Wine-Bug: https://bugs.winehq.org/show_bug.cgi?id=47843 Signed-off-by: Rémi Bernon rbernon@codeweavers.com --- dlls/ntdll/threadpool.c | 432 ++++++++++++++++++++-------------------- 1 file changed, 216 insertions(+), 216 deletions(-)
diff --git a/dlls/ntdll/threadpool.c b/dlls/ntdll/threadpool.c index 1d64bd82bf4..db645118059 100644 --- a/dlls/ntdll/threadpool.c +++ b/dlls/ntdll/threadpool.c @@ -553,222 +553,6 @@ static inline PLARGE_INTEGER get_nt_timeout( PLARGE_INTEGER pTime, ULONG timeout return pTime; }
-static void delete_wait_work_item(struct wait_work_item *wait_work_item) -{ - NtClose( wait_work_item->CancelEvent ); - RtlFreeHeap( GetProcessHeap(), 0, wait_work_item ); -} - -static DWORD CALLBACK wait_thread_proc(LPVOID Arg) -{ - struct wait_work_item *wait_work_item = Arg; - NTSTATUS status; - BOOLEAN alertable = (wait_work_item->Flags & WT_EXECUTEINIOTHREAD) != 0; - HANDLE handles[2] = { wait_work_item->Object, wait_work_item->CancelEvent }; - LARGE_INTEGER timeout; - HANDLE completion_event; - - TRACE("\n"); - - while (TRUE) - { - status = NtWaitForMultipleObjects( 2, handles, TRUE, alertable, - get_nt_timeout( &timeout, wait_work_item->Milliseconds ) ); - if (status == STATUS_WAIT_0 || status == STATUS_TIMEOUT) - { - BOOLEAN TimerOrWaitFired; - - if (status == STATUS_WAIT_0) - { - TRACE( "object %p signaled, calling callback %p with context %p\n", - wait_work_item->Object, wait_work_item->Callback, - wait_work_item->Context ); - TimerOrWaitFired = FALSE; - } - else - { - TRACE( "wait for object %p timed out, calling callback %p with context %p\n", - wait_work_item->Object, wait_work_item->Callback, - wait_work_item->Context ); - TimerOrWaitFired = TRUE; - } - InterlockedExchange( &wait_work_item->CallbackInProgress, TRUE ); - if (wait_work_item->CompletionEvent) - { - TRACE( "Work has been canceled.\n" ); - break; - } - wait_work_item->Callback( wait_work_item->Context, TimerOrWaitFired ); - InterlockedExchange( &wait_work_item->CallbackInProgress, FALSE ); - - if (wait_work_item->Flags & WT_EXECUTEONLYONCE) - break; - } - else if (status != STATUS_USER_APC) - break; - } - - - if (InterlockedIncrement( &wait_work_item->DeleteCount ) == 2 ) - { - completion_event = wait_work_item->CompletionEvent; - delete_wait_work_item( wait_work_item ); - if (completion_event && completion_event != INVALID_HANDLE_VALUE) - NtSetEvent( completion_event, NULL ); - } - - return 0; -} - -/*********************************************************************** - * RtlRegisterWait (NTDLL.@) - * - * Registers a wait for a handle to become signaled. - * - * PARAMS - * NewWaitObject [I] Handle to the new wait object. Use RtlDeregisterWait() to free it. - * Object [I] Object to wait to become signaled. - * Callback [I] Callback function to execute when the wait times out or the handle is signaled. - * Context [I] Context to pass to the callback function when it is executed. - * Milliseconds [I] Number of milliseconds to wait before timing out. - * Flags [I] Flags. See notes. - * - * RETURNS - * Success: STATUS_SUCCESS. - * Failure: Any NTSTATUS code. - * - * NOTES - * Flags can be one or more of the following: - *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread. - *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread. - *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent. - *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time. - *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token. - */ -NTSTATUS WINAPI RtlRegisterWait(PHANDLE NewWaitObject, HANDLE Object, - RTL_WAITORTIMERCALLBACKFUNC Callback, - PVOID Context, ULONG Milliseconds, ULONG Flags) -{ - struct wait_work_item *wait_work_item; - NTSTATUS status; - - TRACE( "(%p, %p, %p, %p, %d, 0x%x)\n", NewWaitObject, Object, Callback, Context, Milliseconds, Flags ); - - wait_work_item = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*wait_work_item) ); - if (!wait_work_item) - return STATUS_NO_MEMORY; - - wait_work_item->Object = Object; - wait_work_item->Callback = Callback; - wait_work_item->Context = Context; - wait_work_item->Milliseconds = Milliseconds; - wait_work_item->Flags = Flags; - wait_work_item->CallbackInProgress = FALSE; - wait_work_item->DeleteCount = 0; - wait_work_item->CompletionEvent = NULL; - - status = NtCreateEvent( &wait_work_item->CancelEvent, EVENT_ALL_ACCESS, NULL, NotificationEvent, FALSE ); - if (status != STATUS_SUCCESS) - { - RtlFreeHeap( GetProcessHeap(), 0, wait_work_item ); - return status; - } - - Flags = Flags & (WT_EXECUTEINIOTHREAD | WT_EXECUTEINPERSISTENTTHREAD | - WT_EXECUTELONGFUNCTION | WT_TRANSFER_IMPERSONATION); - status = RtlQueueWorkItem( wait_thread_proc, wait_work_item, Flags ); - if (status != STATUS_SUCCESS) - { - delete_wait_work_item( wait_work_item ); - return status; - } - - *NewWaitObject = wait_work_item; - return status; -} - -/*********************************************************************** - * RtlDeregisterWaitEx (NTDLL.@) - * - * Cancels a wait operation and frees the resources associated with calling - * RtlRegisterWait(). - * - * PARAMS - * WaitObject [I] Handle to the wait object to free. - * - * RETURNS - * Success: STATUS_SUCCESS. - * Failure: Any NTSTATUS code. - */ -NTSTATUS WINAPI RtlDeregisterWaitEx(HANDLE WaitHandle, HANDLE CompletionEvent) -{ - struct wait_work_item *wait_work_item = WaitHandle; - NTSTATUS status; - HANDLE LocalEvent = NULL; - int CallbackInProgress; - - TRACE( "(%p %p)\n", WaitHandle, CompletionEvent ); - - if (WaitHandle == NULL) - return STATUS_INVALID_HANDLE; - - InterlockedExchangePointer( &wait_work_item->CompletionEvent, INVALID_HANDLE_VALUE ); - CallbackInProgress = wait_work_item->CallbackInProgress; - TRACE( "callback in progress %u\n", CallbackInProgress ); - if (CompletionEvent == INVALID_HANDLE_VALUE || !CallbackInProgress) - { - status = NtCreateEvent( &LocalEvent, EVENT_ALL_ACCESS, NULL, NotificationEvent, FALSE ); - if (status != STATUS_SUCCESS) - return status; - InterlockedExchangePointer( &wait_work_item->CompletionEvent, LocalEvent ); - } - else if (CompletionEvent != NULL) - { - InterlockedExchangePointer( &wait_work_item->CompletionEvent, CompletionEvent ); - } - - NtSetEvent( wait_work_item->CancelEvent, NULL ); - - if (InterlockedIncrement( &wait_work_item->DeleteCount ) == 2 ) - { - status = STATUS_SUCCESS; - delete_wait_work_item( wait_work_item ); - } - else if (LocalEvent) - { - TRACE( "Waiting for completion event\n" ); - NtWaitForSingleObject( LocalEvent, FALSE, NULL ); - status = STATUS_SUCCESS; - } - else - { - status = STATUS_PENDING; - } - - if (LocalEvent) - NtClose( LocalEvent ); - - return status; -} - -/*********************************************************************** - * RtlDeregisterWait (NTDLL.@) - * - * Cancels a wait operation and frees the resources associated with calling - * RtlRegisterWait(). - * - * PARAMS - * WaitObject [I] Handle to the wait object to free. - * - * RETURNS - * Success: STATUS_SUCCESS. - * Failure: Any NTSTATUS code. - */ -NTSTATUS WINAPI RtlDeregisterWait(HANDLE WaitHandle) -{ - return RtlDeregisterWaitEx(WaitHandle, NULL); -} -
/************************** Timer Queue Impl **************************/
@@ -3350,3 +3134,219 @@ NTSTATUS WINAPI TpQueryPoolStackInformation( TP_POOL *pool, TP_POOL_STACK_INFORM
return STATUS_SUCCESS; } + +static void delete_wait_work_item(struct wait_work_item *wait_work_item) +{ + NtClose( wait_work_item->CancelEvent ); + RtlFreeHeap( GetProcessHeap(), 0, wait_work_item ); +} + +static DWORD CALLBACK wait_thread_proc(LPVOID Arg) +{ + struct wait_work_item *wait_work_item = Arg; + NTSTATUS status; + BOOLEAN alertable = (wait_work_item->Flags & WT_EXECUTEINIOTHREAD) != 0; + HANDLE handles[2] = { wait_work_item->Object, wait_work_item->CancelEvent }; + LARGE_INTEGER timeout; + HANDLE completion_event; + + TRACE("\n"); + + while (TRUE) + { + status = NtWaitForMultipleObjects( 2, handles, TRUE, alertable, + get_nt_timeout( &timeout, wait_work_item->Milliseconds ) ); + if (status == STATUS_WAIT_0 || status == STATUS_TIMEOUT) + { + BOOLEAN TimerOrWaitFired; + + if (status == STATUS_WAIT_0) + { + TRACE( "object %p signaled, calling callback %p with context %p\n", + wait_work_item->Object, wait_work_item->Callback, + wait_work_item->Context ); + TimerOrWaitFired = FALSE; + } + else + { + TRACE( "wait for object %p timed out, calling callback %p with context %p\n", + wait_work_item->Object, wait_work_item->Callback, + wait_work_item->Context ); + TimerOrWaitFired = TRUE; + } + InterlockedExchange( &wait_work_item->CallbackInProgress, TRUE ); + if (wait_work_item->CompletionEvent) + { + TRACE( "Work has been canceled.\n" ); + break; + } + wait_work_item->Callback( wait_work_item->Context, TimerOrWaitFired ); + InterlockedExchange( &wait_work_item->CallbackInProgress, FALSE ); + + if (wait_work_item->Flags & WT_EXECUTEONLYONCE) + break; + } + else if (status != STATUS_USER_APC) + break; + } + + + if (InterlockedIncrement( &wait_work_item->DeleteCount ) == 2 ) + { + completion_event = wait_work_item->CompletionEvent; + delete_wait_work_item( wait_work_item ); + if (completion_event && completion_event != INVALID_HANDLE_VALUE) + NtSetEvent( completion_event, NULL ); + } + + return 0; +} + +/*********************************************************************** + * RtlRegisterWait (NTDLL.@) + * + * Registers a wait for a handle to become signaled. + * + * PARAMS + * NewWaitObject [I] Handle to the new wait object. Use RtlDeregisterWait() to free it. + * Object [I] Object to wait to become signaled. + * Callback [I] Callback function to execute when the wait times out or the handle is signaled. + * Context [I] Context to pass to the callback function when it is executed. + * Milliseconds [I] Number of milliseconds to wait before timing out. + * Flags [I] Flags. See notes. + * + * RETURNS + * Success: STATUS_SUCCESS. + * Failure: Any NTSTATUS code. + * + * NOTES + * Flags can be one or more of the following: + *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread. + *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread. + *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent. + *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time. + *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token. + */ +NTSTATUS WINAPI RtlRegisterWait(PHANDLE NewWaitObject, HANDLE Object, + RTL_WAITORTIMERCALLBACKFUNC Callback, + PVOID Context, ULONG Milliseconds, ULONG Flags) +{ + struct wait_work_item *wait_work_item; + NTSTATUS status; + + TRACE( "(%p, %p, %p, %p, %d, 0x%x)\n", NewWaitObject, Object, Callback, Context, Milliseconds, Flags ); + + wait_work_item = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*wait_work_item) ); + if (!wait_work_item) + return STATUS_NO_MEMORY; + + wait_work_item->Object = Object; + wait_work_item->Callback = Callback; + wait_work_item->Context = Context; + wait_work_item->Milliseconds = Milliseconds; + wait_work_item->Flags = Flags; + wait_work_item->CallbackInProgress = FALSE; + wait_work_item->DeleteCount = 0; + wait_work_item->CompletionEvent = NULL; + + status = NtCreateEvent( &wait_work_item->CancelEvent, EVENT_ALL_ACCESS, NULL, NotificationEvent, FALSE ); + if (status != STATUS_SUCCESS) + { + RtlFreeHeap( GetProcessHeap(), 0, wait_work_item ); + return status; + } + + Flags = Flags & (WT_EXECUTEINIOTHREAD | WT_EXECUTEINPERSISTENTTHREAD | + WT_EXECUTELONGFUNCTION | WT_TRANSFER_IMPERSONATION); + status = RtlQueueWorkItem( wait_thread_proc, wait_work_item, Flags ); + if (status != STATUS_SUCCESS) + { + delete_wait_work_item( wait_work_item ); + return status; + } + + *NewWaitObject = wait_work_item; + return status; +} + +/*********************************************************************** + * RtlDeregisterWaitEx (NTDLL.@) + * + * Cancels a wait operation and frees the resources associated with calling + * RtlRegisterWait(). + * + * PARAMS + * WaitObject [I] Handle to the wait object to free. + * + * RETURNS + * Success: STATUS_SUCCESS. + * Failure: Any NTSTATUS code. + */ +NTSTATUS WINAPI RtlDeregisterWaitEx(HANDLE WaitHandle, HANDLE CompletionEvent) +{ + struct wait_work_item *wait_work_item = WaitHandle; + NTSTATUS status; + HANDLE LocalEvent = NULL; + int CallbackInProgress; + + TRACE( "(%p %p)\n", WaitHandle, CompletionEvent ); + + if (WaitHandle == NULL) + return STATUS_INVALID_HANDLE; + + InterlockedExchangePointer( &wait_work_item->CompletionEvent, INVALID_HANDLE_VALUE ); + CallbackInProgress = wait_work_item->CallbackInProgress; + TRACE( "callback in progress %u\n", CallbackInProgress ); + if (CompletionEvent == INVALID_HANDLE_VALUE || !CallbackInProgress) + { + status = NtCreateEvent( &LocalEvent, EVENT_ALL_ACCESS, NULL, NotificationEvent, FALSE ); + if (status != STATUS_SUCCESS) + return status; + InterlockedExchangePointer( &wait_work_item->CompletionEvent, LocalEvent ); + } + else if (CompletionEvent != NULL) + { + InterlockedExchangePointer( &wait_work_item->CompletionEvent, CompletionEvent ); + } + + NtSetEvent( wait_work_item->CancelEvent, NULL ); + + if (InterlockedIncrement( &wait_work_item->DeleteCount ) == 2 ) + { + status = STATUS_SUCCESS; + delete_wait_work_item( wait_work_item ); + } + else if (LocalEvent) + { + TRACE( "Waiting for completion event\n" ); + NtWaitForSingleObject( LocalEvent, FALSE, NULL ); + status = STATUS_SUCCESS; + } + else + { + status = STATUS_PENDING; + } + + if (LocalEvent) + NtClose( LocalEvent ); + + return status; +} + +/*********************************************************************** + * RtlDeregisterWait (NTDLL.@) + * + * Cancels a wait operation and frees the resources associated with calling + * RtlRegisterWait(). + * + * PARAMS + * WaitObject [I] Handle to the wait object to free. + * + * RETURNS + * Success: STATUS_SUCCESS. + * Failure: Any NTSTATUS code. + */ +NTSTATUS WINAPI RtlDeregisterWait(HANDLE WaitHandle) +{ + return RtlDeregisterWaitEx(WaitHandle, NULL); +}
It'll be submitted eventually, no need to force it and it makes support for WT_EXECUTEINWAITTHREAD flag harder.
Wine-Bug: https://bugs.winehq.org/show_bug.cgi?id=47843 Signed-off-by: Rémi Bernon rbernon@codeweavers.com --- dlls/ntdll/threadpool.c | 9 --------- 1 file changed, 9 deletions(-)
diff --git a/dlls/ntdll/threadpool.c b/dlls/ntdll/threadpool.c index db645118059..bef9a11905e 100644 --- a/dlls/ntdll/threadpool.c +++ b/dlls/ntdll/threadpool.c @@ -2941,7 +2941,6 @@ VOID WINAPI TpSetWait( TP_WAIT *wait, HANDLE handle, LARGE_INTEGER *timeout ) { struct threadpool_object *this = impl_from_TP_WAIT( wait ); ULONGLONG timestamp = TIMEOUT_INFINITE; - BOOL submit_wait = FALSE;
TRACE( "%p %p %p\n", wait, handle, timeout );
@@ -2965,11 +2964,6 @@ VOID WINAPI TpSetWait( TP_WAIT *wait, HANDLE handle, LARGE_INTEGER *timeout ) NtQuerySystemTime( &now ); timestamp = now.QuadPart - timestamp; } - else if (!timestamp) - { - submit_wait = TRUE; - handle = NULL; - } }
/* Add wait object back into one of the queues. */ @@ -2990,9 +2984,6 @@ VOID WINAPI TpSetWait( TP_WAIT *wait, HANDLE handle, LARGE_INTEGER *timeout ) }
RtlLeaveCriticalSection( &waitqueue.cs ); - - if (submit_wait) - tp_object_submit( this, FALSE ); }
/***********************************************************************
To execute a threadpool_object callbacks.
Wine-Bug: https://bugs.winehq.org/show_bug.cgi?id=47843 Signed-off-by: Rémi Bernon rbernon@codeweavers.com --- dlls/ntdll/threadpool.c | 303 +++++++++++++++++++++------------------- 1 file changed, 160 insertions(+), 143 deletions(-)
diff --git a/dlls/ntdll/threadpool.c b/dlls/ntdll/threadpool.c index bef9a11905e..e9ca32a3be7 100644 --- a/dlls/ntdll/threadpool.c +++ b/dlls/ntdll/threadpool.c @@ -372,6 +372,7 @@ static inline struct threadpool_instance *impl_from_TP_CALLBACK_INSTANCE( TP_CAL
static void CALLBACK threadpool_worker_proc( void *param ); static void tp_object_submit( struct threadpool_object *object, BOOL signaled ); +static void tp_object_execute( struct threadpool_object *object ); static void tp_object_prepare_shutdown( struct threadpool_object *object ); static BOOL tp_object_release( struct threadpool_object *object ); static struct threadpool *default_threadpool = NULL; @@ -2095,18 +2096,171 @@ static struct list *threadpool_get_next_item( const struct threadpool *pool ) }
/*********************************************************************** - * threadpool_worker_proc (internal) + * tp_object_execute (internal) + * + * Executes a threadpool object callback, object->pool->cs has to be + * held. */ -static void CALLBACK threadpool_worker_proc( void *param ) +static void tp_object_execute( struct threadpool_object *object ) { TP_CALLBACK_INSTANCE *callback_instance; struct threadpool_instance instance; struct io_completion completion; - struct threadpool *pool = param; + struct threadpool *pool = object->pool; TP_WAIT_RESULT wait_result = 0; + NTSTATUS status; + + object->num_pending_callbacks--; + + /* For wait objects check if they were signaled or have timed out. */ + if (object->type == TP_OBJECT_TYPE_WAIT) + { + wait_result = object->u.wait.signaled ? WAIT_OBJECT_0 : WAIT_TIMEOUT; + if (wait_result == WAIT_OBJECT_0) object->u.wait.signaled--; + } + else if (object->type == TP_OBJECT_TYPE_IO) + { + assert( object->u.io.completion_count ); + completion = object->u.io.completions[--object->u.io.completion_count]; + object->u.io.pending_count--; + } + + /* Leave critical section and do the actual callback. */ + object->num_associated_callbacks++; + object->num_running_callbacks++; + RtlLeaveCriticalSection( &pool->cs ); + + /* Initialize threadpool instance struct. */ + callback_instance = (TP_CALLBACK_INSTANCE *)&instance; + instance.object = object; + instance.threadid = GetCurrentThreadId(); + instance.associated = TRUE; + instance.may_run_long = object->may_run_long; + instance.cleanup.critical_section = NULL; + instance.cleanup.mutex = NULL; + instance.cleanup.semaphore = NULL; + instance.cleanup.semaphore_count = 0; + instance.cleanup.event = NULL; + instance.cleanup.library = NULL; + + switch (object->type) + { + case TP_OBJECT_TYPE_SIMPLE: + { + TRACE( "executing simple callback %p(%p, %p)\n", + object->u.simple.callback, callback_instance, object->userdata ); + object->u.simple.callback( callback_instance, object->userdata ); + TRACE( "callback %p returned\n", object->u.simple.callback ); + break; + } + + case TP_OBJECT_TYPE_WORK: + { + TRACE( "executing work callback %p(%p, %p, %p)\n", + object->u.work.callback, callback_instance, object->userdata, object ); + object->u.work.callback( callback_instance, object->userdata, (TP_WORK *)object ); + TRACE( "callback %p returned\n", object->u.work.callback ); + break; + } + + case TP_OBJECT_TYPE_TIMER: + { + TRACE( "executing timer callback %p(%p, %p, %p)\n", + object->u.timer.callback, callback_instance, object->userdata, object ); + object->u.timer.callback( callback_instance, object->userdata, (TP_TIMER *)object ); + TRACE( "callback %p returned\n", object->u.timer.callback ); + break; + } + + case TP_OBJECT_TYPE_WAIT: + { + TRACE( "executing wait callback %p(%p, %p, %p, %u)\n", + object->u.wait.callback, callback_instance, object->userdata, object, wait_result ); + object->u.wait.callback( callback_instance, object->userdata, (TP_WAIT *)object, wait_result ); + TRACE( "callback %p returned\n", object->u.wait.callback ); + break; + } + + case TP_OBJECT_TYPE_IO: + { + TRACE( "executing I/O callback %p(%p, %p, %#lx, %p, %p)\n", + object->u.io.callback, callback_instance, object->userdata, + completion.cvalue, &completion.iosb, (TP_IO *)object ); + object->u.io.callback( callback_instance, object->userdata, + (void *)completion.cvalue, &completion.iosb, (TP_IO *)object ); + TRACE( "callback %p returned\n", object->u.io.callback ); + break; + } + + default: + assert(0); + break; + } + + /* Execute finalization callback. */ + if (object->finalization_callback) + { + TRACE( "executing finalization callback %p(%p, %p)\n", + object->finalization_callback, callback_instance, object->userdata ); + object->finalization_callback( callback_instance, object->userdata ); + TRACE( "callback %p returned\n", object->finalization_callback ); + } + + /* Execute cleanup tasks. */ + if (instance.cleanup.critical_section) + { + RtlLeaveCriticalSection( instance.cleanup.critical_section ); + } + if (instance.cleanup.mutex) + { + status = NtReleaseMutant( instance.cleanup.mutex, NULL ); + if (status != STATUS_SUCCESS) goto skip_cleanup; + } + if (instance.cleanup.semaphore) + { + status = NtReleaseSemaphore( instance.cleanup.semaphore, instance.cleanup.semaphore_count, NULL ); + if (status != STATUS_SUCCESS) goto skip_cleanup; + } + if (instance.cleanup.event) + { + status = NtSetEvent( instance.cleanup.event, NULL ); + if (status != STATUS_SUCCESS) goto skip_cleanup; + } + if (instance.cleanup.library) + { + LdrUnloadDll( instance.cleanup.library ); + } + +skip_cleanup: + RtlEnterCriticalSection( &pool->cs ); + + /* Simple callbacks are automatically shutdown after execution. */ + if (object->type == TP_OBJECT_TYPE_SIMPLE) + { + tp_object_prepare_shutdown( object ); + object->shutdown = TRUE; + } + + object->num_running_callbacks--; + if (object_is_finished( object, TRUE )) + RtlWakeAllConditionVariable( &object->group_finished_event ); + + if (instance.associated) + { + object->num_associated_callbacks--; + if (object_is_finished( object, FALSE )) + RtlWakeAllConditionVariable( &object->finished_event ); + } +} + +/*********************************************************************** + * threadpool_worker_proc (internal) + */ +static void CALLBACK threadpool_worker_proc( void *param ) +{ + struct threadpool *pool = param; LARGE_INTEGER timeout; struct list *ptr; - NTSTATUS status;
TRACE( "starting worker thread for pool %p\n", pool );
@@ -2121,151 +2275,14 @@ static void CALLBACK threadpool_worker_proc( void *param ) /* If further pending callbacks are queued, move the work item to * the end of the pool list. Otherwise remove it from the pool. */ list_remove( &object->pool_entry ); - if (--object->num_pending_callbacks) + if (object->num_pending_callbacks > 1) tp_object_prio_queue( object );
- /* For wait objects check if they were signaled or have timed out. */ - if (object->type == TP_OBJECT_TYPE_WAIT) - { - wait_result = object->u.wait.signaled ? WAIT_OBJECT_0 : WAIT_TIMEOUT; - if (wait_result == WAIT_OBJECT_0) object->u.wait.signaled--; - } - else if (object->type == TP_OBJECT_TYPE_IO) - { - assert( object->u.io.completion_count ); - completion = object->u.io.completions[--object->u.io.completion_count]; - object->u.io.pending_count--; - } + tp_object_execute( object );
- /* Leave critical section and do the actual callback. */ - object->num_associated_callbacks++; - object->num_running_callbacks++; - RtlLeaveCriticalSection( &pool->cs ); - - /* Initialize threadpool instance struct. */ - callback_instance = (TP_CALLBACK_INSTANCE *)&instance; - instance.object = object; - instance.threadid = GetCurrentThreadId(); - instance.associated = TRUE; - instance.may_run_long = object->may_run_long; - instance.cleanup.critical_section = NULL; - instance.cleanup.mutex = NULL; - instance.cleanup.semaphore = NULL; - instance.cleanup.semaphore_count = 0; - instance.cleanup.event = NULL; - instance.cleanup.library = NULL; - - switch (object->type) - { - case TP_OBJECT_TYPE_SIMPLE: - { - TRACE( "executing simple callback %p(%p, %p)\n", - object->u.simple.callback, callback_instance, object->userdata ); - object->u.simple.callback( callback_instance, object->userdata ); - TRACE( "callback %p returned\n", object->u.simple.callback ); - break; - } - - case TP_OBJECT_TYPE_WORK: - { - TRACE( "executing work callback %p(%p, %p, %p)\n", - object->u.work.callback, callback_instance, object->userdata, object ); - object->u.work.callback( callback_instance, object->userdata, (TP_WORK *)object ); - TRACE( "callback %p returned\n", object->u.work.callback ); - break; - } - - case TP_OBJECT_TYPE_TIMER: - { - TRACE( "executing timer callback %p(%p, %p, %p)\n", - object->u.timer.callback, callback_instance, object->userdata, object ); - object->u.timer.callback( callback_instance, object->userdata, (TP_TIMER *)object ); - TRACE( "callback %p returned\n", object->u.timer.callback ); - break; - } - - case TP_OBJECT_TYPE_WAIT: - { - TRACE( "executing wait callback %p(%p, %p, %p, %u)\n", - object->u.wait.callback, callback_instance, object->userdata, object, wait_result ); - object->u.wait.callback( callback_instance, object->userdata, (TP_WAIT *)object, wait_result ); - TRACE( "callback %p returned\n", object->u.wait.callback ); - break; - } - - case TP_OBJECT_TYPE_IO: - { - TRACE( "executing I/O callback %p(%p, %p, %#lx, %p, %p)\n", - object->u.io.callback, callback_instance, object->userdata, - completion.cvalue, &completion.iosb, (TP_IO *)object ); - object->u.io.callback( callback_instance, object->userdata, - (void *)completion.cvalue, &completion.iosb, (TP_IO *)object ); - TRACE( "callback %p returned\n", object->u.io.callback ); - break; - } - - default: - assert(0); - break; - } - - /* Execute finalization callback. */ - if (object->finalization_callback) - { - TRACE( "executing finalization callback %p(%p, %p)\n", - object->finalization_callback, callback_instance, object->userdata ); - object->finalization_callback( callback_instance, object->userdata ); - TRACE( "callback %p returned\n", object->finalization_callback ); - } - - /* Execute cleanup tasks. */ - if (instance.cleanup.critical_section) - { - RtlLeaveCriticalSection( instance.cleanup.critical_section ); - } - if (instance.cleanup.mutex) - { - status = NtReleaseMutant( instance.cleanup.mutex, NULL ); - if (status != STATUS_SUCCESS) goto skip_cleanup; - } - if (instance.cleanup.semaphore) - { - status = NtReleaseSemaphore( instance.cleanup.semaphore, instance.cleanup.semaphore_count, NULL ); - if (status != STATUS_SUCCESS) goto skip_cleanup; - } - if (instance.cleanup.event) - { - status = NtSetEvent( instance.cleanup.event, NULL ); - if (status != STATUS_SUCCESS) goto skip_cleanup; - } - if (instance.cleanup.library) - { - LdrUnloadDll( instance.cleanup.library ); - } - - skip_cleanup: - RtlEnterCriticalSection( &pool->cs ); assert(pool->num_busy_workers); pool->num_busy_workers--;
- /* Simple callbacks are automatically shutdown after execution. */ - if (object->type == TP_OBJECT_TYPE_SIMPLE) - { - tp_object_prepare_shutdown( object ); - object->shutdown = TRUE; - } - - object->num_running_callbacks--; - if (object_is_finished( object, TRUE )) - RtlWakeAllConditionVariable( &object->group_finished_event ); - - if (instance.associated) - { - object->num_associated_callbacks--; - if (object_is_finished( object, FALSE )) - RtlWakeAllConditionVariable( &object->finished_event ); - } - tp_object_release( object ); }
This adds several internal flags to TP_WAIT object to support the implementation:
* WT_EXECUTEONLYONCE: waits are re-queued unless it is set.
* WT_EXECUTEINWAITTHREAD: call the callback in the wait thread when set.
* WT_EXECUTEINIOTHREAD: call alertable NtWaitForMultipleObjects in wait thread when set, as well the callback in the wait thread, as for WT_EXECUTEINWAITTHREAD. The worker threads use non-alertable waits otherwise.
Wine-Bug: https://bugs.winehq.org/show_bug.cgi?id=47843 Signed-off-by: Rémi Bernon rbernon@codeweavers.com --- dlls/kernel32/tests/thread.c | 1 - dlls/ntdll/tests/threadpool.c | 6 +- dlls/ntdll/threadpool.c | 264 +++++++++++++--------------------- 3 files changed, 100 insertions(+), 171 deletions(-)
diff --git a/dlls/kernel32/tests/thread.c b/dlls/kernel32/tests/thread.c index b69f69fadaa..e861aa751e9 100644 --- a/dlls/kernel32/tests/thread.c +++ b/dlls/kernel32/tests/thread.c @@ -1367,7 +1367,6 @@ static void CALLBACK waitthread_test_function(PVOID p, BOOLEAN TimerOrWaitFired)
SetEvent(param->trigger_event); ret = WaitForSingleObject(param->wait_event, 100); - todo_wine ok(ret == WAIT_TIMEOUT, "wait should have timed out\n"); SetEvent(param->complete_event); } diff --git a/dlls/ntdll/tests/threadpool.c b/dlls/ntdll/tests/threadpool.c index e18ad4dd76d..6b301eafcbd 100644 --- a/dlls/ntdll/tests/threadpool.c +++ b/dlls/ntdll/tests/threadpool.c @@ -319,7 +319,7 @@ static void test_RtlRegisterWait(void) result = WaitForSingleObject(semaphores[0], 200); ok(result == WAIT_OBJECT_0, "WaitForSingleObject returned %u\n", result); ok(info.userdata == 1, "expected info.userdata = 1, got %u\n", info.userdata); - todo_wine ok(info.threadid == threadid, "unexpected different wait thread id %x\n", info.threadid); + ok(info.threadid == threadid, "unexpected different wait thread id %x\n", info.threadid); result = WaitForSingleObject(semaphores[1], 0); ok(result == WAIT_TIMEOUT, "WaitForSingleObject returned %u\n", result); Sleep(50); @@ -350,7 +350,7 @@ static void test_RtlRegisterWait(void) result = WaitForSingleObject(semaphores[0], 200); ok(result == WAIT_OBJECT_0, "WaitForSingleObject returned %u\n", result); ok(info.userdata == 1, "expected info.userdata = 1, got %u\n", info.userdata); - todo_wine ok(info.threadid == threadid, "unexpected different wait thread id %x\n", info.threadid); + ok(info.threadid == threadid, "unexpected different wait thread id %x\n", info.threadid); result = WaitForSingleObject(semaphores[1], 0); ok(result == WAIT_TIMEOUT, "WaitForSingleObject returned %u\n", result); Sleep(50); @@ -433,7 +433,6 @@ static void test_RtlRegisterWait(void) ok(!status, "RtlDeregisterWaitEx failed with status %x\n", status); ok(info.userdata == 0, "expected info.userdata = 0, got %u\n", info.userdata); result = WaitForSingleObject(event, 200); - todo_wine ok(result == WAIT_OBJECT_0, "WaitForSingleObject returned %u\n", result);
/* test RtlDeregisterWaitEx after wait expired */ @@ -470,7 +469,6 @@ static void test_RtlRegisterWait(void) ok(!status, "RtlDeregisterWaitEx failed with status %x\n", status); ok(info.userdata == 0x10000, "expected info.userdata = 0x10000, got %u\n", info.userdata); result = WaitForSingleObject(event, 200); - todo_wine ok(result == WAIT_OBJECT_0, "WaitForSingleObject returned %u\n", result);
/* test RtlDeregisterWaitEx while callback is running */ diff --git a/dlls/ntdll/threadpool.c b/dlls/ntdll/threadpool.c index e9ca32a3be7..331149855ab 100644 --- a/dlls/ntdll/threadpool.c +++ b/dlls/ntdll/threadpool.c @@ -68,19 +68,6 @@ static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug = 0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_compl_cs") } };
-struct wait_work_item -{ - HANDLE Object; - HANDLE CancelEvent; - WAITORTIMERCALLBACK Callback; - PVOID Context; - ULONG Milliseconds; - ULONG Flags; - HANDLE CompletionEvent; - LONG DeleteCount; - int CallbackInProgress; -}; - struct timer_queue; struct queue_timer { @@ -170,6 +157,7 @@ struct threadpool_object struct list pool_entry; RTL_CONDITION_VARIABLE finished_event; RTL_CONDITION_VARIABLE group_finished_event; + HANDLE completed_event; LONG num_pending_callbacks; LONG num_running_callbacks; LONG num_associated_callbacks; @@ -206,6 +194,8 @@ struct threadpool_object struct list wait_entry; ULONGLONG timeout; HANDLE handle; + DWORD flags; + RTL_WAITORTIMERCALLBACKFUNC rtl_callback; } wait; struct { @@ -302,6 +292,7 @@ struct waitqueue_bucket struct list reserved; struct list waiting; HANDLE update_event; + BOOL alertable; };
/* global I/O completion queue object */ @@ -372,7 +363,7 @@ static inline struct threadpool_instance *impl_from_TP_CALLBACK_INSTANCE( TP_CAL
static void CALLBACK threadpool_worker_proc( void *param ); static void tp_object_submit( struct threadpool_object *object, BOOL signaled ); -static void tp_object_execute( struct threadpool_object *object ); +static void tp_object_execute( struct threadpool_object *object, BOOL wait_thread ); static void tp_object_prepare_shutdown( struct threadpool_object *object ); static BOOL tp_object_release( struct threadpool_object *object ); static struct threadpool *default_threadpool = NULL; @@ -1266,9 +1257,21 @@ static void CALLBACK waitqueue_thread_proc( void *param ) if (wait->u.wait.timeout <= now.QuadPart) { /* Wait object timed out. */ - list_remove( &wait->u.wait.wait_entry ); - list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry ); - tp_object_submit( wait, FALSE ); + if ((wait->u.wait.flags & WT_EXECUTEONLYONCE)) + { + list_remove( &wait->u.wait.wait_entry ); + list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry ); + } + if ((wait->u.wait.flags & (WT_EXECUTEINWAITTHREAD | WT_EXECUTEINIOTHREAD))) + { + InterlockedIncrement( &wait->refcount ); + wait->num_pending_callbacks++; + RtlEnterCriticalSection( &wait->pool->cs ); + tp_object_execute( wait, TRUE ); + RtlLeaveCriticalSection( &wait->pool->cs ); + tp_object_release( wait ); + } + else tp_object_submit( wait, FALSE ); } else { @@ -1290,7 +1293,7 @@ static void CALLBACK waitqueue_thread_proc( void *param ) assert( num_handles == 0 ); RtlLeaveCriticalSection( &waitqueue.cs ); timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000; - status = NtWaitForMultipleObjects( 1, &bucket->update_event, TRUE, FALSE, &timeout ); + status = NtWaitForMultipleObjects( 1, &bucket->update_event, TRUE, bucket->alertable, &timeout ); RtlEnterCriticalSection( &waitqueue.cs );
if (status == STATUS_TIMEOUT && !bucket->objcount) @@ -1300,7 +1303,7 @@ static void CALLBACK waitqueue_thread_proc( void *param ) { handles[num_handles] = bucket->update_event; RtlLeaveCriticalSection( &waitqueue.cs ); - status = NtWaitForMultipleObjects( num_handles + 1, handles, TRUE, FALSE, &timeout ); + status = NtWaitForMultipleObjects( num_handles + 1, handles, TRUE, bucket->alertable, &timeout ); RtlEnterCriticalSection( &waitqueue.cs );
if (status >= STATUS_WAIT_0 && status < STATUS_WAIT_0 + num_handles) @@ -1311,9 +1314,20 @@ static void CALLBACK waitqueue_thread_proc( void *param ) { /* Wait object signaled. */ assert( wait->u.wait.bucket == bucket ); - list_remove( &wait->u.wait.wait_entry ); - list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry ); - tp_object_submit( wait, TRUE ); + if ((wait->u.wait.flags & WT_EXECUTEONLYONCE)) + { + list_remove( &wait->u.wait.wait_entry ); + list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry ); + } + if ((wait->u.wait.flags & (WT_EXECUTEINWAITTHREAD | WT_EXECUTEINIOTHREAD))) + { + wait->u.wait.signaled++; + wait->num_pending_callbacks++; + RtlEnterCriticalSection( &wait->pool->cs ); + tp_object_execute( wait, TRUE ); + RtlLeaveCriticalSection( &wait->pool->cs ); + } + else tp_object_submit( wait, TRUE ); } else WARN("wait object %p triggered while object was destroyed\n", wait); @@ -1335,7 +1349,7 @@ static void CALLBACK waitqueue_thread_proc( void *param ) struct waitqueue_bucket *other_bucket; LIST_FOR_EACH_ENTRY( other_bucket, &waitqueue.buckets, struct waitqueue_bucket, bucket_entry ) { - if (other_bucket != bucket && other_bucket->objcount && + if (other_bucket != bucket && other_bucket->objcount && other_bucket->alertable == bucket->alertable && other_bucket->objcount + bucket->objcount <= MAXIMUM_WAITQUEUE_OBJECTS * 2 / 3) { other_bucket->objcount += bucket->objcount; @@ -1395,6 +1409,7 @@ static NTSTATUS tp_waitqueue_lock( struct threadpool_object *wait ) struct waitqueue_bucket *bucket; NTSTATUS status; HANDLE thread; + BOOL alertable = (wait->u.wait.flags & WT_EXECUTEINIOTHREAD) != 0; assert( wait->type == TP_OBJECT_TYPE_WAIT );
wait->u.wait.signaled = 0; @@ -1408,7 +1423,7 @@ static NTSTATUS tp_waitqueue_lock( struct threadpool_object *wait ) /* Try to assign to existing bucket if possible. */ LIST_FOR_EACH_ENTRY( bucket, &waitqueue.buckets, struct waitqueue_bucket, bucket_entry ) { - if (bucket->objcount < MAXIMUM_WAITQUEUE_OBJECTS) + if (bucket->objcount < MAXIMUM_WAITQUEUE_OBJECTS && bucket->alertable == alertable) { list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry ); wait->u.wait.bucket = bucket; @@ -1428,6 +1443,7 @@ static NTSTATUS tp_waitqueue_lock( struct threadpool_object *wait ) }
bucket->objcount = 0; + bucket->alertable = alertable; list_init( &bucket->reserved ); list_init( &bucket->waiting );
@@ -1860,6 +1876,7 @@ static void tp_object_initialize( struct threadpool_object *object, struct threa memset( &object->pool_entry, 0, sizeof(object->pool_entry) ); RtlInitializeConditionVariable( &object->finished_event ); RtlInitializeConditionVariable( &object->group_finished_event ); + object->completed_event = NULL; object->num_pending_callbacks = 0; object->num_running_callbacks = 0; object->num_associated_callbacks = 0; @@ -2077,6 +2094,9 @@ static BOOL tp_object_release( struct threadpool_object *object ) if (object->race_dll) LdrUnloadDll( object->race_dll );
+ if (object->completed_event && object->completed_event != INVALID_HANDLE_VALUE) + NtSetEvent( object->completed_event, NULL ); + RtlFreeHeap( GetProcessHeap(), 0, object ); return TRUE; } @@ -2101,7 +2121,7 @@ static struct list *threadpool_get_next_item( const struct threadpool *pool ) * Executes a threadpool object callback, object->pool->cs has to be * held. */ -static void tp_object_execute( struct threadpool_object *object ) +static void tp_object_execute( struct threadpool_object *object, BOOL wait_thread ) { TP_CALLBACK_INSTANCE *callback_instance; struct threadpool_instance instance; @@ -2129,6 +2149,7 @@ static void tp_object_execute( struct threadpool_object *object ) object->num_associated_callbacks++; object->num_running_callbacks++; RtlLeaveCriticalSection( &pool->cs ); + if (wait_thread) RtlLeaveCriticalSection( &waitqueue.cs );
/* Initialize threadpool instance struct. */ callback_instance = (TP_CALLBACK_INSTANCE *)&instance; @@ -2232,6 +2253,7 @@ static void tp_object_execute( struct threadpool_object *object ) }
skip_cleanup: + if (wait_thread) RtlEnterCriticalSection( &waitqueue.cs ); RtlEnterCriticalSection( &pool->cs );
/* Simple callbacks are automatically shutdown after execution. */ @@ -2278,7 +2300,7 @@ static void CALLBACK threadpool_worker_proc( void *param ) if (object->num_pending_callbacks > 1) tp_object_prio_queue( object );
- tp_object_execute( object ); + tp_object_execute( object, FALSE );
assert(pool->num_busy_workers); pool->num_busy_workers--; @@ -2418,18 +2440,13 @@ NTSTATUS WINAPI TpAllocTimer( TP_TIMER **out, PTP_TIMER_CALLBACK callback, PVOID return STATUS_SUCCESS; }
-/*********************************************************************** - * TpAllocWait (NTDLL.@) - */ -NTSTATUS WINAPI TpAllocWait( TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID userdata, - TP_CALLBACK_ENVIRON *environment ) +static NTSTATUS tp_alloc_wait( TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID userdata, + TP_CALLBACK_ENVIRON *environment, DWORD flags ) { struct threadpool_object *object; struct threadpool *pool; NTSTATUS status;
- TRACE( "%p %p %p %p\n", out, callback, userdata, environment ); - object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) ); if (!object) return STATUS_NO_MEMORY; @@ -2443,6 +2460,7 @@ NTSTATUS WINAPI TpAllocWait( TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID us
object->type = TP_OBJECT_TYPE_WAIT; object->u.wait.callback = callback; + object->u.wait.flags = flags;
status = tp_waitqueue_lock( object ); if (status) @@ -2458,6 +2476,16 @@ NTSTATUS WINAPI TpAllocWait( TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID us return STATUS_SUCCESS; }
+/*********************************************************************** + * TpAllocWait (NTDLL.@) + */ +NTSTATUS WINAPI TpAllocWait( TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID userdata, + TP_CALLBACK_ENVIRON *environment ) +{ + TRACE( "%p %p %p %p\n", out, callback, userdata, environment ); + return tp_alloc_wait( out, callback, userdata, environment, WT_EXECUTEONLYONCE ); +} + /*********************************************************************** * TpAllocWork (NTDLL.@) */ @@ -3143,71 +3171,10 @@ NTSTATUS WINAPI TpQueryPoolStackInformation( TP_POOL *pool, TP_POOL_STACK_INFORM return STATUS_SUCCESS; }
-static void delete_wait_work_item(struct wait_work_item *wait_work_item) +static void CALLBACK rtl_wait_callback( TP_CALLBACK_INSTANCE *instance, void *userdata, TP_WAIT *wait, TP_WAIT_RESULT result ) { - NtClose( wait_work_item->CancelEvent ); - RtlFreeHeap( GetProcessHeap(), 0, wait_work_item ); -} - -static DWORD CALLBACK wait_thread_proc(LPVOID Arg) -{ - struct wait_work_item *wait_work_item = Arg; - NTSTATUS status; - BOOLEAN alertable = (wait_work_item->Flags & WT_EXECUTEINIOTHREAD) != 0; - HANDLE handles[2] = { wait_work_item->Object, wait_work_item->CancelEvent }; - LARGE_INTEGER timeout; - HANDLE completion_event; - - TRACE("\n"); - - while (TRUE) - { - status = NtWaitForMultipleObjects( 2, handles, TRUE, alertable, - get_nt_timeout( &timeout, wait_work_item->Milliseconds ) ); - if (status == STATUS_WAIT_0 || status == STATUS_TIMEOUT) - { - BOOLEAN TimerOrWaitFired; - - if (status == STATUS_WAIT_0) - { - TRACE( "object %p signaled, calling callback %p with context %p\n", - wait_work_item->Object, wait_work_item->Callback, - wait_work_item->Context ); - TimerOrWaitFired = FALSE; - } - else - { - TRACE( "wait for object %p timed out, calling callback %p with context %p\n", - wait_work_item->Object, wait_work_item->Callback, - wait_work_item->Context ); - TimerOrWaitFired = TRUE; - } - InterlockedExchange( &wait_work_item->CallbackInProgress, TRUE ); - if (wait_work_item->CompletionEvent) - { - TRACE( "Work has been canceled.\n" ); - break; - } - wait_work_item->Callback( wait_work_item->Context, TimerOrWaitFired ); - InterlockedExchange( &wait_work_item->CallbackInProgress, FALSE ); - - if (wait_work_item->Flags & WT_EXECUTEONLYONCE) - break; - } - else if (status != STATUS_USER_APC) - break; - } - - - if (InterlockedIncrement( &wait_work_item->DeleteCount ) == 2 ) - { - completion_event = wait_work_item->CompletionEvent; - delete_wait_work_item( wait_work_item ); - if (completion_event && completion_event != INVALID_HANDLE_VALUE) - NtSetEvent( completion_event, NULL ); - } - - return 0; + struct threadpool_object *object = impl_from_TP_WAIT(wait); + object->u.wait.rtl_callback( userdata, result != STATUS_WAIT_0 ); }
/*********************************************************************** @@ -3235,46 +3202,34 @@ static DWORD CALLBACK wait_thread_proc(LPVOID Arg) *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time. *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token. */ -NTSTATUS WINAPI RtlRegisterWait(PHANDLE NewWaitObject, HANDLE Object, - RTL_WAITORTIMERCALLBACKFUNC Callback, - PVOID Context, ULONG Milliseconds, ULONG Flags) +NTSTATUS WINAPI RtlRegisterWait( HANDLE *out, HANDLE handle, RTL_WAITORTIMERCALLBACKFUNC callback, + void *context, ULONG milliseconds, ULONG flags ) { - struct wait_work_item *wait_work_item; + struct threadpool_object *object; + TP_CALLBACK_ENVIRON environment; + LARGE_INTEGER timeout; NTSTATUS status; + TP_WAIT *wait;
- TRACE( "(%p, %p, %p, %p, %d, 0x%x)\n", NewWaitObject, Object, Callback, Context, Milliseconds, Flags ); - - wait_work_item = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*wait_work_item) ); - if (!wait_work_item) - return STATUS_NO_MEMORY; + TRACE( "out %p, handle %p, callback %p, context %p, milliseconds %u, flags %x\n", + out, handle, callback, context, milliseconds, flags );
- wait_work_item->Object = Object; - wait_work_item->Callback = Callback; - wait_work_item->Context = Context; - wait_work_item->Milliseconds = Milliseconds; - wait_work_item->Flags = Flags; - wait_work_item->CallbackInProgress = FALSE; - wait_work_item->DeleteCount = 0; - wait_work_item->CompletionEvent = NULL; + memset( &environment, 0, sizeof(environment) ); + environment.Version = 1; + environment.u.s.LongFunction = (flags & WT_EXECUTELONGFUNCTION) != 0; + environment.u.s.Persistent = (flags & WT_EXECUTEINPERSISTENTTHREAD) != 0;
- status = NtCreateEvent( &wait_work_item->CancelEvent, EVENT_ALL_ACCESS, NULL, NotificationEvent, FALSE ); - if (status != STATUS_SUCCESS) - { - RtlFreeHeap( GetProcessHeap(), 0, wait_work_item ); + flags &= (WT_EXECUTEONLYONCE | WT_EXECUTEINWAITTHREAD | WT_EXECUTEINIOTHREAD); + if ((status = tp_alloc_wait( &wait, rtl_wait_callback, context, &environment, flags ))) return status; - }
- Flags = Flags & (WT_EXECUTEINIOTHREAD | WT_EXECUTEINPERSISTENTTHREAD | - WT_EXECUTELONGFUNCTION | WT_TRANSFER_IMPERSONATION); - status = RtlQueueWorkItem( wait_thread_proc, wait_work_item, Flags ); - if (status != STATUS_SUCCESS) - { - delete_wait_work_item( wait_work_item ); - return status; - } + object = impl_from_TP_WAIT(wait); + object->u.wait.rtl_callback = callback;
- *NewWaitObject = wait_work_item; - return status; + TpSetWait( (TP_WAIT *)object, handle, get_nt_timeout( &timeout, milliseconds ) ); + + *out = object; + return STATUS_SUCCESS; }
/*********************************************************************** @@ -3290,54 +3245,31 @@ NTSTATUS WINAPI RtlRegisterWait(PHANDLE NewWaitObject, HANDLE Object, * Success: STATUS_SUCCESS. * Failure: Any NTSTATUS code. */ -NTSTATUS WINAPI RtlDeregisterWaitEx(HANDLE WaitHandle, HANDLE CompletionEvent) +NTSTATUS WINAPI RtlDeregisterWaitEx( HANDLE handle, HANDLE event ) { - struct wait_work_item *wait_work_item = WaitHandle; + struct threadpool_object *object = handle; NTSTATUS status; - HANDLE LocalEvent = NULL; - int CallbackInProgress;
- TRACE( "(%p %p)\n", WaitHandle, CompletionEvent ); + TRACE( "handle %p, event %p\n", handle, event );
- if (WaitHandle == NULL) - return STATUS_INVALID_HANDLE; + if (!object) return STATUS_INVALID_HANDLE;
- InterlockedExchangePointer( &wait_work_item->CompletionEvent, INVALID_HANDLE_VALUE ); - CallbackInProgress = wait_work_item->CallbackInProgress; - TRACE( "callback in progress %u\n", CallbackInProgress ); - if (CompletionEvent == INVALID_HANDLE_VALUE || !CallbackInProgress) - { - status = NtCreateEvent( &LocalEvent, EVENT_ALL_ACCESS, NULL, NotificationEvent, FALSE ); - if (status != STATUS_SUCCESS) - return status; - InterlockedExchangePointer( &wait_work_item->CompletionEvent, LocalEvent ); - } - else if (CompletionEvent != NULL) - { - InterlockedExchangePointer( &wait_work_item->CompletionEvent, CompletionEvent ); - } + TpSetWait( (TP_WAIT *)object, NULL, NULL );
- NtSetEvent( wait_work_item->CancelEvent, NULL ); - - if (InterlockedIncrement( &wait_work_item->DeleteCount ) == 2 ) - { - status = STATUS_SUCCESS; - delete_wait_work_item( wait_work_item ); - } - else if (LocalEvent) - { - TRACE( "Waiting for completion event\n" ); - NtWaitForSingleObject( LocalEvent, FALSE, NULL ); - status = STATUS_SUCCESS; - } + if (event == INVALID_HANDLE_VALUE) TpWaitForWait( (TP_WAIT *)object, TRUE ); else { - status = STATUS_PENDING; + assert( object->completed_event == NULL ); + object->completed_event = event; }
- if (LocalEvent) - NtClose( LocalEvent ); + RtlEnterCriticalSection( &object->pool->cs ); + if (object->num_pending_callbacks + object->num_running_callbacks + + object->num_associated_callbacks) status = STATUS_PENDING; + else status = STATUS_SUCCESS; + RtlLeaveCriticalSection( &object->pool->cs );
+ TpReleaseWait( (TP_WAIT *)object ); return status; }
Hi,
While running your changed tests, I think I found new failures. Being a bot and all I'm not very good at pattern recognition, so I might be wrong, but could you please double-check?
Full results can be found at: https://testbot.winehq.org/JobDetails.pl?Key=85446
Your paranoid android.
=== w8 (32 bit report) ===
ntdll: threadpool.c:1478: Test failed: expected approximately 600 ticks, got 812
Otherwise we may execute the callback before the value is actually returned from RegisterWaitForSingleObject.
Gears Tactics shares a pointer to the returned handle with its callbacks and calls UnregisterWait from there. This creates a race condition that sometimes causes a double free.
Wine-Bug: https://bugs.winehq.org/show_bug.cgi?id=47843 Signed-off-by: Rémi Bernon rbernon@codeweavers.com --- dlls/kernel32/sync.c | 3 +- dlls/kernel32/tests/thread.c | 61 ++++++++++++++++++++++++++++++++++++ dlls/ntdll/threadpool.c | 3 ++ 3 files changed, 66 insertions(+), 1 deletion(-)
diff --git a/dlls/kernel32/sync.c b/dlls/kernel32/sync.c index ab392c5d995..331cd80772a 100644 --- a/dlls/kernel32/sync.c +++ b/dlls/kernel32/sync.c @@ -110,7 +110,8 @@ DWORD WINAPI DECLSPEC_HOTPATCH GetTickCount(void) BOOL WINAPI RegisterWaitForSingleObject( HANDLE *wait, HANDLE object, WAITORTIMERCALLBACK callback, void *context, ULONG timeout, ULONG flags ) { - return (*wait = RegisterWaitForSingleObjectEx( object, callback, context, timeout, flags)) != NULL; + if (!set_ntstatus( RtlRegisterWait( wait, object, callback, context, timeout, flags ))) return FALSE; + return TRUE; }
/*********************************************************************** diff --git a/dlls/kernel32/tests/thread.c b/dlls/kernel32/tests/thread.c index e861aa751e9..f33e05741a2 100644 --- a/dlls/kernel32/tests/thread.c +++ b/dlls/kernel32/tests/thread.c @@ -1346,6 +1346,15 @@ static void CALLBACK signaled_function(PVOID p, BOOLEAN TimerOrWaitFired) ok(!TimerOrWaitFired, "wait shouldn't have timed out\n"); }
+static void CALLBACK wait_complete_function(PVOID p, BOOLEAN TimerOrWaitFired) +{ + HANDLE event = p; + DWORD res; + ok(!TimerOrWaitFired, "wait shouldn't have timed out\n"); + res = WaitForSingleObject(event, INFINITE); + ok(res == WAIT_OBJECT_0, "WaitForSingleObject returned %x\n", res); +} + static void CALLBACK timeout_function(PVOID p, BOOLEAN TimerOrWaitFired) { HANDLE event = p; @@ -1371,6 +1380,23 @@ static void CALLBACK waitthread_test_function(PVOID p, BOOLEAN TimerOrWaitFired) SetEvent(param->complete_event); }
+struct unregister_params +{ + HANDLE wait_handle; + HANDLE complete_event; +}; + +static void CALLBACK unregister_function(PVOID p, BOOLEAN TimerOrWaitFired) +{ + struct unregister_params *param = p; + HANDLE wait_handle = param->wait_handle; + BOOL ret; + ok(wait_handle != INVALID_HANDLE_VALUE, "invalid wait handle\n"); + ret = pUnregisterWait(param->wait_handle); + todo_wine ok(ret, "UnregisterWait failed with error %d\n", GetLastError()); + SetEvent(param->complete_event); +} + static void test_RegisterWaitForSingleObject(void) { BOOL ret; @@ -1379,6 +1405,8 @@ static void test_RegisterWaitForSingleObject(void) HANDLE complete_event; HANDLE waitthread_trigger_event, waitthread_wait_event; struct waitthread_test_param param; + struct unregister_params unregister_param; + DWORD i;
if (!pRegisterWaitForSingleObject || !pUnregisterWait) { @@ -1411,8 +1439,26 @@ static void test_RegisterWaitForSingleObject(void) ret = pUnregisterWait(wait_handle); ok(ret, "UnregisterWait failed with error %d\n", GetLastError());
+ /* test unregister while running */ + + SetEvent(handle); + ret = pRegisterWaitForSingleObject(&wait_handle, handle, wait_complete_function, complete_event, INFINITE, WT_EXECUTEONLYONCE); + ok(ret, "RegisterWaitForSingleObject failed with error %d\n", GetLastError()); + + /* give worker thread chance to start */ + Sleep(50); + ret = pUnregisterWait(wait_handle); + ok(!ret, "UnregisterWait succeeded\n"); + ok(GetLastError() == ERROR_IO_PENDING, "UnregisterWait failed with error %d\n", GetLastError()); + + /* give worker thread chance to complete */ + SetEvent(complete_event); + Sleep(50); + /* test timeout case */
+ ResetEvent(handle); + ret = pRegisterWaitForSingleObject(&wait_handle, handle, timeout_function, complete_event, 0, WT_EXECUTEONLYONCE); ok(ret, "RegisterWaitForSingleObject failed with error %d\n", GetLastError());
@@ -1442,6 +1488,21 @@ static void test_RegisterWaitForSingleObject(void) ret = pUnregisterWait(wait_handle); ok(ret, "UnregisterWait failed with error %d\n", GetLastError());
+ /* the callback execution should be sequentially consistent with the wait handle return, + even if the event is already set */ + + for (i = 0; i < 100; ++i) + { + SetEvent(handle); + unregister_param.complete_event = complete_event; + unregister_param.wait_handle = INVALID_HANDLE_VALUE; + + ret = pRegisterWaitForSingleObject(&unregister_param.wait_handle, handle, unregister_function, &unregister_param, INFINITE, WT_EXECUTEONLYONCE | WT_EXECUTEINWAITTHREAD); + ok(ret, "RegisterWaitForSingleObject failed with error %d\n", GetLastError()); + + WaitForSingleObject(complete_event, INFINITE); + } + /* test multiple waits with WT_EXECUTEINWAITTHREAD. * Windows puts multiple waits on the same wait thread, and using WT_EXECUTEINWAITTHREAD causes the callbacks to run serially. */ diff --git a/dlls/ntdll/threadpool.c b/dlls/ntdll/threadpool.c index 331149855ab..bdc1ebddd7d 100644 --- a/dlls/ntdll/threadpool.c +++ b/dlls/ntdll/threadpool.c @@ -3226,9 +3226,12 @@ NTSTATUS WINAPI RtlRegisterWait( HANDLE *out, HANDLE handle, RTL_WAITORTIMERCALL object = impl_from_TP_WAIT(wait); object->u.wait.rtl_callback = callback;
+ RtlEnterCriticalSection( &waitqueue.cs ); TpSetWait( (TP_WAIT *)object, handle, get_nt_timeout( &timeout, milliseconds ) );
*out = object; + RtlLeaveCriticalSection( &waitqueue.cs ); + return STATUS_SUCCESS; }
Hi,
While running your changed tests, I think I found new failures. Being a bot and all I'm not very good at pattern recognition, so I might be wrong, but could you please double-check?
Full results can be found at: https://testbot.winehq.org/JobDetails.pl?Key=85447
Your paranoid android.
=== debiant2 (32 bit report) ===
ntdll: change.c:277: Test failed: should be ready
=== debiant2 (32 bit WoW report) ===
kernel32: comm.c:918: Test failed: OutQueue should not be empty
Hi,
While running your changed tests, I think I found new failures. Being a bot and all I'm not very good at pattern recognition, so I might be wrong, but could you please double-check?
Full results can be found at: https://testbot.winehq.org/JobDetails.pl?Key=85443
Your paranoid android.
=== debiant2 (32 bit WoW report) ===
ntdll: threadpool.c:1962: Test failed: WaitForSingleObject returned 258