Signed-off-by: Nikolay Sivov nsivov@codeweavers.com --- dlls/ntdll/tests/threadpool.c | 17 ++++++--- dlls/ntdll/threadpool.c | 67 ++++++++++++++++++++++++++++++----- 2 files changed, 72 insertions(+), 12 deletions(-)
diff --git a/dlls/ntdll/tests/threadpool.c b/dlls/ntdll/tests/threadpool.c index 3bd2d994ec..bf5493cac0 100644 --- a/dlls/ntdll/tests/threadpool.c +++ b/dlls/ntdll/tests/threadpool.c @@ -517,12 +517,21 @@ static void test_tp_simple(void) memset(&environment3, 0, sizeof(environment3)); environment3.Version = 3; environment3.Pool = pool; - environment3.CallbackPriority = TP_CALLBACK_PRIORITY_NORMAL; environment3.Size = sizeof(environment3); + + for (i = 0; i < 3; ++i) + { + environment3.CallbackPriority = TP_CALLBACK_PRIORITY_HIGH + i; + status = pTpSimpleTryPost(simple_cb, semaphore, (TP_CALLBACK_ENVIRON *)&environment3); + ok(!status, "TpSimpleTryPost failed with status %x\n", status); + result = WaitForSingleObject(semaphore, 1000); + ok(result == WAIT_OBJECT_0, "WaitForSingleObject returned %u\n", result); + } + + environment3.CallbackPriority = 10; status = pTpSimpleTryPost(simple_cb, semaphore, (TP_CALLBACK_ENVIRON *)&environment3); - ok(!status, "TpSimpleTryPost failed with status %x\n", status); - result = WaitForSingleObject(semaphore, 1000); - ok(result == WAIT_OBJECT_0, "WaitForSingleObject returned %u\n", result); + ok(status == STATUS_INVALID_PARAMETER || broken(!status) /* Vista does not support priorities */, + "TpSimpleTryPost failed with status %x\n", status);
/* test with invalid version number */ memset(&environment, 0, sizeof(environment)); diff --git a/dlls/ntdll/threadpool.c b/dlls/ntdll/threadpool.c index bf7449cdfa..a7ad321a8b 100644 --- a/dlls/ntdll/threadpool.c +++ b/dlls/ntdll/threadpool.c @@ -123,8 +123,8 @@ struct threadpool LONG objcount; BOOL shutdown; CRITICAL_SECTION cs; - /* pool of work items, locked via .cs */ - struct list pool; + /* Pools of work items, locked via .cs, order matches TP_CALLBACK_PRIORITY - high, normal, low. */ + struct list pools[3]; RTL_CONDITION_VARIABLE update_event; /* information about worker threads, locked via .cs */ int max_workers; @@ -155,6 +155,7 @@ struct threadpool_object PTP_SIMPLE_CALLBACK finalization_callback; BOOL may_run_long; HMODULE race_dll; + TP_CALLBACK_PRIORITY priority; /* information about the group, locked via .group->cs */ struct list group_entry; BOOL is_group_member; @@ -1648,6 +1649,7 @@ static void tp_waitqueue_unlock( struct threadpool_object *wait ) static NTSTATUS tp_threadpool_alloc( struct threadpool **out ) { struct threadpool *pool; + unsigned int i;
pool = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*pool) ); if (!pool) @@ -1660,7 +1662,8 @@ static NTSTATUS tp_threadpool_alloc( struct threadpool **out ) RtlInitializeCriticalSection( &pool->cs ); pool->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool.cs");
- list_init( &pool->pool ); + for (i = 0; i < ARRAY_SIZE(pool->pools); ++i) + list_init( &pool->pools[i] ); RtlInitializeConditionVariable( &pool->update_event );
pool->max_workers = 500; @@ -1696,6 +1699,8 @@ static void tp_threadpool_shutdown( struct threadpool *pool ) */ static BOOL tp_threadpool_release( struct threadpool *pool ) { + unsigned int i; + if (interlocked_dec( &pool->refcount )) return FALSE;
@@ -1703,7 +1708,8 @@ static BOOL tp_threadpool_release( struct threadpool *pool )
assert( pool->shutdown ); assert( !pool->objcount ); - assert( list_empty( &pool->pool ) ); + for (i = 0; i < ARRAY_SIZE(pool->pools); ++i) + assert( list_empty( &pool->pools[i] ) );
pool->cs.DebugInfo->Spare[0] = 0; RtlDeleteCriticalSection( &pool->cs ); @@ -1725,7 +1731,25 @@ static NTSTATUS tp_threadpool_lock( struct threadpool **out, TP_CALLBACK_ENVIRON NTSTATUS status = STATUS_SUCCESS;
if (environment) + { + /* Validate environment parameters. */ + if (environment->Version == 3) + { + TP_CALLBACK_ENVIRON_V3 *environment3 = (TP_CALLBACK_ENVIRON_V3 *)environment; + + switch (environment3->CallbackPriority) + { + case TP_CALLBACK_PRIORITY_HIGH: + case TP_CALLBACK_PRIORITY_NORMAL: + case TP_CALLBACK_PRIORITY_LOW: + break; + default: + return STATUS_INVALID_PARAMETER; + } + } + pool = (struct threadpool *)environment->Pool; + }
if (!pool) { @@ -1860,6 +1884,7 @@ static void tp_object_initialize( struct threadpool_object *object, struct threa object->finalization_callback = NULL; object->may_run_long = 0; object->race_dll = NULL; + object->priority = TP_CALLBACK_PRIORITY_NORMAL;
memset( &object->group_entry, 0, sizeof(object->group_entry) ); object->is_group_member = FALSE; @@ -1881,6 +1906,13 @@ static void tp_object_initialize( struct threadpool_object *object, struct threa object->finalization_callback = environment->FinalizationCallback; object->may_run_long = environment->u.s.LongFunction != 0; object->race_dll = environment->RaceDll; + if (environment->Version == 3) + { + TP_CALLBACK_ENVIRON_V3 *environment_v3 = (TP_CALLBACK_ENVIRON_V3 *)environment; + + object->priority = environment_v3->CallbackPriority; + assert( object->priority < ARRAY_SIZE(pool->pools) ); + }
if (environment->ActivationContext) FIXME( "activation context not supported yet\n" ); @@ -1916,6 +1948,11 @@ static void tp_object_initialize( struct threadpool_object *object, struct threa tp_object_release( object ); }
+static void tp_object_prio_queue( struct threadpool_object *object ) +{ + list_add_tail( &object->pool->pools[object->priority], &object->pool_entry ); +} + /*********************************************************************** * tp_object_submit (internal) * @@ -1940,7 +1977,7 @@ static void tp_object_submit( struct threadpool_object *object, BOOL signaled ) /* Queue work item and increment refcount. */ interlocked_inc( &object->refcount ); if (!object->num_pending_callbacks++) - list_add_tail( &pool->pool, &object->pool_entry ); + tp_object_prio_queue( object );
/* Count how often the object was signaled. */ if (object->type == TP_OBJECT_TYPE_WAIT && signaled) @@ -2061,6 +2098,20 @@ static BOOL tp_object_release( struct threadpool_object *object ) return TRUE; }
+static struct list *threadpool_get_next_item( const struct threadpool *pool ) +{ + struct list *ptr; + unsigned int i; + + for (i = 0; i < ARRAY_SIZE(pool->pools); ++i) + { + if ((ptr = list_head( &pool->pools[i] ))) + break; + } + + return ptr; +} + /*********************************************************************** * threadpool_worker_proc (internal) */ @@ -2080,7 +2131,7 @@ static void CALLBACK threadpool_worker_proc( void *param ) pool->num_busy_workers--; for (;;) { - while ((ptr = list_head( &pool->pool ))) + while ((ptr = threadpool_get_next_item( pool ))) { struct threadpool_object *object = LIST_ENTRY( ptr, struct threadpool_object, pool_entry ); assert( object->num_pending_callbacks > 0 ); @@ -2089,7 +2140,7 @@ static void CALLBACK threadpool_worker_proc( void *param ) * the end of the pool list. Otherwise remove it from the pool. */ list_remove( &object->pool_entry ); if (--object->num_pending_callbacks) - list_add_tail( &pool->pool, &object->pool_entry ); + tp_object_prio_queue( object );
/* For wait objects check if they were signaled or have timed out. */ if (object->type == TP_OBJECT_TYPE_WAIT) @@ -2230,7 +2281,7 @@ static void CALLBACK threadpool_worker_proc( void *param ) * can be terminated. */ timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000; if (RtlSleepConditionVariableCS( &pool->update_event, &pool->cs, &timeout ) == STATUS_TIMEOUT && - !list_head( &pool->pool ) && (pool->num_workers > max( pool->min_workers, 1 ) || + !threadpool_get_next_item( pool ) && (pool->num_workers > max( pool->min_workers, 1 ) || (!pool->min_workers && !pool->objcount))) { break;