Signed-off-by: Nikolay Sivov nsivov@codeweavers.com --- dlls/mfplat/queue.c | 150 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 147 insertions(+), 3 deletions(-)
diff --git a/dlls/mfplat/queue.c b/dlls/mfplat/queue.c index 3b599840cb..8c7cf195e5 100644 --- a/dlls/mfplat/queue.c +++ b/dlls/mfplat/queue.c @@ -33,9 +33,16 @@ WINE_DEFAULT_DEBUG_CHANNEL(mfplat); #define FIRST_USER_QUEUE_HANDLE 5 #define MAX_USER_QUEUE_HANDLES 124
+struct work_item +{ + LONG refcount; + IMFAsyncResult *result; +}; + struct queue { TP_POOL *pool; + TP_CALLBACK_ENVIRON env; };
struct queue_handle @@ -86,9 +93,104 @@ enum system_queue_index
static struct queue system_queues[SYS_QUEUE_COUNT];
+static struct queue *get_system_queue(DWORD queue_id) +{ + switch (queue_id) + { + case MFASYNC_CALLBACK_QUEUE_STANDARD: + case MFASYNC_CALLBACK_QUEUE_RT: + case MFASYNC_CALLBACK_QUEUE_IO: + case MFASYNC_CALLBACK_QUEUE_TIMER: + case MFASYNC_CALLBACK_QUEUE_MULTITHREADED: + case MFASYNC_CALLBACK_QUEUE_LONG_FUNCTION: + return &system_queues[queue_id - 1]; + default: + return NULL; + } +} + +static void CALLBACK standard_queue_cleanup_callback(void *object_data, void *group_data) +{ +} + +static struct work_item * alloc_work_item(struct queue *queue, IMFAsyncResult *result) +{ + struct work_item *item; + + item = heap_alloc_zero(sizeof(*item)); + item->result = result; + IMFAsyncResult_AddRef(item->result); + item->refcount = 1; + + return item; +} + +static void release_work_item(struct work_item *item) +{ + if (InterlockedDecrement(&item->refcount) == 0) + { + IMFAsyncResult_Release(item->result); + heap_free(item); + } +} + static void init_work_queue(MFASYNC_WORKQUEUE_TYPE queue_type, struct queue *queue) { + unsigned int max_thread; + queue->pool = CreateThreadpool(NULL); + queue->env.Version = 1; + queue->env.Pool = queue->pool; + queue->env.CleanupGroup = CreateThreadpoolCleanupGroup(); + queue->env.CleanupGroupCancelCallback = standard_queue_cleanup_callback; + + max_thread = (queue_type == MF_STANDARD_WORKQUEUE || queue_type == MF_WINDOW_WORKQUEUE) ? 1 : 4; + + SetThreadpoolThreadMinimum(queue->pool, 1); + SetThreadpoolThreadMaximum(queue->pool, max_thread); + + if (queue_type == MF_WINDOW_WORKQUEUE) + FIXME("MF_WINDOW_WORKQUEUE is not supported.\n"); +} + +static HRESULT grab_queue(DWORD queue_id, struct queue **ret) +{ + struct queue *queue = get_system_queue(queue_id); + MFASYNC_WORKQUEUE_TYPE queue_type; + struct queue_handle *entry; + + if (!system_queues[SYS_QUEUE_STANDARD].pool) + return MF_E_SHUTDOWN; + + if (queue && queue->pool) + { + *ret = queue; + return S_OK; + } + else if (queue) + { + EnterCriticalSection(&queues_section); + switch (queue_id) + { + case MFASYNC_CALLBACK_QUEUE_IO: + case MFASYNC_CALLBACK_QUEUE_MULTITHREADED: + case MFASYNC_CALLBACK_QUEUE_LONG_FUNCTION: + queue_type = MF_MULTITHREADED_WORKQUEUE; + break; + default: + queue_type = MF_STANDARD_WORKQUEUE; + } + init_work_queue(queue_type, queue); + LeaveCriticalSection(&queues_section); + *ret = queue; + return S_OK; + } + + /* Handles user queues. */ + if ((entry = get_queue_obj(queue_id))) + *ret = entry->obj; + + return *ret ? S_OK : MF_E_INVALID_WORKQUEUE; }
void init_system_queues(void) @@ -113,6 +215,7 @@ static void shutdown_queue(struct queue *queue) if (!queue->pool) return;
+ CloseThreadpoolCleanupGroupMembers(queue->env.CleanupGroup, TRUE, NULL); CloseThreadpool(queue->pool); queue->pool = NULL; } @@ -131,6 +234,47 @@ void shutdown_system_queues(void) LeaveCriticalSection(&queues_section); }
+static void CALLBACK standard_queue_worker(TP_CALLBACK_INSTANCE *instance, void *context, TP_WORK *work) +{ + struct work_item *item = context; + MFASYNCRESULT *result = (MFASYNCRESULT *)item->result; + + TRACE("result object %p.\n", result); + + IMFAsyncCallback_Invoke(result->pCallback, item->result); + + release_work_item(item); +} + +static HRESULT queue_submit_item(struct queue *queue, IMFAsyncResult *result) +{ + struct work_item *item; + TP_WORK *work_object; + + if (!(item = alloc_work_item(queue, result))) + return E_OUTOFMEMORY; + + work_object = CreateThreadpoolWork(standard_queue_worker, item, &queue->env); + SubmitThreadpoolWork(work_object); + + TRACE("dispatched %p.\n", result); + + return S_OK; +} + +static HRESULT queue_put_work_item(DWORD queue_id, IMFAsyncResult *result) +{ + struct queue *queue; + HRESULT hr; + + if (FAILED(hr = grab_queue(queue_id, &queue))) + return hr; + + hr = queue_submit_item(queue, result); + + return hr; +} + static HRESULT alloc_user_queue(MFASYNC_WORKQUEUE_TYPE queue_type, DWORD *queue_id) { struct queue_handle *entry; @@ -440,7 +584,7 @@ HRESULT WINAPI MFPutWorkItem(DWORD queue, IMFAsyncCallback *callback, IUnknown * if (FAILED(hr = MFCreateAsyncResult(NULL, callback, state, &result))) return hr;
- hr = MFPutWorkItemEx(queue, result); + hr = queue_put_work_item(queue, result);
IMFAsyncResult_Release(result);
@@ -452,9 +596,9 @@ HRESULT WINAPI MFPutWorkItem(DWORD queue, IMFAsyncCallback *callback, IUnknown * */ HRESULT WINAPI MFPutWorkItemEx(DWORD queue, IMFAsyncResult *result) { - FIXME("%#x, %p\n", queue, result); + TRACE("%#x, %p\n", queue, result);
- return E_NOTIMPL; + return queue_put_work_item(queue, result); }
/***********************************************************************