Async with cancel and join

This commit is contained in:
Allen Webster 2019-10-21 22:17:24 -07:00
parent c7f69d26a0
commit 1396317884
4 changed files with 144 additions and 77 deletions

View File

@ -4,55 +4,55 @@
// TOP // TOP
global Async_System async_system = {}; // TODO(allen): put atomic wrappers in base type layer
#define atomic_write_b32(p,v) (*(p)=(v))
#define atomic_read_b32(p) (*(p))
global Async_System global_async_system = {};
function Async_Node* function Async_Node*
async_pop_node(void){ async_pop_node(Async_System *async_system){
system_mutex_acquire(async_system.mutex); for (;async_system->task_count == 0;){
for (;async_system.task_count == 0;){ system_condition_variable_wait(async_system->cv, async_system->mutex);
system_condition_variable_wait(async_system.cv, async_system.mutex);
} }
Async_Node *node = async_system.task_first; Node *node = async_system->task_sent.next;
sll_queue_pop(async_system.task_first, async_system.task_last); Assert(node != &async_system->task_sent);
async_system.task_count -= 1; dll_remove(node);
system_mutex_release(async_system.mutex); async_system->task_count -= 1;
node->next = 0; Async_Node *a_node = CastFromMember(Async_Node, node, node);
return(node); a_node->next = 0;
return(a_node);
} }
function Async_Task function Async_Task
async_push_node(Async_Task_Function_Type *func, Data data){ async_push_node(Async_System *async_system, Async_Task_Function_Type *func, Data data){
Async_Task result = async_system.task_id_counter; Async_Task result = async_system->task_id_counter;
async_system.task_id_counter += 1; async_system->task_id_counter += 1;
system_mutex_acquire(async_system.mutex); Async_Node *node = async_system->free_nodes;
Async_Node *node = async_system.free_nodes;
if (node == 0){ if (node == 0){
node = push_array(&async_system.node_arena, Async_Node, 1); node = push_array(&async_system->node_arena, Async_Node, 1);
} }
else{ else{
sll_stack_pop(async_system.free_nodes); sll_stack_pop(async_system->free_nodes);
} }
node->task = result; node->task = result;
node->thread = 0; node->thread = 0;
node->func = func; node->func = func;
node->data.data = (u8*)heap_allocate(&async_system.node_heap, data.size); node->data.data = (u8*)heap_allocate(&async_system->node_heap, data.size);
block_copy(node->data.data, data.data, data.size); block_copy(node->data.data, data.data, data.size);
node->data.size = data.size; node->data.size = data.size;
sll_queue_push(async_system.task_first, async_system.task_last, node); dll_insert_back(&async_system->task_sent, &node->node);
async_system.task_count += 1; async_system->task_count += 1;
system_condition_variable_signal(async_system.cv); system_condition_variable_signal(async_system->cv);
system_mutex_release(async_system.mutex);
return(result); return(result);
} }
function void function void
async_free_node(Async_Node *node){ async_free_node(Async_System *async_system, Async_Node *node){
system_mutex_acquire(async_system.mutex); heap_free(&async_system->node_heap, node->data.data);
heap_free(&async_system.node_heap, node->data.data); sll_stack_push(async_system->free_nodes, node);
sll_stack_push(async_system.free_nodes, node);
system_mutex_release(async_system.mutex);
} }
function void function void
@ -67,43 +67,61 @@ async_task_thread(void *thread_ptr){
thread_ctx_init(tctx, ThreadKind_AsyncTasks, allocator, allocator); thread_ctx_init(tctx, ThreadKind_AsyncTasks, allocator, allocator);
Async_Thread *thread = (Async_Thread*)thread_ptr; Async_Thread *thread = (Async_Thread*)thread_ptr;
Async_System *async_system = thread->async_system;
Application_Links app = {}; Application_Links app = {};
app.tctx = tctx; app.tctx = tctx;
app.cmd_context = async_system.cmd_context; app.cmd_context = async_system->cmd_context;
Async_Context ctx = {&app, thread}; Async_Context ctx = {&app, thread};
for (;;){ for (;;){
Async_Node *node = async_pop_node(); system_mutex_acquire(async_system->mutex);
Async_Node *node = async_pop_node(async_system);
node->thread = thread; node->thread = thread;
thread->node = node; thread->node = node;
thread->task = node->task; thread->task = node->task;
thread->cancel_signal = false;
thread->join_signal = false;
system_mutex_release(async_system->mutex);
node->func(&ctx, node->data); node->func(&ctx, node->data);
system_mutex_acquire(async_system->mutex);
if (thread->join_signal){
system_condition_variable_signal(async_system->join_cv);
}
node->thread = 0;
thread->node = 0; thread->node = 0;
thread->task = 0; thread->task = 0;
async_free_node(node); thread->cancel_signal = false;
thread->join_signal = false;
async_free_node(async_system, node);
system_mutex_release(async_system->mutex);
} }
} }
function Async_Node* function Async_Node*
async_get_pending_node(Async_Task task){ async_get_pending_node(Async_System *async_system, Async_Task task){
Async_Node *result = 0; Async_Node *result = 0;
for (Async_Node *node = async_system.task_first; if (task != 0){
node != 0; for (Node *node = async_system->task_sent.next;
node = node->next){ node != &async_system->task_sent;
if (node->task == task){ node = node->next){
result = node; Async_Node *a_node = CastFromMember(Async_Node, node, node);
break; if (a_node->task == task){
result = a_node;
break;
}
} }
} }
return(result); return(result);
} }
function Async_Node* function Async_Node*
async_get_running_node(Async_Task task){ async_get_running_node(Async_System *async_system, Async_Task task){
Async_Node *result = 0; Async_Node *result = 0;
if (async_system.thread.task == task){ if (task != 0 && async_system->thread.task == task){
result = async_system->thread.node;
} }
return(result); return(result);
} }
@ -111,63 +129,106 @@ async_get_running_node(Async_Task task){
//////////////////////////////// ////////////////////////////////
function void function void
async_task_handler_init(Application_Links *app){ async_task_handler_init(Application_Links *app, Async_System *async_system){
block_zero_struct(&async_system); block_zero_struct(async_system);
async_system.cmd_context = app->cmd_context; async_system->cmd_context = app->cmd_context;
async_system.node_arena = make_arena_system(KB(4)); async_system->node_arena = make_arena_system(KB(4));
heap_init(&async_system.node_heap, &async_system.node_arena); heap_init(&async_system->node_heap, &async_system->node_arena);
async_system.mutex = system_mutex_make(); async_system->mutex = system_mutex_make();
async_system.cv = system_condition_variable_make(); async_system->cv = system_condition_variable_make();
async_system.thread.thread = system_thread_launch(async_task_thread, &async_system.thread); async_system->join_cv = system_condition_variable_make();
dll_init_sentinel(&async_system->task_sent);
async_system->thread.thread = system_thread_launch(async_task_thread, &async_system->thread);
} }
function Async_Task function Async_Task
async_task_no_dep(Async_Task_Function_Type *func, Data data){ async_task_no_dep(Async_System *async_system, Async_Task_Function_Type *func, Data data){
return(async_push_node(func, data)); system_mutex_acquire(async_system->mutex);
Async_Task result = async_push_node(async_system, func, data);
system_mutex_release(async_system->mutex);
return(result);
} }
function Async_Task function Async_Task
async_task_single_dep(Async_Task_Function_Type *func, Data data, Async_Task dependency){ async_task_single_dep(Async_System *async_system, Async_Task_Function_Type *func, Data data, Async_Task dependency){
NotImplemented; NotImplemented;
} }
function b32 function b32
async_task_is_pending(Async_Task task){ async_task_is_pending(Async_System *async_system, Async_Task task){
system_mutex_acquire(async_system.mutex); system_mutex_acquire(async_system->mutex);
Async_Node *node = async_get_pending_node(task); Async_Node *node = async_get_pending_node(async_system, task);
system_mutex_release(async_system.mutex); system_mutex_release(async_system->mutex);
return(node != 0); return(node != 0);
} }
function b32 function b32
async_task_is_running(Async_Task task){ async_task_is_running(Async_System *async_system, Async_Task task){
system_mutex_acquire(async_system.mutex); system_mutex_acquire(async_system->mutex);
Async_Node *node = async_get_running_node(task); Async_Node *node = async_get_running_node(async_system, task);
system_mutex_release(async_system.mutex); system_mutex_release(async_system->mutex);
return(node != 0); return(node != 0);
} }
function b32 function b32
async_task_is_running_or_pending(Async_Task task){ async_task_is_running_or_pending(Async_System *async_system, Async_Task task){
system_mutex_acquire(async_system.mutex); system_mutex_acquire(async_system->mutex);
Async_Node *node = async_get_pending_node(task); Async_Node *node = async_get_pending_node(async_system, task);
if (node != 0){ if (node != 0){
node = async_get_running_node(task); node = async_get_running_node(async_system, task);
} }
system_mutex_release(async_system.mutex); system_mutex_release(async_system->mutex);
return(node != 0); return(node != 0);
} }
function void function void
async_task_cancel(Async_Task task){ async_task_cancel(Async_System *async_system, Async_Task task){
system_mutex_acquire(async_system.mutex); system_mutex_acquire(async_system->mutex);
NotImplemented; Async_Node *node = async_get_pending_node(async_system, task);
system_mutex_release(async_system.mutex); if (node != 0){
dll_remove(&node->node);
async_system->task_count -= 1;
async_free_node(async_system, node);
}
else{
node = async_get_running_node(async_system, task);
if (node != 0){
b32 *cancel_signal = &node->thread->cancel_signal;
atomic_write_b32(cancel_signal, true);
}
}
system_mutex_release(async_system->mutex);
} }
function void function void
async_task_join(Async_Task task){ async_task_join(Async_System *async_system, Async_Task task){
NotImplemented; system_mutex_acquire(async_system->mutex);
Async_Node *node = async_get_pending_node(async_system, task);
b32 wait_for_join = false;
if (node != 0){
dll_remove(&node->node);
dll_insert(&async_system->task_sent, &node->node);
node->thread->join_signal = true;
wait_for_join = true;
}
else{
node = async_get_running_node(async_system, task);
if (node != 0){
node->thread->join_signal = true;
wait_for_join = true;
}
}
if (wait_for_join){
system_condition_variable_wait(async_system->join_cv, async_system->mutex);
}
system_mutex_release(async_system->mutex);
}
function b32
async_check_canceled(Async_Context *actx){
b32 *cancel_signal = &actx->thread->cancel_signal;
b32 result = atomic_read_b32(cancel_signal);
return(result);
} }
// BOTTOM // BOTTOM

View File

@ -12,12 +12,18 @@ typedef u64 Async_Task;
struct Async_Thread{ struct Async_Thread{
System_Thread thread; System_Thread thread;
struct Async_System *async_system;
struct Async_Node *node; struct Async_Node *node;
Async_Task task; Async_Task task;
b32 cancel_signal;
b32 join_signal;
}; };
struct Async_Node{ struct Async_Node{
Async_Node *next; union{
Async_Node *next;
Node node;
};
Async_Task task; Async_Task task;
Async_Thread *thread; Async_Thread *thread;
Async_Task_Function_Type *func; Async_Task_Function_Type *func;
@ -31,10 +37,10 @@ struct Async_System{
Arena node_arena; Arena node_arena;
System_Mutex mutex; System_Mutex mutex;
System_Condition_Variable cv; System_Condition_Variable cv;
System_Condition_Variable join_cv;
Async_Task task_id_counter; Async_Task task_id_counter;
Async_Node *free_nodes; Async_Node *free_nodes;
Async_Node *task_first; Node task_sent;
Async_Node *task_last;
i32 task_count; i32 task_count;
Async_Thread thread; Async_Thread thread;

View File

@ -19,7 +19,7 @@ custom_layer_init(Application_Links *app){
mapping_init(tctx, &framework_mapping); mapping_init(tctx, &framework_mapping);
setup_default_mapping(&framework_mapping); setup_default_mapping(&framework_mapping);
global_prof_init(); global_prof_init();
async_task_handler_init(app); async_task_handler_init(app, &global_async_system);
} }
#endif #endif

View File

@ -689,7 +689,7 @@ BUFFER_HOOK_SIG(default_begin_buffer){
} }
if (use_lexer){ if (use_lexer){
Async_Task lex_task = async_task_no_dep(do_full_lex_async, make_data_struct(&buffer_id)); Async_Task lex_task = async_task_no_dep(&global_async_system, do_full_lex_async, make_data_struct(&buffer_id));
Async_Task *lex_task_ptr = scope_attachment(app, scope, buffer_lex_task, Async_Task); Async_Task *lex_task_ptr = scope_attachment(app, scope, buffer_lex_task, Async_Task);
*lex_task_ptr = lex_task; *lex_task_ptr = lex_task;
} }
@ -796,7 +796,7 @@ BUFFER_EDIT_RANGE_SIG(default_buffer_edit_range){
else{ else{
scratch.restore(); scratch.restore();
base_free(allocator, ptr->tokens); base_free(allocator, ptr->tokens);
Async_Task lex_task = async_task_no_dep(do_full_lex_async, make_data_struct(&buffer_id)); Async_Task lex_task = async_task_no_dep(&global_async_system, do_full_lex_async, make_data_struct(&buffer_id));
Async_Task *lex_task_ptr = scope_attachment(app, scope, buffer_lex_task, Async_Task); Async_Task *lex_task_ptr = scope_attachment(app, scope, buffer_lex_task, Async_Task);
*lex_task_ptr = lex_task; *lex_task_ptr = lex_task;
} }