Signed-off-by: Nikolay Sivov nsivov@codeweavers.com --- dlls/mfplat/mfplat.spec | 2 + dlls/mfplat/queue.c | 92 ++++++++++++++++++++++++++++++++--------- include/mfapi.h | 2 + 3 files changed, 77 insertions(+), 19 deletions(-)
diff --git a/dlls/mfplat/mfplat.spec b/dlls/mfplat/mfplat.spec index c47af72ed8..be5d8b2205 100644 --- a/dlls/mfplat/mfplat.spec +++ b/dlls/mfplat/mfplat.spec @@ -123,7 +123,9 @@ @ stdcall MFLockWorkQueue(long) @ stdcall MFPutWaitingWorkItem(long long ptr ptr) @ stdcall MFPutWorkItem(long ptr ptr) +@ stdcall MFPutWorkItem2(long long ptr ptr) @ stdcall MFPutWorkItemEx(long ptr) +@ stdcall MFPutWorkItemEx2(long long ptr) @ stub MFRecordError @ stdcall MFRemovePeriodicCallback(long) @ stdcall MFScheduleWorkItem(ptr ptr int64 ptr) diff --git a/dlls/mfplat/queue.c b/dlls/mfplat/queue.c index e579168770..c2cc6233b5 100644 --- a/dlls/mfplat/queue.c +++ b/dlls/mfplat/queue.c @@ -59,10 +59,17 @@ struct work_item } u; };
+static const TP_CALLBACK_PRIORITY priorities[] = +{ + TP_CALLBACK_PRIORITY_HIGH, + TP_CALLBACK_PRIORITY_NORMAL, + TP_CALLBACK_PRIORITY_LOW, +}; + struct queue { TP_POOL *pool; - TP_CALLBACK_ENVIRON_V3 env; + TP_CALLBACK_ENVIRON_V3 envs[ARRAY_SIZE(priorities)]; CRITICAL_SECTION cs; struct list pending_items; }; @@ -160,16 +167,23 @@ static void release_work_item(struct work_item *item)
static void init_work_queue(MFASYNC_WORKQUEUE_TYPE queue_type, struct queue *queue) { - unsigned int max_thread; + TP_CALLBACK_ENVIRON_V3 env; + unsigned int max_thread, i;
queue->pool = CreateThreadpool(NULL); - memset(&queue->env, 0, sizeof(queue->env)); - queue->env.Version = 3; - queue->env.Size = sizeof(queue->env); - queue->env.Pool = queue->pool; - queue->env.CleanupGroup = CreateThreadpoolCleanupGroup(); - queue->env.CleanupGroupCancelCallback = standard_queue_cleanup_callback; - queue->env.CallbackPriority = TP_CALLBACK_PRIORITY_NORMAL; + + memset(&env, 0, sizeof(env)); + env.Version = 3; + env.Size = sizeof(env); + env.Pool = queue->pool; + env.CleanupGroup = CreateThreadpoolCleanupGroup(); + env.CleanupGroupCancelCallback = standard_queue_cleanup_callback; + env.CallbackPriority = TP_CALLBACK_PRIORITY_NORMAL; + for (i = 0; i < ARRAY_SIZE(queue->envs); ++i) + { + queue->envs[i] = env; + queue->envs[i].CallbackPriority = priorities[i]; + } list_init(&queue->pending_items); InitializeCriticalSection(&queue->cs);
@@ -267,7 +281,7 @@ static void shutdown_queue(struct queue *queue) if (!queue->pool) return;
- CloseThreadpoolCleanupGroupMembers(queue->env.CleanupGroup, TRUE, NULL); + CloseThreadpoolCleanupGroupMembers(queue->envs[0].CleanupGroup, TRUE, NULL); CloseThreadpool(queue->pool); queue->pool = NULL;
@@ -339,15 +353,23 @@ static void CALLBACK standard_queue_worker(TP_CALLBACK_INSTANCE *instance, void release_work_item(item); }
-static HRESULT queue_submit_item(struct queue *queue, IMFAsyncResult *result) +static HRESULT queue_submit_item(struct queue *queue, LONG priority, IMFAsyncResult *result) { + TP_CALLBACK_PRIORITY callback_priority; struct work_item *item; TP_WORK *work_object;
if (!(item = alloc_work_item(queue, result))) return E_OUTOFMEMORY;
- work_object = CreateThreadpoolWork(standard_queue_worker, item, (TP_CALLBACK_ENVIRON *)&queue->env); + if (priority == 0) + callback_priority = TP_CALLBACK_PRIORITY_NORMAL; + else if (priority < 0) + callback_priority = TP_CALLBACK_PRIORITY_LOW; + else + callback_priority = TP_CALLBACK_PRIORITY_HIGH; + work_object = CreateThreadpoolWork(standard_queue_worker, item, + (TP_CALLBACK_ENVIRON *)&queue->envs[callback_priority]); SubmitThreadpoolWork(work_object);
TRACE("dispatched %p.\n", result); @@ -355,7 +377,7 @@ static HRESULT queue_submit_item(struct queue *queue, IMFAsyncResult *result) return S_OK; }
-static HRESULT queue_put_work_item(DWORD queue_id, IMFAsyncResult *result) +static HRESULT queue_put_work_item(DWORD queue_id, LONG priority, IMFAsyncResult *result) { struct queue *queue; HRESULT hr; @@ -363,7 +385,7 @@ static HRESULT queue_put_work_item(DWORD queue_id, IMFAsyncResult *result) if (FAILED(hr = grab_queue(queue_id, &queue))) return hr;
- hr = queue_submit_item(queue, result); + hr = queue_submit_item(queue, priority, result);
return hr; } @@ -380,7 +402,7 @@ static HRESULT invoke_async_callback(IMFAsyncResult *result) if (FAILED(lock_user_queue(queue))) queue = MFASYNC_CALLBACK_QUEUE_STANDARD;
- hr = queue_put_work_item(queue, result); + hr = queue_put_work_item(queue, 0, result);
unlock_user_queue(queue);
@@ -486,7 +508,8 @@ static HRESULT queue_submit_wait(struct queue *queue, HANDLE event, LONG priorit else callback = waiting_item_callback;
- item->u.wait_object = CreateThreadpoolWait(callback, item, (TP_CALLBACK_ENVIRON *)&queue->env); + item->u.wait_object = CreateThreadpoolWait(callback, item, + (TP_CALLBACK_ENVIRON *)&queue->envs[TP_CALLBACK_PRIORITY_NORMAL]); SetThreadpoolWait(item->u.wait_object, event, NULL);
TRACE("dispatched %p.\n", result); @@ -519,7 +542,8 @@ static HRESULT queue_submit_timer(struct queue *queue, IMFAsyncResult *result, I filetime.dwLowDateTime = t.u.LowPart; filetime.dwHighDateTime = t.u.HighPart;
- item->u.timer_object = CreateThreadpoolTimer(callback, item, (TP_CALLBACK_ENVIRON *)&queue->env); + item->u.timer_object = CreateThreadpoolTimer(callback, item, + (TP_CALLBACK_ENVIRON *)&queue->envs[TP_CALLBACK_PRIORITY_NORMAL]); SetThreadpoolTimer(item->u.timer_object, &filetime, period, 0);
TRACE("dispatched %p.\n", result); @@ -828,7 +852,27 @@ HRESULT WINAPI MFPutWorkItem(DWORD queue, IMFAsyncCallback *callback, IUnknown * if (FAILED(hr = MFCreateAsyncResult(NULL, callback, state, &result))) return hr;
- hr = queue_put_work_item(queue, result); + hr = queue_put_work_item(queue, 0, result); + + IMFAsyncResult_Release(result); + + return hr; +} + +/*********************************************************************** + * MFPutWorkItem2 (mfplat.@) + */ +HRESULT WINAPI MFPutWorkItem2(DWORD queue, LONG priority, IMFAsyncCallback *callback, IUnknown *state) +{ + IMFAsyncResult *result; + HRESULT hr; + + TRACE("%#x, %d, %p, %p.\n", queue, priority, callback, state); + + if (FAILED(hr = MFCreateAsyncResult(NULL, callback, state, &result))) + return hr; + + hr = queue_put_work_item(queue, priority, result);
IMFAsyncResult_Release(result);
@@ -842,7 +886,17 @@ HRESULT WINAPI MFPutWorkItemEx(DWORD queue, IMFAsyncResult *result) { TRACE("%#x, %p\n", queue, result);
- return queue_put_work_item(queue, result); + return queue_put_work_item(queue, 0, result); +} + +/*********************************************************************** + * MFPutWorkItemEx2 (mfplat.@) + */ +HRESULT WINAPI MFPutWorkItemEx2(DWORD queue, LONG priority, IMFAsyncResult *result) +{ + TRACE("%#x, %d, %p\n", queue, priority, result); + + return queue_put_work_item(queue, priority, result); }
/*********************************************************************** diff --git a/include/mfapi.h b/include/mfapi.h index fd5fbebc58..21876fe574 100644 --- a/include/mfapi.h +++ b/include/mfapi.h @@ -173,7 +173,9 @@ HRESULT WINAPI MFTEnumEx(GUID category, UINT32 flags, const MFT_REGISTER_TYPE_IN HRESULT WINAPI MFInvokeCallback(IMFAsyncResult *result); HRESULT WINAPI MFLockPlatform(void); HRESULT WINAPI MFPutWorkItem(DWORD queue, IMFAsyncCallback *callback, IUnknown *state); +HRESULT WINAPI MFPutWorkItem2(DWORD queue, LONG priority, IMFAsyncCallback *callback, IUnknown *state); HRESULT WINAPI MFPutWorkItemEx(DWORD queue, IMFAsyncResult *result); +HRESULT WINAPI MFPutWorkItemEx2(DWORD queue, LONG priority, IMFAsyncResult *result); HRESULT WINAPI MFScheduleWorkItem(IMFAsyncCallback *callback, IUnknown *state, INT64 timeout, MFWORKITEM_KEY *key); HRESULT WINAPI MFScheduleWorkItemEx(IMFAsyncResult *result, INT64 timeout, MFWORKITEM_KEY *key); HRESULT WINAPI MFTRegister(CLSID clsid, GUID category, LPWSTR name, UINT32 flags, UINT32 cinput,