Signed-off-by: Nikolay Sivov nsivov@codeweavers.com --- dlls/rtworkq/queue.c | 327 ++++++++++++++++++++++++++++++++------ dlls/rtworkq/rtworkq.spec | 2 +- include/rtworkq.idl | 1 + 3 files changed, 280 insertions(+), 50 deletions(-)
diff --git a/dlls/rtworkq/queue.c b/dlls/rtworkq/queue.c index e5026b6298..65007602c7 100644 --- a/dlls/rtworkq/queue.c +++ b/dlls/rtworkq/queue.c @@ -97,6 +97,16 @@ enum rtwq_callback_queue_id RTWQ_CALLBACK_QUEUE_ALL = 0xffffffff, };
+/* Should be kept in sync with corresponding MFASYNC_ constants. */ +enum rtwq_callback_flags +{ + RTWQ_FAST_IO_PROCESSING_CALLBACK = 0x00000001, + RTWQ_SIGNAL_CALLBACK = 0x00000002, + RTWQ_BLOCKING_CALLBACK = 0x00000004, + RTWQ_REPLY_CALLBACK = 0x00000008, + RTWQ_LOCALIZE_REMOTE_CALLBACK = 0x00000010, +}; + enum system_queue_index { SYS_QUEUE_STANDARD = 0, @@ -115,10 +125,12 @@ struct work_item LONG refcount; struct list entry; IRtwqAsyncResult *result; + IRtwqAsyncResult *reply_result; struct queue *queue; RTWQWORKITEM_KEY key; LONG priority; DWORD flags; + PTP_SIMPLE_CALLBACK finalization_callback; union { TP_WAIT *wait_object; @@ -152,15 +164,123 @@ struct queue_desc { RTWQ_WORKQUEUE_TYPE queue_type; const struct queue_ops *ops; + DWORD target_queue; };
struct queue { + IRtwqAsyncCallback IRtwqAsyncCallback_iface; const struct queue_ops *ops; TP_POOL *pool; TP_CALLBACK_ENVIRON_V3 envs[ARRAY_SIZE(priorities)]; CRITICAL_SECTION cs; struct list pending_items; + DWORD id; + /* Data used for serial queues only. */ + PTP_SIMPLE_CALLBACK finalization_callback; + DWORD target_queue; +}; + +static void shutdown_queue(struct queue *queue); + +static HRESULT lock_user_queue(DWORD queue) +{ + HRESULT hr = RTWQ_E_INVALID_WORKQUEUE; + struct queue_handle *entry; + + if (!(queue & RTWQ_CALLBACK_QUEUE_PRIVATE_MASK)) + return S_OK; + + EnterCriticalSection(&queues_section); + entry = get_queue_obj(queue); + if (entry && entry->refcount) + { + entry->refcount++; + hr = S_OK; + } + LeaveCriticalSection(&queues_section); + return hr; +} + +static HRESULT unlock_user_queue(DWORD queue) +{ + HRESULT hr = RTWQ_E_INVALID_WORKQUEUE; + struct queue_handle *entry; + + if (!(queue & RTWQ_CALLBACK_QUEUE_PRIVATE_MASK)) + return S_OK; + + EnterCriticalSection(&queues_section); + entry = get_queue_obj(queue); + if (entry && entry->refcount) + { + if (--entry->refcount == 0) + { + shutdown_queue((struct queue *)entry->obj); + heap_free(entry->obj); + entry->obj = next_free_user_queue; + next_free_user_queue = entry; + } + hr = S_OK; + } + LeaveCriticalSection(&queues_section); + return hr; +} + +static struct queue *queue_impl_from_IRtwqAsyncCallback(IRtwqAsyncCallback *iface) +{ + return CONTAINING_RECORD(iface, struct queue, IRtwqAsyncCallback_iface); +} + +static HRESULT WINAPI queue_serial_callback_QueryInterface(IRtwqAsyncCallback *iface, REFIID riid, void **obj) +{ + if (IsEqualIID(riid, &IID_IRtwqAsyncCallback) || + IsEqualIID(riid, &IID_IUnknown)) + { + *obj = iface; + IRtwqAsyncCallback_AddRef(iface); + return S_OK; + } + + *obj = NULL; + return E_NOINTERFACE; +} + +static ULONG WINAPI queue_serial_callback_AddRef(IRtwqAsyncCallback *iface) +{ + return 2; +} + +static ULONG WINAPI queue_serial_callback_Release(IRtwqAsyncCallback *iface) +{ + return 1; +} + +static HRESULT WINAPI queue_serial_callback_GetParameters(IRtwqAsyncCallback *iface, DWORD *flags, DWORD *queue_id) +{ + struct queue *queue = queue_impl_from_IRtwqAsyncCallback(iface); + + *flags = 0; + *queue_id = queue->id; + + return S_OK; +} + +static HRESULT WINAPI queue_serial_callback_Invoke(IRtwqAsyncCallback *iface, IRtwqAsyncResult *result) +{ + /* Reply callback won't be called in a regular way, pending items and chained queues will make it + unnecessary complicated to reach actual work queue that's able to execute this item. Instead + serial queues are cleaned up right away on submit(). */ + return S_OK; +} + +static const IRtwqAsyncCallbackVtbl queue_serial_callback_vtbl = +{ + queue_serial_callback_QueryInterface, + queue_serial_callback_AddRef, + queue_serial_callback_Release, + queue_serial_callback_GetParameters, + queue_serial_callback_Invoke, };
static struct queue system_queues[SYS_QUEUE_COUNT]; @@ -181,6 +301,8 @@ static struct queue *get_system_queue(DWORD queue_id) } }
+static HRESULT grab_queue(DWORD queue_id, struct queue **ret); + static void CALLBACK standard_queue_cleanup_callback(void *object_data, void *group_data) { } @@ -237,7 +359,11 @@ static void CALLBACK standard_queue_worker(TP_CALLBACK_INSTANCE *instance, void
TRACE("result object %p.\n", result);
- IRtwqAsyncCallback_Invoke(result->pCallback, item->result); + /* Submitting from serial queue in reply mode, use different result object acting as receipt token. + It's submitted to user callback still, but when invoked, special serial queue callback will be used + to ensure correct destination queue. */ + + IRtwqAsyncCallback_Invoke(result->pCallback, item->reply_result ? item->reply_result : item->result);
IUnknown_Release(&item->IUnknown_iface); } @@ -245,6 +371,7 @@ static void CALLBACK standard_queue_worker(TP_CALLBACK_INSTANCE *instance, void static void pool_queue_submit(struct queue *queue, struct work_item *item) { TP_CALLBACK_PRIORITY callback_priority; + TP_CALLBACK_ENVIRON_V3 env; TP_WORK *work_object;
if (item->priority == 0) @@ -253,8 +380,14 @@ static void pool_queue_submit(struct queue *queue, struct work_item *item) callback_priority = TP_CALLBACK_PRIORITY_LOW; else callback_priority = TP_CALLBACK_PRIORITY_HIGH; - work_object = CreateThreadpoolWork(standard_queue_worker, item, - (TP_CALLBACK_ENVIRON *)&queue->envs[callback_priority]); + + env = queue->envs[callback_priority]; + env.FinalizationCallback = item->finalization_callback; + /* Worker pool callback will release one reference. Grab one more to keep object alive when + we need finalization callback. */ + if (item->finalization_callback) + IUnknown_AddRef(&item->IUnknown_iface); + work_object = CreateThreadpoolWork(standard_queue_worker, item, (TP_CALLBACK_ENVIRON *)&env); SubmitThreadpoolWork(work_object);
TRACE("dispatched %p.\n", item->result); @@ -267,6 +400,129 @@ static const struct queue_ops pool_queue_ops = pool_queue_submit, };
+static struct work_item * serial_queue_get_next(struct queue *queue, struct work_item *item) +{ + struct work_item *next_item = NULL; + + list_remove(&item->entry); + if (!list_empty(&item->queue->pending_items)) + next_item = LIST_ENTRY(list_head(&item->queue->pending_items), struct work_item, entry); + + return next_item; +} + +static void CALLBACK serial_queue_finalization_callback(PTP_CALLBACK_INSTANCE instance, void *user_data) +{ + struct work_item *item = (struct work_item *)user_data, *next_item; + struct queue *target_queue, *queue = item->queue; + HRESULT hr; + + EnterCriticalSection(&queue->cs); + + if ((next_item = serial_queue_get_next(queue, item))) + { + if (SUCCEEDED(hr = grab_queue(queue->target_queue, &target_queue))) + target_queue->ops->submit(target_queue, next_item); + else + WARN("Failed to grab queue for id %#x, hr %#x.\n", queue->target_queue, hr); + } + + LeaveCriticalSection(&queue->cs); + + IUnknown_Release(&item->IUnknown_iface); +} + +static HRESULT serial_queue_init(const struct queue_desc *desc, struct queue *queue) +{ + queue->IRtwqAsyncCallback_iface.lpVtbl = &queue_serial_callback_vtbl; + queue->target_queue = desc->target_queue; + lock_user_queue(queue->target_queue); + queue->finalization_callback = serial_queue_finalization_callback; + + return S_OK; +} + +static BOOL serial_queue_shutdown(struct queue *queue) +{ + unlock_user_queue(queue->target_queue); + + return TRUE; +} + +static struct work_item * serial_queue_is_ack_token(struct queue *queue, struct work_item *item) +{ + RTWQASYNCRESULT *async_result = (RTWQASYNCRESULT *)item->result; + struct work_item *head; + + if (list_empty(&queue->pending_items)) + return NULL; + + head = LIST_ENTRY(list_head(&queue->pending_items), struct work_item, entry); + if (head->reply_result == item->result && async_result->pCallback == &queue->IRtwqAsyncCallback_iface) + return head; + + return NULL; +} + +static void serial_queue_submit(struct queue *queue, struct work_item *item) +{ + struct work_item *head, *next_item = NULL; + struct queue *target_queue; + HRESULT hr; + + /* In reply mode queue will advance when 'reply_result' is invoked, in regular mode it will advance automatically, + via finalization callback. */ + + if (item->flags & RTWQ_REPLY_CALLBACK) + { + if (FAILED(hr = RtwqCreateAsyncResult(NULL, &queue->IRtwqAsyncCallback_iface, NULL, &item->reply_result))) + WARN("Failed to create reply object, hr %#x.\n", hr); + } + else + item->finalization_callback = queue->finalization_callback; + + /* Serial queues could be chained together, detach from current queue before transitioning item to this one. + Items are not detached when submitted to pool queues, because pool queues won't forward them further. */ + EnterCriticalSection(&item->queue->cs); + list_remove(&item->entry); + LeaveCriticalSection(&item->queue->cs); + + EnterCriticalSection(&queue->cs); + + item->queue = queue; + + if ((head = serial_queue_is_ack_token(queue, item))) + { + /* Ack receipt token - pop waiting item, advance. */ + next_item = serial_queue_get_next(queue, head); + IUnknown_Release(&head->IUnknown_iface); + } + else + { + if (list_empty(&queue->pending_items)) + next_item = item; + list_add_tail(&queue->pending_items, &item->entry); + IUnknown_AddRef(&item->IUnknown_iface); + } + + if (next_item) + { + if (SUCCEEDED(hr = grab_queue(queue->target_queue, &target_queue))) + target_queue->ops->submit(target_queue, next_item); + else + WARN("Failed to grab queue for id %#x, hr %#x.\n", queue->target_queue, hr); + } + + LeaveCriticalSection(&queue->cs); +} + +static const struct queue_ops serial_queue_ops = +{ + serial_queue_init, + serial_queue_shutdown, + serial_queue_submit, +}; + static HRESULT WINAPI work_item_QueryInterface(IUnknown *iface, REFIID riid, void **obj) { if (IsEqualIID(riid, &IID_IUnknown)) @@ -293,8 +549,10 @@ static ULONG WINAPI work_item_Release(IUnknown *iface)
if (!refcount) { - IRtwqAsyncResult_Release(item->result); - heap_free(item); + if (item->reply_result) + IRtwqAsyncResult_Release(item->reply_result); + IRtwqAsyncResult_Release(item->result); + heap_free(item); }
return refcount; @@ -375,6 +633,7 @@ static HRESULT grab_queue(DWORD queue_id, struct queue **ret)
desc.queue_type = queue_type; desc.ops = &pool_queue_ops; + desc.target_queue = 0; init_work_queue(&desc, queue); LeaveCriticalSection(&queues_section); *ret = queue; @@ -388,25 +647,6 @@ static HRESULT grab_queue(DWORD queue_id, struct queue **ret) return *ret ? S_OK : RTWQ_E_INVALID_WORKQUEUE; }
-static HRESULT lock_user_queue(DWORD queue) -{ - HRESULT hr = RTWQ_E_INVALID_WORKQUEUE; - struct queue_handle *entry; - - if (!(queue & RTWQ_CALLBACK_QUEUE_PRIVATE_MASK)) - return S_OK; - - EnterCriticalSection(&queues_section); - entry = get_queue_obj(queue); - if (entry && entry->refcount) - { - entry->refcount++; - hr = S_OK; - } - LeaveCriticalSection(&queues_section); - return hr; -} - static void shutdown_queue(struct queue *queue) { struct work_item *item, *item2; @@ -427,31 +667,6 @@ static void shutdown_queue(struct queue *queue) memset(queue, 0, sizeof(*queue)); }
-static HRESULT unlock_user_queue(DWORD queue) -{ - HRESULT hr = RTWQ_E_INVALID_WORKQUEUE; - struct queue_handle *entry; - - if (!(queue & RTWQ_CALLBACK_QUEUE_PRIVATE_MASK)) - return S_OK; - - EnterCriticalSection(&queues_section); - entry = get_queue_obj(queue); - if (entry && entry->refcount) - { - if (--entry->refcount == 0) - { - shutdown_queue((struct queue *)entry->obj); - heap_free(entry->obj); - entry->obj = next_free_user_queue; - next_free_user_queue = entry; - } - hr = S_OK; - } - LeaveCriticalSection(&queues_section); - return hr; -} - static HRESULT queue_submit_item(struct queue *queue, LONG priority, IRtwqAsyncResult *result) { struct work_item *item; @@ -915,6 +1130,7 @@ static void init_system_queues(void)
desc.queue_type = RTWQ_STANDARD_WORKQUEUE; desc.ops = &pool_queue_ops; + desc.target_queue = 0; init_work_queue(&desc, &system_queues[SYS_QUEUE_STANDARD]);
LeaveCriticalSection(&queues_section); @@ -1168,6 +1384,7 @@ HRESULT WINAPI RtwqAllocateWorkQueue(RTWQ_WORKQUEUE_TYPE queue_type, DWORD *queu
desc.queue_type = queue_type; desc.ops = &pool_queue_ops; + desc.target_queue = 0; return alloc_user_queue(&desc, queue); }
@@ -1233,3 +1450,15 @@ HRESULT WINAPI RtwqCancelDeadline(HANDLE request)
return E_NOTIMPL; } + +HRESULT WINAPI RtwqAllocateSerialWorkQueue(DWORD target_queue, DWORD *queue) +{ + struct queue_desc desc; + + TRACE("%#x, %p.\n", target_queue, queue); + + desc.queue_type = RTWQ_STANDARD_WORKQUEUE; + desc.ops = &serial_queue_ops; + desc.target_queue = target_queue; + return alloc_user_queue(&desc, queue); +} diff --git a/dlls/rtworkq/rtworkq.spec b/dlls/rtworkq/rtworkq.spec index 1a8909e098..adc05d679d 100644 --- a/dlls/rtworkq/rtworkq.spec +++ b/dlls/rtworkq/rtworkq.spec @@ -1,5 +1,5 @@ @ stdcall RtwqAddPeriodicCallback(ptr ptr ptr) -@ stub RtwqAllocateSerialWorkQueue +@ stdcall RtwqAllocateSerialWorkQueue(long ptr) @ stdcall RtwqAllocateWorkQueue(long ptr) @ stub RtwqBeginRegisterWorkQueueWithMMCSS @ stub RtwqBeginUnregisterWorkQueueWithMMCSS diff --git a/include/rtworkq.idl b/include/rtworkq.idl index 325bebfde5..1a452c3edc 100644 --- a/include/rtworkq.idl +++ b/include/rtworkq.idl @@ -78,6 +78,7 @@ cpp_quote("} RTWQASYNCRESULT;") cpp_quote("typedef void (WINAPI *RTWQPERIODICCALLBACK)(IUnknown *context);")
cpp_quote("HRESULT WINAPI RtwqAddPeriodicCallback(RTWQPERIODICCALLBACK callback, IUnknown *context, DWORD *key);") +cpp_quote("HRESULT WINAPI RtwqAllocateSerialWorkQueue(DWORD target_queue, DWORD *queue);") cpp_quote("HRESULT WINAPI RtwqAllocateWorkQueue(RTWQ_WORKQUEUE_TYPE queue_type, DWORD *queue);") cpp_quote("HRESULT WINAPI RtwqCancelDeadline(HANDLE request);") cpp_quote("HRESULT WINAPI RtwqCancelWorkItem(RTWQWORKITEM_KEY key);")