diff --git a/custom/4coder_async_tasks.cpp b/custom/4coder_async_tasks.cpp index b4f88b09..3a0e9adf 100644 --- a/custom/4coder_async_tasks.cpp +++ b/custom/4coder_async_tasks.cpp @@ -4,55 +4,55 @@ // TOP -global Async_System async_system = {}; +// TODO(allen): put atomic wrappers in base type layer +#define atomic_write_b32(p,v) (*(p)=(v)) +#define atomic_read_b32(p) (*(p)) + +global Async_System global_async_system = {}; function Async_Node* -async_pop_node(void){ - system_mutex_acquire(async_system.mutex); - for (;async_system.task_count == 0;){ - system_condition_variable_wait(async_system.cv, async_system.mutex); +async_pop_node(Async_System *async_system){ + for (;async_system->task_count == 0;){ + system_condition_variable_wait(async_system->cv, async_system->mutex); } - Async_Node *node = async_system.task_first; - sll_queue_pop(async_system.task_first, async_system.task_last); - async_system.task_count -= 1; - system_mutex_release(async_system.mutex); - node->next = 0; - return(node); + Node *node = async_system->task_sent.next; + Assert(node != &async_system->task_sent); + dll_remove(node); + async_system->task_count -= 1; + Async_Node *a_node = CastFromMember(Async_Node, node, node); + a_node->next = 0; + return(a_node); } function Async_Task -async_push_node(Async_Task_Function_Type *func, Data data){ - Async_Task result = async_system.task_id_counter; - async_system.task_id_counter += 1; +async_push_node(Async_System *async_system, Async_Task_Function_Type *func, Data data){ + Async_Task result = async_system->task_id_counter; + async_system->task_id_counter += 1; - system_mutex_acquire(async_system.mutex); - Async_Node *node = async_system.free_nodes; + Async_Node *node = async_system->free_nodes; if (node == 0){ - node = push_array(&async_system.node_arena, Async_Node, 1); + node = push_array(&async_system->node_arena, Async_Node, 1); } else{ - sll_stack_pop(async_system.free_nodes); + sll_stack_pop(async_system->free_nodes); } node->task = result; node->thread = 0; node->func = func; - node->data.data = (u8*)heap_allocate(&async_system.node_heap, data.size); + node->data.data = (u8*)heap_allocate(&async_system->node_heap, data.size); block_copy(node->data.data, data.data, data.size); node->data.size = data.size; - sll_queue_push(async_system.task_first, async_system.task_last, node); - async_system.task_count += 1; - system_condition_variable_signal(async_system.cv); - system_mutex_release(async_system.mutex); + dll_insert_back(&async_system->task_sent, &node->node); + async_system->task_count += 1; + system_condition_variable_signal(async_system->cv); return(result); } function void -async_free_node(Async_Node *node){ - system_mutex_acquire(async_system.mutex); - heap_free(&async_system.node_heap, node->data.data); - sll_stack_push(async_system.free_nodes, node); - system_mutex_release(async_system.mutex); +async_free_node(Async_System *async_system, Async_Node *node){ + heap_free(&async_system->node_heap, node->data.data); + sll_stack_push(async_system->free_nodes, node); } function void @@ -67,43 +67,61 @@ async_task_thread(void *thread_ptr){ thread_ctx_init(tctx, ThreadKind_AsyncTasks, allocator, allocator); Async_Thread *thread = (Async_Thread*)thread_ptr; + Async_System *async_system = thread->async_system; Application_Links app = {}; app.tctx = tctx; - app.cmd_context = async_system.cmd_context; + app.cmd_context = async_system->cmd_context; Async_Context ctx = {&app, thread}; for (;;){ - Async_Node *node = async_pop_node(); + system_mutex_acquire(async_system->mutex); + Async_Node *node = async_pop_node(async_system); node->thread = thread; thread->node = node; thread->task = node->task; + thread->cancel_signal = false; + thread->join_signal = false; + system_mutex_release(async_system->mutex); + node->func(&ctx, node->data); + + system_mutex_acquire(async_system->mutex); + if (thread->join_signal){ + system_condition_variable_signal(async_system->join_cv); + } + node->thread = 0; thread->node = 0; thread->task = 0; - async_free_node(node); + thread->cancel_signal = false; + thread->join_signal = false; + async_free_node(async_system, node); + system_mutex_release(async_system->mutex); } } function Async_Node* -async_get_pending_node(Async_Task task){ +async_get_pending_node(Async_System *async_system, Async_Task task){ Async_Node *result = 0; - for (Async_Node *node = async_system.task_first; - node != 0; - node = node->next){ - if (node->task == task){ - result = node; - break; + if (task != 0){ + for (Node *node = async_system->task_sent.next; + node != &async_system->task_sent; + node = node->next){ + Async_Node *a_node = CastFromMember(Async_Node, node, node); + if (a_node->task == task){ + result = a_node; + break; + } } } return(result); } function Async_Node* -async_get_running_node(Async_Task task){ +async_get_running_node(Async_System *async_system, Async_Task task){ Async_Node *result = 0; - if (async_system.thread.task == task){ - + if (task != 0 && async_system->thread.task == task){ + result = async_system->thread.node; } return(result); } @@ -111,63 +129,106 @@ async_get_running_node(Async_Task task){ //////////////////////////////// function void -async_task_handler_init(Application_Links *app){ - block_zero_struct(&async_system); - async_system.cmd_context = app->cmd_context; - async_system.node_arena = make_arena_system(KB(4)); - heap_init(&async_system.node_heap, &async_system.node_arena); - async_system.mutex = system_mutex_make(); - async_system.cv = system_condition_variable_make(); - async_system.thread.thread = system_thread_launch(async_task_thread, &async_system.thread); +async_task_handler_init(Application_Links *app, Async_System *async_system){ + block_zero_struct(async_system); + async_system->cmd_context = app->cmd_context; + async_system->node_arena = make_arena_system(KB(4)); + heap_init(&async_system->node_heap, &async_system->node_arena); + async_system->mutex = system_mutex_make(); + async_system->cv = system_condition_variable_make(); + async_system->join_cv = system_condition_variable_make(); + dll_init_sentinel(&async_system->task_sent); + async_system->thread.thread = system_thread_launch(async_task_thread, &async_system->thread); } function Async_Task -async_task_no_dep(Async_Task_Function_Type *func, Data data){ - return(async_push_node(func, data)); +async_task_no_dep(Async_System *async_system, Async_Task_Function_Type *func, Data data){ + system_mutex_acquire(async_system->mutex); + Async_Task result = async_push_node(async_system, func, data); + system_mutex_release(async_system->mutex); + return(result); } function Async_Task -async_task_single_dep(Async_Task_Function_Type *func, Data data, Async_Task dependency){ +async_task_single_dep(Async_System *async_system, Async_Task_Function_Type *func, Data data, Async_Task dependency){ NotImplemented; } function b32 -async_task_is_pending(Async_Task task){ - system_mutex_acquire(async_system.mutex); - Async_Node *node = async_get_pending_node(task); - system_mutex_release(async_system.mutex); +async_task_is_pending(Async_System *async_system, Async_Task task){ + system_mutex_acquire(async_system->mutex); + Async_Node *node = async_get_pending_node(async_system, task); + system_mutex_release(async_system->mutex); return(node != 0); } function b32 -async_task_is_running(Async_Task task){ - system_mutex_acquire(async_system.mutex); - Async_Node *node = async_get_running_node(task); - system_mutex_release(async_system.mutex); +async_task_is_running(Async_System *async_system, Async_Task task){ + system_mutex_acquire(async_system->mutex); + Async_Node *node = async_get_running_node(async_system, task); + system_mutex_release(async_system->mutex); return(node != 0); } function b32 -async_task_is_running_or_pending(Async_Task task){ - system_mutex_acquire(async_system.mutex); - Async_Node *node = async_get_pending_node(task); +async_task_is_running_or_pending(Async_System *async_system, Async_Task task){ + system_mutex_acquire(async_system->mutex); + Async_Node *node = async_get_pending_node(async_system, task); if (node != 0){ - node = async_get_running_node(task); + node = async_get_running_node(async_system, task); } - system_mutex_release(async_system.mutex); + system_mutex_release(async_system->mutex); return(node != 0); } function void -async_task_cancel(Async_Task task){ - system_mutex_acquire(async_system.mutex); - NotImplemented; - system_mutex_release(async_system.mutex); +async_task_cancel(Async_System *async_system, Async_Task task){ + system_mutex_acquire(async_system->mutex); + Async_Node *node = async_get_pending_node(async_system, task); + if (node != 0){ + dll_remove(&node->node); + async_system->task_count -= 1; + async_free_node(async_system, node); + } + else{ + node = async_get_running_node(async_system, task); + if (node != 0){ + b32 *cancel_signal = &node->thread->cancel_signal; + atomic_write_b32(cancel_signal, true); + } + } + system_mutex_release(async_system->mutex); } function void -async_task_join(Async_Task task){ - NotImplemented; +async_task_join(Async_System *async_system, Async_Task task){ + system_mutex_acquire(async_system->mutex); + Async_Node *node = async_get_pending_node(async_system, task); + b32 wait_for_join = false; + if (node != 0){ + dll_remove(&node->node); + dll_insert(&async_system->task_sent, &node->node); + node->thread->join_signal = true; + wait_for_join = true; + } + else{ + node = async_get_running_node(async_system, task); + if (node != 0){ + node->thread->join_signal = true; + wait_for_join = true; + } + } + if (wait_for_join){ + system_condition_variable_wait(async_system->join_cv, async_system->mutex); + } + system_mutex_release(async_system->mutex); +} + +function b32 +async_check_canceled(Async_Context *actx){ + b32 *cancel_signal = &actx->thread->cancel_signal; + b32 result = atomic_read_b32(cancel_signal); + return(result); } // BOTTOM diff --git a/custom/4coder_async_tasks.h b/custom/4coder_async_tasks.h index ab1d4ac9..dd3b5efb 100644 --- a/custom/4coder_async_tasks.h +++ b/custom/4coder_async_tasks.h @@ -12,12 +12,18 @@ typedef u64 Async_Task; struct Async_Thread{ System_Thread thread; + struct Async_System *async_system; struct Async_Node *node; Async_Task task; + b32 cancel_signal; + b32 join_signal; }; struct Async_Node{ - Async_Node *next; + union{ + Async_Node *next; + Node node; + }; Async_Task task; Async_Thread *thread; Async_Task_Function_Type *func; @@ -31,10 +37,10 @@ struct Async_System{ Arena node_arena; System_Mutex mutex; System_Condition_Variable cv; + System_Condition_Variable join_cv; Async_Task task_id_counter; Async_Node *free_nodes; - Async_Node *task_first; - Async_Node *task_last; + Node task_sent; i32 task_count; Async_Thread thread; diff --git a/custom/4coder_default_bindings.cpp b/custom/4coder_default_bindings.cpp index d636362f..6b295fd3 100644 --- a/custom/4coder_default_bindings.cpp +++ b/custom/4coder_default_bindings.cpp @@ -19,7 +19,7 @@ custom_layer_init(Application_Links *app){ mapping_init(tctx, &framework_mapping); setup_default_mapping(&framework_mapping); global_prof_init(); - async_task_handler_init(app); + async_task_handler_init(app, &global_async_system); } #endif diff --git a/custom/4coder_default_hooks.cpp b/custom/4coder_default_hooks.cpp index 8ba19564..b5a1dc70 100644 --- a/custom/4coder_default_hooks.cpp +++ b/custom/4coder_default_hooks.cpp @@ -689,7 +689,7 @@ BUFFER_HOOK_SIG(default_begin_buffer){ } if (use_lexer){ - Async_Task lex_task = async_task_no_dep(do_full_lex_async, make_data_struct(&buffer_id)); + Async_Task lex_task = async_task_no_dep(&global_async_system, do_full_lex_async, make_data_struct(&buffer_id)); Async_Task *lex_task_ptr = scope_attachment(app, scope, buffer_lex_task, Async_Task); *lex_task_ptr = lex_task; } @@ -796,7 +796,7 @@ BUFFER_EDIT_RANGE_SIG(default_buffer_edit_range){ else{ scratch.restore(); base_free(allocator, ptr->tokens); - Async_Task lex_task = async_task_no_dep(do_full_lex_async, make_data_struct(&buffer_id)); + Async_Task lex_task = async_task_no_dep(&global_async_system, do_full_lex_async, make_data_struct(&buffer_id)); Async_Task *lex_task_ptr = scope_attachment(app, scope, buffer_lex_task, Async_Task); *lex_task_ptr = lex_task; }