Signed-off-by: Nikolay Sivov nsivov@codeweavers.com --- dlls/rtworkq/queue.c | 391 +++++++++++++++++++++++++++++++++++--- dlls/rtworkq/rtworkq.spec | 6 +- include/rtworkq.idl | 12 ++ 3 files changed, 384 insertions(+), 25 deletions(-)
diff --git a/dlls/rtworkq/queue.c b/dlls/rtworkq/queue.c index 31a17c68ae..7d949e8e62 100644 --- a/dlls/rtworkq/queue.c +++ b/dlls/rtworkq/queue.c @@ -27,6 +27,34 @@
WINE_DEFAULT_DEBUG_CHANNEL(mfplat);
+#define FIRST_USER_QUEUE_HANDLE 5 +#define MAX_USER_QUEUE_HANDLES 124 + +#define WAIT_ITEM_KEY_MASK (0x82000000) +#define SCHEDULED_ITEM_KEY_MASK (0x80000000) + +static LONG next_item_key; + +static RTWQWORKITEM_KEY get_item_key(DWORD mask, DWORD key) +{ + return ((RTWQWORKITEM_KEY)mask << 32) | key; +} + +static RTWQWORKITEM_KEY generate_item_key(DWORD mask) +{ + return get_item_key(mask, InterlockedIncrement(&next_item_key)); +} + +struct queue_handle +{ + void *obj; + LONG refcount; + WORD generation; +}; + +static struct queue_handle user_queues[MAX_USER_QUEUE_HANDLES]; +static struct queue_handle *next_free_user_queue; + static CRITICAL_SECTION queues_section; static CRITICAL_SECTION_DEBUG queues_critsect_debug = { @@ -38,6 +66,33 @@ static CRITICAL_SECTION queues_section = { &queues_critsect_debug, -1, 0, 0, 0,
static LONG platform_lock;
+static struct queue_handle *get_queue_obj(DWORD handle) +{ + unsigned int idx = HIWORD(handle) - FIRST_USER_QUEUE_HANDLE; + + if (idx < MAX_USER_QUEUE_HANDLES && user_queues[idx].refcount) + { + if (LOWORD(handle) == user_queues[idx].generation) + return &user_queues[idx]; + } + + return NULL; +} + +/* Should be kept in sync with corresponding MFASYNC_CALLBACK_ constants. */ +enum rtwq_callback_queue_id +{ + RTWQ_CALLBACK_QUEUE_UNDEFINED = 0x00000000, + RTWQ_CALLBACK_QUEUE_STANDARD = 0x00000001, + RTWQ_CALLBACK_QUEUE_RT = 0x00000002, + RTWQ_CALLBACK_QUEUE_IO = 0x00000003, + RTWQ_CALLBACK_QUEUE_TIMER = 0x00000004, + RTWQ_CALLBACK_QUEUE_MULTITHREADED = 0x00000005, + RTWQ_CALLBACK_QUEUE_LONG_FUNCTION = 0x00000007, + RTWQ_CALLBACK_QUEUE_PRIVATE_MASK = 0xffff0000, + RTWQ_CALLBACK_QUEUE_ALL = 0xffffffff, +}; + enum system_queue_index { SYS_QUEUE_STANDARD = 0, @@ -81,10 +136,40 @@ struct queue
static struct queue system_queues[SYS_QUEUE_COUNT];
+static struct queue *get_system_queue(DWORD queue_id) +{ + switch (queue_id) + { + case RTWQ_CALLBACK_QUEUE_STANDARD: + case RTWQ_CALLBACK_QUEUE_RT: + case RTWQ_CALLBACK_QUEUE_IO: + case RTWQ_CALLBACK_QUEUE_TIMER: + case RTWQ_CALLBACK_QUEUE_MULTITHREADED: + case RTWQ_CALLBACK_QUEUE_LONG_FUNCTION: + return &system_queues[queue_id - 1]; + default: + return NULL; + } +} + static void CALLBACK standard_queue_cleanup_callback(void *object_data, void *group_data) { }
+static struct work_item * alloc_work_item(struct queue *queue, IRtwqAsyncResult *result) +{ + struct work_item *item; + + item = heap_alloc_zero(sizeof(*item)); + item->result = result; + IRtwqAsyncResult_AddRef(item->result); + item->refcount = 1; + item->queue = queue; + list_init(&item->entry); + + return item; +} + static void release_work_item(struct work_item *item) { if (InterlockedDecrement(&item->refcount) == 0) @@ -94,6 +179,12 @@ static void release_work_item(struct work_item *item) } }
+static struct work_item *grab_work_item(struct work_item *item) +{ + InterlockedIncrement(&item->refcount); + return item; +} + static void init_work_queue(RTWQ_WORKQUEUE_TYPE queue_type, struct queue *queue) { TP_CALLBACK_ENVIRON_V3 env; @@ -125,6 +216,255 @@ static void init_work_queue(RTWQ_WORKQUEUE_TYPE queue_type, struct queue *queue) FIXME("RTWQ_WINDOW_WORKQUEUE is not supported.\n"); }
+static HRESULT grab_queue(DWORD queue_id, struct queue **ret) +{ + struct queue *queue = get_system_queue(queue_id); + RTWQ_WORKQUEUE_TYPE queue_type; + struct queue_handle *entry; + + *ret = NULL; + + if (!system_queues[SYS_QUEUE_STANDARD].pool) + return RTWQ_E_SHUTDOWN; + + if (queue && queue->pool) + { + *ret = queue; + return S_OK; + } + else if (queue) + { + EnterCriticalSection(&queues_section); + switch (queue_id) + { + case RTWQ_CALLBACK_QUEUE_IO: + case RTWQ_CALLBACK_QUEUE_MULTITHREADED: + case RTWQ_CALLBACK_QUEUE_LONG_FUNCTION: + queue_type = RTWQ_MULTITHREADED_WORKQUEUE; + break; + default: + queue_type = RTWQ_STANDARD_WORKQUEUE; + } + init_work_queue(queue_type, queue); + LeaveCriticalSection(&queues_section); + *ret = queue; + return S_OK; + } + + /* Handles user queues. */ + if ((entry = get_queue_obj(queue_id))) + *ret = entry->obj; + + return *ret ? S_OK : RTWQ_E_INVALID_WORKQUEUE; +} + +static HRESULT lock_user_queue(DWORD queue) +{ + HRESULT hr = RTWQ_E_INVALID_WORKQUEUE; + struct queue_handle *entry; + + if (!(queue & RTWQ_CALLBACK_QUEUE_PRIVATE_MASK)) + return S_OK; + + EnterCriticalSection(&queues_section); + entry = get_queue_obj(queue); + if (entry && entry->refcount) + { + entry->refcount++; + hr = S_OK; + } + LeaveCriticalSection(&queues_section); + return hr; +} + +static void shutdown_queue(struct queue *queue) +{ + struct work_item *item, *item2; + + if (!queue->pool) + return; + + CloseThreadpoolCleanupGroupMembers(queue->envs[0].CleanupGroup, TRUE, NULL); + CloseThreadpool(queue->pool); + queue->pool = NULL; + + EnterCriticalSection(&queue->cs); + LIST_FOR_EACH_ENTRY_SAFE(item, item2, &queue->pending_items, struct work_item, entry) + { + list_remove(&item->entry); + release_work_item(item); + } + LeaveCriticalSection(&queue->cs); + + DeleteCriticalSection(&queue->cs); +} + +static HRESULT unlock_user_queue(DWORD queue) +{ + HRESULT hr = RTWQ_E_INVALID_WORKQUEUE; + struct queue_handle *entry; + + if (!(queue & RTWQ_CALLBACK_QUEUE_PRIVATE_MASK)) + return S_OK; + + EnterCriticalSection(&queues_section); + entry = get_queue_obj(queue); + if (entry && entry->refcount) + { + if (--entry->refcount == 0) + { + shutdown_queue((struct queue *)entry->obj); + heap_free(entry->obj); + entry->obj = next_free_user_queue; + next_free_user_queue = entry; + } + hr = S_OK; + } + LeaveCriticalSection(&queues_section); + return hr; +} + +static void CALLBACK standard_queue_worker(TP_CALLBACK_INSTANCE *instance, void *context, TP_WORK *work) +{ + struct work_item *item = context; + RTWQASYNCRESULT *result = (RTWQASYNCRESULT *)item->result; + + TRACE("result object %p.\n", result); + + IRtwqAsyncCallback_Invoke(result->pCallback, item->result); + + release_work_item(item); +} + +static HRESULT queue_submit_item(struct queue *queue, LONG priority, IRtwqAsyncResult *result) +{ + TP_CALLBACK_PRIORITY callback_priority; + struct work_item *item; + TP_WORK *work_object; + + if (!(item = alloc_work_item(queue, result))) + return E_OUTOFMEMORY; + + if (priority == 0) + callback_priority = TP_CALLBACK_PRIORITY_NORMAL; + else if (priority < 0) + callback_priority = TP_CALLBACK_PRIORITY_LOW; + else + callback_priority = TP_CALLBACK_PRIORITY_HIGH; + work_object = CreateThreadpoolWork(standard_queue_worker, item, + (TP_CALLBACK_ENVIRON *)&queue->envs[callback_priority]); + SubmitThreadpoolWork(work_object); + + TRACE("dispatched %p.\n", result); + + return S_OK; +} + +static HRESULT queue_put_work_item(DWORD queue_id, LONG priority, IRtwqAsyncResult *result) +{ + struct queue *queue; + HRESULT hr; + + if (FAILED(hr = grab_queue(queue_id, &queue))) + return hr; + + return queue_submit_item(queue, priority, result); +} + +static HRESULT invoke_async_callback(IRtwqAsyncResult *result) +{ + RTWQASYNCRESULT *result_data = (RTWQASYNCRESULT *)result; + DWORD queue = RTWQ_CALLBACK_QUEUE_STANDARD, flags; + HRESULT hr; + + if (FAILED(IRtwqAsyncCallback_GetParameters(result_data->pCallback, &flags, &queue))) + queue = RTWQ_CALLBACK_QUEUE_STANDARD; + + if (FAILED(lock_user_queue(queue))) + queue = RTWQ_CALLBACK_QUEUE_STANDARD; + + hr = queue_put_work_item(queue, 0, result); + + unlock_user_queue(queue); + + return hr; +} + +static void queue_release_pending_item(struct work_item *item) +{ + EnterCriticalSection(&item->queue->cs); + if (item->key) + { + list_remove(&item->entry); + item->key = 0; + release_work_item(item); + } + LeaveCriticalSection(&item->queue->cs); +} + +static void CALLBACK waiting_item_callback(TP_CALLBACK_INSTANCE *instance, void *context, TP_WAIT *wait, + TP_WAIT_RESULT wait_result) +{ + struct work_item *item = context; + + TRACE("result object %p.\n", item->result); + + invoke_async_callback(item->result); + + release_work_item(item); +} + +static void CALLBACK waiting_item_cancelable_callback(TP_CALLBACK_INSTANCE *instance, void *context, TP_WAIT *wait, + TP_WAIT_RESULT wait_result) +{ + struct work_item *item = context; + + TRACE("result object %p.\n", item->result); + + queue_release_pending_item(item); + + invoke_async_callback(item->result); + + release_work_item(item); +} + +static void queue_mark_item_pending(DWORD mask, struct work_item *item, RTWQWORKITEM_KEY *key) +{ + *key = generate_item_key(mask); + item->key = *key; + + EnterCriticalSection(&item->queue->cs); + list_add_tail(&item->queue->pending_items, &item->entry); + grab_work_item(item); + LeaveCriticalSection(&item->queue->cs); +} + +static HRESULT queue_submit_wait(struct queue *queue, HANDLE event, LONG priority, IRtwqAsyncResult *result, + RTWQWORKITEM_KEY *key) +{ + PTP_WAIT_CALLBACK callback; + struct work_item *item; + + if (!(item = alloc_work_item(queue, result))) + return E_OUTOFMEMORY; + + if (key) + { + queue_mark_item_pending(WAIT_ITEM_KEY_MASK, item, key); + callback = waiting_item_cancelable_callback; + } + else + callback = waiting_item_callback; + + item->u.wait_object = CreateThreadpoolWait(callback, item, + (TP_CALLBACK_ENVIRON *)&queue->envs[TP_CALLBACK_PRIORITY_NORMAL]); + SetThreadpoolWait(item->u.wait_object, event, NULL); + + TRACE("dispatched %p.\n", result); + + return S_OK; +} + struct async_result { RTWQASYNCRESULT result; @@ -342,28 +682,6 @@ HRESULT WINAPI RtwqStartup(void) return S_OK; }
-static void shutdown_queue(struct queue *queue) -{ - struct work_item *item, *item2; - - if (!queue->pool) - return; - - CloseThreadpoolCleanupGroupMembers(queue->envs[0].CleanupGroup, TRUE, NULL); - CloseThreadpool(queue->pool); - queue->pool = NULL; - - EnterCriticalSection(&queue->cs); - LIST_FOR_EACH_ENTRY_SAFE(item, item2, &queue->pending_items, struct work_item, entry) - { - list_remove(&item->entry); - release_work_item(item); - } - LeaveCriticalSection(&queue->cs); - - DeleteCriticalSection(&queue->cs); -} - static void shutdown_system_queues(void) { unsigned int i; @@ -390,3 +708,32 @@ HRESULT WINAPI RtwqShutdown(void)
return S_OK; } + +HRESULT WINAPI RtwqPutWaitingWorkItem(HANDLE event, LONG priority, IRtwqAsyncResult *result, RTWQWORKITEM_KEY *key) +{ + struct queue *queue; + HRESULT hr; + + TRACE("%p, %d, %p, %p.\n", event, priority, result, key); + + if (FAILED(hr = grab_queue(RTWQ_CALLBACK_QUEUE_TIMER, &queue))) + return hr; + + hr = queue_submit_wait(queue, event, priority, result, key); + + return hr; +} + +HRESULT WINAPI RtwqLockWorkQueue(DWORD queue) +{ + TRACE("%#x.\n", queue); + + return lock_user_queue(queue); +} + +HRESULT WINAPI RtwqUnlockWorkQueue(DWORD queue) +{ + TRACE("%#x.\n", queue); + + return unlock_user_queue(queue); +} diff --git a/dlls/rtworkq/rtworkq.spec b/dlls/rtworkq/rtworkq.spec index 8c352593ff..47e1c1b2ae 100644 --- a/dlls/rtworkq/rtworkq.spec +++ b/dlls/rtworkq/rtworkq.spec @@ -17,9 +17,9 @@ @ stub RtwqJoinWorkQueue @ stdcall RtwqLockPlatform() @ stub RtwqLockSharedWorkQueue -@ stub RtwqLockWorkQueue +@ stdcall RtwqLockWorkQueue(long) @ stub RtwqPutMultipleWaitingWorkItem -@ stub RtwqPutWaitingWorkItem +@ stdcall RtwqPutWaitingWorkItem(long long ptr ptr) @ stub RtwqPutWorkItem @ stub RtwqRegisterPlatformEvents @ stub RtwqRegisterPlatformWithMMCSS @@ -32,6 +32,6 @@ @ stdcall RtwqStartup() @ stub RtwqUnjoinWorkQueue @ stdcall RtwqUnlockPlatform() -@ stub RtwqUnlockWorkQueue +@ stdcall RtwqUnlockWorkQueue(long) @ stub RtwqUnregisterPlatformEvents @ stub RtwqUnregisterPlatformFromMMCSS diff --git a/include/rtworkq.idl b/include/rtworkq.idl index c2e8237bba..a899376e92 100644 --- a/include/rtworkq.idl +++ b/include/rtworkq.idl @@ -52,6 +52,15 @@ interface IRtwqAsyncCallback : IUnknown HRESULT Invoke([in] IRtwqAsyncResult *result); }
+cpp_quote("#define RTWQ_E_ERROR(x) ((HRESULT)(0xc00d0000L+x))") +cpp_quote("#define RTWQ_E_BUFFERTOOSMALL RTWQ_E_ERROR(14001)") +cpp_quote("#define RTWQ_E_NOT_INITIALIZED RTWQ_E_ERROR(14006)") +cpp_quote("#define RTWQ_E_UNEXPECTED RTWQ_E_ERROR(14011)") +cpp_quote("#define RTWQ_E_NOT_FOUND RTWQ_E_ERROR(14037)") +cpp_quote("#define RTWQ_E_OPERATION_CANCELLED RTWQ_E_ERROR(14061)") +cpp_quote("#define RTWQ_E_INVALID_WORKQUEUE RTWQ_E_ERROR(14079)") +cpp_quote("#define RTWQ_E_SHUTDOWN RTWQ_E_ERROR(16005)") + cpp_quote("#ifdef __WINESRC__") cpp_quote("typedef struct tagRTWQASYNCRESULT") cpp_quote("{") @@ -68,6 +77,9 @@ cpp_quote("} RTWQASYNCRESULT;")
cpp_quote("HRESULT WINAPI RtwqCreateAsyncResult(IUnknown *object, IRtwqAsyncCallback *callback, IUnknown *state, IRtwqAsyncResult **result);") cpp_quote("HRESULT WINAPI RtwqLockPlatform(void);") +cpp_quote("HRESULT WINAPI RtwqLockWorkQueue(DWORD queue);") +cpp_quote("HRESULT WINAPI RtwqPutWaitingWorkItem(HANDLE event, LONG priority, IRtwqAsyncResult *result, RTWQWORKITEM_KEY *key);") cpp_quote("HRESULT WINAPI RtwqShutdown(void);") cpp_quote("HRESULT WINAPI RtwqStartup(void);") cpp_quote("HRESULT WINAPI RtwqUnlockPlatform(void);") +cpp_quote("HRESULT WINAPI RtwqUnlockWorkQueue(DWORD queue);")