101 lines
2.8 KiB
C++
101 lines
2.8 KiB
C++
|
|
// @Section: Work Queue
|
|
#define WORK_QUEUE_CALLBACK(name) void name(void *data)
|
|
typedef WORK_QUEUE_CALLBACK(WorkQueueCallback);
|
|
|
|
struct WorkQueueEntry {
|
|
WorkQueueCallback *callback;
|
|
void *data;
|
|
};
|
|
|
|
struct WorkQueue {
|
|
WorkQueueEntry entries[256];
|
|
S64 volatile index_to_write;
|
|
S64 volatile index_to_read;
|
|
S64 volatile completion_index;
|
|
S64 volatile completion_goal;
|
|
HANDLE semaphore;
|
|
};
|
|
|
|
struct ThreadStartupInfo {
|
|
DWORD thread_id;
|
|
S32 thread_index;
|
|
WorkQueue *queue;
|
|
};
|
|
|
|
S64 atomic_increment(volatile S64 *i){
|
|
return InterlockedIncrement64(i);
|
|
}
|
|
|
|
S64 atomic_compare_and_swap(volatile S64 *dst, S64 exchange, S64 comperand){
|
|
return InterlockedCompareExchange64(dst, exchange, comperand);
|
|
}
|
|
|
|
void push_work(WorkQueue *wq, void *data, WorkQueueCallback *callback) {
|
|
U32 new_index = (wq->index_to_write + 1) % buff_cap(wq->entries);
|
|
assert(new_index != wq->index_to_read);
|
|
|
|
WorkQueueEntry *entry = wq->entries + wq->index_to_write;
|
|
entry->data = data;
|
|
entry->callback = callback;
|
|
|
|
wq->completion_goal+=1;
|
|
_WriteBarrier();
|
|
wq->index_to_write = new_index;
|
|
ReleaseSemaphore(wq->semaphore, 1, 0);
|
|
}
|
|
|
|
bool try_doing_work(WorkQueue *wq) {
|
|
bool should_sleep = false;
|
|
S64 original_index_to_read = wq->index_to_read;
|
|
S64 new_index_to_read = (original_index_to_read + 1) % buff_cap(wq->entries);
|
|
if(original_index_to_read != wq->index_to_write) {
|
|
S64 index = atomic_compare_and_swap(&wq->index_to_read, new_index_to_read, original_index_to_read);
|
|
if(index == original_index_to_read) {
|
|
WorkQueueEntry *entry = wq->entries + index;
|
|
entry->callback(entry->data);
|
|
atomic_increment(&wq->completion_index);
|
|
}
|
|
}
|
|
else {
|
|
should_sleep = true;
|
|
}
|
|
return should_sleep;
|
|
}
|
|
|
|
DWORD WINAPI thread_proc(LPVOID param) {
|
|
auto ti = (ThreadStartupInfo *)param;
|
|
|
|
Thread_Ctx ctx = {};
|
|
ctx.thread_index = ti->thread_index;
|
|
for(;;) {
|
|
if(try_doing_work(ti->queue)) {
|
|
WaitForSingleObject(ti->queue->semaphore, INFINITE);
|
|
}
|
|
}
|
|
}
|
|
|
|
void init_work_queue(WorkQueue *queue, U32 thread_count, ThreadStartupInfo *info) {
|
|
queue->index_to_read = 0;
|
|
queue->index_to_write = 0;
|
|
queue->completion_index = 0;
|
|
queue->completion_goal = 0;
|
|
queue->semaphore = CreateSemaphoreExA(0, 0, thread_count, 0, 0, SEMAPHORE_ALL_ACCESS);
|
|
assert_msg(queue->semaphore != INVALID_HANDLE_VALUE, "Failed to create semaphore");
|
|
|
|
for(U32 i = 0; i < thread_count; i++) {
|
|
ThreadStartupInfo *ti = info + i;
|
|
ti->thread_index = i;
|
|
ti->queue = queue;
|
|
HANDLE thread_handle = CreateThread(0, 0, thread_proc, ti, 0, &ti->thread_id);
|
|
assert_msg(thread_handle != INVALID_HANDLE_VALUE, "Failed to create thread");
|
|
CloseHandle(thread_handle);
|
|
}
|
|
}
|
|
|
|
void wait_until_completion(WorkQueue *wq) {
|
|
while(wq->completion_goal != wq->completion_index) {
|
|
try_doing_work(wq);
|
|
}
|
|
}
|