Signed-off-by: Nikolay Sivov nsivov@codeweavers.com --- dlls/rtworkq/queue.c | 162 ++++++++++++++++++++++++++++--------------- 1 file changed, 108 insertions(+), 54 deletions(-)
diff --git a/dlls/rtworkq/queue.c b/dlls/rtworkq/queue.c index 38d376342d..e5026b6298 100644 --- a/dlls/rtworkq/queue.c +++ b/dlls/rtworkq/queue.c @@ -16,6 +16,8 @@ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA */
+#include <assert.h> + #define COBJMACROS #define NONAMELESSUNION
@@ -136,13 +138,25 @@ static const TP_CALLBACK_PRIORITY priorities[] = TP_CALLBACK_PRIORITY_LOW, };
+struct queue; +struct queue_desc; + +struct queue_ops +{ + HRESULT (*init)(const struct queue_desc *desc, struct queue *queue); + BOOL (*shutdown)(struct queue *queue); + void (*submit)(struct queue *queue, struct work_item *item); +}; + struct queue_desc { RTWQ_WORKQUEUE_TYPE queue_type; + const struct queue_ops *ops; };
struct queue { + const struct queue_ops *ops; TP_POOL *pool; TP_CALLBACK_ENVIRON_V3 envs[ARRAY_SIZE(priorities)]; CRITICAL_SECTION cs; @@ -171,6 +185,88 @@ static void CALLBACK standard_queue_cleanup_callback(void *object_data, void *gr { }
+static HRESULT pool_queue_init(const struct queue_desc *desc, struct queue *queue) +{ + TP_CALLBACK_ENVIRON_V3 env; + unsigned int max_thread, i; + + queue->pool = CreateThreadpool(NULL); + + memset(&env, 0, sizeof(env)); + env.Version = 3; + env.Size = sizeof(env); + env.Pool = queue->pool; + env.CleanupGroup = CreateThreadpoolCleanupGroup(); + env.CleanupGroupCancelCallback = standard_queue_cleanup_callback; + env.CallbackPriority = TP_CALLBACK_PRIORITY_NORMAL; + for (i = 0; i < ARRAY_SIZE(queue->envs); ++i) + { + queue->envs[i] = env; + queue->envs[i].CallbackPriority = priorities[i]; + } + list_init(&queue->pending_items); + InitializeCriticalSection(&queue->cs); + + max_thread = (desc->queue_type == RTWQ_STANDARD_WORKQUEUE || desc->queue_type == RTWQ_WINDOW_WORKQUEUE) ? 1 : 4; + + SetThreadpoolThreadMinimum(queue->pool, 1); + SetThreadpoolThreadMaximum(queue->pool, max_thread); + + if (desc->queue_type == RTWQ_WINDOW_WORKQUEUE) + FIXME("RTWQ_WINDOW_WORKQUEUE is not supported.\n"); + + return S_OK; +} + +static BOOL pool_queue_shutdown(struct queue *queue) +{ + if (!queue->pool) + return FALSE; + + CloseThreadpoolCleanupGroupMembers(queue->envs[0].CleanupGroup, TRUE, NULL); + CloseThreadpool(queue->pool); + queue->pool = NULL; + + return TRUE; +} + +static void CALLBACK standard_queue_worker(TP_CALLBACK_INSTANCE *instance, void *context, TP_WORK *work) +{ + struct work_item *item = context; + RTWQASYNCRESULT *result = (RTWQASYNCRESULT *)item->result; + + TRACE("result object %p.\n", result); + + IRtwqAsyncCallback_Invoke(result->pCallback, item->result); + + IUnknown_Release(&item->IUnknown_iface); +} + +static void pool_queue_submit(struct queue *queue, struct work_item *item) +{ + TP_CALLBACK_PRIORITY callback_priority; + TP_WORK *work_object; + + if (item->priority == 0) + callback_priority = TP_CALLBACK_PRIORITY_NORMAL; + else if (item->priority < 0) + callback_priority = TP_CALLBACK_PRIORITY_LOW; + else + callback_priority = TP_CALLBACK_PRIORITY_HIGH; + work_object = CreateThreadpoolWork(standard_queue_worker, item, + (TP_CALLBACK_ENVIRON *)&queue->envs[callback_priority]); + SubmitThreadpoolWork(work_object); + + TRACE("dispatched %p.\n", item->result); +} + +static const struct queue_ops pool_queue_ops = +{ + pool_queue_init, + pool_queue_shutdown, + pool_queue_submit, +}; + static HRESULT WINAPI work_item_QueryInterface(IUnknown *iface, REFIID riid, void **obj) { if (IsEqualIID(riid, &IID_IUnknown)) @@ -235,33 +331,14 @@ static struct work_item * alloc_work_item(struct queue *queue, LONG priority, IR
static void init_work_queue(const struct queue_desc *desc, struct queue *queue) { - TP_CALLBACK_ENVIRON_V3 env; - unsigned int max_thread, i; - - queue->pool = CreateThreadpool(NULL); + assert(desc->ops != NULL);
- memset(&env, 0, sizeof(env)); - env.Version = 3; - env.Size = sizeof(env); - env.Pool = queue->pool; - env.CleanupGroup = CreateThreadpoolCleanupGroup(); - env.CleanupGroupCancelCallback = standard_queue_cleanup_callback; - env.CallbackPriority = TP_CALLBACK_PRIORITY_NORMAL; - for (i = 0; i < ARRAY_SIZE(queue->envs); ++i) + queue->ops = desc->ops; + if (SUCCEEDED(queue->ops->init(desc, queue))) { - queue->envs[i] = env; - queue->envs[i].CallbackPriority = priorities[i]; + list_init(&queue->pending_items); + InitializeCriticalSection(&queue->cs); } - list_init(&queue->pending_items); - InitializeCriticalSection(&queue->cs); - - max_thread = (desc->queue_type == RTWQ_STANDARD_WORKQUEUE || desc->queue_type == RTWQ_WINDOW_WORKQUEUE) ? 1 : 4; - - SetThreadpoolThreadMinimum(queue->pool, 1); - SetThreadpoolThreadMaximum(queue->pool, max_thread); - - if (desc->queue_type == RTWQ_WINDOW_WORKQUEUE) - FIXME("RTWQ_WINDOW_WORKQUEUE is not supported.\n"); }
static HRESULT grab_queue(DWORD queue_id, struct queue **ret) @@ -297,6 +374,7 @@ static HRESULT grab_queue(DWORD queue_id, struct queue **ret) }
desc.queue_type = queue_type; + desc.ops = &pool_queue_ops; init_work_queue(&desc, queue); LeaveCriticalSection(&queues_section); *ret = queue; @@ -333,13 +411,9 @@ static void shutdown_queue(struct queue *queue) { struct work_item *item, *item2;
- if (!queue->pool) + if (!queue->ops || !queue->ops->shutdown(queue)) return;
- CloseThreadpoolCleanupGroupMembers(queue->envs[0].CleanupGroup, TRUE, NULL); - CloseThreadpool(queue->pool); - queue->pool = NULL; - EnterCriticalSection(&queue->cs); LIST_FOR_EACH_ENTRY_SAFE(item, item2, &queue->pending_items, struct work_item, entry) { @@ -349,6 +423,8 @@ static void shutdown_queue(struct queue *queue) LeaveCriticalSection(&queue->cs);
DeleteCriticalSection(&queue->cs); + + memset(queue, 0, sizeof(*queue)); }
static HRESULT unlock_user_queue(DWORD queue) @@ -376,38 +452,14 @@ static HRESULT unlock_user_queue(DWORD queue) return hr; }
-static void CALLBACK standard_queue_worker(TP_CALLBACK_INSTANCE *instance, void *context, TP_WORK *work) -{ - struct work_item *item = context; - RTWQASYNCRESULT *result = (RTWQASYNCRESULT *)item->result; - - TRACE("result object %p.\n", result); - - IRtwqAsyncCallback_Invoke(result->pCallback, item->result); - - IUnknown_Release(&item->IUnknown_iface); -} - static HRESULT queue_submit_item(struct queue *queue, LONG priority, IRtwqAsyncResult *result) { - TP_CALLBACK_PRIORITY callback_priority; struct work_item *item; - TP_WORK *work_object;
if (!(item = alloc_work_item(queue, priority, result))) return E_OUTOFMEMORY;
- if (priority == 0) - callback_priority = TP_CALLBACK_PRIORITY_NORMAL; - else if (priority < 0) - callback_priority = TP_CALLBACK_PRIORITY_LOW; - else - callback_priority = TP_CALLBACK_PRIORITY_HIGH; - work_object = CreateThreadpoolWork(standard_queue_worker, item, - (TP_CALLBACK_ENVIRON *)&queue->envs[callback_priority]); - SubmitThreadpoolWork(work_object); - - TRACE("dispatched %p.\n", result); + queue->ops->submit(queue, item);
return S_OK; } @@ -862,6 +914,7 @@ static void init_system_queues(void) }
desc.queue_type = RTWQ_STANDARD_WORKQUEUE; + desc.ops = &pool_queue_ops; init_work_queue(&desc, &system_queues[SYS_QUEUE_STANDARD]);
LeaveCriticalSection(&queues_section); @@ -1114,6 +1167,7 @@ HRESULT WINAPI RtwqAllocateWorkQueue(RTWQ_WORKQUEUE_TYPE queue_type, DWORD *queu TRACE("%d, %p.\n", queue_type, queue);
desc.queue_type = queue_type; + desc.ops = &pool_queue_ops; return alloc_user_queue(&desc, queue); }