This adds several internal flags to TP_WAIT object to support the implementation:
* WT_EXECUTEONLYONCE: waits are re-queued unless it is set.
* WT_EXECUTEINWAITTHREAD: call the callback in the wait thread when set.
* WT_EXECUTEINIOTHREAD: call alertable NtWaitForMultipleObjects in wait thread when set, as well the callback in the wait thread, as for WT_EXECUTEINWAITTHREAD. The worker threads use non-alertable waits otherwise.
Wine-Bug: https://bugs.winehq.org/show_bug.cgi?id=47843 Signed-off-by: Rémi Bernon rbernon@codeweavers.com --- dlls/kernel32/tests/thread.c | 1 - dlls/ntdll/tests/threadpool.c | 6 +- dlls/ntdll/threadpool.c | 264 +++++++++++++--------------------- 3 files changed, 100 insertions(+), 171 deletions(-)
diff --git a/dlls/kernel32/tests/thread.c b/dlls/kernel32/tests/thread.c index b69f69fadaa..e861aa751e9 100644 --- a/dlls/kernel32/tests/thread.c +++ b/dlls/kernel32/tests/thread.c @@ -1367,7 +1367,6 @@ static void CALLBACK waitthread_test_function(PVOID p, BOOLEAN TimerOrWaitFired)
SetEvent(param->trigger_event); ret = WaitForSingleObject(param->wait_event, 100); - todo_wine ok(ret == WAIT_TIMEOUT, "wait should have timed out\n"); SetEvent(param->complete_event); } diff --git a/dlls/ntdll/tests/threadpool.c b/dlls/ntdll/tests/threadpool.c index e18ad4dd76d..6b301eafcbd 100644 --- a/dlls/ntdll/tests/threadpool.c +++ b/dlls/ntdll/tests/threadpool.c @@ -319,7 +319,7 @@ static void test_RtlRegisterWait(void) result = WaitForSingleObject(semaphores[0], 200); ok(result == WAIT_OBJECT_0, "WaitForSingleObject returned %u\n", result); ok(info.userdata == 1, "expected info.userdata = 1, got %u\n", info.userdata); - todo_wine ok(info.threadid == threadid, "unexpected different wait thread id %x\n", info.threadid); + ok(info.threadid == threadid, "unexpected different wait thread id %x\n", info.threadid); result = WaitForSingleObject(semaphores[1], 0); ok(result == WAIT_TIMEOUT, "WaitForSingleObject returned %u\n", result); Sleep(50); @@ -350,7 +350,7 @@ static void test_RtlRegisterWait(void) result = WaitForSingleObject(semaphores[0], 200); ok(result == WAIT_OBJECT_0, "WaitForSingleObject returned %u\n", result); ok(info.userdata == 1, "expected info.userdata = 1, got %u\n", info.userdata); - todo_wine ok(info.threadid == threadid, "unexpected different wait thread id %x\n", info.threadid); + ok(info.threadid == threadid, "unexpected different wait thread id %x\n", info.threadid); result = WaitForSingleObject(semaphores[1], 0); ok(result == WAIT_TIMEOUT, "WaitForSingleObject returned %u\n", result); Sleep(50); @@ -433,7 +433,6 @@ static void test_RtlRegisterWait(void) ok(!status, "RtlDeregisterWaitEx failed with status %x\n", status); ok(info.userdata == 0, "expected info.userdata = 0, got %u\n", info.userdata); result = WaitForSingleObject(event, 200); - todo_wine ok(result == WAIT_OBJECT_0, "WaitForSingleObject returned %u\n", result);
/* test RtlDeregisterWaitEx after wait expired */ @@ -470,7 +469,6 @@ static void test_RtlRegisterWait(void) ok(!status, "RtlDeregisterWaitEx failed with status %x\n", status); ok(info.userdata == 0x10000, "expected info.userdata = 0x10000, got %u\n", info.userdata); result = WaitForSingleObject(event, 200); - todo_wine ok(result == WAIT_OBJECT_0, "WaitForSingleObject returned %u\n", result);
/* test RtlDeregisterWaitEx while callback is running */ diff --git a/dlls/ntdll/threadpool.c b/dlls/ntdll/threadpool.c index e9ca32a3be7..331149855ab 100644 --- a/dlls/ntdll/threadpool.c +++ b/dlls/ntdll/threadpool.c @@ -68,19 +68,6 @@ static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug = 0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_compl_cs") } };
-struct wait_work_item -{ - HANDLE Object; - HANDLE CancelEvent; - WAITORTIMERCALLBACK Callback; - PVOID Context; - ULONG Milliseconds; - ULONG Flags; - HANDLE CompletionEvent; - LONG DeleteCount; - int CallbackInProgress; -}; - struct timer_queue; struct queue_timer { @@ -170,6 +157,7 @@ struct threadpool_object struct list pool_entry; RTL_CONDITION_VARIABLE finished_event; RTL_CONDITION_VARIABLE group_finished_event; + HANDLE completed_event; LONG num_pending_callbacks; LONG num_running_callbacks; LONG num_associated_callbacks; @@ -206,6 +194,8 @@ struct threadpool_object struct list wait_entry; ULONGLONG timeout; HANDLE handle; + DWORD flags; + RTL_WAITORTIMERCALLBACKFUNC rtl_callback; } wait; struct { @@ -302,6 +292,7 @@ struct waitqueue_bucket struct list reserved; struct list waiting; HANDLE update_event; + BOOL alertable; };
/* global I/O completion queue object */ @@ -372,7 +363,7 @@ static inline struct threadpool_instance *impl_from_TP_CALLBACK_INSTANCE( TP_CAL
static void CALLBACK threadpool_worker_proc( void *param ); static void tp_object_submit( struct threadpool_object *object, BOOL signaled ); -static void tp_object_execute( struct threadpool_object *object ); +static void tp_object_execute( struct threadpool_object *object, BOOL wait_thread ); static void tp_object_prepare_shutdown( struct threadpool_object *object ); static BOOL tp_object_release( struct threadpool_object *object ); static struct threadpool *default_threadpool = NULL; @@ -1266,9 +1257,21 @@ static void CALLBACK waitqueue_thread_proc( void *param ) if (wait->u.wait.timeout <= now.QuadPart) { /* Wait object timed out. */ - list_remove( &wait->u.wait.wait_entry ); - list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry ); - tp_object_submit( wait, FALSE ); + if ((wait->u.wait.flags & WT_EXECUTEONLYONCE)) + { + list_remove( &wait->u.wait.wait_entry ); + list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry ); + } + if ((wait->u.wait.flags & (WT_EXECUTEINWAITTHREAD | WT_EXECUTEINIOTHREAD))) + { + InterlockedIncrement( &wait->refcount ); + wait->num_pending_callbacks++; + RtlEnterCriticalSection( &wait->pool->cs ); + tp_object_execute( wait, TRUE ); + RtlLeaveCriticalSection( &wait->pool->cs ); + tp_object_release( wait ); + } + else tp_object_submit( wait, FALSE ); } else { @@ -1290,7 +1293,7 @@ static void CALLBACK waitqueue_thread_proc( void *param ) assert( num_handles == 0 ); RtlLeaveCriticalSection( &waitqueue.cs ); timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000; - status = NtWaitForMultipleObjects( 1, &bucket->update_event, TRUE, FALSE, &timeout ); + status = NtWaitForMultipleObjects( 1, &bucket->update_event, TRUE, bucket->alertable, &timeout ); RtlEnterCriticalSection( &waitqueue.cs );
if (status == STATUS_TIMEOUT && !bucket->objcount) @@ -1300,7 +1303,7 @@ static void CALLBACK waitqueue_thread_proc( void *param ) { handles[num_handles] = bucket->update_event; RtlLeaveCriticalSection( &waitqueue.cs ); - status = NtWaitForMultipleObjects( num_handles + 1, handles, TRUE, FALSE, &timeout ); + status = NtWaitForMultipleObjects( num_handles + 1, handles, TRUE, bucket->alertable, &timeout ); RtlEnterCriticalSection( &waitqueue.cs );
if (status >= STATUS_WAIT_0 && status < STATUS_WAIT_0 + num_handles) @@ -1311,9 +1314,20 @@ static void CALLBACK waitqueue_thread_proc( void *param ) { /* Wait object signaled. */ assert( wait->u.wait.bucket == bucket ); - list_remove( &wait->u.wait.wait_entry ); - list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry ); - tp_object_submit( wait, TRUE ); + if ((wait->u.wait.flags & WT_EXECUTEONLYONCE)) + { + list_remove( &wait->u.wait.wait_entry ); + list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry ); + } + if ((wait->u.wait.flags & (WT_EXECUTEINWAITTHREAD | WT_EXECUTEINIOTHREAD))) + { + wait->u.wait.signaled++; + wait->num_pending_callbacks++; + RtlEnterCriticalSection( &wait->pool->cs ); + tp_object_execute( wait, TRUE ); + RtlLeaveCriticalSection( &wait->pool->cs ); + } + else tp_object_submit( wait, TRUE ); } else WARN("wait object %p triggered while object was destroyed\n", wait); @@ -1335,7 +1349,7 @@ static void CALLBACK waitqueue_thread_proc( void *param ) struct waitqueue_bucket *other_bucket; LIST_FOR_EACH_ENTRY( other_bucket, &waitqueue.buckets, struct waitqueue_bucket, bucket_entry ) { - if (other_bucket != bucket && other_bucket->objcount && + if (other_bucket != bucket && other_bucket->objcount && other_bucket->alertable == bucket->alertable && other_bucket->objcount + bucket->objcount <= MAXIMUM_WAITQUEUE_OBJECTS * 2 / 3) { other_bucket->objcount += bucket->objcount; @@ -1395,6 +1409,7 @@ static NTSTATUS tp_waitqueue_lock( struct threadpool_object *wait ) struct waitqueue_bucket *bucket; NTSTATUS status; HANDLE thread; + BOOL alertable = (wait->u.wait.flags & WT_EXECUTEINIOTHREAD) != 0; assert( wait->type == TP_OBJECT_TYPE_WAIT );
wait->u.wait.signaled = 0; @@ -1408,7 +1423,7 @@ static NTSTATUS tp_waitqueue_lock( struct threadpool_object *wait ) /* Try to assign to existing bucket if possible. */ LIST_FOR_EACH_ENTRY( bucket, &waitqueue.buckets, struct waitqueue_bucket, bucket_entry ) { - if (bucket->objcount < MAXIMUM_WAITQUEUE_OBJECTS) + if (bucket->objcount < MAXIMUM_WAITQUEUE_OBJECTS && bucket->alertable == alertable) { list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry ); wait->u.wait.bucket = bucket; @@ -1428,6 +1443,7 @@ static NTSTATUS tp_waitqueue_lock( struct threadpool_object *wait ) }
bucket->objcount = 0; + bucket->alertable = alertable; list_init( &bucket->reserved ); list_init( &bucket->waiting );
@@ -1860,6 +1876,7 @@ static void tp_object_initialize( struct threadpool_object *object, struct threa memset( &object->pool_entry, 0, sizeof(object->pool_entry) ); RtlInitializeConditionVariable( &object->finished_event ); RtlInitializeConditionVariable( &object->group_finished_event ); + object->completed_event = NULL; object->num_pending_callbacks = 0; object->num_running_callbacks = 0; object->num_associated_callbacks = 0; @@ -2077,6 +2094,9 @@ static BOOL tp_object_release( struct threadpool_object *object ) if (object->race_dll) LdrUnloadDll( object->race_dll );
+ if (object->completed_event && object->completed_event != INVALID_HANDLE_VALUE) + NtSetEvent( object->completed_event, NULL ); + RtlFreeHeap( GetProcessHeap(), 0, object ); return TRUE; } @@ -2101,7 +2121,7 @@ static struct list *threadpool_get_next_item( const struct threadpool *pool ) * Executes a threadpool object callback, object->pool->cs has to be * held. */ -static void tp_object_execute( struct threadpool_object *object ) +static void tp_object_execute( struct threadpool_object *object, BOOL wait_thread ) { TP_CALLBACK_INSTANCE *callback_instance; struct threadpool_instance instance; @@ -2129,6 +2149,7 @@ static void tp_object_execute( struct threadpool_object *object ) object->num_associated_callbacks++; object->num_running_callbacks++; RtlLeaveCriticalSection( &pool->cs ); + if (wait_thread) RtlLeaveCriticalSection( &waitqueue.cs );
/* Initialize threadpool instance struct. */ callback_instance = (TP_CALLBACK_INSTANCE *)&instance; @@ -2232,6 +2253,7 @@ static void tp_object_execute( struct threadpool_object *object ) }
skip_cleanup: + if (wait_thread) RtlEnterCriticalSection( &waitqueue.cs ); RtlEnterCriticalSection( &pool->cs );
/* Simple callbacks are automatically shutdown after execution. */ @@ -2278,7 +2300,7 @@ static void CALLBACK threadpool_worker_proc( void *param ) if (object->num_pending_callbacks > 1) tp_object_prio_queue( object );
- tp_object_execute( object ); + tp_object_execute( object, FALSE );
assert(pool->num_busy_workers); pool->num_busy_workers--; @@ -2418,18 +2440,13 @@ NTSTATUS WINAPI TpAllocTimer( TP_TIMER **out, PTP_TIMER_CALLBACK callback, PVOID return STATUS_SUCCESS; }
-/*********************************************************************** - * TpAllocWait (NTDLL.@) - */ -NTSTATUS WINAPI TpAllocWait( TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID userdata, - TP_CALLBACK_ENVIRON *environment ) +static NTSTATUS tp_alloc_wait( TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID userdata, + TP_CALLBACK_ENVIRON *environment, DWORD flags ) { struct threadpool_object *object; struct threadpool *pool; NTSTATUS status;
- TRACE( "%p %p %p %p\n", out, callback, userdata, environment ); - object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) ); if (!object) return STATUS_NO_MEMORY; @@ -2443,6 +2460,7 @@ NTSTATUS WINAPI TpAllocWait( TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID us
object->type = TP_OBJECT_TYPE_WAIT; object->u.wait.callback = callback; + object->u.wait.flags = flags;
status = tp_waitqueue_lock( object ); if (status) @@ -2458,6 +2476,16 @@ NTSTATUS WINAPI TpAllocWait( TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID us return STATUS_SUCCESS; }
+/*********************************************************************** + * TpAllocWait (NTDLL.@) + */ +NTSTATUS WINAPI TpAllocWait( TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID userdata, + TP_CALLBACK_ENVIRON *environment ) +{ + TRACE( "%p %p %p %p\n", out, callback, userdata, environment ); + return tp_alloc_wait( out, callback, userdata, environment, WT_EXECUTEONLYONCE ); +} + /*********************************************************************** * TpAllocWork (NTDLL.@) */ @@ -3143,71 +3171,10 @@ NTSTATUS WINAPI TpQueryPoolStackInformation( TP_POOL *pool, TP_POOL_STACK_INFORM return STATUS_SUCCESS; }
-static void delete_wait_work_item(struct wait_work_item *wait_work_item) +static void CALLBACK rtl_wait_callback( TP_CALLBACK_INSTANCE *instance, void *userdata, TP_WAIT *wait, TP_WAIT_RESULT result ) { - NtClose( wait_work_item->CancelEvent ); - RtlFreeHeap( GetProcessHeap(), 0, wait_work_item ); -} - -static DWORD CALLBACK wait_thread_proc(LPVOID Arg) -{ - struct wait_work_item *wait_work_item = Arg; - NTSTATUS status; - BOOLEAN alertable = (wait_work_item->Flags & WT_EXECUTEINIOTHREAD) != 0; - HANDLE handles[2] = { wait_work_item->Object, wait_work_item->CancelEvent }; - LARGE_INTEGER timeout; - HANDLE completion_event; - - TRACE("\n"); - - while (TRUE) - { - status = NtWaitForMultipleObjects( 2, handles, TRUE, alertable, - get_nt_timeout( &timeout, wait_work_item->Milliseconds ) ); - if (status == STATUS_WAIT_0 || status == STATUS_TIMEOUT) - { - BOOLEAN TimerOrWaitFired; - - if (status == STATUS_WAIT_0) - { - TRACE( "object %p signaled, calling callback %p with context %p\n", - wait_work_item->Object, wait_work_item->Callback, - wait_work_item->Context ); - TimerOrWaitFired = FALSE; - } - else - { - TRACE( "wait for object %p timed out, calling callback %p with context %p\n", - wait_work_item->Object, wait_work_item->Callback, - wait_work_item->Context ); - TimerOrWaitFired = TRUE; - } - InterlockedExchange( &wait_work_item->CallbackInProgress, TRUE ); - if (wait_work_item->CompletionEvent) - { - TRACE( "Work has been canceled.\n" ); - break; - } - wait_work_item->Callback( wait_work_item->Context, TimerOrWaitFired ); - InterlockedExchange( &wait_work_item->CallbackInProgress, FALSE ); - - if (wait_work_item->Flags & WT_EXECUTEONLYONCE) - break; - } - else if (status != STATUS_USER_APC) - break; - } - - - if (InterlockedIncrement( &wait_work_item->DeleteCount ) == 2 ) - { - completion_event = wait_work_item->CompletionEvent; - delete_wait_work_item( wait_work_item ); - if (completion_event && completion_event != INVALID_HANDLE_VALUE) - NtSetEvent( completion_event, NULL ); - } - - return 0; + struct threadpool_object *object = impl_from_TP_WAIT(wait); + object->u.wait.rtl_callback( userdata, result != STATUS_WAIT_0 ); }
/*********************************************************************** @@ -3235,46 +3202,34 @@ static DWORD CALLBACK wait_thread_proc(LPVOID Arg) *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time. *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token. */ -NTSTATUS WINAPI RtlRegisterWait(PHANDLE NewWaitObject, HANDLE Object, - RTL_WAITORTIMERCALLBACKFUNC Callback, - PVOID Context, ULONG Milliseconds, ULONG Flags) +NTSTATUS WINAPI RtlRegisterWait( HANDLE *out, HANDLE handle, RTL_WAITORTIMERCALLBACKFUNC callback, + void *context, ULONG milliseconds, ULONG flags ) { - struct wait_work_item *wait_work_item; + struct threadpool_object *object; + TP_CALLBACK_ENVIRON environment; + LARGE_INTEGER timeout; NTSTATUS status; + TP_WAIT *wait;
- TRACE( "(%p, %p, %p, %p, %d, 0x%x)\n", NewWaitObject, Object, Callback, Context, Milliseconds, Flags ); - - wait_work_item = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*wait_work_item) ); - if (!wait_work_item) - return STATUS_NO_MEMORY; + TRACE( "out %p, handle %p, callback %p, context %p, milliseconds %u, flags %x\n", + out, handle, callback, context, milliseconds, flags );
- wait_work_item->Object = Object; - wait_work_item->Callback = Callback; - wait_work_item->Context = Context; - wait_work_item->Milliseconds = Milliseconds; - wait_work_item->Flags = Flags; - wait_work_item->CallbackInProgress = FALSE; - wait_work_item->DeleteCount = 0; - wait_work_item->CompletionEvent = NULL; + memset( &environment, 0, sizeof(environment) ); + environment.Version = 1; + environment.u.s.LongFunction = (flags & WT_EXECUTELONGFUNCTION) != 0; + environment.u.s.Persistent = (flags & WT_EXECUTEINPERSISTENTTHREAD) != 0;
- status = NtCreateEvent( &wait_work_item->CancelEvent, EVENT_ALL_ACCESS, NULL, NotificationEvent, FALSE ); - if (status != STATUS_SUCCESS) - { - RtlFreeHeap( GetProcessHeap(), 0, wait_work_item ); + flags &= (WT_EXECUTEONLYONCE | WT_EXECUTEINWAITTHREAD | WT_EXECUTEINIOTHREAD); + if ((status = tp_alloc_wait( &wait, rtl_wait_callback, context, &environment, flags ))) return status; - }
- Flags = Flags & (WT_EXECUTEINIOTHREAD | WT_EXECUTEINPERSISTENTTHREAD | - WT_EXECUTELONGFUNCTION | WT_TRANSFER_IMPERSONATION); - status = RtlQueueWorkItem( wait_thread_proc, wait_work_item, Flags ); - if (status != STATUS_SUCCESS) - { - delete_wait_work_item( wait_work_item ); - return status; - } + object = impl_from_TP_WAIT(wait); + object->u.wait.rtl_callback = callback;
- *NewWaitObject = wait_work_item; - return status; + TpSetWait( (TP_WAIT *)object, handle, get_nt_timeout( &timeout, milliseconds ) ); + + *out = object; + return STATUS_SUCCESS; }
/*********************************************************************** @@ -3290,54 +3245,31 @@ NTSTATUS WINAPI RtlRegisterWait(PHANDLE NewWaitObject, HANDLE Object, * Success: STATUS_SUCCESS. * Failure: Any NTSTATUS code. */ -NTSTATUS WINAPI RtlDeregisterWaitEx(HANDLE WaitHandle, HANDLE CompletionEvent) +NTSTATUS WINAPI RtlDeregisterWaitEx( HANDLE handle, HANDLE event ) { - struct wait_work_item *wait_work_item = WaitHandle; + struct threadpool_object *object = handle; NTSTATUS status; - HANDLE LocalEvent = NULL; - int CallbackInProgress;
- TRACE( "(%p %p)\n", WaitHandle, CompletionEvent ); + TRACE( "handle %p, event %p\n", handle, event );
- if (WaitHandle == NULL) - return STATUS_INVALID_HANDLE; + if (!object) return STATUS_INVALID_HANDLE;
- InterlockedExchangePointer( &wait_work_item->CompletionEvent, INVALID_HANDLE_VALUE ); - CallbackInProgress = wait_work_item->CallbackInProgress; - TRACE( "callback in progress %u\n", CallbackInProgress ); - if (CompletionEvent == INVALID_HANDLE_VALUE || !CallbackInProgress) - { - status = NtCreateEvent( &LocalEvent, EVENT_ALL_ACCESS, NULL, NotificationEvent, FALSE ); - if (status != STATUS_SUCCESS) - return status; - InterlockedExchangePointer( &wait_work_item->CompletionEvent, LocalEvent ); - } - else if (CompletionEvent != NULL) - { - InterlockedExchangePointer( &wait_work_item->CompletionEvent, CompletionEvent ); - } + TpSetWait( (TP_WAIT *)object, NULL, NULL );
- NtSetEvent( wait_work_item->CancelEvent, NULL ); - - if (InterlockedIncrement( &wait_work_item->DeleteCount ) == 2 ) - { - status = STATUS_SUCCESS; - delete_wait_work_item( wait_work_item ); - } - else if (LocalEvent) - { - TRACE( "Waiting for completion event\n" ); - NtWaitForSingleObject( LocalEvent, FALSE, NULL ); - status = STATUS_SUCCESS; - } + if (event == INVALID_HANDLE_VALUE) TpWaitForWait( (TP_WAIT *)object, TRUE ); else { - status = STATUS_PENDING; + assert( object->completed_event == NULL ); + object->completed_event = event; }
- if (LocalEvent) - NtClose( LocalEvent ); + RtlEnterCriticalSection( &object->pool->cs ); + if (object->num_pending_callbacks + object->num_running_callbacks + + object->num_associated_callbacks) status = STATUS_PENDING; + else status = STATUS_SUCCESS; + RtlLeaveCriticalSection( &object->pool->cs );
+ TpReleaseWait( (TP_WAIT *)object ); return status; }