Trying to improve threading, thread queue

This commit is contained in:
Krzosa Karol
2024-07-06 07:55:38 +02:00
parent 30fa22aed5
commit e3a176b2f9
6 changed files with 201 additions and 108 deletions

30
src/basic/thread_queue.h Normal file
View File

@@ -0,0 +1,30 @@
#define WORK_FUNCTION(name) void name(void *data)
typedef WORK_FUNCTION(WorkQueueCallback);
struct WorkQueueEntry {
WorkQueueCallback *callback;
void *data;
};
struct ThreadCtx {
int thread_index;
};
struct WorkQueue {
WorkQueueEntry entries[256];
int64_t volatile index_to_write;
int64_t volatile index_to_read;
int64_t volatile completion_index;
int64_t volatile completion_goal;
void *semaphore;
};
struct ThreadStartupInfo {
uint32_t thread_id;
int32_t thread_index;
WorkQueue *queue;
};
void PushWork(WorkQueue *wq, void *data, WorkQueueCallback *callback);
void InitWorkQueue(WorkQueue *queue, uint32_t thread_count, ThreadStartupInfo *info);
void WaitUntilCompletion(WorkQueue *wq);

View File

@@ -1,4 +1,5 @@
#include "filesystem.h"
#include "thread_queue.h"
#ifndef NOMINMAX
#define NOMINMAX
@@ -8,6 +9,9 @@
#endif
#include <windows.h>
#include <stdio.h>
#include <intrin.h>
#include "win32_thread.cpp"
// Basic begin
void *VReserve(size_t size) {

View File

@@ -0,0 +1,83 @@
int64_t AtomicIncrement(volatile int64_t *i) {
return InterlockedIncrement64(i);
}
int64_t AtomicCompareAndSwap(volatile int64_t *dst, int64_t exchange, int64_t comperand) {
return InterlockedCompareExchange64(dst, exchange, comperand);
}
void PushWork(WorkQueue *wq, void *data, WorkQueueCallback *callback) {
uint32_t new_index = (wq->index_to_write + 1) % Lengthof(wq->entries);
assert(new_index != wq->index_to_read);
WorkQueueEntry *entry = wq->entries + wq->index_to_write;
entry->data = data;
entry->callback = callback;
wq->completion_goal += 1;
_WriteBarrier();
wq->index_to_write = new_index;
ReleaseSemaphore(wq->semaphore, 1, 0);
}
bool TryDoingWork(WorkQueue *wq) {
bool should_sleep = false;
int64_t original_index_to_read = wq->index_to_read;
int64_t new_index_to_read = (original_index_to_read + 1) % Lengthof(wq->entries);
if (original_index_to_read != wq->index_to_write) {
int64_t index = AtomicCompareAndSwap(&wq->index_to_read, new_index_to_read, original_index_to_read);
if (index == original_index_to_read) {
WorkQueueEntry *entry = wq->entries + index;
entry->callback(entry->data);
AtomicIncrement(&wq->completion_index);
}
} else {
should_sleep = true;
}
return should_sleep;
}
DWORD WINAPI WorkQueueThreadEntry(LPVOID param) {
auto ti = (ThreadStartupInfo *)param;
ThreadCtx ctx = {};
ctx.thread_index = ti->thread_index;
InitScratch();
for (;;) {
if (TryDoingWork(ti->queue)) {
WaitForSingleObject(ti->queue->semaphore, INFINITE);
}
}
}
void InitWorkQueue(WorkQueue *queue, uint32_t thread_count, ThreadStartupInfo *info) {
queue->index_to_read = 0;
queue->index_to_write = 0;
queue->completion_index = 0;
queue->completion_goal = 0;
queue->semaphore = CreateSemaphoreExA(0, 0, thread_count, 0, 0, SEMAPHORE_ALL_ACCESS);
Assert(queue->semaphore != INVALID_HANDLE_VALUE);
for (uint32_t i = 0; i < thread_count; i++) {
ThreadStartupInfo *ti = info + i;
ti->thread_index = i;
ti->queue = queue;
DWORD thread_id = 0;
HANDLE thread_handle = CreateThread(0, 0, WorkQueueThreadEntry, ti, 0, &thread_id);
Assert(thread_handle != INVALID_HANDLE_VALUE);
ti->thread_id = thread_id;
CloseHandle(thread_handle);
}
}
void WaitUntilCompletion(WorkQueue *wq) {
while (wq->completion_goal != wq->completion_index) {
TryDoingWork(wq);
}
}
bool IsWorkCompleted(WorkQueue *wq) {
bool result = wq->completion_goal == wq->completion_index;
return result;
}

View File

@@ -55,21 +55,8 @@ struct TimeFile {
struct ParseThreadIO {
Array<String> input_files;
// output
Arena *arena;
Array<TimeFile> time_files;
};
void ParseThreadEntry(ParseThreadIO *io) {
io->arena = AllocArena();
io->time_files.allocator = *io->arena;
For(io->input_files) {
Array<TimeString> time_strings = ParseSrtFile(io->arena, it);
io->time_files.add({time_strings, it});
}
}
struct XToTimeString {
String string; // String inside transcript arena
uint16_t hour;
@@ -77,9 +64,28 @@ struct XToTimeString {
uint16_t second;
String filepath;
};
Arena XArena;
void AddFolder(String folder, Array<String> *filenames, Array<XToTimeString> *x_to_time_string) {
Arena XArena;
std::mutex XArenaAddMutex;
Array<XToTimeString> XToTime;
WORK_FUNCTION(ParseFilesWork) {
ParseThreadIO *io = (ParseThreadIO *)data;
ForItem(it_time_file, io->input_files) {
Scratch scratch;
Array<TimeString> time_strings = ParseSrtFile(scratch, it_time_file);
XArenaAddMutex.lock();
For(time_strings) {
String s = Copy(XArena, it.string);
s.data[s.len] = ' ';
XToTime.add({s, it.hour, it.minute, it.second, it_time_file});
}
XArenaAddMutex.unlock();
}
}
void XAddFolder(String folder, Array<String> *filenames) {
Scratch scratch;
Array<String> srt_files = {scratch};
@@ -91,11 +97,10 @@ void AddFolder(String folder, Array<String> *filenames, Array<XToTimeString> *x_
}
}
int64_t thread_count = 16;
Array<std::thread *> threads = {scratch};
int64_t files_per_thread = srt_files.len / thread_count;
int64_t remainder = srt_files.len % thread_count;
int64_t fi = 0;
int64_t thread_count = 16;
int64_t files_per_thread = srt_files.len / thread_count;
int64_t remainder = srt_files.len % thread_count;
int64_t fi = 0;
Array<ParseThreadIO> io = {scratch};
io.reserve(thread_count);
@@ -108,28 +113,14 @@ void AddFolder(String folder, Array<String> *filenames, Array<XToTimeString> *x_
ParseThreadIO *i = io.alloc();
i->input_files = files;
threads.add(new std::thread(ParseThreadEntry, i));
PushWork(&MainWorkQueue, (void *)i, ParseFilesWork);
}
For(threads) {
it->join();
delete it;
}
ForItem(it_io, io) {
ForItem(it_time_file, it_io.time_files) {
For(it_time_file.time_strings) {
String s = Copy(XArena, it.string);
s.data[s.len] = ' ';
x_to_time_string->add({s, it.hour, it.minute, it.second, it_time_file.file});
}
}
Release(it_io.arena);
}
WaitUntilCompletion(&MainWorkQueue);
}
XToTimeString *FindItem(Array<XToTimeString> &x_to_time_string, String string) {
For(x_to_time_string) {
XToTimeString *XFindItem(String string) {
For(XToTime) {
uintptr_t begin = (uintptr_t)(it.string.data);
uintptr_t end = (uintptr_t)(it.string.data + it.string.len);
uintptr_t needle = (uintptr_t)string.data;

View File

@@ -1,6 +1,7 @@
#define BASIC_IMPL
#include "../basic/basic.h"
#include "../basic/filesystem.h"
#include "../basic/thread_queue.h"
#include <thread>
#include <semaphore>
@@ -11,9 +12,10 @@
#include "imgui_impl_sdl2.h"
#include "imgui_impl_opengl3.h"
#include <SDL.h>
#include <SDL_opengl.h>
#include "glad.h"
Arena Perm;
Arena Perm;
WorkQueue MainWorkQueue;
#include "loading_thread.cpp"
#include "searching_thread.cpp"
@@ -35,13 +37,12 @@ int main(int, char **) {
InitArena(&XArena);
XArena.align = 0;
ThreadStartupInfo infos[16] = {};
InitWorkQueue(&MainWorkQueue, 16, infos);
memcpy(Prompt, "read=D:/zizek", sizeof("read=D:/zizek"));
std::thread search_thread(SearchThreadEntry);
int64_t chosen_text = 0;
int64_t match_search_offset = 0;
Array<String> filenames = {};
Array<XToTimeString> x_to_time_string = {};
Array<String> filenames = {};
if (SDL_Init(SDL_INIT_VIDEO | SDL_INIT_TIMER | SDL_INIT_GAMECONTROLLER) != 0) {
printf("Error: %s\n", SDL_GetError());
@@ -67,6 +68,12 @@ int main(int, char **) {
SDL_GLContext gl_context = SDL_GL_CreateContext(window);
SDL_GL_MakeCurrent(window, gl_context);
if (!gladLoadGLLoader((GLADloadproc)SDL_GL_GetProcAddress)) {
Assert(0);
return -1;
}
SDL_GL_SetSwapInterval(1); // Enable vsync
// Setup Dear ImGui context
@@ -138,13 +145,7 @@ int main(int, char **) {
{
ImGuiWindowFlags window_flags = 0;
window_flags |= ImGuiWindowFlags_NoTitleBar;
window_flags |= ImGuiWindowFlags_NoScrollbar;
window_flags |= ImGuiWindowFlags_NoMove;
window_flags |= ImGuiWindowFlags_NoResize;
window_flags |= ImGuiWindowFlags_NoCollapse;
ImGuiWindowFlags window_flags = ImGuiWindowFlags_NoTitleBar | ImGuiWindowFlags_NoScrollbar | ImGuiWindowFlags_NoMove | ImGuiWindowFlags_NoResize | ImGuiWindowFlags_NoCollapse;
const ImGuiViewport *main_viewport = ImGui::GetMainViewport();
ImGui::SetNextWindowPos(ImVec2(0, 0), ImGuiCond_Always);
ImGui::SetNextWindowSize(ImVec2(main_viewport->Size.x, 20), ImGuiCond_Always);
@@ -153,48 +154,42 @@ int main(int, char **) {
if (set_focus_to_input) ImGui::SetKeyboardFocusHere(0);
if (tab_press && ImGui::IsWindowFocused()) set_focus_to_list = true;
if (ImGui::InputText("Input your query", Prompt, sizeof(Prompt))) {
match_search_offset = 0;
SearchThreadStopSearching = true;
SearchThreadSemaphore.release();
StartSearchingForMatches();
}
ImGui::End();
if (enter_press) {
String prompt = Prompt;
if (StartsWith(prompt, "read=")) {
AddFolder(prompt.skip(5), &filenames, &x_to_time_string);
XAddFolder(prompt.skip(5), &filenames);
memset(Prompt, 0, sizeof(Prompt));
}
}
}
{
ImGuiWindowFlags window_flags = 0;
window_flags |= ImGuiWindowFlags_NoTitleBar;
window_flags |= ImGuiWindowFlags_NoMove;
window_flags |= ImGuiWindowFlags_NoResize;
window_flags |= ImGuiWindowFlags_NoCollapse;
ImGuiWindowFlags window_flags = ImGuiWindowFlags_NoTitleBar | ImGuiWindowFlags_NoMove | ImGuiWindowFlags_NoResize | ImGuiWindowFlags_NoCollapse;
const ImGuiViewport *main_viewport = ImGui::GetMainViewport();
ImGui::SetNextWindowPos(ImVec2(0, 30), ImGuiCond_Always);
ImGui::SetNextWindowSize(ImVec2(main_viewport->Size.x, main_viewport->Size.y - 30), ImGuiCond_Always);
ImGui::Begin("result", NULL, window_flags);
if (!ImGui::IsWindowFocused() && set_focus_to_list) ImGui::SetKeyboardFocusHere(0);
SearchThreadArrayMutex.lock();
Array<String> matches = LockSearchResults();
defer { UnlockSearchResults(); };
float font_size = ImGui::GetFontSize();
ImFont *font = ImGui::GetFont();
int64_t chars_per_line = (int64_t)(main_viewport->WorkSize.x / 2) / (int64_t)font->GetCharAdvance('_') - strlen(Prompt) * 2;
// int64_t chars_per_line = 200;
ImGuiListClipper clipper;
clipper.Begin((int)Matches.len);
clipper.Begin((int)matches.len);
while (clipper.Step()) {
for (int i = clipper.DisplayStart; i < clipper.DisplayEnd; i++) {
ImGui::PushID(i);
defer { ImGui::PopID(); };
auto &it = Matches[i];
auto &it = matches[i];
uintptr_t begin_region = (uintptr_t)XArena.data;
uintptr_t end_region = (uintptr_t)XArena.data + XArena.len;
@@ -213,7 +208,7 @@ int main(int, char **) {
String string = Format(frame_arena, "%.*s**%.*s**%.*s",
FmtString(left), FmtString(middle), FmtString(right));
XToTimeString *item = FindItem(x_to_time_string, middle);
XToTimeString *item = XFindItem(middle);
if (ImGui::Button(string.data)) {
String base = ChopLastPeriod(item->filepath); // .srt
base = ChopLastPeriod(base); // .en
@@ -235,8 +230,6 @@ int main(int, char **) {
ImGui::Text(SkipToLastSlash(item->filepath).data);
}
}
SearchThreadArrayMutex.unlock();
ImGui::End();
}
@@ -250,8 +243,6 @@ int main(int, char **) {
SDL_Delay(16);
}
SearchThreadClose(search_thread);
// Cleanup
ImGui_ImplOpenGL3_Shutdown();
ImGui_ImplSDL2_Shutdown();

View File

@@ -1,45 +1,39 @@
Arena MatchesArena;
Array<String> Matches = {MatchesArena};
Array<String> Matches = {};
char Prompt[256];
std::mutex SearchThreadArrayMutex;
bool SearchThreadStopSearching;
std::mutex SearchThreadArrayMutex;
std::binary_semaphore SearchThreadSemaphore{0};
bool SearchThreadStopSearching = false;
bool SearchThreadRunning = true;
void SearchThreadEntry() {
InitArena(&MatchesArena);
for (;;) {
SearchThreadSemaphore.acquire();
if (!SearchThreadRunning) break;
SearchThreadStopSearching = false;
WORK_FUNCTION(SearchForMatchesWork) {
if (Prompt[0] == 0) return;
if (Prompt[0]) {
SearchThreadArrayMutex.lock();
{
Matches.clear();
}
SearchThreadArrayMutex.unlock();
SearchThreadArrayMutex.lock();
Matches.clear();
SearchThreadArrayMutex.unlock();
String buffer = {(char *)XArena.data, (int64_t)XArena.len};
String find = Prompt;
int64_t index = 0;
while (Seek(buffer, find, &index, SeekFlag_IgnoreCase)) {
String found = {buffer.data + index, find.len};
SearchThreadArrayMutex.lock();
{
Matches.add(found);
}
SearchThreadArrayMutex.unlock();
if (SearchThreadStopSearching) break;
buffer = buffer.skip(index + find.len);
}
}
String buffer = {(char *)XArena.data, (int64_t)XArena.len};
String find = Prompt;
int64_t index = 0;
while (Seek(buffer, find, &index, SeekFlag_IgnoreCase)) {
String found = {buffer.data + index, find.len};
Matches.add(found);
buffer = buffer.skip(index + find.len);
if (SearchThreadStopSearching) return;
}
}
void SearchThreadClose(std::thread &thread) {
SearchThreadRunning = false;
SearchThreadSemaphore.release();
thread.join();
void StartSearchingForMatches() {
SearchThreadStopSearching = true;
WaitUntilCompletion(&MainWorkQueue);
SearchThreadStopSearching = false;
PushWork(&MainWorkQueue, NULL, SearchForMatchesWork);
}
Array<String> LockSearchResults() {
SearchThreadArrayMutex.lock();
Array<String> copy = Matches;
return copy;
}
void UnlockSearchResults() {
SearchThreadArrayMutex.unlock();
}