Signed-off-by: Nikolay Sivov nsivov@codeweavers.com ---
v2: fixes test failures on Windows.
dlls/mfplat/mfplat.spec | 6 ++-- dlls/mfplat/queue.c | 47 +++++++++++++++++++++++++++++ dlls/mfplat/tests/mfplat.c | 62 ++++++++++++++++++++++++++++++++++++-- include/mfapi.h | 2 ++ 4 files changed, 112 insertions(+), 5 deletions(-)
diff --git a/dlls/mfplat/mfplat.spec b/dlls/mfplat/mfplat.spec index c4e0b2abfa..39fd7a9d44 100644 --- a/dlls/mfplat/mfplat.spec +++ b/dlls/mfplat/mfplat.spec @@ -28,7 +28,7 @@ @ stub MFCalculateBitmapImageSize @ stub MFCalculateImageSize @ stub MFCancelCreateFile -@ stub MFCancelWorkItem +@ stdcall MFCancelWorkItem(int64) @ stub MFCompareFullToPartialMediaType @ stub MFCompareSockaddrAddresses @ stub MFConvertColorInfoFromDXVA @@ -124,8 +124,8 @@ @ stdcall MFPutWorkItemEx(long ptr) @ stub MFRecordError @ stub MFRemovePeriodicCallback -@ stub MFScheduleWorkItem -@ stub MFScheduleWorkItemEx +@ stdcall MFScheduleWorkItem(ptr ptr int64 ptr) +@ stdcall MFScheduleWorkItemEx(ptr int64 ptr) @ stub MFSerializeAttributesToStream @ stub MFSerializeEvent @ stub MFSerializeMediaTypeToStream diff --git a/dlls/mfplat/queue.c b/dlls/mfplat/queue.c index ebc2ad487b..7c8790b52c 100644 --- a/dlls/mfplat/queue.c +++ b/dlls/mfplat/queue.c @@ -385,3 +385,50 @@ HRESULT WINAPI MFInvokeCallback(IMFAsyncResult *result)
return hr; } + +static HRESULT schedule_work_item(IMFAsyncResult *result, INT64 timeout, MFWORKITEM_KEY *key) +{ + FIXME("%p, %s, %p.\n", result, wine_dbgstr_longlong(timeout), key); + + return E_NOTIMPL; +} + +/*********************************************************************** + * MFScheduleWorkItemEx (mfplat.@) + */ +HRESULT WINAPI MFScheduleWorkItemEx(IMFAsyncResult *result, INT64 timeout, MFWORKITEM_KEY *key) +{ + TRACE("%p, %s, %p.\n", result, wine_dbgstr_longlong(timeout), key); + + return schedule_work_item(result, timeout, key); +} + +/*********************************************************************** + * MFScheduleWorkItemEx (mfplat.@) + */ +HRESULT WINAPI MFScheduleWorkItem(IMFAsyncCallback *callback, IUnknown *state, INT64 timeout, MFWORKITEM_KEY *key) +{ + IMFAsyncResult *result; + HRESULT hr; + + TRACE("%p, %p, %s, %p.\n", callback, state, wine_dbgstr_longlong(timeout), key); + + if (FAILED(hr = MFCreateAsyncResult(NULL, callback, state, &result))) + return hr; + + hr = schedule_work_item(result, timeout, key); + + IMFAsyncResult_Release(result); + + return hr; +} + +/*********************************************************************** + * MFCancelWorkItem (mfplat.@) + */ +HRESULT WINAPI MFCancelWorkItem(MFWORKITEM_KEY key) +{ + FIXME("%s.\n", wine_dbgstr_longlong(key)); + + return E_NOTIMPL; +} diff --git a/dlls/mfplat/tests/mfplat.c b/dlls/mfplat/tests/mfplat.c index 609faa5d00..cc8a0f47b8 100644 --- a/dlls/mfplat/tests/mfplat.c +++ b/dlls/mfplat/tests/mfplat.c @@ -43,6 +43,7 @@ static HRESULT (WINAPI *pMFCreateMFByteStreamOnStream)(IStream *stream, IMFByteS static HRESULT (WINAPI *pMFCreateMemoryBuffer)(DWORD max_length, IMFMediaBuffer **buffer); static void* (WINAPI *pMFHeapAlloc)(SIZE_T size, ULONG flags, char *file, int line, EAllocationType type); static void (WINAPI *pMFHeapFree)(void *p); +static HRESULT (WINAPI *pMFPutWaitingWorkItem)(HANDLE event, LONG priority, IMFAsyncResult *result, MFWORKITEM_KEY *key);
DEFINE_GUID(GUID_NULL,0,0,0,0,0,0,0,0,0,0,0);
@@ -318,6 +319,7 @@ static void init_functions(void) X(MFCreateMemoryBuffer); X(MFHeapAlloc); X(MFHeapFree); + X(MFPutWaitingWorkItem); #undef X }
@@ -718,13 +720,13 @@ static ULONG WINAPI testcallback_Release(IMFAsyncCallback *iface)
static HRESULT WINAPI testcallback_GetParameters(IMFAsyncCallback *iface, DWORD *flags, DWORD *queue) { - ok(0, "Unexpected call.\n"); + ok(flags != NULL && queue != NULL, "Unexpected arguments.\n"); return E_NOTIMPL; }
static HRESULT WINAPI testcallback_Invoke(IMFAsyncCallback *iface, IMFAsyncResult *result) { - ok(0, "Unexpected call.\n"); + ok(result != NULL, "Unexpected result object.\n"); return E_NOTIMPL; }
@@ -1035,6 +1037,61 @@ static void test_MFHeapAlloc(void) pMFHeapFree(res); }
+static void test_scheduled_items(void) +{ + IMFAsyncCallback callback = { &testcallbackvtbl }; + IMFAsyncResult *result; + MFWORKITEM_KEY key, key2; + HRESULT hr; + + hr = MFStartup(MF_VERSION, MFSTARTUP_FULL); + ok(hr == S_OK, "Failed to start up, hr %#x.\n", hr); + + hr = MFScheduleWorkItem(&callback, NULL, -5000, &key); +todo_wine + ok(hr == S_OK, "Failed to schedule item, hr %#x.\n", hr); + + hr = MFCancelWorkItem(key); +todo_wine + ok(hr == S_OK, "Failed to cancel item, hr %#x.\n", hr); + + hr = MFCancelWorkItem(key); +todo_wine + ok(hr == MF_E_NOT_FOUND || broken(hr == S_OK) /* < win10 */, "Unexpected hr %#x.\n", hr); + + if (!pMFPutWaitingWorkItem) + { + skip("Waiting items are not supported.\n"); + return; + } + + hr = MFCreateAsyncResult(NULL, &callback, NULL, &result); + ok(hr == S_OK, "Failed to create result, hr %#x.\n", hr); + + hr = pMFPutWaitingWorkItem(NULL, 0, result, &key); + ok(hr == S_OK, "Failed to add waiting item, hr %#x.\n", hr); + + hr = pMFPutWaitingWorkItem(NULL, 0, result, &key2); + ok(hr == S_OK, "Failed to add waiting item, hr %#x.\n", hr); + + hr = MFCancelWorkItem(key); + ok(hr == S_OK, "Failed to cancel item, hr %#x.\n", hr); + + hr = MFCancelWorkItem(key2); + ok(hr == S_OK, "Failed to cancel item, hr %#x.\n", hr); + + IMFAsyncResult_Release(result); + + hr = MFScheduleWorkItem(&callback, NULL, -5000, &key); + ok(hr == S_OK, "Failed to schedule item, hr %#x.\n", hr); + + hr = MFCancelWorkItem(key); + ok(hr == S_OK, "Failed to cancel item, hr %#x.\n", hr); + + hr = MFShutdown(); + ok(hr == S_OK, "Failed to shutdown, hr %#x.\n", hr); +} + START_TEST(mfplat) { CoInitialize(NULL); @@ -1056,6 +1113,7 @@ START_TEST(mfplat) test_MFCopyImage(); test_MFCreateCollection(); test_MFHeapAlloc(); + test_scheduled_items();
CoUninitialize(); } diff --git a/include/mfapi.h b/include/mfapi.h index 7c47213d85..4b7d758488 100644 --- a/include/mfapi.h +++ b/include/mfapi.h @@ -115,6 +115,8 @@ HRESULT WINAPI MFInvokeCallback(IMFAsyncResult *result); HRESULT WINAPI MFLockPlatform(void); HRESULT WINAPI MFPutWorkItem(DWORD queue, IMFAsyncCallback *callback, IUnknown *state); HRESULT WINAPI MFPutWorkItemEx(DWORD queue, IMFAsyncResult *result); +HRESULT WINAPI MFScheduleWorkItem(IMFAsyncCallback *callback, IUnknown *state, INT64 timeout, MFWORKITEM_KEY *key); +HRESULT WINAPI MFScheduleWorkItemEx(IMFAsyncResult *result, INT64 timeout, MFWORKITEM_KEY *key); HRESULT WINAPI MFTRegister(CLSID clsid, GUID category, LPWSTR name, UINT32 flags, UINT32 cinput, MFT_REGISTER_TYPE_INFO *input_types, UINT32 coutput, MFT_REGISTER_TYPE_INFO *output_types, IMFAttributes *attributes);
Signed-off-by: Nikolay Sivov nsivov@codeweavers.com --- dlls/mfplat/main.c | 26 ++++++-- dlls/mfplat/mfplat_private.h | 20 ++++++ dlls/mfplat/queue.c | 123 ++++++++++++++++++++++++++++------- include/mfapi.h | 8 +++ include/mfobjects.idl | 1 + 5 files changed, 150 insertions(+), 28 deletions(-) create mode 100644 dlls/mfplat/mfplat_private.h
diff --git a/dlls/mfplat/main.c b/dlls/mfplat/main.c index a38c03b4d1..2d82589dfb 100644 --- a/dlls/mfplat/main.c +++ b/dlls/mfplat/main.c @@ -36,6 +36,8 @@ #include "wine/debug.h" #include "wine/unicode.h"
+#include "mfplat_private.h" + WINE_DEFAULT_DEBUG_CHANNEL(mfplat);
static LONG platform_lock; @@ -465,11 +467,16 @@ HRESULT WINAPI MFStartup(ULONG version, DWORD flags) #define MF_VERSION_XP MAKELONG( MF_API_VERSION, 1 ) #define MF_VERSION_WIN7 MAKELONG( MF_API_VERSION, 2 )
- FIXME("(%u, %u): stub\n", version, flags); + TRACE("%#x, %#x.\n", version, flags);
- if(version != MF_VERSION_XP && version != MF_VERSION_WIN7) + if (version != MF_VERSION_XP && version != MF_VERSION_WIN7) return MF_E_BAD_STARTUP_VERSION;
+ if (InterlockedIncrement(&platform_lock) == 1) + { + init_system_queues(); + } + return S_OK; }
@@ -478,7 +485,15 @@ HRESULT WINAPI MFStartup(ULONG version, DWORD flags) */ HRESULT WINAPI MFShutdown(void) { - FIXME("(): stub\n"); + TRACE("\n"); + + if (platform_lock <= 0) + return S_OK; + + if (InterlockedExchangeAdd(&platform_lock, -1) == 1) + { + shutdown_system_queues(); + }
return S_OK; } @@ -498,7 +513,10 @@ HRESULT WINAPI MFLockPlatform(void) */ HRESULT WINAPI MFUnlockPlatform(void) { - InterlockedDecrement(&platform_lock); + if (InterlockedDecrement(&platform_lock) == 0) + { + shutdown_system_queues(); + }
return S_OK; } diff --git a/dlls/mfplat/mfplat_private.h b/dlls/mfplat/mfplat_private.h new file mode 100644 index 0000000000..2d9e7d5906 --- /dev/null +++ b/dlls/mfplat/mfplat_private.h @@ -0,0 +1,20 @@ +/* + * Copyright 2019 Nikolay Sivov for CodeWeavers + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA + */ + +extern void init_system_queues(void) DECLSPEC_HIDDEN; +extern void shutdown_system_queues(void) DECLSPEC_HIDDEN; diff --git a/dlls/mfplat/queue.c b/dlls/mfplat/queue.c index 7c8790b52c..9a3922fce2 100644 --- a/dlls/mfplat/queue.c +++ b/dlls/mfplat/queue.c @@ -26,33 +26,40 @@ #include "wine/debug.h" #include "wine/heap.h"
+#include "mfplat_private.h" + WINE_DEFAULT_DEBUG_CHANNEL(mfplat);
#define FIRST_USER_QUEUE_HANDLE 5 #define MAX_USER_QUEUE_HANDLES 124
struct queue +{ + TP_POOL *pool; +}; + +struct queue_handle { void *obj; LONG refcount; WORD generation; };
-static struct queue user_queues[MAX_USER_QUEUE_HANDLES]; -static struct queue *next_free_user_queue; -static struct queue *next_unused_user_queue = user_queues; +static struct queue_handle user_queues[MAX_USER_QUEUE_HANDLES]; +static struct queue_handle *next_free_user_queue; +static struct queue_handle *next_unused_user_queue = user_queues; static WORD queue_generation;
-static CRITICAL_SECTION user_queues_section; -static CRITICAL_SECTION_DEBUG user_queues_critsect_debug = +static CRITICAL_SECTION queues_section; +static CRITICAL_SECTION_DEBUG queues_critsect_debug = { - 0, 0, &user_queues_section, - { &user_queues_critsect_debug.ProcessLocksList, &user_queues_critsect_debug.ProcessLocksList }, - 0, 0, { (DWORD_PTR)(__FILE__ ": user_queues_section") } + 0, 0, &queues_section, + { &queues_critsect_debug.ProcessLocksList, &queues_critsect_debug.ProcessLocksList }, + 0, 0, { (DWORD_PTR)(__FILE__ ": queues_section") } }; -static CRITICAL_SECTION user_queues_section = { &user_queues_critsect_debug, -1, 0, 0, 0, 0 }; +static CRITICAL_SECTION queues_section = { &queues_critsect_debug, -1, 0, 0, 0, 0 };
-static struct queue *get_queue_obj(DWORD handle) +static struct queue_handle *get_queue_obj(DWORD handle) { unsigned int idx = HIWORD(handle) - FIRST_USER_QUEUE_HANDLE;
@@ -65,12 +72,79 @@ static struct queue *get_queue_obj(DWORD handle) return NULL; }
-static HRESULT alloc_user_queue(DWORD *queue) +enum system_queue_index +{ + SYS_QUEUE_STANDARD = 0, + SYS_QUEUE_RT, + SYS_QUEUE_IO, + SYS_QUEUE_TIMER, + SYS_QUEUE_MULTITHREADED, + SYS_QUEUE_DO_NOT_USE, + SYS_QUEUE_LONG_FUNCTION, + SYS_QUEUE_COUNT, +}; + +static struct queue system_queues[SYS_QUEUE_COUNT]; + +static void init_work_queue(MFASYNC_WORKQUEUE_TYPE queue_type, struct queue *queue) +{ + queue->pool = CreateThreadpool(NULL); +} + +void init_system_queues(void) +{ + /* Always initialize standard queue, keep the rest lazy. */ + + EnterCriticalSection(&queues_section); + + if (system_queues[SYS_QUEUE_STANDARD].pool) + { + LeaveCriticalSection(&queues_section); + return; + } + + init_work_queue(MF_STANDARD_WORKQUEUE, &system_queues[SYS_QUEUE_STANDARD]); + + LeaveCriticalSection(&queues_section); +} + +static void shutdown_queue(struct queue *queue) +{ + if (!queue->pool) + return; + + CloseThreadpool(queue->pool); + queue->pool = NULL; +} + +void shutdown_system_queues(void) +{ + unsigned int i; + + EnterCriticalSection(&queues_section); + + for (i = 0; i < ARRAY_SIZE(system_queues); ++i) + { + shutdown_queue(&system_queues[i]); + } + + LeaveCriticalSection(&queues_section); +} + +static HRESULT alloc_user_queue(MFASYNC_WORKQUEUE_TYPE queue_type, DWORD *queue_id) { - struct queue *entry; + struct queue_handle *entry; + struct queue *queue; unsigned int idx;
- EnterCriticalSection(&user_queues_section); + *queue_id = MFASYNC_CALLBACK_QUEUE_UNDEFINED; + + queue = heap_alloc_zero(sizeof(*queue)); + if (!queue) + return E_OUTOFMEMORY; + init_work_queue(queue_type, queue); + + EnterCriticalSection(&queues_section);
entry = next_free_user_queue; if (entry) @@ -79,17 +153,18 @@ static HRESULT alloc_user_queue(DWORD *queue) entry = next_unused_user_queue++; else { - LeaveCriticalSection(&user_queues_section); + LeaveCriticalSection(&queues_section); return E_OUTOFMEMORY; }
entry->refcount = 1; - entry->obj = NULL; + entry->obj = queue; if (++queue_generation == 0xffff) queue_generation = 1; entry->generation = queue_generation; idx = entry - user_queues + FIRST_USER_QUEUE_HANDLE; - *queue = (idx << 16) | entry->generation; - LeaveCriticalSection(&user_queues_section); + *queue_id = (idx << 16) | entry->generation; + + LeaveCriticalSection(&queues_section);
return S_OK; } @@ -97,31 +172,31 @@ static HRESULT alloc_user_queue(DWORD *queue) static HRESULT lock_user_queue(DWORD queue) { HRESULT hr = MF_E_INVALID_WORKQUEUE; - struct queue *entry; + struct queue_handle *entry;
if (!(queue & MFASYNC_CALLBACK_QUEUE_PRIVATE_MASK)) return S_OK;
- EnterCriticalSection(&user_queues_section); + EnterCriticalSection(&queues_section); entry = get_queue_obj(queue); if (entry && entry->refcount) { entry->refcount++; hr = S_OK; } - LeaveCriticalSection(&user_queues_section); + LeaveCriticalSection(&queues_section); return hr; }
static HRESULT unlock_user_queue(DWORD queue) { HRESULT hr = MF_E_INVALID_WORKQUEUE; - struct queue *entry; + struct queue_handle *entry;
if (!(queue & MFASYNC_CALLBACK_QUEUE_PRIVATE_MASK)) return S_OK;
- EnterCriticalSection(&user_queues_section); + EnterCriticalSection(&queues_section); entry = get_queue_obj(queue); if (entry && entry->refcount) { @@ -132,7 +207,7 @@ static HRESULT unlock_user_queue(DWORD queue) } hr = S_OK; } - LeaveCriticalSection(&user_queues_section); + LeaveCriticalSection(&queues_section); return hr; }
@@ -309,7 +384,7 @@ HRESULT WINAPI MFAllocateWorkQueue(DWORD *queue) { TRACE("%p.\n", queue);
- return alloc_user_queue(queue); + return alloc_user_queue(MF_STANDARD_WORKQUEUE, queue); }
/*********************************************************************** diff --git a/include/mfapi.h b/include/mfapi.h index 4b7d758488..ee5cf4c069 100644 --- a/include/mfapi.h +++ b/include/mfapi.h @@ -88,7 +88,15 @@ DEFINE_GUID(MFMediaType_Video, 0x73646976, 0x0000, 0x0010, 0x80, 0x00, 0
typedef unsigned __int64 MFWORKITEM_KEY;
+typedef enum +{ + MF_STANDARD_WORKQUEUE, + MF_WINDOW_WORKQUEUE, + MF_MULTITHREADED_WORKQUEUE, +} MFASYNC_WORKQUEUE_TYPE; + HRESULT WINAPI MFAllocateWorkQueue(DWORD *queue); +HRESULT WINAPI MFAllocateWorkQueueEx(MFASYNC_WORKQUEUE_TYPE queue_type, DWORD *queue); HRESULT WINAPI MFCancelWorkItem(MFWORKITEM_KEY key); HRESULT WINAPI MFCopyImage(BYTE *dest, LONG deststride, const BYTE *src, LONG srcstride, DWORD width, DWORD lines); HRESULT WINAPI MFCreateAttributes(IMFAttributes **attributes, UINT32 size); diff --git a/include/mfobjects.idl b/include/mfobjects.idl index 5ea26b2c67..3245d84d1b 100644 --- a/include/mfobjects.idl +++ b/include/mfobjects.idl @@ -431,6 +431,7 @@ cpp_quote("#define MFASYNC_CALLBACK_QUEUE_STANDARD 0x00000001") cpp_quote("#define MFASYNC_CALLBACK_QUEUE_RT 0x00000002") cpp_quote("#define MFASYNC_CALLBACK_QUEUE_IO 0x00000003") cpp_quote("#define MFASYNC_CALLBACK_QUEUE_TIMER 0x00000004") +cpp_quote("#define MFASYNC_CALLBACK_QUEUE_MULTITHREADED 0x00000005") cpp_quote("#define MFASYNC_CALLBACK_QUEUE_LONG_FUNCTION 0x00000007") cpp_quote("#define MFASYNC_CALLBACK_QUEUE_PRIVATE_MASK 0xffff0000") cpp_quote("#define MFASYNC_CALLBACK_QUEUE_ALL 0xffffffff")
Signed-off-by: Nikolay Sivov nsivov@codeweavers.com --- dlls/mfplat/mfplat.spec | 2 +- dlls/mfplat/queue.c | 10 ++++++++++ 2 files changed, 11 insertions(+), 1 deletion(-)
diff --git a/dlls/mfplat/mfplat.spec b/dlls/mfplat/mfplat.spec index 39fd7a9d44..a218087f6a 100644 --- a/dlls/mfplat/mfplat.spec +++ b/dlls/mfplat/mfplat.spec @@ -17,7 +17,7 @@ @ stub LFGetGlobalPool @ stub MFAddPeriodicCallback @ stdcall MFAllocateWorkQueue(ptr) -@ stub MFAllocateWorkQueueEx +@ stdcall MFAllocateWorkQueueEx(long ptr) @ stub MFAppendCollection @ stub MFAverageTimePerFrameToFrameRate @ stub MFBeginCreateFile diff --git a/dlls/mfplat/queue.c b/dlls/mfplat/queue.c index 9a3922fce2..70efac36e8 100644 --- a/dlls/mfplat/queue.c +++ b/dlls/mfplat/queue.c @@ -387,6 +387,16 @@ HRESULT WINAPI MFAllocateWorkQueue(DWORD *queue) return alloc_user_queue(MF_STANDARD_WORKQUEUE, queue); }
+/*********************************************************************** + * MFAllocateWorkQueueEx (mfplat.@) + */ +HRESULT WINAPI MFAllocateWorkQueueEx(MFASYNC_WORKQUEUE_TYPE queue_type, DWORD *queue) +{ + TRACE("%d, %p.\n", queue_type, queue); + + return alloc_user_queue(queue_type, queue); +} + /*********************************************************************** * MFLockWorkQueue (mfplat.@) */
Signed-off-by: Nikolay Sivov nsivov@codeweavers.com --- dlls/mfplat/queue.c | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-)
diff --git a/dlls/mfplat/queue.c b/dlls/mfplat/queue.c index 70efac36e8..3b599840cb 100644 --- a/dlls/mfplat/queue.c +++ b/dlls/mfplat/queue.c @@ -345,12 +345,10 @@ static const IMFAsyncResultVtbl async_result_vtbl = async_result_GetStateNoAddRef, };
-HRESULT WINAPI MFCreateAsyncResult(IUnknown *object, IMFAsyncCallback *callback, IUnknown *state, IMFAsyncResult **out) +static HRESULT create_async_result(IUnknown *object, IMFAsyncCallback *callback, IUnknown *state, IMFAsyncResult **out) { struct async_result *result;
- TRACE("%p, %p, %p, %p.\n", object, callback, state, out); - if (!out) return E_INVALIDARG;
@@ -374,9 +372,21 @@ HRESULT WINAPI MFCreateAsyncResult(IUnknown *object, IMFAsyncCallback *callback,
*out = &result->result.AsyncResult;
+ TRACE("Created async result object %p.\n", *out); + return S_OK; }
+/*********************************************************************** + * MFCreateAsyncResult (mfplat.@) + */ +HRESULT WINAPI MFCreateAsyncResult(IUnknown *object, IMFAsyncCallback *callback, IUnknown *state, IMFAsyncResult **out) +{ + TRACE("%p, %p, %p, %p.\n", object, callback, state, out); + + return create_async_result(object, callback, state, out); +} + /*********************************************************************** * MFAllocateWorkQueue (mfplat.@) */
Signed-off-by: Nikolay Sivov nsivov@codeweavers.com --- dlls/mfplat/queue.c | 150 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 147 insertions(+), 3 deletions(-)
diff --git a/dlls/mfplat/queue.c b/dlls/mfplat/queue.c index 3b599840cb..8c7cf195e5 100644 --- a/dlls/mfplat/queue.c +++ b/dlls/mfplat/queue.c @@ -33,9 +33,16 @@ WINE_DEFAULT_DEBUG_CHANNEL(mfplat); #define FIRST_USER_QUEUE_HANDLE 5 #define MAX_USER_QUEUE_HANDLES 124
+struct work_item +{ + LONG refcount; + IMFAsyncResult *result; +}; + struct queue { TP_POOL *pool; + TP_CALLBACK_ENVIRON env; };
struct queue_handle @@ -86,9 +93,104 @@ enum system_queue_index
static struct queue system_queues[SYS_QUEUE_COUNT];
+static struct queue *get_system_queue(DWORD queue_id) +{ + switch (queue_id) + { + case MFASYNC_CALLBACK_QUEUE_STANDARD: + case MFASYNC_CALLBACK_QUEUE_RT: + case MFASYNC_CALLBACK_QUEUE_IO: + case MFASYNC_CALLBACK_QUEUE_TIMER: + case MFASYNC_CALLBACK_QUEUE_MULTITHREADED: + case MFASYNC_CALLBACK_QUEUE_LONG_FUNCTION: + return &system_queues[queue_id - 1]; + default: + return NULL; + } +} + +static void CALLBACK standard_queue_cleanup_callback(void *object_data, void *group_data) +{ +} + +static struct work_item * alloc_work_item(struct queue *queue, IMFAsyncResult *result) +{ + struct work_item *item; + + item = heap_alloc_zero(sizeof(*item)); + item->result = result; + IMFAsyncResult_AddRef(item->result); + item->refcount = 1; + + return item; +} + +static void release_work_item(struct work_item *item) +{ + if (InterlockedDecrement(&item->refcount) == 0) + { + IMFAsyncResult_Release(item->result); + heap_free(item); + } +} + static void init_work_queue(MFASYNC_WORKQUEUE_TYPE queue_type, struct queue *queue) { + unsigned int max_thread; + queue->pool = CreateThreadpool(NULL); + queue->env.Version = 1; + queue->env.Pool = queue->pool; + queue->env.CleanupGroup = CreateThreadpoolCleanupGroup(); + queue->env.CleanupGroupCancelCallback = standard_queue_cleanup_callback; + + max_thread = (queue_type == MF_STANDARD_WORKQUEUE || queue_type == MF_WINDOW_WORKQUEUE) ? 1 : 4; + + SetThreadpoolThreadMinimum(queue->pool, 1); + SetThreadpoolThreadMaximum(queue->pool, max_thread); + + if (queue_type == MF_WINDOW_WORKQUEUE) + FIXME("MF_WINDOW_WORKQUEUE is not supported.\n"); +} + +static HRESULT grab_queue(DWORD queue_id, struct queue **ret) +{ + struct queue *queue = get_system_queue(queue_id); + MFASYNC_WORKQUEUE_TYPE queue_type; + struct queue_handle *entry; + + if (!system_queues[SYS_QUEUE_STANDARD].pool) + return MF_E_SHUTDOWN; + + if (queue && queue->pool) + { + *ret = queue; + return S_OK; + } + else if (queue) + { + EnterCriticalSection(&queues_section); + switch (queue_id) + { + case MFASYNC_CALLBACK_QUEUE_IO: + case MFASYNC_CALLBACK_QUEUE_MULTITHREADED: + case MFASYNC_CALLBACK_QUEUE_LONG_FUNCTION: + queue_type = MF_MULTITHREADED_WORKQUEUE; + break; + default: + queue_type = MF_STANDARD_WORKQUEUE; + } + init_work_queue(queue_type, queue); + LeaveCriticalSection(&queues_section); + *ret = queue; + return S_OK; + } + + /* Handles user queues. */ + if ((entry = get_queue_obj(queue_id))) + *ret = entry->obj; + + return *ret ? S_OK : MF_E_INVALID_WORKQUEUE; }
void init_system_queues(void) @@ -113,6 +215,7 @@ static void shutdown_queue(struct queue *queue) if (!queue->pool) return;
+ CloseThreadpoolCleanupGroupMembers(queue->env.CleanupGroup, TRUE, NULL); CloseThreadpool(queue->pool); queue->pool = NULL; } @@ -131,6 +234,47 @@ void shutdown_system_queues(void) LeaveCriticalSection(&queues_section); }
+static void CALLBACK standard_queue_worker(TP_CALLBACK_INSTANCE *instance, void *context, TP_WORK *work) +{ + struct work_item *item = context; + MFASYNCRESULT *result = (MFASYNCRESULT *)item->result; + + TRACE("result object %p.\n", result); + + IMFAsyncCallback_Invoke(result->pCallback, item->result); + + release_work_item(item); +} + +static HRESULT queue_submit_item(struct queue *queue, IMFAsyncResult *result) +{ + struct work_item *item; + TP_WORK *work_object; + + if (!(item = alloc_work_item(queue, result))) + return E_OUTOFMEMORY; + + work_object = CreateThreadpoolWork(standard_queue_worker, item, &queue->env); + SubmitThreadpoolWork(work_object); + + TRACE("dispatched %p.\n", result); + + return S_OK; +} + +static HRESULT queue_put_work_item(DWORD queue_id, IMFAsyncResult *result) +{ + struct queue *queue; + HRESULT hr; + + if (FAILED(hr = grab_queue(queue_id, &queue))) + return hr; + + hr = queue_submit_item(queue, result); + + return hr; +} + static HRESULT alloc_user_queue(MFASYNC_WORKQUEUE_TYPE queue_type, DWORD *queue_id) { struct queue_handle *entry; @@ -440,7 +584,7 @@ HRESULT WINAPI MFPutWorkItem(DWORD queue, IMFAsyncCallback *callback, IUnknown * if (FAILED(hr = MFCreateAsyncResult(NULL, callback, state, &result))) return hr;
- hr = MFPutWorkItemEx(queue, result); + hr = queue_put_work_item(queue, result);
IMFAsyncResult_Release(result);
@@ -452,9 +596,9 @@ HRESULT WINAPI MFPutWorkItem(DWORD queue, IMFAsyncCallback *callback, IUnknown * */ HRESULT WINAPI MFPutWorkItemEx(DWORD queue, IMFAsyncResult *result) { - FIXME("%#x, %p\n", queue, result); + TRACE("%#x, %p\n", queue, result);
- return E_NOTIMPL; + return queue_put_work_item(queue, result); }
/***********************************************************************
Signed-off-by: Nikolay Sivov nsivov@codeweavers.com --- dlls/mfplat/tests/mfplat.c | 62 +++++++++++++++++++++++++++++++++++++- 1 file changed, 61 insertions(+), 1 deletion(-)
diff --git a/dlls/mfplat/tests/mfplat.c b/dlls/mfplat/tests/mfplat.c index cc8a0f47b8..e5e074ee82 100644 --- a/dlls/mfplat/tests/mfplat.c +++ b/dlls/mfplat/tests/mfplat.c @@ -44,6 +44,7 @@ static HRESULT (WINAPI *pMFCreateMemoryBuffer)(DWORD max_length, IMFMediaBuffer static void* (WINAPI *pMFHeapAlloc)(SIZE_T size, ULONG flags, char *file, int line, EAllocationType type); static void (WINAPI *pMFHeapFree)(void *p); static HRESULT (WINAPI *pMFPutWaitingWorkItem)(HANDLE event, LONG priority, IMFAsyncResult *result, MFWORKITEM_KEY *key); +static HRESULT (WINAPI *pMFAllocateSerialWorkQueue)(DWORD queue, DWORD *serial_queue);
DEFINE_GUID(GUID_NULL,0,0,0,0,0,0,0,0,0,0,0);
@@ -312,7 +313,8 @@ static void init_functions(void) { HMODULE mod = GetModuleHandleA("mfplat.dll");
-#define X(f) if (!(p##f = (void*)GetProcAddress(mod, #f))) return; +#define X(f) p##f = (void*)GetProcAddress(mod, #f) + X(MFAllocateSerialWorkQueue); X(MFCopyImage); X(MFCreateSourceResolver); X(MFCreateMFByteStreamOnStream); @@ -1092,6 +1094,63 @@ todo_wine ok(hr == S_OK, "Failed to shutdown, hr %#x.\n", hr); }
+static void test_serial_queue(void) +{ + static const DWORD queue_ids[] = + { + MFASYNC_CALLBACK_QUEUE_STANDARD, + MFASYNC_CALLBACK_QUEUE_RT, + MFASYNC_CALLBACK_QUEUE_IO, + MFASYNC_CALLBACK_QUEUE_TIMER, + MFASYNC_CALLBACK_QUEUE_MULTITHREADED, + MFASYNC_CALLBACK_QUEUE_LONG_FUNCTION, + }; + DWORD queue, serial_queue; + unsigned int i; + HRESULT hr; + + if (!pMFAllocateSerialWorkQueue) + { + skip("Serial queues are not supported.\n"); + return; + } + + hr = MFStartup(MF_VERSION, MFSTARTUP_FULL); + ok(hr == S_OK, "Failed to start up, hr %#x.\n", hr); + + for (i = 0; i < ARRAY_SIZE(queue_ids); ++i) + { + BOOL broken_types = queue_ids[i] == MFASYNC_CALLBACK_QUEUE_TIMER || + queue_ids[i] == MFASYNC_CALLBACK_QUEUE_LONG_FUNCTION; + + hr = pMFAllocateSerialWorkQueue(queue_ids[i], &serial_queue); + ok(hr == S_OK || broken(broken_types && hr == E_INVALIDARG) /* Win8 */, + "%u: failed to allocate a queue, hr %#x.\n", i, hr); + + if (SUCCEEDED(hr)) + { + hr = MFUnlockWorkQueue(serial_queue); + ok(hr == S_OK, "%u: failed to unlock the queue, hr %#x.\n", i, hr); + } + } + + /* Chain them together. */ + hr = pMFAllocateSerialWorkQueue(MFASYNC_CALLBACK_QUEUE_STANDARD, &serial_queue); + ok(hr == S_OK, "Failed to allocate a queue, hr %#x.\n", hr); + + hr = pMFAllocateSerialWorkQueue(serial_queue, &queue); + ok(hr == S_OK, "Failed to allocate a queue, hr %#x.\n", hr); + + hr = MFUnlockWorkQueue(serial_queue); + ok(hr == S_OK, "Failed to unlock the queue, hr %#x.\n", hr); + + hr = MFUnlockWorkQueue(queue); + ok(hr == S_OK, "Failed to unlock the queue, hr %#x.\n", hr); + + hr = MFShutdown(); + ok(hr == S_OK, "Failed to shut down, hr %#x.\n", hr); +} + START_TEST(mfplat) { CoInitialize(NULL); @@ -1114,6 +1173,7 @@ START_TEST(mfplat) test_MFCreateCollection(); test_MFHeapAlloc(); test_scheduled_items(); + test_serial_queue();
CoUninitialize(); }
Signed-off-by: Nikolay Sivov nsivov@codeweavers.com --- dlls/mfplat/mfplat.spec | 1 + dlls/mfplat/queue.c | 288 +++++++++++++++++++++++++++++-------- dlls/mfplat/tests/mfplat.c | 5 +- 3 files changed, 233 insertions(+), 61 deletions(-)
diff --git a/dlls/mfplat/mfplat.spec b/dlls/mfplat/mfplat.spec index a218087f6a..c43746ec7b 100644 --- a/dlls/mfplat/mfplat.spec +++ b/dlls/mfplat/mfplat.spec @@ -120,6 +120,7 @@ @ stub MFJoinIoPort @ stdcall MFLockPlatform() @ stdcall MFLockWorkQueue(long) +@ stdcall MFPutWaitingWorkItem(long long ptr ptr) @ stdcall MFPutWorkItem(long ptr ptr) @ stdcall MFPutWorkItemEx(long ptr) @ stub MFRecordError diff --git a/dlls/mfplat/queue.c b/dlls/mfplat/queue.c index 8c7cf195e5..1434645f49 100644 --- a/dlls/mfplat/queue.c +++ b/dlls/mfplat/queue.c @@ -25,6 +25,7 @@
#include "wine/debug.h" #include "wine/heap.h" +#include "wine/list.h"
#include "mfplat_private.h"
@@ -33,16 +34,34 @@ WINE_DEFAULT_DEBUG_CHANNEL(mfplat); #define FIRST_USER_QUEUE_HANDLE 5 #define MAX_USER_QUEUE_HANDLES 124
+#define WAIT_ITEM_KEY_MASK (0x82000000) + +static LONG next_item_key; + +static MFWORKITEM_KEY generate_item_key(DWORD mask) +{ + return ((MFWORKITEM_KEY)mask << 32) | InterlockedIncrement(&next_item_key); +} + struct work_item { + struct list entry; LONG refcount; IMFAsyncResult *result; + struct queue *queue; + MFWORKITEM_KEY key; + union + { + TP_WAIT *wait_object; + } u; };
struct queue { TP_POOL *pool; TP_CALLBACK_ENVIRON env; + CRITICAL_SECTION cs; + struct list pending_items; };
struct queue_handle @@ -121,6 +140,8 @@ static struct work_item * alloc_work_item(struct queue *queue, IMFAsyncResult *r item->result = result; IMFAsyncResult_AddRef(item->result); item->refcount = 1; + item->queue = queue; + list_init(&item->entry);
return item; } @@ -143,6 +164,8 @@ static void init_work_queue(MFASYNC_WORKQUEUE_TYPE queue_type, struct queue *que queue->env.Pool = queue->pool; queue->env.CleanupGroup = CreateThreadpoolCleanupGroup(); queue->env.CleanupGroupCancelCallback = standard_queue_cleanup_callback; + list_init(&queue->pending_items); + InitializeCriticalSection(&queue->cs);
max_thread = (queue_type == MF_STANDARD_WORKQUEUE || queue_type == MF_WINDOW_WORKQUEUE) ? 1 : 4;
@@ -210,14 +233,68 @@ void init_system_queues(void) LeaveCriticalSection(&queues_section); }
+static HRESULT lock_user_queue(DWORD queue) +{ + HRESULT hr = MF_E_INVALID_WORKQUEUE; + struct queue_handle *entry; + + if (!(queue & MFASYNC_CALLBACK_QUEUE_PRIVATE_MASK)) + return S_OK; + + EnterCriticalSection(&queues_section); + entry = get_queue_obj(queue); + if (entry && entry->refcount) + { + entry->refcount++; + hr = S_OK; + } + LeaveCriticalSection(&queues_section); + return hr; +} + +static HRESULT unlock_user_queue(DWORD queue) +{ + HRESULT hr = MF_E_INVALID_WORKQUEUE; + struct queue_handle *entry; + + if (!(queue & MFASYNC_CALLBACK_QUEUE_PRIVATE_MASK)) + return S_OK; + + EnterCriticalSection(&queues_section); + entry = get_queue_obj(queue); + if (entry && entry->refcount) + { + if (--entry->refcount == 0) + { + entry->obj = next_free_user_queue; + next_free_user_queue = entry; + } + hr = S_OK; + } + LeaveCriticalSection(&queues_section); + return hr; +} + static void shutdown_queue(struct queue *queue) { + struct work_item *item, *item2; + if (!queue->pool) return;
CloseThreadpoolCleanupGroupMembers(queue->env.CleanupGroup, TRUE, NULL); CloseThreadpool(queue->pool); queue->pool = NULL; + + EnterCriticalSection(&queue->cs); + LIST_FOR_EACH_ENTRY_SAFE(item, item2, &queue->pending_items, struct work_item, entry) + { + list_remove(&item->entry); + release_work_item(item); + } + LeaveCriticalSection(&queue->cs); + + DeleteCriticalSection(&queue->cs); }
void shutdown_system_queues(void) @@ -234,6 +311,11 @@ void shutdown_system_queues(void) LeaveCriticalSection(&queues_section); }
+static void grab_work_item(struct work_item *item) +{ + InterlockedIncrement(&item->refcount); +} + static void CALLBACK standard_queue_worker(TP_CALLBACK_INSTANCE *instance, void *context, TP_WORK *work) { struct work_item *item = context; @@ -275,6 +357,124 @@ static HRESULT queue_put_work_item(DWORD queue_id, IMFAsyncResult *result) return hr; }
+static HRESULT invoke_async_callback(IMFAsyncResult *result) +{ + MFASYNCRESULT *result_data = (MFASYNCRESULT *)result; + DWORD queue = MFASYNC_CALLBACK_QUEUE_STANDARD, flags; + HRESULT hr; + + if (FAILED(IMFAsyncCallback_GetParameters(result_data->pCallback, &flags, &queue))) + queue = MFASYNC_CALLBACK_QUEUE_STANDARD; + + if (FAILED(lock_user_queue(queue))) + queue = MFASYNC_CALLBACK_QUEUE_STANDARD; + + hr = queue_put_work_item(queue, result); + + unlock_user_queue(queue); + + return hr; +} + +static void queue_release_pending_item(struct work_item *item) +{ + EnterCriticalSection(&item->queue->cs); + if (item->key) + { + list_remove(&item->entry); + item->key = 0; + release_work_item(item); + } + LeaveCriticalSection(&item->queue->cs); +} + +static void CALLBACK waiting_item_callback(TP_CALLBACK_INSTANCE *instance, void *context, TP_WAIT *wait, + TP_WAIT_RESULT wait_result) +{ + struct work_item *item = context; + + TRACE("result object %p.\n", item->result); + + invoke_async_callback(item->result); + + release_work_item(item); +} + +static void CALLBACK waiting_item_cancelable_callback(TP_CALLBACK_INSTANCE *instance, void *context, TP_WAIT *wait, + TP_WAIT_RESULT wait_result) +{ + struct work_item *item = context; + + TRACE("result object %p.\n", item->result); + + queue_release_pending_item(item); + + invoke_async_callback(item->result); + + release_work_item(item); +} + +static void queue_mark_item_pending(DWORD mask, struct work_item *item, MFWORKITEM_KEY *key) +{ + *key = generate_item_key(mask); + item->key = *key; + + EnterCriticalSection(&item->queue->cs); + list_add_tail(&item->queue->pending_items, &item->entry); + grab_work_item(item); + LeaveCriticalSection(&item->queue->cs); +} + +static HRESULT queue_submit_wait(struct queue *queue, HANDLE event, LONG priority, IMFAsyncResult *result, + MFWORKITEM_KEY *key) +{ + PTP_WAIT_CALLBACK callback; + struct work_item *item; + + if (!(item = alloc_work_item(queue, result))) + return E_OUTOFMEMORY; + + if (key) + { + queue_mark_item_pending(WAIT_ITEM_KEY_MASK, item, key); + callback = waiting_item_cancelable_callback; + } + else + callback = waiting_item_callback; + + item->u.wait_object = CreateThreadpoolWait(callback, item, &queue->env); + SetThreadpoolWait(item->u.wait_object, event, NULL); + + TRACE("dispatched %p.\n", result); + + return S_OK; +} + +static HRESULT queue_cancel_item(struct queue *queue, MFWORKITEM_KEY key) +{ + HRESULT hr = MF_E_NOT_FOUND; + struct work_item *item; + + EnterCriticalSection(&queue->cs); + LIST_FOR_EACH_ENTRY(item, &queue->pending_items, struct work_item, entry) + { + if (item->key == key) + { + key >>= 32; + if ((key & WAIT_ITEM_KEY_MASK) == WAIT_ITEM_KEY_MASK) + CloseThreadpoolWait(item->u.wait_object); + else + WARN("Unknown item key mask %#x.\n", (DWORD)key); + queue_release_pending_item(item); + hr = S_OK; + break; + } + } + LeaveCriticalSection(&queue->cs); + + return hr; +} + static HRESULT alloc_user_queue(MFASYNC_WORKQUEUE_TYPE queue_type, DWORD *queue_id) { struct queue_handle *entry; @@ -313,48 +513,6 @@ static HRESULT alloc_user_queue(MFASYNC_WORKQUEUE_TYPE queue_type, DWORD *queue_ return S_OK; }
-static HRESULT lock_user_queue(DWORD queue) -{ - HRESULT hr = MF_E_INVALID_WORKQUEUE; - struct queue_handle *entry; - - if (!(queue & MFASYNC_CALLBACK_QUEUE_PRIVATE_MASK)) - return S_OK; - - EnterCriticalSection(&queues_section); - entry = get_queue_obj(queue); - if (entry && entry->refcount) - { - entry->refcount++; - hr = S_OK; - } - LeaveCriticalSection(&queues_section); - return hr; -} - -static HRESULT unlock_user_queue(DWORD queue) -{ - HRESULT hr = MF_E_INVALID_WORKQUEUE; - struct queue_handle *entry; - - if (!(queue & MFASYNC_CALLBACK_QUEUE_PRIVATE_MASK)) - return S_OK; - - EnterCriticalSection(&queues_section); - entry = get_queue_obj(queue); - if (entry && entry->refcount) - { - if (--entry->refcount == 0) - { - entry->obj = next_free_user_queue; - next_free_user_queue = entry; - } - hr = S_OK; - } - LeaveCriticalSection(&queues_section); - return hr; -} - struct async_result { MFASYNCRESULT result; @@ -606,23 +764,9 @@ HRESULT WINAPI MFPutWorkItemEx(DWORD queue, IMFAsyncResult *result) */ HRESULT WINAPI MFInvokeCallback(IMFAsyncResult *result) { - MFASYNCRESULT *result_data = (MFASYNCRESULT *)result; - DWORD queue = MFASYNC_CALLBACK_QUEUE_STANDARD, flags; - HRESULT hr; - TRACE("%p.\n", result);
- if (FAILED(IMFAsyncCallback_GetParameters(result_data->pCallback, &flags, &queue))) - queue = MFASYNC_CALLBACK_QUEUE_STANDARD; - - if (FAILED(MFLockWorkQueue(queue))) - queue = MFASYNC_CALLBACK_QUEUE_STANDARD; - - hr = MFPutWorkItemEx(queue, result); - - MFUnlockWorkQueue(queue); - - return hr; + return invoke_async_callback(result); }
static HRESULT schedule_work_item(IMFAsyncResult *result, INT64 timeout, MFWORKITEM_KEY *key) @@ -662,12 +806,38 @@ HRESULT WINAPI MFScheduleWorkItem(IMFAsyncCallback *callback, IUnknown *state, I return hr; }
+/*********************************************************************** + * MFPutWaitingWorkItem (mfplat.@) + */ +HRESULT WINAPI MFPutWaitingWorkItem(HANDLE event, LONG priority, IMFAsyncResult *result, MFWORKITEM_KEY *key) +{ + struct queue *queue; + HRESULT hr; + + TRACE("%p, %d, %p, %p.\n", event, priority, result, key); + + if (FAILED(hr = grab_queue(MFASYNC_CALLBACK_QUEUE_TIMER, &queue))) + return hr; + + hr = queue_submit_wait(queue, event, priority, result, key); + + return hr; +} + /*********************************************************************** * MFCancelWorkItem (mfplat.@) */ HRESULT WINAPI MFCancelWorkItem(MFWORKITEM_KEY key) { - FIXME("%s.\n", wine_dbgstr_longlong(key)); + struct queue *queue; + HRESULT hr;
- return E_NOTIMPL; + TRACE("%s.\n", wine_dbgstr_longlong(key)); + + if (FAILED(hr = grab_queue(MFASYNC_CALLBACK_QUEUE_TIMER, &queue))) + return hr; + + hr = queue_cancel_item(queue, key); + + return hr; } diff --git a/dlls/mfplat/tests/mfplat.c b/dlls/mfplat/tests/mfplat.c index e5e074ee82..915af01822 100644 --- a/dlls/mfplat/tests/mfplat.c +++ b/dlls/mfplat/tests/mfplat.c @@ -1058,12 +1058,11 @@ todo_wine ok(hr == S_OK, "Failed to cancel item, hr %#x.\n", hr);
hr = MFCancelWorkItem(key); -todo_wine ok(hr == MF_E_NOT_FOUND || broken(hr == S_OK) /* < win10 */, "Unexpected hr %#x.\n", hr);
if (!pMFPutWaitingWorkItem) { - skip("Waiting items are not supported.\n"); + win_skip("Waiting items are not supported.\n"); return; }
@@ -1085,9 +1084,11 @@ todo_wine IMFAsyncResult_Release(result);
hr = MFScheduleWorkItem(&callback, NULL, -5000, &key); +todo_wine ok(hr == S_OK, "Failed to schedule item, hr %#x.\n", hr);
hr = MFCancelWorkItem(key); +todo_wine ok(hr == S_OK, "Failed to cancel item, hr %#x.\n", hr);
hr = MFShutdown();
Signed-off-by: Nikolay Sivov nsivov@codeweavers.com --- dlls/mfplat/queue.c | 70 ++++++++++++++++++++++++++++++++++++-- dlls/mfplat/tests/mfplat.c | 6 +--- 2 files changed, 69 insertions(+), 7 deletions(-)
diff --git a/dlls/mfplat/queue.c b/dlls/mfplat/queue.c index 1434645f49..c793caa4e9 100644 --- a/dlls/mfplat/queue.c +++ b/dlls/mfplat/queue.c @@ -35,6 +35,7 @@ WINE_DEFAULT_DEBUG_CHANNEL(mfplat); #define MAX_USER_QUEUE_HANDLES 124
#define WAIT_ITEM_KEY_MASK (0x82000000) +#define SCHEDULED_ITEM_KEY_MASK (0x80000000)
static LONG next_item_key;
@@ -53,6 +54,7 @@ struct work_item union { TP_WAIT *wait_object; + TP_TIMER *timer_object; } u; };
@@ -414,6 +416,30 @@ static void CALLBACK waiting_item_cancelable_callback(TP_CALLBACK_INSTANCE *inst release_work_item(item); }
+static void CALLBACK scheduled_item_callback(TP_CALLBACK_INSTANCE *instance, void *context, TP_TIMER *timer) +{ + struct work_item *item = context; + + TRACE("result object %p.\n", item->result); + + invoke_async_callback(item->result); + + release_work_item(item); +} + +static void CALLBACK scheduled_item_cancelable_callback(TP_CALLBACK_INSTANCE *instance, void *context, TP_TIMER *timer) +{ + struct work_item *item = context; + + TRACE("result object %p.\n", item->result); + + queue_release_pending_item(item); + + invoke_async_callback(item->result); + + release_work_item(item); +} + static void queue_mark_item_pending(DWORD mask, struct work_item *item, MFWORKITEM_KEY *key) { *key = generate_item_key(mask); @@ -450,6 +476,36 @@ static HRESULT queue_submit_wait(struct queue *queue, HANDLE event, LONG priorit return S_OK; }
+static HRESULT queue_submit_timer(struct queue *queue, IMFAsyncResult *result, INT64 timeout, MFWORKITEM_KEY *key) +{ + PTP_TIMER_CALLBACK callback; + struct work_item *item; + FILETIME filetime; + LARGE_INTEGER t; + + if (!(item = alloc_work_item(queue, result))) + return E_OUTOFMEMORY; + + if (key) + { + queue_mark_item_pending(SCHEDULED_ITEM_KEY_MASK, item, key); + callback = scheduled_item_cancelable_callback; + } + else + callback = scheduled_item_callback; + + t.QuadPart = timeout * 1000 * 10; + filetime.dwLowDateTime = t.u.LowPart; + filetime.dwHighDateTime = t.u.HighPart; + + item->u.timer_object = CreateThreadpoolTimer(callback, item, &queue->env); + SetThreadpoolTimer(item->u.timer_object, &filetime, 0, 0); + + TRACE("dispatched %p.\n", result); + + return S_OK; +} + static HRESULT queue_cancel_item(struct queue *queue, MFWORKITEM_KEY key) { HRESULT hr = MF_E_NOT_FOUND; @@ -463,6 +519,8 @@ static HRESULT queue_cancel_item(struct queue *queue, MFWORKITEM_KEY key) key >>= 32; if ((key & WAIT_ITEM_KEY_MASK) == WAIT_ITEM_KEY_MASK) CloseThreadpoolWait(item->u.wait_object); + else if ((key & SCHEDULED_ITEM_KEY_MASK) == SCHEDULED_ITEM_KEY_MASK) + CloseThreadpoolTimer(item->u.timer_object); else WARN("Unknown item key mask %#x.\n", (DWORD)key); queue_release_pending_item(item); @@ -771,9 +829,17 @@ HRESULT WINAPI MFInvokeCallback(IMFAsyncResult *result)
static HRESULT schedule_work_item(IMFAsyncResult *result, INT64 timeout, MFWORKITEM_KEY *key) { - FIXME("%p, %s, %p.\n", result, wine_dbgstr_longlong(timeout), key); + struct queue *queue; + HRESULT hr; + + if (FAILED(hr = grab_queue(MFASYNC_CALLBACK_QUEUE_TIMER, &queue))) + return hr; + + TRACE("%p, %s, %p.\n", result, wine_dbgstr_longlong(timeout), key); + + hr = queue_submit_timer(queue, result, timeout, key);
- return E_NOTIMPL; + return hr; }
/*********************************************************************** diff --git a/dlls/mfplat/tests/mfplat.c b/dlls/mfplat/tests/mfplat.c index 915af01822..fa779299d3 100644 --- a/dlls/mfplat/tests/mfplat.c +++ b/dlls/mfplat/tests/mfplat.c @@ -1050,11 +1050,9 @@ static void test_scheduled_items(void) ok(hr == S_OK, "Failed to start up, hr %#x.\n", hr);
hr = MFScheduleWorkItem(&callback, NULL, -5000, &key); -todo_wine ok(hr == S_OK, "Failed to schedule item, hr %#x.\n", hr);
hr = MFCancelWorkItem(key); -todo_wine ok(hr == S_OK, "Failed to cancel item, hr %#x.\n", hr);
hr = MFCancelWorkItem(key); @@ -1084,15 +1082,13 @@ todo_wine IMFAsyncResult_Release(result);
hr = MFScheduleWorkItem(&callback, NULL, -5000, &key); -todo_wine ok(hr == S_OK, "Failed to schedule item, hr %#x.\n", hr);
hr = MFCancelWorkItem(key); -todo_wine ok(hr == S_OK, "Failed to cancel item, hr %#x.\n", hr);
hr = MFShutdown(); - ok(hr == S_OK, "Failed to shutdown, hr %#x.\n", hr); + ok(hr == S_OK, "Failed to shut down, hr %#x.\n", hr); }
static void test_serial_queue(void)
Signed-off-by: Nikolay Sivov nsivov@codeweavers.com --- dlls/mfplat/queue.c | 46 +++++++++++++++++++++++---------------------- 1 file changed, 24 insertions(+), 22 deletions(-)
diff --git a/dlls/mfplat/queue.c b/dlls/mfplat/queue.c index c793caa4e9..23e54a5cbf 100644 --- a/dlls/mfplat/queue.c +++ b/dlls/mfplat/queue.c @@ -254,6 +254,28 @@ static HRESULT lock_user_queue(DWORD queue) return hr; }
+static void shutdown_queue(struct queue *queue) +{ + struct work_item *item, *item2; + + if (!queue->pool) + return; + + CloseThreadpoolCleanupGroupMembers(queue->env.CleanupGroup, TRUE, NULL); + CloseThreadpool(queue->pool); + queue->pool = NULL; + + EnterCriticalSection(&queue->cs); + LIST_FOR_EACH_ENTRY_SAFE(item, item2, &queue->pending_items, struct work_item, entry) + { + list_remove(&item->entry); + release_work_item(item); + } + LeaveCriticalSection(&queue->cs); + + DeleteCriticalSection(&queue->cs); +} + static HRESULT unlock_user_queue(DWORD queue) { HRESULT hr = MF_E_INVALID_WORKQUEUE; @@ -268,6 +290,8 @@ static HRESULT unlock_user_queue(DWORD queue) { if (--entry->refcount == 0) { + shutdown_queue((struct queue *)entry->obj); + heap_free(entry->obj); entry->obj = next_free_user_queue; next_free_user_queue = entry; } @@ -277,28 +301,6 @@ static HRESULT unlock_user_queue(DWORD queue) return hr; }
-static void shutdown_queue(struct queue *queue) -{ - struct work_item *item, *item2; - - if (!queue->pool) - return; - - CloseThreadpoolCleanupGroupMembers(queue->env.CleanupGroup, TRUE, NULL); - CloseThreadpool(queue->pool); - queue->pool = NULL; - - EnterCriticalSection(&queue->cs); - LIST_FOR_EACH_ENTRY_SAFE(item, item2, &queue->pending_items, struct work_item, entry) - { - list_remove(&item->entry); - release_work_item(item); - } - LeaveCriticalSection(&queue->cs); - - DeleteCriticalSection(&queue->cs); -} - void shutdown_system_queues(void) { unsigned int i;
Signed-off-by: Nikolay Sivov nsivov@codeweavers.com --- dlls/mfplat/main.c | 5 ++++ dlls/mfplat/mfplat_private.h | 1 + dlls/mfplat/queue.c | 3 ++ dlls/mfplat/tests/mfplat.c | 53 +++++++++++++++++++++++++++++++++++- 4 files changed, 61 insertions(+), 1 deletion(-)
diff --git a/dlls/mfplat/main.c b/dlls/mfplat/main.c index 2d82589dfb..a8963ca0d8 100644 --- a/dlls/mfplat/main.c +++ b/dlls/mfplat/main.c @@ -521,6 +521,11 @@ HRESULT WINAPI MFUnlockPlatform(void) return S_OK; }
+BOOL is_platform_locked(void) +{ + return platform_lock > 0; +} + /*********************************************************************** * MFCopyImage (mfplat.@) */ diff --git a/dlls/mfplat/mfplat_private.h b/dlls/mfplat/mfplat_private.h index 2d9e7d5906..ac35ff2dca 100644 --- a/dlls/mfplat/mfplat_private.h +++ b/dlls/mfplat/mfplat_private.h @@ -18,3 +18,4 @@
extern void init_system_queues(void) DECLSPEC_HIDDEN; extern void shutdown_system_queues(void) DECLSPEC_HIDDEN; +extern BOOL is_platform_locked(void) DECLSPEC_HIDDEN; diff --git a/dlls/mfplat/queue.c b/dlls/mfplat/queue.c index 23e54a5cbf..f470cb44b5 100644 --- a/dlls/mfplat/queue.c +++ b/dlls/mfplat/queue.c @@ -543,6 +543,9 @@ static HRESULT alloc_user_queue(MFASYNC_WORKQUEUE_TYPE queue_type, DWORD *queue_
*queue_id = MFASYNC_CALLBACK_QUEUE_UNDEFINED;
+ if (!is_platform_locked()) + return MF_E_SHUTDOWN; + queue = heap_alloc_zero(sizeof(*queue)); if (!queue) return E_OUTOFMEMORY; diff --git a/dlls/mfplat/tests/mfplat.c b/dlls/mfplat/tests/mfplat.c index fa779299d3..2a2189b4d3 100644 --- a/dlls/mfplat/tests/mfplat.c +++ b/dlls/mfplat/tests/mfplat.c @@ -837,6 +837,7 @@ static void test_MFCreateAsyncResult(void)
static void test_startup(void) { + DWORD queue; HRESULT hr;
hr = MFStartup(MAKELONG(MF_API_VERSION, 0xdead), MFSTARTUP_FULL); @@ -845,8 +846,58 @@ static void test_startup(void) hr = MFStartup(MF_VERSION, MFSTARTUP_FULL); ok(hr == S_OK, "Failed to start up, hr %#x.\n", hr);
+ hr = MFAllocateWorkQueue(&queue); + ok(hr == S_OK, "Failed to allocate a queue, hr %#x.\n", hr); + hr = MFUnlockWorkQueue(queue); + ok(hr == S_OK, "Failed to unlock the queue, hr %#x.\n", hr); + hr = MFShutdown(); - ok(hr == S_OK, "Failed to shutdown, hr %#x.\n", hr); + ok(hr == S_OK, "Failed to shut down, hr %#x.\n", hr); + + hr = MFAllocateWorkQueue(&queue); + ok(hr == MF_E_SHUTDOWN, "Unexpected hr %#x.\n", hr); + + /* Already shut down, has no effect. */ + hr = MFShutdown(); + ok(hr == S_OK, "Failed to shut down, hr %#x.\n", hr); + + hr = MFStartup(MF_VERSION, MFSTARTUP_FULL); + ok(hr == S_OK, "Failed to start up, hr %#x.\n", hr); + + hr = MFAllocateWorkQueue(&queue); + ok(hr == S_OK, "Failed to allocate a queue, hr %#x.\n", hr); + hr = MFUnlockWorkQueue(queue); + ok(hr == S_OK, "Failed to unlock the queue, hr %#x.\n", hr); + + hr = MFShutdown(); + ok(hr == S_OK, "Failed to shut down, hr %#x.\n", hr); + + /* Platform lock. */ + hr = MFStartup(MF_VERSION, MFSTARTUP_FULL); + ok(hr == S_OK, "Failed to start up, hr %#x.\n", hr); + + hr = MFAllocateWorkQueue(&queue); + ok(hr == S_OK, "Failed to allocate a queue, hr %#x.\n", hr); + hr = MFUnlockWorkQueue(queue); + ok(hr == S_OK, "Failed to unlock the queue, hr %#x.\n", hr); + + /* Unlocking implies shutdown. */ + hr = MFUnlockPlatform(); + ok(hr == S_OK, "Failed to unlock, %#x.\n", hr); + + hr = MFAllocateWorkQueue(&queue); + ok(hr == MF_E_SHUTDOWN, "Unexpected hr %#x.\n", hr); + + hr = MFLockPlatform(); + ok(hr == S_OK, "Failed to lock, %#x.\n", hr); + + hr = MFAllocateWorkQueue(&queue); + ok(hr == S_OK, "Failed to allocate a queue, hr %#x.\n", hr); + hr = MFUnlockWorkQueue(queue); + ok(hr == S_OK, "Failed to unlock the queue, hr %#x.\n", hr); + + hr = MFShutdown(); + ok(hr == S_OK, "Failed to shut down, hr %#x.\n", hr); }
static void test_allocate_queue(void)