To execute a threadpool_object callbacks.
Signed-off-by: Rémi Bernon rbernon@codeweavers.com --- dlls/ntdll/threadpool.c | 303 +++++++++++++++++++++------------------- 1 file changed, 160 insertions(+), 143 deletions(-)
diff --git a/dlls/ntdll/threadpool.c b/dlls/ntdll/threadpool.c index bef9a11905e..e9ca32a3be7 100644 --- a/dlls/ntdll/threadpool.c +++ b/dlls/ntdll/threadpool.c @@ -372,6 +372,7 @@ static inline struct threadpool_instance *impl_from_TP_CALLBACK_INSTANCE( TP_CAL
static void CALLBACK threadpool_worker_proc( void *param ); static void tp_object_submit( struct threadpool_object *object, BOOL signaled ); +static void tp_object_execute( struct threadpool_object *object ); static void tp_object_prepare_shutdown( struct threadpool_object *object ); static BOOL tp_object_release( struct threadpool_object *object ); static struct threadpool *default_threadpool = NULL; @@ -2095,18 +2096,171 @@ static struct list *threadpool_get_next_item( const struct threadpool *pool ) }
/*********************************************************************** - * threadpool_worker_proc (internal) + * tp_object_execute (internal) + * + * Executes a threadpool object callback, object->pool->cs has to be + * held. */ -static void CALLBACK threadpool_worker_proc( void *param ) +static void tp_object_execute( struct threadpool_object *object ) { TP_CALLBACK_INSTANCE *callback_instance; struct threadpool_instance instance; struct io_completion completion; - struct threadpool *pool = param; + struct threadpool *pool = object->pool; TP_WAIT_RESULT wait_result = 0; + NTSTATUS status; + + object->num_pending_callbacks--; + + /* For wait objects check if they were signaled or have timed out. */ + if (object->type == TP_OBJECT_TYPE_WAIT) + { + wait_result = object->u.wait.signaled ? WAIT_OBJECT_0 : WAIT_TIMEOUT; + if (wait_result == WAIT_OBJECT_0) object->u.wait.signaled--; + } + else if (object->type == TP_OBJECT_TYPE_IO) + { + assert( object->u.io.completion_count ); + completion = object->u.io.completions[--object->u.io.completion_count]; + object->u.io.pending_count--; + } + + /* Leave critical section and do the actual callback. */ + object->num_associated_callbacks++; + object->num_running_callbacks++; + RtlLeaveCriticalSection( &pool->cs ); + + /* Initialize threadpool instance struct. */ + callback_instance = (TP_CALLBACK_INSTANCE *)&instance; + instance.object = object; + instance.threadid = GetCurrentThreadId(); + instance.associated = TRUE; + instance.may_run_long = object->may_run_long; + instance.cleanup.critical_section = NULL; + instance.cleanup.mutex = NULL; + instance.cleanup.semaphore = NULL; + instance.cleanup.semaphore_count = 0; + instance.cleanup.event = NULL; + instance.cleanup.library = NULL; + + switch (object->type) + { + case TP_OBJECT_TYPE_SIMPLE: + { + TRACE( "executing simple callback %p(%p, %p)\n", + object->u.simple.callback, callback_instance, object->userdata ); + object->u.simple.callback( callback_instance, object->userdata ); + TRACE( "callback %p returned\n", object->u.simple.callback ); + break; + } + + case TP_OBJECT_TYPE_WORK: + { + TRACE( "executing work callback %p(%p, %p, %p)\n", + object->u.work.callback, callback_instance, object->userdata, object ); + object->u.work.callback( callback_instance, object->userdata, (TP_WORK *)object ); + TRACE( "callback %p returned\n", object->u.work.callback ); + break; + } + + case TP_OBJECT_TYPE_TIMER: + { + TRACE( "executing timer callback %p(%p, %p, %p)\n", + object->u.timer.callback, callback_instance, object->userdata, object ); + object->u.timer.callback( callback_instance, object->userdata, (TP_TIMER *)object ); + TRACE( "callback %p returned\n", object->u.timer.callback ); + break; + } + + case TP_OBJECT_TYPE_WAIT: + { + TRACE( "executing wait callback %p(%p, %p, %p, %u)\n", + object->u.wait.callback, callback_instance, object->userdata, object, wait_result ); + object->u.wait.callback( callback_instance, object->userdata, (TP_WAIT *)object, wait_result ); + TRACE( "callback %p returned\n", object->u.wait.callback ); + break; + } + + case TP_OBJECT_TYPE_IO: + { + TRACE( "executing I/O callback %p(%p, %p, %#lx, %p, %p)\n", + object->u.io.callback, callback_instance, object->userdata, + completion.cvalue, &completion.iosb, (TP_IO *)object ); + object->u.io.callback( callback_instance, object->userdata, + (void *)completion.cvalue, &completion.iosb, (TP_IO *)object ); + TRACE( "callback %p returned\n", object->u.io.callback ); + break; + } + + default: + assert(0); + break; + } + + /* Execute finalization callback. */ + if (object->finalization_callback) + { + TRACE( "executing finalization callback %p(%p, %p)\n", + object->finalization_callback, callback_instance, object->userdata ); + object->finalization_callback( callback_instance, object->userdata ); + TRACE( "callback %p returned\n", object->finalization_callback ); + } + + /* Execute cleanup tasks. */ + if (instance.cleanup.critical_section) + { + RtlLeaveCriticalSection( instance.cleanup.critical_section ); + } + if (instance.cleanup.mutex) + { + status = NtReleaseMutant( instance.cleanup.mutex, NULL ); + if (status != STATUS_SUCCESS) goto skip_cleanup; + } + if (instance.cleanup.semaphore) + { + status = NtReleaseSemaphore( instance.cleanup.semaphore, instance.cleanup.semaphore_count, NULL ); + if (status != STATUS_SUCCESS) goto skip_cleanup; + } + if (instance.cleanup.event) + { + status = NtSetEvent( instance.cleanup.event, NULL ); + if (status != STATUS_SUCCESS) goto skip_cleanup; + } + if (instance.cleanup.library) + { + LdrUnloadDll( instance.cleanup.library ); + } + +skip_cleanup: + RtlEnterCriticalSection( &pool->cs ); + + /* Simple callbacks are automatically shutdown after execution. */ + if (object->type == TP_OBJECT_TYPE_SIMPLE) + { + tp_object_prepare_shutdown( object ); + object->shutdown = TRUE; + } + + object->num_running_callbacks--; + if (object_is_finished( object, TRUE )) + RtlWakeAllConditionVariable( &object->group_finished_event ); + + if (instance.associated) + { + object->num_associated_callbacks--; + if (object_is_finished( object, FALSE )) + RtlWakeAllConditionVariable( &object->finished_event ); + } +} + +/*********************************************************************** + * threadpool_worker_proc (internal) + */ +static void CALLBACK threadpool_worker_proc( void *param ) +{ + struct threadpool *pool = param; LARGE_INTEGER timeout; struct list *ptr; - NTSTATUS status;
TRACE( "starting worker thread for pool %p\n", pool );
@@ -2121,151 +2275,14 @@ static void CALLBACK threadpool_worker_proc( void *param ) /* If further pending callbacks are queued, move the work item to * the end of the pool list. Otherwise remove it from the pool. */ list_remove( &object->pool_entry ); - if (--object->num_pending_callbacks) + if (object->num_pending_callbacks > 1) tp_object_prio_queue( object );
- /* For wait objects check if they were signaled or have timed out. */ - if (object->type == TP_OBJECT_TYPE_WAIT) - { - wait_result = object->u.wait.signaled ? WAIT_OBJECT_0 : WAIT_TIMEOUT; - if (wait_result == WAIT_OBJECT_0) object->u.wait.signaled--; - } - else if (object->type == TP_OBJECT_TYPE_IO) - { - assert( object->u.io.completion_count ); - completion = object->u.io.completions[--object->u.io.completion_count]; - object->u.io.pending_count--; - } + tp_object_execute( object );
- /* Leave critical section and do the actual callback. */ - object->num_associated_callbacks++; - object->num_running_callbacks++; - RtlLeaveCriticalSection( &pool->cs ); - - /* Initialize threadpool instance struct. */ - callback_instance = (TP_CALLBACK_INSTANCE *)&instance; - instance.object = object; - instance.threadid = GetCurrentThreadId(); - instance.associated = TRUE; - instance.may_run_long = object->may_run_long; - instance.cleanup.critical_section = NULL; - instance.cleanup.mutex = NULL; - instance.cleanup.semaphore = NULL; - instance.cleanup.semaphore_count = 0; - instance.cleanup.event = NULL; - instance.cleanup.library = NULL; - - switch (object->type) - { - case TP_OBJECT_TYPE_SIMPLE: - { - TRACE( "executing simple callback %p(%p, %p)\n", - object->u.simple.callback, callback_instance, object->userdata ); - object->u.simple.callback( callback_instance, object->userdata ); - TRACE( "callback %p returned\n", object->u.simple.callback ); - break; - } - - case TP_OBJECT_TYPE_WORK: - { - TRACE( "executing work callback %p(%p, %p, %p)\n", - object->u.work.callback, callback_instance, object->userdata, object ); - object->u.work.callback( callback_instance, object->userdata, (TP_WORK *)object ); - TRACE( "callback %p returned\n", object->u.work.callback ); - break; - } - - case TP_OBJECT_TYPE_TIMER: - { - TRACE( "executing timer callback %p(%p, %p, %p)\n", - object->u.timer.callback, callback_instance, object->userdata, object ); - object->u.timer.callback( callback_instance, object->userdata, (TP_TIMER *)object ); - TRACE( "callback %p returned\n", object->u.timer.callback ); - break; - } - - case TP_OBJECT_TYPE_WAIT: - { - TRACE( "executing wait callback %p(%p, %p, %p, %u)\n", - object->u.wait.callback, callback_instance, object->userdata, object, wait_result ); - object->u.wait.callback( callback_instance, object->userdata, (TP_WAIT *)object, wait_result ); - TRACE( "callback %p returned\n", object->u.wait.callback ); - break; - } - - case TP_OBJECT_TYPE_IO: - { - TRACE( "executing I/O callback %p(%p, %p, %#lx, %p, %p)\n", - object->u.io.callback, callback_instance, object->userdata, - completion.cvalue, &completion.iosb, (TP_IO *)object ); - object->u.io.callback( callback_instance, object->userdata, - (void *)completion.cvalue, &completion.iosb, (TP_IO *)object ); - TRACE( "callback %p returned\n", object->u.io.callback ); - break; - } - - default: - assert(0); - break; - } - - /* Execute finalization callback. */ - if (object->finalization_callback) - { - TRACE( "executing finalization callback %p(%p, %p)\n", - object->finalization_callback, callback_instance, object->userdata ); - object->finalization_callback( callback_instance, object->userdata ); - TRACE( "callback %p returned\n", object->finalization_callback ); - } - - /* Execute cleanup tasks. */ - if (instance.cleanup.critical_section) - { - RtlLeaveCriticalSection( instance.cleanup.critical_section ); - } - if (instance.cleanup.mutex) - { - status = NtReleaseMutant( instance.cleanup.mutex, NULL ); - if (status != STATUS_SUCCESS) goto skip_cleanup; - } - if (instance.cleanup.semaphore) - { - status = NtReleaseSemaphore( instance.cleanup.semaphore, instance.cleanup.semaphore_count, NULL ); - if (status != STATUS_SUCCESS) goto skip_cleanup; - } - if (instance.cleanup.event) - { - status = NtSetEvent( instance.cleanup.event, NULL ); - if (status != STATUS_SUCCESS) goto skip_cleanup; - } - if (instance.cleanup.library) - { - LdrUnloadDll( instance.cleanup.library ); - } - - skip_cleanup: - RtlEnterCriticalSection( &pool->cs ); assert(pool->num_busy_workers); pool->num_busy_workers--;
- /* Simple callbacks are automatically shutdown after execution. */ - if (object->type == TP_OBJECT_TYPE_SIMPLE) - { - tp_object_prepare_shutdown( object ); - object->shutdown = TRUE; - } - - object->num_running_callbacks--; - if (object_is_finished( object, TRUE )) - RtlWakeAllConditionVariable( &object->group_finished_event ); - - if (instance.associated) - { - object->num_associated_callbacks--; - if (object_is_finished( object, FALSE )) - RtlWakeAllConditionVariable( &object->finished_event ); - } - tp_object_release( object ); }