/* * Mr. 4th Dimention - Allen Webster * * 18.07.2017 * * Cross platform logic for work queues. * */ // TOP internal void job_proc(System_Functions *system, Thread_Context *thread, Work_Queue *queue, Thread_Group *group, Thread_Memory *thread_memory){ i32 thread_index = thread->id - 1; i32 cancel_lock = group->cancel_lock0 + thread_index; i32 cancel_cv = group->cancel_cv0 + thread_index; if (thread_memory->size == 0){ i32 new_size = KB(64); thread_memory->data = system_memory_allocate(new_size); thread_memory->size = new_size; } for (;;){ u32 read_index = queue->read_position; u32 write_index = queue->write_position; if (read_index != write_index){ u32 next_read_index = (read_index + 1) % QUEUE_WRAP; u32 safe_read_index = InterlockedCompareExchange(&queue->read_position, next_read_index, read_index); if (safe_read_index == read_index){ Full_Job_Data *full_job = queue->jobs + safe_read_index; // NOTE(allen): This is interlocked so that it plays nice // with the cancel job routine, which may try to cancel this job // at the same time that we try to run it i32 safe_running_thread =InterlockedCompareExchange(&full_job->running_thread, thread->id, THREAD_NOT_ASSIGNED); if (safe_running_thread == THREAD_NOT_ASSIGNED){ thread->job_id = full_job->id; thread->running = true; full_job->job.callback(system, thread, thread_memory, full_job->job.data); system_schedule_step(); thread->running = false; system_acquire_lock(cancel_lock); if (thread->cancel){ thread->cancel = 0; system_signal_cv(cancel_lock, cancel_cv); } system_release_lock(cancel_lock); } } } else{ system_wait_on(queue->semaphore); } } } internal void initialize_unbounded_queue(Unbounded_Work_Queue *source_queue){ i32 max = 512; source_queue->jobs = (Full_Job_Data*)system_memory_allocate(max*sizeof(Full_Job_Data)); source_queue->count = 0; source_queue->max = max; source_queue->skip = 0; } inline i32 get_work_queue_available_space(i32 write, i32 read){ // NOTE(allen): The only time that queue->write_position == queue->read_position // is allowed is when the queue is empty. Thus if (write_position+1 == read_position) // the available space is zero. So these computations both end up leaving one slot unused. // TODO(allen): The only way I can think to easily eliminate this is to have read and write wrap // at twice the size of the underlying array but modulo their values into the array then if write // has caught up with read it still will not be equal... but lots of modulos... ehh. i32 available_space = 0; if (write >= read){ available_space = QUEUE_WRAP - (write - read) - 1; } else{ available_space = (read - write) - 1; } return(available_space); } #define UNBOUNDED_SKIP_MAX 128 internal i32 flush_to_direct_queue(Unbounded_Work_Queue *source_queue, Work_Queue *queue, i32 thread_count){ // NOTE(allen): It is understood that read_position may be changed by other // threads but it will only make more space in the queue if it is changed. // Meanwhile write_position should not ever be changed by anything but the // main thread in this system, so it will not be interlocked. u32 read_position = queue->read_position; u32 write_position = queue->write_position; u32 available_space = get_work_queue_available_space(write_position, read_position); u32 available_jobs = source_queue->count - source_queue->skip; u32 writable_count = Min(available_space, available_jobs); if (writable_count > 0){ u32 count1 = writable_count; if (count1+write_position > QUEUE_WRAP){ count1 = QUEUE_WRAP - write_position; } u32 count2 = writable_count - count1; Full_Job_Data *job_src1 = source_queue->jobs + source_queue->skip; Full_Job_Data *job_src2 = job_src1 + count1; Full_Job_Data *job_dst1 = queue->jobs + write_position; Full_Job_Data *job_dst2 = queue->jobs; Assert((job_src1->id % QUEUE_WRAP) == write_position); memcpy(job_dst1, job_src1, sizeof(Full_Job_Data)*count1); memcpy(job_dst2, job_src2, sizeof(Full_Job_Data)*count2); queue->write_position = (write_position + writable_count) % QUEUE_WRAP; source_queue->skip += writable_count; if (source_queue->skip == source_queue->count){ source_queue->skip = source_queue->count = 0; } else if (source_queue->skip > UNBOUNDED_SKIP_MAX){ u32 left_over = source_queue->count - source_queue->skip; memmove(source_queue->jobs, source_queue->jobs + source_queue->skip, sizeof(Full_Job_Data)*left_over); source_queue->count = left_over; source_queue->skip = 0; } } i32 semaphore_release_count = writable_count; if (semaphore_release_count > thread_count){ semaphore_release_count = thread_count; } for (i32 i = 0; i < semaphore_release_count; ++i){ system_release_semaphore(queue->semaphore); } return(semaphore_release_count); } // Note(allen): post_job puts the job on the unbounded queue. // The unbounded queue is entirely managed by the main thread. // The thread safe queue is bounded in size so the unbounded // queue is periodically flushed into the direct work queue. internal u32 post_job(Thread_Group *group, Work_Queue *direct_queue, Job_Data job){ Unbounded_Work_Queue *queue = &group->queue; u32 result = queue->next_job_id++; if (queue->count >= queue->max){ u32 new_max = round_up_pot_u32(queue->count + 1); Full_Job_Data *new_jobs = (Full_Job_Data*)system_memory_allocate(new_max*sizeof(Full_Job_Data)); memcpy(new_jobs, queue->jobs, queue->count); system_memory_free(queue->jobs, queue->max*sizeof(Full_Job_Data)); queue->jobs = new_jobs; queue->max = new_max; } Full_Job_Data full_job = {0}; full_job.job = job; full_job.running_thread = THREAD_NOT_ASSIGNED; full_job.id = result; queue->jobs[queue->count++] = full_job; flush_to_direct_queue(queue, direct_queue, group->count); return(result); } internal void cancel_job(Thread_Group *group, Work_Queue *queue, u32 job_id){ Unbounded_Work_Queue *source_queue = &group->queue; b32 handled_in_unbounded = false; if (source_queue->skip < source_queue->count){ Full_Job_Data *first_job = source_queue->jobs + source_queue->skip; if (first_job->id <= job_id){ u32 index = source_queue->skip + (job_id - first_job->id); Full_Job_Data *job = source_queue->jobs + index; job->running_thread = 0; handled_in_unbounded = true; } } if (!handled_in_unbounded){ Full_Job_Data *job = queue->jobs + (job_id % QUEUE_WRAP); Assert(job->id == job_id); u32 thread_id = InterlockedCompareExchange(&job->running_thread, 0, THREAD_NOT_ASSIGNED); if (thread_id != THREAD_NOT_ASSIGNED && thread_id != 0){ i32 thread_index = thread_id - 1; i32 cancel_lock = group->cancel_lock0 + thread_index; i32 cancel_cv = group->cancel_cv0 + thread_index; Thread_Context *thread = group->threads + thread_index; system_acquire_lock(cancel_lock); thread->cancel = true; system_release_lock(FRAME_LOCK); do{ system_wait_cv(cancel_lock, cancel_cv); }while (thread->cancel); system_acquire_lock(FRAME_LOCK); system_release_lock(cancel_lock); } } } internal b32 check_cancel_status(Thread_Group *group, Thread_Context *thread){ b32 result = false; i32 thread_index = thread->id - 1; i32 cancel_lock = group->cancel_lock0 + thread_index; system_acquire_lock(cancel_lock); if (thread->cancel){ result = true; } system_release_lock(cancel_lock); return(result); } internal void grow_thread_memory(Thread_Memory *memory){ system_acquire_lock(CANCEL_LOCK0 + memory->id - 1); void *old_data = memory->data; i32 old_size = memory->size; i32 new_size = l_round_up_i32(memory->size*2, KB(4)); memory->data = system_memory_allocate(new_size); memory->size = new_size; if (old_data != 0){ memcpy(memory->data, old_data, old_size); system_memory_free(old_data, old_size); } system_release_lock(CANCEL_LOCK0 + memory->id - 1); } internal void dbg_get_thread_states(Thread_Group *group, Work_Queue *queue, b8 *running, i32 *pending){ Unbounded_Work_Queue *source_queue = &group->queue; u32 write = queue->write_position; u32 read = queue->read_position; if (write < read){ write += QUEUE_WRAP; } *pending = (i32)(write - read) + source_queue->count - source_queue->skip; for (i32 i = 0; i < group->count; ++i){ running[i] = (group->threads[i].running != 0); } } // BOTTOM