Signed-off-by: Nikolay Sivov nsivov@codeweavers.com --- dlls/mfplat/mfplat.spec | 1 + dlls/mfplat/queue.c | 288 +++++++++++++++++++++++++++++-------- dlls/mfplat/tests/mfplat.c | 5 +- 3 files changed, 233 insertions(+), 61 deletions(-)
diff --git a/dlls/mfplat/mfplat.spec b/dlls/mfplat/mfplat.spec index a218087f6a..c43746ec7b 100644 --- a/dlls/mfplat/mfplat.spec +++ b/dlls/mfplat/mfplat.spec @@ -120,6 +120,7 @@ @ stub MFJoinIoPort @ stdcall MFLockPlatform() @ stdcall MFLockWorkQueue(long) +@ stdcall MFPutWaitingWorkItem(long long ptr ptr) @ stdcall MFPutWorkItem(long ptr ptr) @ stdcall MFPutWorkItemEx(long ptr) @ stub MFRecordError diff --git a/dlls/mfplat/queue.c b/dlls/mfplat/queue.c index 8c7cf195e5..1434645f49 100644 --- a/dlls/mfplat/queue.c +++ b/dlls/mfplat/queue.c @@ -25,6 +25,7 @@
#include "wine/debug.h" #include "wine/heap.h" +#include "wine/list.h"
#include "mfplat_private.h"
@@ -33,16 +34,34 @@ WINE_DEFAULT_DEBUG_CHANNEL(mfplat); #define FIRST_USER_QUEUE_HANDLE 5 #define MAX_USER_QUEUE_HANDLES 124
+#define WAIT_ITEM_KEY_MASK (0x82000000) + +static LONG next_item_key; + +static MFWORKITEM_KEY generate_item_key(DWORD mask) +{ + return ((MFWORKITEM_KEY)mask << 32) | InterlockedIncrement(&next_item_key); +} + struct work_item { + struct list entry; LONG refcount; IMFAsyncResult *result; + struct queue *queue; + MFWORKITEM_KEY key; + union + { + TP_WAIT *wait_object; + } u; };
struct queue { TP_POOL *pool; TP_CALLBACK_ENVIRON env; + CRITICAL_SECTION cs; + struct list pending_items; };
struct queue_handle @@ -121,6 +140,8 @@ static struct work_item * alloc_work_item(struct queue *queue, IMFAsyncResult *r item->result = result; IMFAsyncResult_AddRef(item->result); item->refcount = 1; + item->queue = queue; + list_init(&item->entry);
return item; } @@ -143,6 +164,8 @@ static void init_work_queue(MFASYNC_WORKQUEUE_TYPE queue_type, struct queue *que queue->env.Pool = queue->pool; queue->env.CleanupGroup = CreateThreadpoolCleanupGroup(); queue->env.CleanupGroupCancelCallback = standard_queue_cleanup_callback; + list_init(&queue->pending_items); + InitializeCriticalSection(&queue->cs);
max_thread = (queue_type == MF_STANDARD_WORKQUEUE || queue_type == MF_WINDOW_WORKQUEUE) ? 1 : 4;
@@ -210,14 +233,68 @@ void init_system_queues(void) LeaveCriticalSection(&queues_section); }
+static HRESULT lock_user_queue(DWORD queue) +{ + HRESULT hr = MF_E_INVALID_WORKQUEUE; + struct queue_handle *entry; + + if (!(queue & MFASYNC_CALLBACK_QUEUE_PRIVATE_MASK)) + return S_OK; + + EnterCriticalSection(&queues_section); + entry = get_queue_obj(queue); + if (entry && entry->refcount) + { + entry->refcount++; + hr = S_OK; + } + LeaveCriticalSection(&queues_section); + return hr; +} + +static HRESULT unlock_user_queue(DWORD queue) +{ + HRESULT hr = MF_E_INVALID_WORKQUEUE; + struct queue_handle *entry; + + if (!(queue & MFASYNC_CALLBACK_QUEUE_PRIVATE_MASK)) + return S_OK; + + EnterCriticalSection(&queues_section); + entry = get_queue_obj(queue); + if (entry && entry->refcount) + { + if (--entry->refcount == 0) + { + entry->obj = next_free_user_queue; + next_free_user_queue = entry; + } + hr = S_OK; + } + LeaveCriticalSection(&queues_section); + return hr; +} + static void shutdown_queue(struct queue *queue) { + struct work_item *item, *item2; + if (!queue->pool) return;
CloseThreadpoolCleanupGroupMembers(queue->env.CleanupGroup, TRUE, NULL); CloseThreadpool(queue->pool); queue->pool = NULL; + + EnterCriticalSection(&queue->cs); + LIST_FOR_EACH_ENTRY_SAFE(item, item2, &queue->pending_items, struct work_item, entry) + { + list_remove(&item->entry); + release_work_item(item); + } + LeaveCriticalSection(&queue->cs); + + DeleteCriticalSection(&queue->cs); }
void shutdown_system_queues(void) @@ -234,6 +311,11 @@ void shutdown_system_queues(void) LeaveCriticalSection(&queues_section); }
+static void grab_work_item(struct work_item *item) +{ + InterlockedIncrement(&item->refcount); +} + static void CALLBACK standard_queue_worker(TP_CALLBACK_INSTANCE *instance, void *context, TP_WORK *work) { struct work_item *item = context; @@ -275,6 +357,124 @@ static HRESULT queue_put_work_item(DWORD queue_id, IMFAsyncResult *result) return hr; }
+static HRESULT invoke_async_callback(IMFAsyncResult *result) +{ + MFASYNCRESULT *result_data = (MFASYNCRESULT *)result; + DWORD queue = MFASYNC_CALLBACK_QUEUE_STANDARD, flags; + HRESULT hr; + + if (FAILED(IMFAsyncCallback_GetParameters(result_data->pCallback, &flags, &queue))) + queue = MFASYNC_CALLBACK_QUEUE_STANDARD; + + if (FAILED(lock_user_queue(queue))) + queue = MFASYNC_CALLBACK_QUEUE_STANDARD; + + hr = queue_put_work_item(queue, result); + + unlock_user_queue(queue); + + return hr; +} + +static void queue_release_pending_item(struct work_item *item) +{ + EnterCriticalSection(&item->queue->cs); + if (item->key) + { + list_remove(&item->entry); + item->key = 0; + release_work_item(item); + } + LeaveCriticalSection(&item->queue->cs); +} + +static void CALLBACK waiting_item_callback(TP_CALLBACK_INSTANCE *instance, void *context, TP_WAIT *wait, + TP_WAIT_RESULT wait_result) +{ + struct work_item *item = context; + + TRACE("result object %p.\n", item->result); + + invoke_async_callback(item->result); + + release_work_item(item); +} + +static void CALLBACK waiting_item_cancelable_callback(TP_CALLBACK_INSTANCE *instance, void *context, TP_WAIT *wait, + TP_WAIT_RESULT wait_result) +{ + struct work_item *item = context; + + TRACE("result object %p.\n", item->result); + + queue_release_pending_item(item); + + invoke_async_callback(item->result); + + release_work_item(item); +} + +static void queue_mark_item_pending(DWORD mask, struct work_item *item, MFWORKITEM_KEY *key) +{ + *key = generate_item_key(mask); + item->key = *key; + + EnterCriticalSection(&item->queue->cs); + list_add_tail(&item->queue->pending_items, &item->entry); + grab_work_item(item); + LeaveCriticalSection(&item->queue->cs); +} + +static HRESULT queue_submit_wait(struct queue *queue, HANDLE event, LONG priority, IMFAsyncResult *result, + MFWORKITEM_KEY *key) +{ + PTP_WAIT_CALLBACK callback; + struct work_item *item; + + if (!(item = alloc_work_item(queue, result))) + return E_OUTOFMEMORY; + + if (key) + { + queue_mark_item_pending(WAIT_ITEM_KEY_MASK, item, key); + callback = waiting_item_cancelable_callback; + } + else + callback = waiting_item_callback; + + item->u.wait_object = CreateThreadpoolWait(callback, item, &queue->env); + SetThreadpoolWait(item->u.wait_object, event, NULL); + + TRACE("dispatched %p.\n", result); + + return S_OK; +} + +static HRESULT queue_cancel_item(struct queue *queue, MFWORKITEM_KEY key) +{ + HRESULT hr = MF_E_NOT_FOUND; + struct work_item *item; + + EnterCriticalSection(&queue->cs); + LIST_FOR_EACH_ENTRY(item, &queue->pending_items, struct work_item, entry) + { + if (item->key == key) + { + key >>= 32; + if ((key & WAIT_ITEM_KEY_MASK) == WAIT_ITEM_KEY_MASK) + CloseThreadpoolWait(item->u.wait_object); + else + WARN("Unknown item key mask %#x.\n", (DWORD)key); + queue_release_pending_item(item); + hr = S_OK; + break; + } + } + LeaveCriticalSection(&queue->cs); + + return hr; +} + static HRESULT alloc_user_queue(MFASYNC_WORKQUEUE_TYPE queue_type, DWORD *queue_id) { struct queue_handle *entry; @@ -313,48 +513,6 @@ static HRESULT alloc_user_queue(MFASYNC_WORKQUEUE_TYPE queue_type, DWORD *queue_ return S_OK; }
-static HRESULT lock_user_queue(DWORD queue) -{ - HRESULT hr = MF_E_INVALID_WORKQUEUE; - struct queue_handle *entry; - - if (!(queue & MFASYNC_CALLBACK_QUEUE_PRIVATE_MASK)) - return S_OK; - - EnterCriticalSection(&queues_section); - entry = get_queue_obj(queue); - if (entry && entry->refcount) - { - entry->refcount++; - hr = S_OK; - } - LeaveCriticalSection(&queues_section); - return hr; -} - -static HRESULT unlock_user_queue(DWORD queue) -{ - HRESULT hr = MF_E_INVALID_WORKQUEUE; - struct queue_handle *entry; - - if (!(queue & MFASYNC_CALLBACK_QUEUE_PRIVATE_MASK)) - return S_OK; - - EnterCriticalSection(&queues_section); - entry = get_queue_obj(queue); - if (entry && entry->refcount) - { - if (--entry->refcount == 0) - { - entry->obj = next_free_user_queue; - next_free_user_queue = entry; - } - hr = S_OK; - } - LeaveCriticalSection(&queues_section); - return hr; -} - struct async_result { MFASYNCRESULT result; @@ -606,23 +764,9 @@ HRESULT WINAPI MFPutWorkItemEx(DWORD queue, IMFAsyncResult *result) */ HRESULT WINAPI MFInvokeCallback(IMFAsyncResult *result) { - MFASYNCRESULT *result_data = (MFASYNCRESULT *)result; - DWORD queue = MFASYNC_CALLBACK_QUEUE_STANDARD, flags; - HRESULT hr; - TRACE("%p.\n", result);
- if (FAILED(IMFAsyncCallback_GetParameters(result_data->pCallback, &flags, &queue))) - queue = MFASYNC_CALLBACK_QUEUE_STANDARD; - - if (FAILED(MFLockWorkQueue(queue))) - queue = MFASYNC_CALLBACK_QUEUE_STANDARD; - - hr = MFPutWorkItemEx(queue, result); - - MFUnlockWorkQueue(queue); - - return hr; + return invoke_async_callback(result); }
static HRESULT schedule_work_item(IMFAsyncResult *result, INT64 timeout, MFWORKITEM_KEY *key) @@ -662,12 +806,38 @@ HRESULT WINAPI MFScheduleWorkItem(IMFAsyncCallback *callback, IUnknown *state, I return hr; }
+/*********************************************************************** + * MFPutWaitingWorkItem (mfplat.@) + */ +HRESULT WINAPI MFPutWaitingWorkItem(HANDLE event, LONG priority, IMFAsyncResult *result, MFWORKITEM_KEY *key) +{ + struct queue *queue; + HRESULT hr; + + TRACE("%p, %d, %p, %p.\n", event, priority, result, key); + + if (FAILED(hr = grab_queue(MFASYNC_CALLBACK_QUEUE_TIMER, &queue))) + return hr; + + hr = queue_submit_wait(queue, event, priority, result, key); + + return hr; +} + /*********************************************************************** * MFCancelWorkItem (mfplat.@) */ HRESULT WINAPI MFCancelWorkItem(MFWORKITEM_KEY key) { - FIXME("%s.\n", wine_dbgstr_longlong(key)); + struct queue *queue; + HRESULT hr;
- return E_NOTIMPL; + TRACE("%s.\n", wine_dbgstr_longlong(key)); + + if (FAILED(hr = grab_queue(MFASYNC_CALLBACK_QUEUE_TIMER, &queue))) + return hr; + + hr = queue_cancel_item(queue, key); + + return hr; } diff --git a/dlls/mfplat/tests/mfplat.c b/dlls/mfplat/tests/mfplat.c index 790eeaf6d9..cd55b3973b 100644 --- a/dlls/mfplat/tests/mfplat.c +++ b/dlls/mfplat/tests/mfplat.c @@ -1060,12 +1060,11 @@ todo_wine ok(hr == S_OK, "Failed to cancel item, hr %#x.\n", hr);
hr = MFCancelWorkItem(key); -todo_wine ok(hr == MF_E_NOT_FOUND || broken(hr == S_OK) /* < win10 */, "Unexpected hr %#x.\n", hr);
if (!pMFPutWaitingWorkItem) { - skip("Waiting items are not supported.\n"); + win_skip("Waiting items are not supported.\n"); return; }
@@ -1087,9 +1086,11 @@ todo_wine IMFAsyncResult_Release(result);
hr = MFScheduleWorkItem(&callback, NULL, -5000, &key); +todo_wine ok(hr == S_OK, "Failed to schedule item, hr %#x.\n", hr);
hr = MFCancelWorkItem(key); +todo_wine ok(hr == S_OK, "Failed to cancel item, hr %#x.\n", hr);
hr = MFShutdown();