Signed-off-by: Nikolay Sivov nsivov@codeweavers.com --- dlls/rtworkq/queue.c | 173 ++++++++++++++++++++++++++++++++++++++ dlls/rtworkq/rtworkq.spec | 4 +- include/rtworkq.idl | 11 +++ 3 files changed, 186 insertions(+), 2 deletions(-)
diff --git a/dlls/rtworkq/queue.c b/dlls/rtworkq/queue.c index 7897ef5dc9..31a17c68ae 100644 --- a/dlls/rtworkq/queue.c +++ b/dlls/rtworkq/queue.c @@ -23,11 +23,108 @@ #include "rtworkq.h" #include "wine/debug.h" #include "wine/heap.h" +#include "wine/list.h"
WINE_DEFAULT_DEBUG_CHANNEL(mfplat);
+static CRITICAL_SECTION queues_section; +static CRITICAL_SECTION_DEBUG queues_critsect_debug = +{ + 0, 0, &queues_section, + { &queues_critsect_debug.ProcessLocksList, &queues_critsect_debug.ProcessLocksList }, + 0, 0, { (DWORD_PTR)(__FILE__ ": queues_section") } +}; +static CRITICAL_SECTION queues_section = { &queues_critsect_debug, -1, 0, 0, 0, 0 }; + static LONG platform_lock;
+enum system_queue_index +{ + SYS_QUEUE_STANDARD = 0, + SYS_QUEUE_RT, + SYS_QUEUE_IO, + SYS_QUEUE_TIMER, + SYS_QUEUE_MULTITHREADED, + SYS_QUEUE_DO_NOT_USE, + SYS_QUEUE_LONG_FUNCTION, + SYS_QUEUE_COUNT, +}; + +struct work_item +{ + struct list entry; + LONG refcount; + IRtwqAsyncResult *result; + struct queue *queue; + RTWQWORKITEM_KEY key; + union + { + TP_WAIT *wait_object; + TP_TIMER *timer_object; + } u; +}; + +static const TP_CALLBACK_PRIORITY priorities[] = +{ + TP_CALLBACK_PRIORITY_HIGH, + TP_CALLBACK_PRIORITY_NORMAL, + TP_CALLBACK_PRIORITY_LOW, +}; + +struct queue +{ + TP_POOL *pool; + TP_CALLBACK_ENVIRON_V3 envs[ARRAY_SIZE(priorities)]; + CRITICAL_SECTION cs; + struct list pending_items; +}; + +static struct queue system_queues[SYS_QUEUE_COUNT]; + +static void CALLBACK standard_queue_cleanup_callback(void *object_data, void *group_data) +{ +} + +static void release_work_item(struct work_item *item) +{ + if (InterlockedDecrement(&item->refcount) == 0) + { + IRtwqAsyncResult_Release(item->result); + heap_free(item); + } +} + +static void init_work_queue(RTWQ_WORKQUEUE_TYPE queue_type, struct queue *queue) +{ + TP_CALLBACK_ENVIRON_V3 env; + unsigned int max_thread, i; + + queue->pool = CreateThreadpool(NULL); + + memset(&env, 0, sizeof(env)); + env.Version = 3; + env.Size = sizeof(env); + env.Pool = queue->pool; + env.CleanupGroup = CreateThreadpoolCleanupGroup(); + env.CleanupGroupCancelCallback = standard_queue_cleanup_callback; + env.CallbackPriority = TP_CALLBACK_PRIORITY_NORMAL; + for (i = 0; i < ARRAY_SIZE(queue->envs); ++i) + { + queue->envs[i] = env; + queue->envs[i].CallbackPriority = priorities[i]; + } + list_init(&queue->pending_items); + InitializeCriticalSection(&queue->cs); + + max_thread = (queue_type == RTWQ_STANDARD_WORKQUEUE || queue_type == RTWQ_WINDOW_WORKQUEUE) ? 1 : 4; + + SetThreadpoolThreadMinimum(queue->pool, 1); + SetThreadpoolThreadMaximum(queue->pool, max_thread); + + if (queue_type == RTWQ_WINDOW_WORKQUEUE) + FIXME("RTWQ_WINDOW_WORKQUEUE is not supported.\n"); +} + struct async_result { RTWQASYNCRESULT result; @@ -217,3 +314,79 @@ HRESULT WINAPI RtwqUnlockPlatform(void)
return S_OK; } + +static void init_system_queues(void) +{ + /* Always initialize standard queue, keep the rest lazy. */ + + EnterCriticalSection(&queues_section); + + if (system_queues[SYS_QUEUE_STANDARD].pool) + { + LeaveCriticalSection(&queues_section); + return; + } + + init_work_queue(RTWQ_STANDARD_WORKQUEUE, &system_queues[SYS_QUEUE_STANDARD]); + + LeaveCriticalSection(&queues_section); +} + +HRESULT WINAPI RtwqStartup(void) +{ + if (InterlockedIncrement(&platform_lock) == 1) + { + init_system_queues(); + } + + return S_OK; +} + +static void shutdown_queue(struct queue *queue) +{ + struct work_item *item, *item2; + + if (!queue->pool) + return; + + CloseThreadpoolCleanupGroupMembers(queue->envs[0].CleanupGroup, TRUE, NULL); + CloseThreadpool(queue->pool); + queue->pool = NULL; + + EnterCriticalSection(&queue->cs); + LIST_FOR_EACH_ENTRY_SAFE(item, item2, &queue->pending_items, struct work_item, entry) + { + list_remove(&item->entry); + release_work_item(item); + } + LeaveCriticalSection(&queue->cs); + + DeleteCriticalSection(&queue->cs); +} + +static void shutdown_system_queues(void) +{ + unsigned int i; + + EnterCriticalSection(&queues_section); + + for (i = 0; i < ARRAY_SIZE(system_queues); ++i) + { + shutdown_queue(&system_queues[i]); + } + + LeaveCriticalSection(&queues_section); +} + +HRESULT WINAPI RtwqShutdown(void) +{ + if (platform_lock <= 0) + return S_OK; + + if (InterlockedExchangeAdd(&platform_lock, -1) == 1) + { + shutdown_system_queues(); + } + + return S_OK; +} diff --git a/dlls/rtworkq/rtworkq.spec b/dlls/rtworkq/rtworkq.spec index 900aa5f230..8c352593ff 100644 --- a/dlls/rtworkq/rtworkq.spec +++ b/dlls/rtworkq/rtworkq.spec @@ -28,8 +28,8 @@ @ stub RtwqSetDeadline @ stub RtwqSetDeadline2 @ stub RtwqSetLongRunning -@ stub RtwqShutdown -@ stub RtwqStartup +@ stdcall RtwqShutdown() +@ stdcall RtwqStartup() @ stub RtwqUnjoinWorkQueue @ stdcall RtwqUnlockPlatform() @ stub RtwqUnlockWorkQueue diff --git a/include/rtworkq.idl b/include/rtworkq.idl index d22b09ca50..c2e8237bba 100644 --- a/include/rtworkq.idl +++ b/include/rtworkq.idl @@ -18,6 +18,15 @@
import "unknwn.idl";
+typedef enum +{ + RTWQ_STANDARD_WORKQUEUE = 0, + RTWQ_WINDOW_WORKQUEUE = 1, + RTWQ_MULTITHREADED_WORKQUEUE = 2, +} RTWQ_WORKQUEUE_TYPE; + +typedef unsigned __int64 RTWQWORKITEM_KEY; + [ object, uuid(ac6b7889-0740-4d51-8619-905994a55cc6), @@ -59,4 +68,6 @@ cpp_quote("} RTWQASYNCRESULT;")
cpp_quote("HRESULT WINAPI RtwqCreateAsyncResult(IUnknown *object, IRtwqAsyncCallback *callback, IUnknown *state, IRtwqAsyncResult **result);") cpp_quote("HRESULT WINAPI RtwqLockPlatform(void);") +cpp_quote("HRESULT WINAPI RtwqShutdown(void);") +cpp_quote("HRESULT WINAPI RtwqStartup(void);") cpp_quote("HRESULT WINAPI RtwqUnlockPlatform(void);")