Signed-off-by: Nikolay Sivov nsivov@codeweavers.com --- configure | 2 ++ configure.ac | 1 + dlls/rtworkq/Makefile.in | 4 ++++ dlls/rtworkq/rtworkq.spec | 37 +++++++++++++++++++++++++++++++++++++ 4 files changed, 44 insertions(+) create mode 100644 dlls/rtworkq/Makefile.in create mode 100644 dlls/rtworkq/rtworkq.spec
diff --git a/configure b/configure index 80235dd995..9f23d268e8 100755 --- a/configure +++ b/configure @@ -1539,6 +1539,7 @@ enable_rsabase enable_rsaenh enable_rstrtmgr enable_rtutils +enable_rtworkq enable_samlib enable_sane_ds enable_sapi @@ -20798,6 +20799,7 @@ wine_fn_config_makefile dlls/rsaenh enable_rsaenh wine_fn_config_makefile dlls/rsaenh/tests enable_tests wine_fn_config_makefile dlls/rstrtmgr enable_rstrtmgr wine_fn_config_makefile dlls/rtutils enable_rtutils +wine_fn_config_makefile dlls/rtworkq enable_rtworkq wine_fn_config_makefile dlls/samlib enable_samlib wine_fn_config_makefile dlls/sane.ds enable_sane_ds wine_fn_config_makefile dlls/sapi enable_sapi diff --git a/configure.ac b/configure.ac index 40374210bb..e06743c2ea 100644 --- a/configure.ac +++ b/configure.ac @@ -3606,6 +3606,7 @@ WINE_CONFIG_MAKEFILE(dlls/rsaenh) WINE_CONFIG_MAKEFILE(dlls/rsaenh/tests) WINE_CONFIG_MAKEFILE(dlls/rstrtmgr) WINE_CONFIG_MAKEFILE(dlls/rtutils) +WINE_CONFIG_MAKEFILE(dlls/rtworkq) WINE_CONFIG_MAKEFILE(dlls/samlib) WINE_CONFIG_MAKEFILE(dlls/sane.ds) WINE_CONFIG_MAKEFILE(dlls/sapi) diff --git a/dlls/rtworkq/Makefile.in b/dlls/rtworkq/Makefile.in new file mode 100644 index 0000000000..d6d7d6f715 --- /dev/null +++ b/dlls/rtworkq/Makefile.in @@ -0,0 +1,4 @@ +MODULE = rtworkq.dll +IMPORTLIB = rtworkq + +EXTRADLLFLAGS = -mno-cygwin diff --git a/dlls/rtworkq/rtworkq.spec b/dlls/rtworkq/rtworkq.spec new file mode 100644 index 0000000000..29e56846c6 --- /dev/null +++ b/dlls/rtworkq/rtworkq.spec @@ -0,0 +1,37 @@ +@ stub RtwqAddPeriodicCallback +@ stub RtwqAllocateSerialWorkQueue +@ stub RtwqAllocateWorkQueue +@ stub RtwqBeginRegisterWorkQueueWithMMCSS +@ stub RtwqBeginUnregisterWorkQueueWithMMCSS +@ stub RtwqCancelDeadline +@ stub RtwqCancelMultipleWaitingWorkItem +@ stub RtwqCancelWorkItem +@ stub RtwqCreateAsyncResult +@ stub RtwqEndRegisterWorkQueueWithMMCSS +@ stub RtwqEndUnregisterWorkQueueWithMMCSS +@ stub RtwqGetPlatform +@ stub RtwqGetWorkQueueMMCSSClass +@ stub RtwqGetWorkQueueMMCSSPriority +@ stub RtwqGetWorkQueueMMCSSTaskId +@ stub RtwqInvokeCallback +@ stub RtwqJoinWorkQueue +@ stub RtwqLockPlatform +@ stub RtwqLockSharedWorkQueue +@ stub RtwqLockWorkQueue +@ stub RtwqPutMultipleWaitingWorkItem +@ stub RtwqPutWaitingWorkItem +@ stub RtwqPutWorkItem +@ stub RtwqRegisterPlatformEvents +@ stub RtwqRegisterPlatformWithMMCSS +@ stub RtwqRemovePeriodicCallback +@ stub RtwqScheduleWorkItem +@ stub RtwqSetDeadline +@ stub RtwqSetDeadline2 +@ stub RtwqSetLongRunning +@ stub RtwqShutdown +@ stub RtwqStartup +@ stub RtwqUnjoinWorkQueue +@ stub RtwqUnlockPlatform +@ stub RtwqUnlockWorkQueue +@ stub RtwqUnregisterPlatformEvents +@ stub RtwqUnregisterPlatformFromMMCSS
Signed-off-by: Nikolay Sivov nsivov@codeweavers.com --- dlls/rtworkq/Makefile.in | 3 + dlls/rtworkq/queue.c | 219 ++++++++++++++++++++++++++++++++++++++ dlls/rtworkq/rtworkq.spec | 6 +- include/Makefile.in | 1 + include/rtworkq.idl | 62 +++++++++++ 5 files changed, 288 insertions(+), 3 deletions(-) create mode 100644 dlls/rtworkq/queue.c create mode 100644 include/rtworkq.idl
diff --git a/dlls/rtworkq/Makefile.in b/dlls/rtworkq/Makefile.in index d6d7d6f715..a224198986 100644 --- a/dlls/rtworkq/Makefile.in +++ b/dlls/rtworkq/Makefile.in @@ -2,3 +2,6 @@ MODULE = rtworkq.dll IMPORTLIB = rtworkq
EXTRADLLFLAGS = -mno-cygwin + +C_SRCS = \ + queue.c diff --git a/dlls/rtworkq/queue.c b/dlls/rtworkq/queue.c new file mode 100644 index 0000000000..7897ef5dc9 --- /dev/null +++ b/dlls/rtworkq/queue.c @@ -0,0 +1,219 @@ +/* + * Copyright 2019-2020 Nikolay Sivov for CodeWeavers + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA + */ + +#define COBJMACROS +#define NONAMELESSUNION + +#include "initguid.h" +#include "rtworkq.h" +#include "wine/debug.h" +#include "wine/heap.h" + +WINE_DEFAULT_DEBUG_CHANNEL(mfplat); + +static LONG platform_lock; + +struct async_result +{ + RTWQASYNCRESULT result; + LONG refcount; + IUnknown *object; + IUnknown *state; +}; + +static struct async_result *impl_from_IRtwqAsyncResult(IRtwqAsyncResult *iface) +{ + return CONTAINING_RECORD(iface, struct async_result, result.AsyncResult); +} + +static HRESULT WINAPI async_result_QueryInterface(IRtwqAsyncResult *iface, REFIID riid, void **obj) +{ + TRACE("%p, %s, %p.\n", iface, debugstr_guid(riid), obj); + + if (IsEqualIID(riid, &IID_IRtwqAsyncResult) || + IsEqualIID(riid, &IID_IUnknown)) + { + *obj = iface; + IRtwqAsyncResult_AddRef(iface); + return S_OK; + } + + *obj = NULL; + WARN("Unsupported interface %s.\n", debugstr_guid(riid)); + return E_NOINTERFACE; +} + +static ULONG WINAPI async_result_AddRef(IRtwqAsyncResult *iface) +{ + struct async_result *result = impl_from_IRtwqAsyncResult(iface); + ULONG refcount = InterlockedIncrement(&result->refcount); + + TRACE("%p, %u.\n", iface, refcount); + + return refcount; +} + +static ULONG WINAPI async_result_Release(IRtwqAsyncResult *iface) +{ + struct async_result *result = impl_from_IRtwqAsyncResult(iface); + ULONG refcount = InterlockedDecrement(&result->refcount); + + TRACE("%p, %u.\n", iface, refcount); + + if (!refcount) + { + if (result->result.pCallback) + IRtwqAsyncCallback_Release(result->result.pCallback); + if (result->object) + IUnknown_Release(result->object); + if (result->state) + IUnknown_Release(result->state); + if (result->result.hEvent) + CloseHandle(result->result.hEvent); + heap_free(result); + + RtwqUnlockPlatform(); + } + + return refcount; +} + +static HRESULT WINAPI async_result_GetState(IRtwqAsyncResult *iface, IUnknown **state) +{ + struct async_result *result = impl_from_IRtwqAsyncResult(iface); + + TRACE("%p, %p.\n", iface, state); + + if (!result->state) + return E_POINTER; + + *state = result->state; + IUnknown_AddRef(*state); + + return S_OK; +} + +static HRESULT WINAPI async_result_GetStatus(IRtwqAsyncResult *iface) +{ + struct async_result *result = impl_from_IRtwqAsyncResult(iface); + + TRACE("%p.\n", iface); + + return result->result.hrStatusResult; +} + +static HRESULT WINAPI async_result_SetStatus(IRtwqAsyncResult *iface, HRESULT status) +{ + struct async_result *result = impl_from_IRtwqAsyncResult(iface); + + TRACE("%p, %#x.\n", iface, status); + + result->result.hrStatusResult = status; + + return S_OK; +} + +static HRESULT WINAPI async_result_GetObject(IRtwqAsyncResult *iface, IUnknown **object) +{ + struct async_result *result = impl_from_IRtwqAsyncResult(iface); + + TRACE("%p, %p.\n", iface, object); + + if (!result->object) + return E_POINTER; + + *object = result->object; + IUnknown_AddRef(*object); + + return S_OK; +} + +static IUnknown * WINAPI async_result_GetStateNoAddRef(IRtwqAsyncResult *iface) +{ + struct async_result *result = impl_from_IRtwqAsyncResult(iface); + + TRACE("%p.\n", iface); + + return result->state; +} + +static const IRtwqAsyncResultVtbl async_result_vtbl = +{ + async_result_QueryInterface, + async_result_AddRef, + async_result_Release, + async_result_GetState, + async_result_GetStatus, + async_result_SetStatus, + async_result_GetObject, + async_result_GetStateNoAddRef, +}; + +static HRESULT create_async_result(IUnknown *object, IRtwqAsyncCallback *callback, IUnknown *state, IRtwqAsyncResult **out) +{ + struct async_result *result; + + if (!out) + return E_INVALIDARG; + + result = heap_alloc_zero(sizeof(*result)); + if (!result) + return E_OUTOFMEMORY; + + RtwqLockPlatform(); + + result->result.AsyncResult.lpVtbl = &async_result_vtbl; + result->refcount = 1; + result->object = object; + if (result->object) + IUnknown_AddRef(result->object); + result->result.pCallback = callback; + if (result->result.pCallback) + IRtwqAsyncCallback_AddRef(result->result.pCallback); + result->state = state; + if (result->state) + IUnknown_AddRef(result->state); + + *out = &result->result.AsyncResult; + + TRACE("Created async result object %p.\n", *out); + + return S_OK; +} + +HRESULT WINAPI RtwqCreateAsyncResult(IUnknown *object, IRtwqAsyncCallback *callback, IUnknown *state, + IRtwqAsyncResult **out) +{ + TRACE("%p, %p, %p, %p.\n", object, callback, state, out); + + return create_async_result(object, callback, state, out); +} + +HRESULT WINAPI RtwqLockPlatform(void) +{ + InterlockedIncrement(&platform_lock); + + return S_OK; +} + +HRESULT WINAPI RtwqUnlockPlatform(void) +{ + InterlockedDecrement(&platform_lock); + + return S_OK; +} diff --git a/dlls/rtworkq/rtworkq.spec b/dlls/rtworkq/rtworkq.spec index 29e56846c6..900aa5f230 100644 --- a/dlls/rtworkq/rtworkq.spec +++ b/dlls/rtworkq/rtworkq.spec @@ -6,7 +6,7 @@ @ stub RtwqCancelDeadline @ stub RtwqCancelMultipleWaitingWorkItem @ stub RtwqCancelWorkItem -@ stub RtwqCreateAsyncResult +@ stdcall RtwqCreateAsyncResult(ptr ptr ptr ptr) @ stub RtwqEndRegisterWorkQueueWithMMCSS @ stub RtwqEndUnregisterWorkQueueWithMMCSS @ stub RtwqGetPlatform @@ -15,7 +15,7 @@ @ stub RtwqGetWorkQueueMMCSSTaskId @ stub RtwqInvokeCallback @ stub RtwqJoinWorkQueue -@ stub RtwqLockPlatform +@ stdcall RtwqLockPlatform() @ stub RtwqLockSharedWorkQueue @ stub RtwqLockWorkQueue @ stub RtwqPutMultipleWaitingWorkItem @@ -31,7 +31,7 @@ @ stub RtwqShutdown @ stub RtwqStartup @ stub RtwqUnjoinWorkQueue -@ stub RtwqUnlockPlatform +@ stdcall RtwqUnlockPlatform() @ stub RtwqUnlockWorkQueue @ stub RtwqUnregisterPlatformEvents @ stub RtwqUnregisterPlatformFromMMCSS diff --git a/include/Makefile.in b/include/Makefile.in index 5f967b616d..a91372eb4d 100644 --- a/include/Makefile.in +++ b/include/Makefile.in @@ -579,6 +579,7 @@ SOURCES = \ rstloc.idl \ rstnot.idl \ rtutils.h \ + rtworkq.idl \ sal.h \ sapi.idl \ sapiaut.idl \ diff --git a/include/rtworkq.idl b/include/rtworkq.idl new file mode 100644 index 0000000000..d22b09ca50 --- /dev/null +++ b/include/rtworkq.idl @@ -0,0 +1,62 @@ +/* + * Copyright 2020 Nikolay Sivov for CodeWeavers + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA + */ + +import "unknwn.idl"; + +[ + object, + uuid(ac6b7889-0740-4d51-8619-905994a55cc6), + local +] +interface IRtwqAsyncResult : IUnknown +{ + HRESULT GetState([out] IUnknown **state); + HRESULT GetStatus(); + HRESULT SetStatus([in] HRESULT status); + HRESULT GetObject([out] IUnknown **object); + IUnknown *GetStateNoAddRef(); +} + +[ + object, + uuid(a27003cf-2354-4f2a-8d6a-ab7cff15437e), + local +] +interface IRtwqAsyncCallback : IUnknown +{ + HRESULT GetParameters([out] DWORD *flags, [out] DWORD *queue); + HRESULT Invoke([in] IRtwqAsyncResult *result); +} + +cpp_quote("#ifdef __WINESRC__") +cpp_quote("typedef struct tagRTWQASYNCRESULT") +cpp_quote("{") +cpp_quote(" IRtwqAsyncResult AsyncResult;") +cpp_quote("#else") +cpp_quote("typedef struct tagRTWQASYNCRESULT : public IRtwqAsyncResult {") +cpp_quote("#endif") +cpp_quote(" OVERLAPPED overlapped;") +cpp_quote(" IRtwqAsyncCallback *pCallback;") +cpp_quote(" HRESULT hrStatusResult;") +cpp_quote(" DWORD dwBytesTransferred;") +cpp_quote(" HANDLE hEvent;") +cpp_quote("} RTWQASYNCRESULT;") + +cpp_quote("HRESULT WINAPI RtwqCreateAsyncResult(IUnknown *object, IRtwqAsyncCallback *callback, IUnknown *state, IRtwqAsyncResult **result);") +cpp_quote("HRESULT WINAPI RtwqLockPlatform(void);") +cpp_quote("HRESULT WINAPI RtwqUnlockPlatform(void);")
Signed-off-by: Nikolay Sivov nsivov@codeweavers.com --- dlls/rtworkq/queue.c | 173 ++++++++++++++++++++++++++++++++++++++ dlls/rtworkq/rtworkq.spec | 4 +- include/rtworkq.idl | 11 +++ 3 files changed, 186 insertions(+), 2 deletions(-)
diff --git a/dlls/rtworkq/queue.c b/dlls/rtworkq/queue.c index 7897ef5dc9..31a17c68ae 100644 --- a/dlls/rtworkq/queue.c +++ b/dlls/rtworkq/queue.c @@ -23,11 +23,108 @@ #include "rtworkq.h" #include "wine/debug.h" #include "wine/heap.h" +#include "wine/list.h"
WINE_DEFAULT_DEBUG_CHANNEL(mfplat);
+static CRITICAL_SECTION queues_section; +static CRITICAL_SECTION_DEBUG queues_critsect_debug = +{ + 0, 0, &queues_section, + { &queues_critsect_debug.ProcessLocksList, &queues_critsect_debug.ProcessLocksList }, + 0, 0, { (DWORD_PTR)(__FILE__ ": queues_section") } +}; +static CRITICAL_SECTION queues_section = { &queues_critsect_debug, -1, 0, 0, 0, 0 }; + static LONG platform_lock;
+enum system_queue_index +{ + SYS_QUEUE_STANDARD = 0, + SYS_QUEUE_RT, + SYS_QUEUE_IO, + SYS_QUEUE_TIMER, + SYS_QUEUE_MULTITHREADED, + SYS_QUEUE_DO_NOT_USE, + SYS_QUEUE_LONG_FUNCTION, + SYS_QUEUE_COUNT, +}; + +struct work_item +{ + struct list entry; + LONG refcount; + IRtwqAsyncResult *result; + struct queue *queue; + RTWQWORKITEM_KEY key; + union + { + TP_WAIT *wait_object; + TP_TIMER *timer_object; + } u; +}; + +static const TP_CALLBACK_PRIORITY priorities[] = +{ + TP_CALLBACK_PRIORITY_HIGH, + TP_CALLBACK_PRIORITY_NORMAL, + TP_CALLBACK_PRIORITY_LOW, +}; + +struct queue +{ + TP_POOL *pool; + TP_CALLBACK_ENVIRON_V3 envs[ARRAY_SIZE(priorities)]; + CRITICAL_SECTION cs; + struct list pending_items; +}; + +static struct queue system_queues[SYS_QUEUE_COUNT]; + +static void CALLBACK standard_queue_cleanup_callback(void *object_data, void *group_data) +{ +} + +static void release_work_item(struct work_item *item) +{ + if (InterlockedDecrement(&item->refcount) == 0) + { + IRtwqAsyncResult_Release(item->result); + heap_free(item); + } +} + +static void init_work_queue(RTWQ_WORKQUEUE_TYPE queue_type, struct queue *queue) +{ + TP_CALLBACK_ENVIRON_V3 env; + unsigned int max_thread, i; + + queue->pool = CreateThreadpool(NULL); + + memset(&env, 0, sizeof(env)); + env.Version = 3; + env.Size = sizeof(env); + env.Pool = queue->pool; + env.CleanupGroup = CreateThreadpoolCleanupGroup(); + env.CleanupGroupCancelCallback = standard_queue_cleanup_callback; + env.CallbackPriority = TP_CALLBACK_PRIORITY_NORMAL; + for (i = 0; i < ARRAY_SIZE(queue->envs); ++i) + { + queue->envs[i] = env; + queue->envs[i].CallbackPriority = priorities[i]; + } + list_init(&queue->pending_items); + InitializeCriticalSection(&queue->cs); + + max_thread = (queue_type == RTWQ_STANDARD_WORKQUEUE || queue_type == RTWQ_WINDOW_WORKQUEUE) ? 1 : 4; + + SetThreadpoolThreadMinimum(queue->pool, 1); + SetThreadpoolThreadMaximum(queue->pool, max_thread); + + if (queue_type == RTWQ_WINDOW_WORKQUEUE) + FIXME("RTWQ_WINDOW_WORKQUEUE is not supported.\n"); +} + struct async_result { RTWQASYNCRESULT result; @@ -217,3 +314,79 @@ HRESULT WINAPI RtwqUnlockPlatform(void)
return S_OK; } + +static void init_system_queues(void) +{ + /* Always initialize standard queue, keep the rest lazy. */ + + EnterCriticalSection(&queues_section); + + if (system_queues[SYS_QUEUE_STANDARD].pool) + { + LeaveCriticalSection(&queues_section); + return; + } + + init_work_queue(RTWQ_STANDARD_WORKQUEUE, &system_queues[SYS_QUEUE_STANDARD]); + + LeaveCriticalSection(&queues_section); +} + +HRESULT WINAPI RtwqStartup(void) +{ + if (InterlockedIncrement(&platform_lock) == 1) + { + init_system_queues(); + } + + return S_OK; +} + +static void shutdown_queue(struct queue *queue) +{ + struct work_item *item, *item2; + + if (!queue->pool) + return; + + CloseThreadpoolCleanupGroupMembers(queue->envs[0].CleanupGroup, TRUE, NULL); + CloseThreadpool(queue->pool); + queue->pool = NULL; + + EnterCriticalSection(&queue->cs); + LIST_FOR_EACH_ENTRY_SAFE(item, item2, &queue->pending_items, struct work_item, entry) + { + list_remove(&item->entry); + release_work_item(item); + } + LeaveCriticalSection(&queue->cs); + + DeleteCriticalSection(&queue->cs); +} + +static void shutdown_system_queues(void) +{ + unsigned int i; + + EnterCriticalSection(&queues_section); + + for (i = 0; i < ARRAY_SIZE(system_queues); ++i) + { + shutdown_queue(&system_queues[i]); + } + + LeaveCriticalSection(&queues_section); +} + +HRESULT WINAPI RtwqShutdown(void) +{ + if (platform_lock <= 0) + return S_OK; + + if (InterlockedExchangeAdd(&platform_lock, -1) == 1) + { + shutdown_system_queues(); + } + + return S_OK; +} diff --git a/dlls/rtworkq/rtworkq.spec b/dlls/rtworkq/rtworkq.spec index 900aa5f230..8c352593ff 100644 --- a/dlls/rtworkq/rtworkq.spec +++ b/dlls/rtworkq/rtworkq.spec @@ -28,8 +28,8 @@ @ stub RtwqSetDeadline @ stub RtwqSetDeadline2 @ stub RtwqSetLongRunning -@ stub RtwqShutdown -@ stub RtwqStartup +@ stdcall RtwqShutdown() +@ stdcall RtwqStartup() @ stub RtwqUnjoinWorkQueue @ stdcall RtwqUnlockPlatform() @ stub RtwqUnlockWorkQueue diff --git a/include/rtworkq.idl b/include/rtworkq.idl index d22b09ca50..c2e8237bba 100644 --- a/include/rtworkq.idl +++ b/include/rtworkq.idl @@ -18,6 +18,15 @@
import "unknwn.idl";
+typedef enum +{ + RTWQ_STANDARD_WORKQUEUE = 0, + RTWQ_WINDOW_WORKQUEUE = 1, + RTWQ_MULTITHREADED_WORKQUEUE = 2, +} RTWQ_WORKQUEUE_TYPE; + +typedef unsigned __int64 RTWQWORKITEM_KEY; + [ object, uuid(ac6b7889-0740-4d51-8619-905994a55cc6), @@ -59,4 +68,6 @@ cpp_quote("} RTWQASYNCRESULT;")
cpp_quote("HRESULT WINAPI RtwqCreateAsyncResult(IUnknown *object, IRtwqAsyncCallback *callback, IUnknown *state, IRtwqAsyncResult **result);") cpp_quote("HRESULT WINAPI RtwqLockPlatform(void);") +cpp_quote("HRESULT WINAPI RtwqShutdown(void);") +cpp_quote("HRESULT WINAPI RtwqStartup(void);") cpp_quote("HRESULT WINAPI RtwqUnlockPlatform(void);")
Signed-off-by: Nikolay Sivov nsivov@codeweavers.com --- dlls/rtworkq/queue.c | 391 +++++++++++++++++++++++++++++++++++--- dlls/rtworkq/rtworkq.spec | 6 +- include/rtworkq.idl | 12 ++ 3 files changed, 384 insertions(+), 25 deletions(-)
diff --git a/dlls/rtworkq/queue.c b/dlls/rtworkq/queue.c index 31a17c68ae..7d949e8e62 100644 --- a/dlls/rtworkq/queue.c +++ b/dlls/rtworkq/queue.c @@ -27,6 +27,34 @@
WINE_DEFAULT_DEBUG_CHANNEL(mfplat);
+#define FIRST_USER_QUEUE_HANDLE 5 +#define MAX_USER_QUEUE_HANDLES 124 + +#define WAIT_ITEM_KEY_MASK (0x82000000) +#define SCHEDULED_ITEM_KEY_MASK (0x80000000) + +static LONG next_item_key; + +static RTWQWORKITEM_KEY get_item_key(DWORD mask, DWORD key) +{ + return ((RTWQWORKITEM_KEY)mask << 32) | key; +} + +static RTWQWORKITEM_KEY generate_item_key(DWORD mask) +{ + return get_item_key(mask, InterlockedIncrement(&next_item_key)); +} + +struct queue_handle +{ + void *obj; + LONG refcount; + WORD generation; +}; + +static struct queue_handle user_queues[MAX_USER_QUEUE_HANDLES]; +static struct queue_handle *next_free_user_queue; + static CRITICAL_SECTION queues_section; static CRITICAL_SECTION_DEBUG queues_critsect_debug = { @@ -38,6 +66,33 @@ static CRITICAL_SECTION queues_section = { &queues_critsect_debug, -1, 0, 0, 0,
static LONG platform_lock;
+static struct queue_handle *get_queue_obj(DWORD handle) +{ + unsigned int idx = HIWORD(handle) - FIRST_USER_QUEUE_HANDLE; + + if (idx < MAX_USER_QUEUE_HANDLES && user_queues[idx].refcount) + { + if (LOWORD(handle) == user_queues[idx].generation) + return &user_queues[idx]; + } + + return NULL; +} + +/* Should be kept in sync with corresponding MFASYNC_CALLBACK_ constants. */ +enum rtwq_callback_queue_id +{ + RTWQ_CALLBACK_QUEUE_UNDEFINED = 0x00000000, + RTWQ_CALLBACK_QUEUE_STANDARD = 0x00000001, + RTWQ_CALLBACK_QUEUE_RT = 0x00000002, + RTWQ_CALLBACK_QUEUE_IO = 0x00000003, + RTWQ_CALLBACK_QUEUE_TIMER = 0x00000004, + RTWQ_CALLBACK_QUEUE_MULTITHREADED = 0x00000005, + RTWQ_CALLBACK_QUEUE_LONG_FUNCTION = 0x00000007, + RTWQ_CALLBACK_QUEUE_PRIVATE_MASK = 0xffff0000, + RTWQ_CALLBACK_QUEUE_ALL = 0xffffffff, +}; + enum system_queue_index { SYS_QUEUE_STANDARD = 0, @@ -81,10 +136,40 @@ struct queue
static struct queue system_queues[SYS_QUEUE_COUNT];
+static struct queue *get_system_queue(DWORD queue_id) +{ + switch (queue_id) + { + case RTWQ_CALLBACK_QUEUE_STANDARD: + case RTWQ_CALLBACK_QUEUE_RT: + case RTWQ_CALLBACK_QUEUE_IO: + case RTWQ_CALLBACK_QUEUE_TIMER: + case RTWQ_CALLBACK_QUEUE_MULTITHREADED: + case RTWQ_CALLBACK_QUEUE_LONG_FUNCTION: + return &system_queues[queue_id - 1]; + default: + return NULL; + } +} + static void CALLBACK standard_queue_cleanup_callback(void *object_data, void *group_data) { }
+static struct work_item * alloc_work_item(struct queue *queue, IRtwqAsyncResult *result) +{ + struct work_item *item; + + item = heap_alloc_zero(sizeof(*item)); + item->result = result; + IRtwqAsyncResult_AddRef(item->result); + item->refcount = 1; + item->queue = queue; + list_init(&item->entry); + + return item; +} + static void release_work_item(struct work_item *item) { if (InterlockedDecrement(&item->refcount) == 0) @@ -94,6 +179,12 @@ static void release_work_item(struct work_item *item) } }
+static struct work_item *grab_work_item(struct work_item *item) +{ + InterlockedIncrement(&item->refcount); + return item; +} + static void init_work_queue(RTWQ_WORKQUEUE_TYPE queue_type, struct queue *queue) { TP_CALLBACK_ENVIRON_V3 env; @@ -125,6 +216,255 @@ static void init_work_queue(RTWQ_WORKQUEUE_TYPE queue_type, struct queue *queue) FIXME("RTWQ_WINDOW_WORKQUEUE is not supported.\n"); }
+static HRESULT grab_queue(DWORD queue_id, struct queue **ret) +{ + struct queue *queue = get_system_queue(queue_id); + RTWQ_WORKQUEUE_TYPE queue_type; + struct queue_handle *entry; + + *ret = NULL; + + if (!system_queues[SYS_QUEUE_STANDARD].pool) + return RTWQ_E_SHUTDOWN; + + if (queue && queue->pool) + { + *ret = queue; + return S_OK; + } + else if (queue) + { + EnterCriticalSection(&queues_section); + switch (queue_id) + { + case RTWQ_CALLBACK_QUEUE_IO: + case RTWQ_CALLBACK_QUEUE_MULTITHREADED: + case RTWQ_CALLBACK_QUEUE_LONG_FUNCTION: + queue_type = RTWQ_MULTITHREADED_WORKQUEUE; + break; + default: + queue_type = RTWQ_STANDARD_WORKQUEUE; + } + init_work_queue(queue_type, queue); + LeaveCriticalSection(&queues_section); + *ret = queue; + return S_OK; + } + + /* Handles user queues. */ + if ((entry = get_queue_obj(queue_id))) + *ret = entry->obj; + + return *ret ? S_OK : RTWQ_E_INVALID_WORKQUEUE; +} + +static HRESULT lock_user_queue(DWORD queue) +{ + HRESULT hr = RTWQ_E_INVALID_WORKQUEUE; + struct queue_handle *entry; + + if (!(queue & RTWQ_CALLBACK_QUEUE_PRIVATE_MASK)) + return S_OK; + + EnterCriticalSection(&queues_section); + entry = get_queue_obj(queue); + if (entry && entry->refcount) + { + entry->refcount++; + hr = S_OK; + } + LeaveCriticalSection(&queues_section); + return hr; +} + +static void shutdown_queue(struct queue *queue) +{ + struct work_item *item, *item2; + + if (!queue->pool) + return; + + CloseThreadpoolCleanupGroupMembers(queue->envs[0].CleanupGroup, TRUE, NULL); + CloseThreadpool(queue->pool); + queue->pool = NULL; + + EnterCriticalSection(&queue->cs); + LIST_FOR_EACH_ENTRY_SAFE(item, item2, &queue->pending_items, struct work_item, entry) + { + list_remove(&item->entry); + release_work_item(item); + } + LeaveCriticalSection(&queue->cs); + + DeleteCriticalSection(&queue->cs); +} + +static HRESULT unlock_user_queue(DWORD queue) +{ + HRESULT hr = RTWQ_E_INVALID_WORKQUEUE; + struct queue_handle *entry; + + if (!(queue & RTWQ_CALLBACK_QUEUE_PRIVATE_MASK)) + return S_OK; + + EnterCriticalSection(&queues_section); + entry = get_queue_obj(queue); + if (entry && entry->refcount) + { + if (--entry->refcount == 0) + { + shutdown_queue((struct queue *)entry->obj); + heap_free(entry->obj); + entry->obj = next_free_user_queue; + next_free_user_queue = entry; + } + hr = S_OK; + } + LeaveCriticalSection(&queues_section); + return hr; +} + +static void CALLBACK standard_queue_worker(TP_CALLBACK_INSTANCE *instance, void *context, TP_WORK *work) +{ + struct work_item *item = context; + RTWQASYNCRESULT *result = (RTWQASYNCRESULT *)item->result; + + TRACE("result object %p.\n", result); + + IRtwqAsyncCallback_Invoke(result->pCallback, item->result); + + release_work_item(item); +} + +static HRESULT queue_submit_item(struct queue *queue, LONG priority, IRtwqAsyncResult *result) +{ + TP_CALLBACK_PRIORITY callback_priority; + struct work_item *item; + TP_WORK *work_object; + + if (!(item = alloc_work_item(queue, result))) + return E_OUTOFMEMORY; + + if (priority == 0) + callback_priority = TP_CALLBACK_PRIORITY_NORMAL; + else if (priority < 0) + callback_priority = TP_CALLBACK_PRIORITY_LOW; + else + callback_priority = TP_CALLBACK_PRIORITY_HIGH; + work_object = CreateThreadpoolWork(standard_queue_worker, item, + (TP_CALLBACK_ENVIRON *)&queue->envs[callback_priority]); + SubmitThreadpoolWork(work_object); + + TRACE("dispatched %p.\n", result); + + return S_OK; +} + +static HRESULT queue_put_work_item(DWORD queue_id, LONG priority, IRtwqAsyncResult *result) +{ + struct queue *queue; + HRESULT hr; + + if (FAILED(hr = grab_queue(queue_id, &queue))) + return hr; + + return queue_submit_item(queue, priority, result); +} + +static HRESULT invoke_async_callback(IRtwqAsyncResult *result) +{ + RTWQASYNCRESULT *result_data = (RTWQASYNCRESULT *)result; + DWORD queue = RTWQ_CALLBACK_QUEUE_STANDARD, flags; + HRESULT hr; + + if (FAILED(IRtwqAsyncCallback_GetParameters(result_data->pCallback, &flags, &queue))) + queue = RTWQ_CALLBACK_QUEUE_STANDARD; + + if (FAILED(lock_user_queue(queue))) + queue = RTWQ_CALLBACK_QUEUE_STANDARD; + + hr = queue_put_work_item(queue, 0, result); + + unlock_user_queue(queue); + + return hr; +} + +static void queue_release_pending_item(struct work_item *item) +{ + EnterCriticalSection(&item->queue->cs); + if (item->key) + { + list_remove(&item->entry); + item->key = 0; + release_work_item(item); + } + LeaveCriticalSection(&item->queue->cs); +} + +static void CALLBACK waiting_item_callback(TP_CALLBACK_INSTANCE *instance, void *context, TP_WAIT *wait, + TP_WAIT_RESULT wait_result) +{ + struct work_item *item = context; + + TRACE("result object %p.\n", item->result); + + invoke_async_callback(item->result); + + release_work_item(item); +} + +static void CALLBACK waiting_item_cancelable_callback(TP_CALLBACK_INSTANCE *instance, void *context, TP_WAIT *wait, + TP_WAIT_RESULT wait_result) +{ + struct work_item *item = context; + + TRACE("result object %p.\n", item->result); + + queue_release_pending_item(item); + + invoke_async_callback(item->result); + + release_work_item(item); +} + +static void queue_mark_item_pending(DWORD mask, struct work_item *item, RTWQWORKITEM_KEY *key) +{ + *key = generate_item_key(mask); + item->key = *key; + + EnterCriticalSection(&item->queue->cs); + list_add_tail(&item->queue->pending_items, &item->entry); + grab_work_item(item); + LeaveCriticalSection(&item->queue->cs); +} + +static HRESULT queue_submit_wait(struct queue *queue, HANDLE event, LONG priority, IRtwqAsyncResult *result, + RTWQWORKITEM_KEY *key) +{ + PTP_WAIT_CALLBACK callback; + struct work_item *item; + + if (!(item = alloc_work_item(queue, result))) + return E_OUTOFMEMORY; + + if (key) + { + queue_mark_item_pending(WAIT_ITEM_KEY_MASK, item, key); + callback = waiting_item_cancelable_callback; + } + else + callback = waiting_item_callback; + + item->u.wait_object = CreateThreadpoolWait(callback, item, + (TP_CALLBACK_ENVIRON *)&queue->envs[TP_CALLBACK_PRIORITY_NORMAL]); + SetThreadpoolWait(item->u.wait_object, event, NULL); + + TRACE("dispatched %p.\n", result); + + return S_OK; +} + struct async_result { RTWQASYNCRESULT result; @@ -342,28 +682,6 @@ HRESULT WINAPI RtwqStartup(void) return S_OK; }
-static void shutdown_queue(struct queue *queue) -{ - struct work_item *item, *item2; - - if (!queue->pool) - return; - - CloseThreadpoolCleanupGroupMembers(queue->envs[0].CleanupGroup, TRUE, NULL); - CloseThreadpool(queue->pool); - queue->pool = NULL; - - EnterCriticalSection(&queue->cs); - LIST_FOR_EACH_ENTRY_SAFE(item, item2, &queue->pending_items, struct work_item, entry) - { - list_remove(&item->entry); - release_work_item(item); - } - LeaveCriticalSection(&queue->cs); - - DeleteCriticalSection(&queue->cs); -} - static void shutdown_system_queues(void) { unsigned int i; @@ -390,3 +708,32 @@ HRESULT WINAPI RtwqShutdown(void)
return S_OK; } + +HRESULT WINAPI RtwqPutWaitingWorkItem(HANDLE event, LONG priority, IRtwqAsyncResult *result, RTWQWORKITEM_KEY *key) +{ + struct queue *queue; + HRESULT hr; + + TRACE("%p, %d, %p, %p.\n", event, priority, result, key); + + if (FAILED(hr = grab_queue(RTWQ_CALLBACK_QUEUE_TIMER, &queue))) + return hr; + + hr = queue_submit_wait(queue, event, priority, result, key); + + return hr; +} + +HRESULT WINAPI RtwqLockWorkQueue(DWORD queue) +{ + TRACE("%#x.\n", queue); + + return lock_user_queue(queue); +} + +HRESULT WINAPI RtwqUnlockWorkQueue(DWORD queue) +{ + TRACE("%#x.\n", queue); + + return unlock_user_queue(queue); +} diff --git a/dlls/rtworkq/rtworkq.spec b/dlls/rtworkq/rtworkq.spec index 8c352593ff..47e1c1b2ae 100644 --- a/dlls/rtworkq/rtworkq.spec +++ b/dlls/rtworkq/rtworkq.spec @@ -17,9 +17,9 @@ @ stub RtwqJoinWorkQueue @ stdcall RtwqLockPlatform() @ stub RtwqLockSharedWorkQueue -@ stub RtwqLockWorkQueue +@ stdcall RtwqLockWorkQueue(long) @ stub RtwqPutMultipleWaitingWorkItem -@ stub RtwqPutWaitingWorkItem +@ stdcall RtwqPutWaitingWorkItem(long long ptr ptr) @ stub RtwqPutWorkItem @ stub RtwqRegisterPlatformEvents @ stub RtwqRegisterPlatformWithMMCSS @@ -32,6 +32,6 @@ @ stdcall RtwqStartup() @ stub RtwqUnjoinWorkQueue @ stdcall RtwqUnlockPlatform() -@ stub RtwqUnlockWorkQueue +@ stdcall RtwqUnlockWorkQueue(long) @ stub RtwqUnregisterPlatformEvents @ stub RtwqUnregisterPlatformFromMMCSS diff --git a/include/rtworkq.idl b/include/rtworkq.idl index c2e8237bba..a899376e92 100644 --- a/include/rtworkq.idl +++ b/include/rtworkq.idl @@ -52,6 +52,15 @@ interface IRtwqAsyncCallback : IUnknown HRESULT Invoke([in] IRtwqAsyncResult *result); }
+cpp_quote("#define RTWQ_E_ERROR(x) ((HRESULT)(0xc00d0000L+x))") +cpp_quote("#define RTWQ_E_BUFFERTOOSMALL RTWQ_E_ERROR(14001)") +cpp_quote("#define RTWQ_E_NOT_INITIALIZED RTWQ_E_ERROR(14006)") +cpp_quote("#define RTWQ_E_UNEXPECTED RTWQ_E_ERROR(14011)") +cpp_quote("#define RTWQ_E_NOT_FOUND RTWQ_E_ERROR(14037)") +cpp_quote("#define RTWQ_E_OPERATION_CANCELLED RTWQ_E_ERROR(14061)") +cpp_quote("#define RTWQ_E_INVALID_WORKQUEUE RTWQ_E_ERROR(14079)") +cpp_quote("#define RTWQ_E_SHUTDOWN RTWQ_E_ERROR(16005)") + cpp_quote("#ifdef __WINESRC__") cpp_quote("typedef struct tagRTWQASYNCRESULT") cpp_quote("{") @@ -68,6 +77,9 @@ cpp_quote("} RTWQASYNCRESULT;")
cpp_quote("HRESULT WINAPI RtwqCreateAsyncResult(IUnknown *object, IRtwqAsyncCallback *callback, IUnknown *state, IRtwqAsyncResult **result);") cpp_quote("HRESULT WINAPI RtwqLockPlatform(void);") +cpp_quote("HRESULT WINAPI RtwqLockWorkQueue(DWORD queue);") +cpp_quote("HRESULT WINAPI RtwqPutWaitingWorkItem(HANDLE event, LONG priority, IRtwqAsyncResult *result, RTWQWORKITEM_KEY *key);") cpp_quote("HRESULT WINAPI RtwqShutdown(void);") cpp_quote("HRESULT WINAPI RtwqStartup(void);") cpp_quote("HRESULT WINAPI RtwqUnlockPlatform(void);") +cpp_quote("HRESULT WINAPI RtwqUnlockWorkQueue(DWORD queue);")
Signed-off-by: Nikolay Sivov nsivov@codeweavers.com --- dlls/rtworkq/queue.c | 40 +++++++++++++++++++++++++++++++++++++++ dlls/rtworkq/rtworkq.spec | 2 +- include/rtworkq.idl | 1 + 3 files changed, 42 insertions(+), 1 deletion(-)
diff --git a/dlls/rtworkq/queue.c b/dlls/rtworkq/queue.c index 7d949e8e62..2797f292ce 100644 --- a/dlls/rtworkq/queue.c +++ b/dlls/rtworkq/queue.c @@ -465,6 +465,33 @@ static HRESULT queue_submit_wait(struct queue *queue, HANDLE event, LONG priorit return S_OK; }
+static HRESULT queue_cancel_item(struct queue *queue, RTWQWORKITEM_KEY key) +{ + HRESULT hr = RTWQ_E_NOT_FOUND; + struct work_item *item; + + EnterCriticalSection(&queue->cs); + LIST_FOR_EACH_ENTRY(item, &queue->pending_items, struct work_item, entry) + { + if (item->key == key) + { + key >>= 32; + if ((key & WAIT_ITEM_KEY_MASK) == WAIT_ITEM_KEY_MASK) + CloseThreadpoolWait(item->u.wait_object); + else if ((key & SCHEDULED_ITEM_KEY_MASK) == SCHEDULED_ITEM_KEY_MASK) + CloseThreadpoolTimer(item->u.timer_object); + else + WARN("Unknown item key mask %#x.\n", (DWORD)key); + queue_release_pending_item(item); + hr = S_OK; + break; + } + } + LeaveCriticalSection(&queue->cs); + + return hr; +} + struct async_result { RTWQASYNCRESULT result; @@ -724,6 +751,19 @@ HRESULT WINAPI RtwqPutWaitingWorkItem(HANDLE event, LONG priority, IRtwqAsyncRes return hr; }
+HRESULT WINAPI RtwqCancelWorkItem(RTWQWORKITEM_KEY key) +{ + struct queue *queue; + HRESULT hr; + + TRACE("%s.\n", wine_dbgstr_longlong(key)); + + if (FAILED(hr = grab_queue(RTWQ_CALLBACK_QUEUE_TIMER, &queue))) + return hr; + + return queue_cancel_item(queue, key); +} + HRESULT WINAPI RtwqLockWorkQueue(DWORD queue) { TRACE("%#x.\n", queue); diff --git a/dlls/rtworkq/rtworkq.spec b/dlls/rtworkq/rtworkq.spec index 47e1c1b2ae..04da2a9573 100644 --- a/dlls/rtworkq/rtworkq.spec +++ b/dlls/rtworkq/rtworkq.spec @@ -5,7 +5,7 @@ @ stub RtwqBeginUnregisterWorkQueueWithMMCSS @ stub RtwqCancelDeadline @ stub RtwqCancelMultipleWaitingWorkItem -@ stub RtwqCancelWorkItem +@ stdcall RtwqCancelWorkItem(int64) @ stdcall RtwqCreateAsyncResult(ptr ptr ptr ptr) @ stub RtwqEndRegisterWorkQueueWithMMCSS @ stub RtwqEndUnregisterWorkQueueWithMMCSS diff --git a/include/rtworkq.idl b/include/rtworkq.idl index a899376e92..8cc205a7eb 100644 --- a/include/rtworkq.idl +++ b/include/rtworkq.idl @@ -75,6 +75,7 @@ cpp_quote(" DWORD dwBytesTransferred;") cpp_quote(" HANDLE hEvent;") cpp_quote("} RTWQASYNCRESULT;")
+cpp_quote("HRESULT WINAPI RtwqCancelWorkItem(RTWQWORKITEM_KEY key);") cpp_quote("HRESULT WINAPI RtwqCreateAsyncResult(IUnknown *object, IRtwqAsyncCallback *callback, IUnknown *state, IRtwqAsyncResult **result);") cpp_quote("HRESULT WINAPI RtwqLockPlatform(void);") cpp_quote("HRESULT WINAPI RtwqLockWorkQueue(DWORD queue);")
Signed-off-by: Nikolay Sivov nsivov@codeweavers.com --- dlls/rtworkq/queue.c | 94 +++++++++++++++++++++++++++++++++++++++ dlls/rtworkq/rtworkq.spec | 4 +- include/rtworkq.idl | 2 + 3 files changed, 98 insertions(+), 2 deletions(-)
diff --git a/dlls/rtworkq/queue.c b/dlls/rtworkq/queue.c index 2797f292ce..dc6f6efeb1 100644 --- a/dlls/rtworkq/queue.c +++ b/dlls/rtworkq/queue.c @@ -428,6 +428,39 @@ static void CALLBACK waiting_item_cancelable_callback(TP_CALLBACK_INSTANCE *inst release_work_item(item); }
+static void CALLBACK scheduled_item_callback(TP_CALLBACK_INSTANCE *instance, void *context, TP_TIMER *timer) +{ + struct work_item *item = context; + + TRACE("result object %p.\n", item->result); + + invoke_async_callback(item->result); + + release_work_item(item); +} + +static void CALLBACK scheduled_item_cancelable_callback(TP_CALLBACK_INSTANCE *instance, void *context, TP_TIMER *timer) +{ + struct work_item *item = context; + + TRACE("result object %p.\n", item->result); + + queue_release_pending_item(item); + + invoke_async_callback(item->result); + + release_work_item(item); +} + +static void CALLBACK periodic_item_callback(TP_CALLBACK_INSTANCE *instance, void *context, TP_TIMER *timer) +{ + struct work_item *item = grab_work_item(context); + + invoke_async_callback(item->result); + + release_work_item(item); +} + static void queue_mark_item_pending(DWORD mask, struct work_item *item, RTWQWORKITEM_KEY *key) { *key = generate_item_key(mask); @@ -465,6 +498,40 @@ static HRESULT queue_submit_wait(struct queue *queue, HANDLE event, LONG priorit return S_OK; }
+static HRESULT queue_submit_timer(struct queue *queue, IRtwqAsyncResult *result, INT64 timeout, DWORD period, + RTWQWORKITEM_KEY *key) +{ + PTP_TIMER_CALLBACK callback; + struct work_item *item; + FILETIME filetime; + LARGE_INTEGER t; + + if (!(item = alloc_work_item(queue, result))) + return E_OUTOFMEMORY; + + if (key) + { + queue_mark_item_pending(SCHEDULED_ITEM_KEY_MASK, item, key); + } + + if (period) + callback = periodic_item_callback; + else + callback = key ? scheduled_item_cancelable_callback : scheduled_item_callback; + + t.QuadPart = timeout * 1000 * 10; + filetime.dwLowDateTime = t.u.LowPart; + filetime.dwHighDateTime = t.u.HighPart; + + item->u.timer_object = CreateThreadpoolTimer(callback, item, + (TP_CALLBACK_ENVIRON *)&queue->envs[TP_CALLBACK_PRIORITY_NORMAL]); + SetThreadpoolTimer(item->u.timer_object, &filetime, period, 0); + + TRACE("dispatched %p.\n", result); + + return S_OK; +} + static HRESULT queue_cancel_item(struct queue *queue, RTWQWORKITEM_KEY key) { HRESULT hr = RTWQ_E_NOT_FOUND; @@ -751,6 +818,26 @@ HRESULT WINAPI RtwqPutWaitingWorkItem(HANDLE event, LONG priority, IRtwqAsyncRes return hr; }
+static HRESULT schedule_work_item(IRtwqAsyncResult *result, INT64 timeout, RTWQWORKITEM_KEY *key) +{ + struct queue *queue; + HRESULT hr; + + if (FAILED(hr = grab_queue(RTWQ_CALLBACK_QUEUE_TIMER, &queue))) + return hr; + + TRACE("%p, %s, %p.\n", result, wine_dbgstr_longlong(timeout), key); + + return queue_submit_timer(queue, result, timeout, 0, key); +} + +HRESULT WINAPI RtwqScheduleWorkItem(IRtwqAsyncResult *result, INT64 timeout, RTWQWORKITEM_KEY *key) +{ + TRACE("%p, %s, %p.\n", result, wine_dbgstr_longlong(timeout), key); + + return schedule_work_item(result, timeout, key); +} + HRESULT WINAPI RtwqCancelWorkItem(RTWQWORKITEM_KEY key) { struct queue *queue; @@ -764,6 +851,13 @@ HRESULT WINAPI RtwqCancelWorkItem(RTWQWORKITEM_KEY key) return queue_cancel_item(queue, key); }
+HRESULT WINAPI RtwqInvokeCallback(IRtwqAsyncResult *result) +{ + TRACE("%p.\n", result); + + return invoke_async_callback(result); +} + HRESULT WINAPI RtwqLockWorkQueue(DWORD queue) { TRACE("%#x.\n", queue); diff --git a/dlls/rtworkq/rtworkq.spec b/dlls/rtworkq/rtworkq.spec index 04da2a9573..eb3d73d8e7 100644 --- a/dlls/rtworkq/rtworkq.spec +++ b/dlls/rtworkq/rtworkq.spec @@ -13,7 +13,7 @@ @ stub RtwqGetWorkQueueMMCSSClass @ stub RtwqGetWorkQueueMMCSSPriority @ stub RtwqGetWorkQueueMMCSSTaskId -@ stub RtwqInvokeCallback +@ stdcall RtwqInvokeCallback(ptr) @ stub RtwqJoinWorkQueue @ stdcall RtwqLockPlatform() @ stub RtwqLockSharedWorkQueue @@ -24,7 +24,7 @@ @ stub RtwqRegisterPlatformEvents @ stub RtwqRegisterPlatformWithMMCSS @ stub RtwqRemovePeriodicCallback -@ stub RtwqScheduleWorkItem +@ stdcall RtwqScheduleWorkItem(ptr int64 ptr) @ stub RtwqSetDeadline @ stub RtwqSetDeadline2 @ stub RtwqSetLongRunning diff --git a/include/rtworkq.idl b/include/rtworkq.idl index 8cc205a7eb..36468304b4 100644 --- a/include/rtworkq.idl +++ b/include/rtworkq.idl @@ -77,9 +77,11 @@ cpp_quote("} RTWQASYNCRESULT;")
cpp_quote("HRESULT WINAPI RtwqCancelWorkItem(RTWQWORKITEM_KEY key);") cpp_quote("HRESULT WINAPI RtwqCreateAsyncResult(IUnknown *object, IRtwqAsyncCallback *callback, IUnknown *state, IRtwqAsyncResult **result);") +cpp_quote("HRESULT WINAPI RtwqInvokeCallback(IRtwqAsyncResult *result);") cpp_quote("HRESULT WINAPI RtwqLockPlatform(void);") cpp_quote("HRESULT WINAPI RtwqLockWorkQueue(DWORD queue);") cpp_quote("HRESULT WINAPI RtwqPutWaitingWorkItem(HANDLE event, LONG priority, IRtwqAsyncResult *result, RTWQWORKITEM_KEY *key);") +cpp_quote("HRESULT WINAPI RtwqScheduleWorkItem(IRtwqAsyncResult *result, INT64 timeout, RTWQWORKITEM_KEY *key);") cpp_quote("HRESULT WINAPI RtwqShutdown(void);") cpp_quote("HRESULT WINAPI RtwqStartup(void);") cpp_quote("HRESULT WINAPI RtwqUnlockPlatform(void);")
Signed-off-by: Nikolay Sivov nsivov@codeweavers.com --- dlls/rtworkq/queue.c | 141 ++++++++++++++++++++++++++++++++++++++ dlls/rtworkq/rtworkq.spec | 4 +- include/rtworkq.idl | 4 ++ 3 files changed, 147 insertions(+), 2 deletions(-)
diff --git a/dlls/rtworkq/queue.c b/dlls/rtworkq/queue.c index dc6f6efeb1..16411bfc49 100644 --- a/dlls/rtworkq/queue.c +++ b/dlls/rtworkq/queue.c @@ -838,6 +838,147 @@ HRESULT WINAPI RtwqScheduleWorkItem(IRtwqAsyncResult *result, INT64 timeout, RTW return schedule_work_item(result, timeout, key); }
+struct periodic_callback +{ + IRtwqAsyncCallback IRtwqAsyncCallback_iface; + LONG refcount; + RTWQPERIODICCALLBACK callback; +}; + +static struct periodic_callback *impl_from_IRtwqAsyncCallback(IRtwqAsyncCallback *iface) +{ + return CONTAINING_RECORD(iface, struct periodic_callback, IRtwqAsyncCallback_iface); +} + +static HRESULT WINAPI periodic_callback_QueryInterface(IRtwqAsyncCallback *iface, REFIID riid, void **obj) +{ + if (IsEqualIID(riid, &IID_IRtwqAsyncCallback) || + IsEqualIID(riid, &IID_IUnknown)) + { + *obj = iface; + IRtwqAsyncCallback_AddRef(iface); + return S_OK; + } + + *obj = NULL; + return E_NOINTERFACE; +} + +static ULONG WINAPI periodic_callback_AddRef(IRtwqAsyncCallback *iface) +{ + struct periodic_callback *callback = impl_from_IRtwqAsyncCallback(iface); + ULONG refcount = InterlockedIncrement(&callback->refcount); + + TRACE("%p, %u.\n", iface, refcount); + + return refcount; +} + +static ULONG WINAPI periodic_callback_Release(IRtwqAsyncCallback *iface) +{ + struct periodic_callback *callback = impl_from_IRtwqAsyncCallback(iface); + ULONG refcount = InterlockedDecrement(&callback->refcount); + + TRACE("%p, %u.\n", iface, refcount); + + if (!refcount) + heap_free(callback); + + return refcount; +} + +static HRESULT WINAPI periodic_callback_GetParameters(IRtwqAsyncCallback *iface, DWORD *flags, DWORD *queue) +{ + return E_NOTIMPL; +} + +static HRESULT WINAPI periodic_callback_Invoke(IRtwqAsyncCallback *iface, IRtwqAsyncResult *result) +{ + struct periodic_callback *callback = impl_from_IRtwqAsyncCallback(iface); + IUnknown *context = NULL; + + if (FAILED(IRtwqAsyncResult_GetObject(result, &context))) + WARN("Expected object to be set for result object.\n"); + + callback->callback(context); + + if (context) + IUnknown_Release(context); + + return S_OK; +} + +static const IRtwqAsyncCallbackVtbl periodic_callback_vtbl = +{ + periodic_callback_QueryInterface, + periodic_callback_AddRef, + periodic_callback_Release, + periodic_callback_GetParameters, + periodic_callback_Invoke, +}; + +static HRESULT create_periodic_callback_obj(RTWQPERIODICCALLBACK callback, IRtwqAsyncCallback **out) +{ + struct periodic_callback *object; + + object = heap_alloc(sizeof(*object)); + if (!object) + return E_OUTOFMEMORY; + + object->IRtwqAsyncCallback_iface.lpVtbl = &periodic_callback_vtbl; + object->refcount = 1; + object->callback = callback; + + *out = &object->IRtwqAsyncCallback_iface; + + return S_OK; +} + +HRESULT WINAPI RtwqAddPeriodicCallback(RTWQPERIODICCALLBACK callback, IUnknown *context, DWORD *key) +{ + IRtwqAsyncCallback *periodic_callback; + RTWQWORKITEM_KEY workitem_key; + IRtwqAsyncResult *result; + struct queue *queue; + HRESULT hr; + + TRACE("%p, %p, %p.\n", callback, context, key); + + if (FAILED(hr = grab_queue(RTWQ_CALLBACK_QUEUE_TIMER, &queue))) + return hr; + + if (FAILED(hr = create_periodic_callback_obj(callback, &periodic_callback))) + return hr; + + hr = create_async_result(context, periodic_callback, NULL, &result); + IRtwqAsyncCallback_Release(periodic_callback); + if (FAILED(hr)) + return hr; + + /* Same period MFGetTimerPeriodicity() returns. */ + hr = queue_submit_timer(queue, result, 0, 10, key ? &workitem_key : NULL); + + IRtwqAsyncResult_Release(result); + + if (key) + *key = workitem_key; + + return S_OK; +} + +HRESULT WINAPI RtwqRemovePeriodicCallback(DWORD key) +{ + struct queue *queue; + HRESULT hr; + + TRACE("%#x.\n", key); + + if (FAILED(hr = grab_queue(RTWQ_CALLBACK_QUEUE_TIMER, &queue))) + return hr; + + return queue_cancel_item(queue, get_item_key(SCHEDULED_ITEM_KEY_MASK, key)); +} + HRESULT WINAPI RtwqCancelWorkItem(RTWQWORKITEM_KEY key) { struct queue *queue; diff --git a/dlls/rtworkq/rtworkq.spec b/dlls/rtworkq/rtworkq.spec index eb3d73d8e7..8846047d46 100644 --- a/dlls/rtworkq/rtworkq.spec +++ b/dlls/rtworkq/rtworkq.spec @@ -1,4 +1,4 @@ -@ stub RtwqAddPeriodicCallback +@ stdcall RtwqAddPeriodicCallback(ptr ptr ptr) @ stub RtwqAllocateSerialWorkQueue @ stub RtwqAllocateWorkQueue @ stub RtwqBeginRegisterWorkQueueWithMMCSS @@ -23,7 +23,7 @@ @ stub RtwqPutWorkItem @ stub RtwqRegisterPlatformEvents @ stub RtwqRegisterPlatformWithMMCSS -@ stub RtwqRemovePeriodicCallback +@ stdcall RtwqRemovePeriodicCallback(long) @ stdcall RtwqScheduleWorkItem(ptr int64 ptr) @ stub RtwqSetDeadline @ stub RtwqSetDeadline2 diff --git a/include/rtworkq.idl b/include/rtworkq.idl index 36468304b4..3c167460c8 100644 --- a/include/rtworkq.idl +++ b/include/rtworkq.idl @@ -75,12 +75,16 @@ cpp_quote(" DWORD dwBytesTransferred;") cpp_quote(" HANDLE hEvent;") cpp_quote("} RTWQASYNCRESULT;")
+cpp_quote("typedef void (WINAPI *RTWQPERIODICCALLBACK)(IUnknown *context);") + +cpp_quote("HRESULT WINAPI RtwqAddPeriodicCallback(RTWQPERIODICCALLBACK callback, IUnknown *context, DWORD *key);") cpp_quote("HRESULT WINAPI RtwqCancelWorkItem(RTWQWORKITEM_KEY key);") cpp_quote("HRESULT WINAPI RtwqCreateAsyncResult(IUnknown *object, IRtwqAsyncCallback *callback, IUnknown *state, IRtwqAsyncResult **result);") cpp_quote("HRESULT WINAPI RtwqInvokeCallback(IRtwqAsyncResult *result);") cpp_quote("HRESULT WINAPI RtwqLockPlatform(void);") cpp_quote("HRESULT WINAPI RtwqLockWorkQueue(DWORD queue);") cpp_quote("HRESULT WINAPI RtwqPutWaitingWorkItem(HANDLE event, LONG priority, IRtwqAsyncResult *result, RTWQWORKITEM_KEY *key);") +cpp_quote("HRESULT WINAPI RtwqRemovePeriodicCallback(DWORD key);") cpp_quote("HRESULT WINAPI RtwqScheduleWorkItem(IRtwqAsyncResult *result, INT64 timeout, RTWQWORKITEM_KEY *key);") cpp_quote("HRESULT WINAPI RtwqShutdown(void);") cpp_quote("HRESULT WINAPI RtwqStartup(void);")
Signed-off-by: Nikolay Sivov nsivov@codeweavers.com --- dlls/rtworkq/queue.c | 59 +++++++++++++++++++++++++++++++++++++++ dlls/rtworkq/rtworkq.spec | 4 +-- include/rtworkq.idl | 2 ++ 3 files changed, 63 insertions(+), 2 deletions(-)
diff --git a/dlls/rtworkq/queue.c b/dlls/rtworkq/queue.c index 16411bfc49..a6aac6abda 100644 --- a/dlls/rtworkq/queue.c +++ b/dlls/rtworkq/queue.c @@ -54,6 +54,8 @@ struct queue_handle
static struct queue_handle user_queues[MAX_USER_QUEUE_HANDLES]; static struct queue_handle *next_free_user_queue; +static struct queue_handle *next_unused_user_queue = user_queues; +static WORD queue_generation;
static CRITICAL_SECTION queues_section; static CRITICAL_SECTION_DEBUG queues_critsect_debug = @@ -559,6 +561,49 @@ static HRESULT queue_cancel_item(struct queue *queue, RTWQWORKITEM_KEY key) return hr; }
+static HRESULT alloc_user_queue(RTWQ_WORKQUEUE_TYPE queue_type, DWORD *queue_id) +{ + struct queue_handle *entry; + struct queue *queue; + unsigned int idx; + + *queue_id = RTWQ_CALLBACK_QUEUE_UNDEFINED; + + if (platform_lock <= 0) + return RTWQ_E_SHUTDOWN; + + queue = heap_alloc_zero(sizeof(*queue)); + if (!queue) + return E_OUTOFMEMORY; + init_work_queue(queue_type, queue); + + EnterCriticalSection(&queues_section); + + entry = next_free_user_queue; + if (entry) + next_free_user_queue = entry->obj; + else if (next_unused_user_queue < user_queues + MAX_USER_QUEUE_HANDLES) + entry = next_unused_user_queue++; + else + { + LeaveCriticalSection(&queues_section); + heap_free(queue); + WARN("Out of user queue handles.\n"); + return E_OUTOFMEMORY; + } + + entry->refcount = 1; + entry->obj = queue; + if (++queue_generation == 0xffff) queue_generation = 1; + entry->generation = queue_generation; + idx = entry - user_queues + FIRST_USER_QUEUE_HANDLE; + *queue_id = (idx << 16) | entry->generation; + + LeaveCriticalSection(&queues_section); + + return S_OK; +} + struct async_result { RTWQASYNCRESULT result; @@ -999,6 +1044,20 @@ HRESULT WINAPI RtwqInvokeCallback(IRtwqAsyncResult *result) return invoke_async_callback(result); }
+HRESULT WINAPI RtwqPutWorkItem(DWORD queue, LONG priority, IRtwqAsyncResult *result) +{ + TRACE("%d, %d, %p.\n", queue, priority, result); + + return queue_put_work_item(queue, priority, result); +} + +HRESULT WINAPI RtwqAllocateWorkQueue(RTWQ_WORKQUEUE_TYPE queue_type, DWORD *queue) +{ + TRACE("%d, %p.\n", queue_type, queue); + + return alloc_user_queue(queue_type, queue); +} + HRESULT WINAPI RtwqLockWorkQueue(DWORD queue) { TRACE("%#x.\n", queue); diff --git a/dlls/rtworkq/rtworkq.spec b/dlls/rtworkq/rtworkq.spec index 8846047d46..48bc17cbfc 100644 --- a/dlls/rtworkq/rtworkq.spec +++ b/dlls/rtworkq/rtworkq.spec @@ -1,6 +1,6 @@ @ stdcall RtwqAddPeriodicCallback(ptr ptr ptr) @ stub RtwqAllocateSerialWorkQueue -@ stub RtwqAllocateWorkQueue +@ stdcall RtwqAllocateWorkQueue(long ptr) @ stub RtwqBeginRegisterWorkQueueWithMMCSS @ stub RtwqBeginUnregisterWorkQueueWithMMCSS @ stub RtwqCancelDeadline @@ -20,7 +20,7 @@ @ stdcall RtwqLockWorkQueue(long) @ stub RtwqPutMultipleWaitingWorkItem @ stdcall RtwqPutWaitingWorkItem(long long ptr ptr) -@ stub RtwqPutWorkItem +@ stdcall RtwqPutWorkItem(long long ptr) @ stub RtwqRegisterPlatformEvents @ stub RtwqRegisterPlatformWithMMCSS @ stdcall RtwqRemovePeriodicCallback(long) diff --git a/include/rtworkq.idl b/include/rtworkq.idl index 3c167460c8..001cf2c3e9 100644 --- a/include/rtworkq.idl +++ b/include/rtworkq.idl @@ -78,12 +78,14 @@ cpp_quote("} RTWQASYNCRESULT;") cpp_quote("typedef void (WINAPI *RTWQPERIODICCALLBACK)(IUnknown *context);")
cpp_quote("HRESULT WINAPI RtwqAddPeriodicCallback(RTWQPERIODICCALLBACK callback, IUnknown *context, DWORD *key);") +cpp_quote("HRESULT WINAPI RtwqAllocateWorkQueue(RTWQ_WORKQUEUE_TYPE queue_type, DWORD *queue);") cpp_quote("HRESULT WINAPI RtwqCancelWorkItem(RTWQWORKITEM_KEY key);") cpp_quote("HRESULT WINAPI RtwqCreateAsyncResult(IUnknown *object, IRtwqAsyncCallback *callback, IUnknown *state, IRtwqAsyncResult **result);") cpp_quote("HRESULT WINAPI RtwqInvokeCallback(IRtwqAsyncResult *result);") cpp_quote("HRESULT WINAPI RtwqLockPlatform(void);") cpp_quote("HRESULT WINAPI RtwqLockWorkQueue(DWORD queue);") cpp_quote("HRESULT WINAPI RtwqPutWaitingWorkItem(HANDLE event, LONG priority, IRtwqAsyncResult *result, RTWQWORKITEM_KEY *key);") +cpp_quote("HRESULT WINAPI RtwqPutWorkItem(DWORD queue, LONG priority, IRtwqAsyncResult *result);") cpp_quote("HRESULT WINAPI RtwqRemovePeriodicCallback(DWORD key);") cpp_quote("HRESULT WINAPI RtwqScheduleWorkItem(IRtwqAsyncResult *result, INT64 timeout, RTWQWORKITEM_KEY *key);") cpp_quote("HRESULT WINAPI RtwqShutdown(void);")
Signed-off-by: Nikolay Sivov nsivov@codeweavers.com --- dlls/mfplat/Makefile.in | 2 +- dlls/mfplat/main.c | 44 +- dlls/mfplat/mfplat.spec | 18 +- dlls/mfplat/mfplat_private.h | 4 - dlls/mfplat/queue.c | 1041 +--------------------------------- dlls/rtworkq/queue.c | 11 + 6 files changed, 48 insertions(+), 1072 deletions(-)
diff --git a/dlls/mfplat/Makefile.in b/dlls/mfplat/Makefile.in index f3484fa611..cdb2b81334 100644 --- a/dlls/mfplat/Makefile.in +++ b/dlls/mfplat/Makefile.in @@ -1,6 +1,6 @@ MODULE = mfplat.dll IMPORTLIB = mfplat -IMPORTS = advapi32 ole32 mfuuid propsys +IMPORTS = advapi32 ole32 mfuuid propsys rtworkq
EXTRADLLFLAGS = -mno-cygwin
diff --git a/dlls/mfplat/main.c b/dlls/mfplat/main.c index c8beeda973..e96a972909 100644 --- a/dlls/mfplat/main.c +++ b/dlls/mfplat/main.c @@ -40,6 +40,7 @@ #include "mfreadwrite.h" #include "propvarutil.h" #include "strsafe.h" +#include "rtworkq.h"
WINE_DEFAULT_DEBUG_CHANNEL(mfplat);
@@ -64,8 +65,6 @@ static HRESULT heap_strdupW(const WCHAR *str, WCHAR **dest) return hr; }
-static LONG platform_lock; - struct local_handler { struct list entry; @@ -1128,10 +1127,7 @@ HRESULT WINAPI MFStartup(ULONG version, DWORD flags) if (version != MF_VERSION_XP && version != MF_VERSION_WIN7) return MF_E_BAD_STARTUP_VERSION;
- if (InterlockedIncrement(&platform_lock) == 1) - { - init_system_queues(); - } + RtwqStartup();
return S_OK; } @@ -1143,45 +1139,11 @@ HRESULT WINAPI MFShutdown(void) { TRACE("\n");
- if (platform_lock <= 0) - return S_OK; - - if (InterlockedExchangeAdd(&platform_lock, -1) == 1) - { - shutdown_system_queues(); - } - - return S_OK; -} - -/*********************************************************************** - * MFLockPlatform (mfplat.@) - */ -HRESULT WINAPI MFLockPlatform(void) -{ - InterlockedIncrement(&platform_lock); - - return S_OK; -} - -/*********************************************************************** - * MFUnlockPlatform (mfplat.@) - */ -HRESULT WINAPI MFUnlockPlatform(void) -{ - if (InterlockedDecrement(&platform_lock) == 0) - { - shutdown_system_queues(); - } + RtwqShutdown();
return S_OK; }
-BOOL is_platform_locked(void) -{ - return platform_lock > 0; -} - /*********************************************************************** * MFCopyImage (mfplat.@) */ diff --git a/dlls/mfplat/mfplat.spec b/dlls/mfplat/mfplat.spec index a13c192caf..ba835b694a 100644 --- a/dlls/mfplat/mfplat.spec +++ b/dlls/mfplat/mfplat.spec @@ -15,9 +15,9 @@ @ stub GetAMSubtypeFromD3DFormat @ stub GetD3DFormatFromMFSubtype @ stub LFGetGlobalPool -@ stdcall MFAddPeriodicCallback(ptr ptr ptr) +@ stdcall MFAddPeriodicCallback(ptr ptr ptr) rtworkq.RtwqAddPeriodicCallback @ stdcall MFAllocateWorkQueue(ptr) -@ stdcall MFAllocateWorkQueueEx(long ptr) +@ stdcall MFAllocateWorkQueueEx(long ptr) rtworkq.RtwqAllocateWorkQueue @ stub MFAppendCollection @ stub MFAverageTimePerFrameToFrameRate @ stdcall MFBeginCreateFile(long long long wstr ptr ptr ptr) @@ -28,7 +28,7 @@ @ stub MFCalculateBitmapImageSize @ stdcall MFCalculateImageSize(ptr long long ptr) @ stdcall MFCancelCreateFile(ptr) -@ stdcall MFCancelWorkItem(int64) +@ stdcall MFCancelWorkItem(int64) rtworkq.RtwqCancelWorkItem @ stdcall MFCompareFullToPartialMediaType(ptr ptr) @ stub MFCompareSockaddrAddresses @ stub MFConvertColorInfoFromDXVA @@ -121,9 +121,9 @@ @ stub MFInitVideoFormat_RGB @ stdcall MFInvokeCallback(ptr) @ stub MFJoinIoPort -@ stdcall MFLockPlatform() -@ stdcall MFLockWorkQueue(long) -@ stdcall MFPutWaitingWorkItem(long long ptr ptr) +@ stdcall MFLockPlatform() rtworkq.RtwqLockPlatform +@ stdcall MFLockWorkQueue(long) rtworkq.RtwqLockWorkQueue +@ stdcall MFPutWaitingWorkItem(long long ptr ptr) rtworkq.RtwqPutWaitingWorkItem @ stdcall MFPutWorkItem(long ptr ptr) @ stdcall MFPutWorkItem2(long long ptr ptr) @ stdcall MFPutWorkItemEx(long ptr) @@ -131,7 +131,7 @@ @ stub MFRecordError @ stdcall MFRegisterLocalByteStreamHandler(wstr wstr ptr) @ stdcall MFRegisterLocalSchemeHandler(wstr ptr) -@ stdcall MFRemovePeriodicCallback(long) +@ stdcall MFRemovePeriodicCallback(long) rtworkq.RtwqRemovePeriodicCallback @ stdcall MFScheduleWorkItem(ptr ptr int64 ptr) @ stdcall MFScheduleWorkItemEx(ptr int64 ptr) @ stub MFSerializeAttributesToStream @@ -154,8 +154,8 @@ @ stub MFTraceError @ stub MFTraceFuncEnter @ stub MFUnblockThread -@ stdcall MFUnlockPlatform() -@ stdcall MFUnlockWorkQueue(long) +@ stdcall MFUnlockPlatform() rtworkq.RtwqUnlockPlatform +@ stdcall MFUnlockWorkQueue(long) rtworkq.RtwqUnlockWorkQueue @ stdcall MFUnwrapMediaType(ptr ptr) @ stub MFValidateMediaTypeSize @ stdcall MFWrapMediaType(ptr ptr ptr ptr) diff --git a/dlls/mfplat/mfplat_private.h b/dlls/mfplat/mfplat_private.h index 6418afcb97..af8583daee 100644 --- a/dlls/mfplat/mfplat_private.h +++ b/dlls/mfplat/mfplat_private.h @@ -87,10 +87,6 @@ extern HRESULT attributes_GetItemByIndex(struct attributes *object, UINT32 index PROPVARIANT *value) DECLSPEC_HIDDEN; extern HRESULT attributes_CopyAllItems(struct attributes *object, IMFAttributes *dest) DECLSPEC_HIDDEN;
-extern void init_system_queues(void) DECLSPEC_HIDDEN; -extern void shutdown_system_queues(void) DECLSPEC_HIDDEN; -extern BOOL is_platform_locked(void) DECLSPEC_HIDDEN; - static inline BOOL mf_array_reserve(void **elements, size_t *capacity, size_t count, size_t size) { size_t new_capacity, max_capacity; diff --git a/dlls/mfplat/queue.c b/dlls/mfplat/queue.c index 31b281a9a3..c0279f1744 100644 --- a/dlls/mfplat/queue.c +++ b/dlls/mfplat/queue.c @@ -25,781 +25,10 @@ #include "wine/list.h"
#include "mfplat_private.h" +#include "rtworkq.h"
WINE_DEFAULT_DEBUG_CHANNEL(mfplat);
-#define FIRST_USER_QUEUE_HANDLE 5 -#define MAX_USER_QUEUE_HANDLES 124 - -#define WAIT_ITEM_KEY_MASK (0x82000000) -#define SCHEDULED_ITEM_KEY_MASK (0x80000000) - -static LONG next_item_key; - -static MFWORKITEM_KEY get_item_key(DWORD mask, DWORD key) -{ - return ((MFWORKITEM_KEY)mask << 32) | key; -} - -static MFWORKITEM_KEY generate_item_key(DWORD mask) -{ - return get_item_key(mask, InterlockedIncrement(&next_item_key)); -} - -struct work_item -{ - struct list entry; - LONG refcount; - IMFAsyncResult *result; - struct queue *queue; - MFWORKITEM_KEY key; - union - { - TP_WAIT *wait_object; - TP_TIMER *timer_object; - } u; -}; - -static const TP_CALLBACK_PRIORITY priorities[] = -{ - TP_CALLBACK_PRIORITY_HIGH, - TP_CALLBACK_PRIORITY_NORMAL, - TP_CALLBACK_PRIORITY_LOW, -}; - -struct queue -{ - TP_POOL *pool; - TP_CALLBACK_ENVIRON_V3 envs[ARRAY_SIZE(priorities)]; - CRITICAL_SECTION cs; - struct list pending_items; -}; - -struct queue_handle -{ - void *obj; - LONG refcount; - WORD generation; -}; - -static struct queue_handle user_queues[MAX_USER_QUEUE_HANDLES]; -static struct queue_handle *next_free_user_queue; -static struct queue_handle *next_unused_user_queue = user_queues; -static WORD queue_generation; - -static CRITICAL_SECTION queues_section; -static CRITICAL_SECTION_DEBUG queues_critsect_debug = -{ - 0, 0, &queues_section, - { &queues_critsect_debug.ProcessLocksList, &queues_critsect_debug.ProcessLocksList }, - 0, 0, { (DWORD_PTR)(__FILE__ ": queues_section") } -}; -static CRITICAL_SECTION queues_section = { &queues_critsect_debug, -1, 0, 0, 0, 0 }; - -static struct queue_handle *get_queue_obj(DWORD handle) -{ - unsigned int idx = HIWORD(handle) - FIRST_USER_QUEUE_HANDLE; - - if (idx < MAX_USER_QUEUE_HANDLES && user_queues[idx].refcount) - { - if (LOWORD(handle) == user_queues[idx].generation) - return &user_queues[idx]; - } - - return NULL; -} - -enum system_queue_index -{ - SYS_QUEUE_STANDARD = 0, - SYS_QUEUE_RT, - SYS_QUEUE_IO, - SYS_QUEUE_TIMER, - SYS_QUEUE_MULTITHREADED, - SYS_QUEUE_DO_NOT_USE, - SYS_QUEUE_LONG_FUNCTION, - SYS_QUEUE_COUNT, -}; - -static struct queue system_queues[SYS_QUEUE_COUNT]; - -static struct queue *get_system_queue(DWORD queue_id) -{ - switch (queue_id) - { - case MFASYNC_CALLBACK_QUEUE_STANDARD: - case MFASYNC_CALLBACK_QUEUE_RT: - case MFASYNC_CALLBACK_QUEUE_IO: - case MFASYNC_CALLBACK_QUEUE_TIMER: - case MFASYNC_CALLBACK_QUEUE_MULTITHREADED: - case MFASYNC_CALLBACK_QUEUE_LONG_FUNCTION: - return &system_queues[queue_id - 1]; - default: - return NULL; - } -} - -static void CALLBACK standard_queue_cleanup_callback(void *object_data, void *group_data) -{ -} - -static struct work_item * alloc_work_item(struct queue *queue, IMFAsyncResult *result) -{ - struct work_item *item; - - item = heap_alloc_zero(sizeof(*item)); - item->result = result; - IMFAsyncResult_AddRef(item->result); - item->refcount = 1; - item->queue = queue; - list_init(&item->entry); - - return item; -} - -static void release_work_item(struct work_item *item) -{ - if (InterlockedDecrement(&item->refcount) == 0) - { - IMFAsyncResult_Release(item->result); - heap_free(item); - } -} - -static void init_work_queue(MFASYNC_WORKQUEUE_TYPE queue_type, struct queue *queue) -{ - TP_CALLBACK_ENVIRON_V3 env; - unsigned int max_thread, i; - - queue->pool = CreateThreadpool(NULL); - - memset(&env, 0, sizeof(env)); - env.Version = 3; - env.Size = sizeof(env); - env.Pool = queue->pool; - env.CleanupGroup = CreateThreadpoolCleanupGroup(); - env.CleanupGroupCancelCallback = standard_queue_cleanup_callback; - env.CallbackPriority = TP_CALLBACK_PRIORITY_NORMAL; - for (i = 0; i < ARRAY_SIZE(queue->envs); ++i) - { - queue->envs[i] = env; - queue->envs[i].CallbackPriority = priorities[i]; - } - list_init(&queue->pending_items); - InitializeCriticalSection(&queue->cs); - - max_thread = (queue_type == MF_STANDARD_WORKQUEUE || queue_type == MF_WINDOW_WORKQUEUE) ? 1 : 4; - - SetThreadpoolThreadMinimum(queue->pool, 1); - SetThreadpoolThreadMaximum(queue->pool, max_thread); - - if (queue_type == MF_WINDOW_WORKQUEUE) - FIXME("MF_WINDOW_WORKQUEUE is not supported.\n"); -} - -static HRESULT grab_queue(DWORD queue_id, struct queue **ret) -{ - struct queue *queue = get_system_queue(queue_id); - MFASYNC_WORKQUEUE_TYPE queue_type; - struct queue_handle *entry; - - *ret = NULL; - - if (!system_queues[SYS_QUEUE_STANDARD].pool) - return MF_E_SHUTDOWN; - - if (queue && queue->pool) - { - *ret = queue; - return S_OK; - } - else if (queue) - { - EnterCriticalSection(&queues_section); - switch (queue_id) - { - case MFASYNC_CALLBACK_QUEUE_IO: - case MFASYNC_CALLBACK_QUEUE_MULTITHREADED: - case MFASYNC_CALLBACK_QUEUE_LONG_FUNCTION: - queue_type = MF_MULTITHREADED_WORKQUEUE; - break; - default: - queue_type = MF_STANDARD_WORKQUEUE; - } - init_work_queue(queue_type, queue); - LeaveCriticalSection(&queues_section); - *ret = queue; - return S_OK; - } - - /* Handles user queues. */ - if ((entry = get_queue_obj(queue_id))) - *ret = entry->obj; - - return *ret ? S_OK : MF_E_INVALID_WORKQUEUE; -} - -void init_system_queues(void) -{ - /* Always initialize standard queue, keep the rest lazy. */ - - EnterCriticalSection(&queues_section); - - if (system_queues[SYS_QUEUE_STANDARD].pool) - { - LeaveCriticalSection(&queues_section); - return; - } - - init_work_queue(MF_STANDARD_WORKQUEUE, &system_queues[SYS_QUEUE_STANDARD]); - - LeaveCriticalSection(&queues_section); -} - -static HRESULT lock_user_queue(DWORD queue) -{ - HRESULT hr = MF_E_INVALID_WORKQUEUE; - struct queue_handle *entry; - - if (!(queue & MFASYNC_CALLBACK_QUEUE_PRIVATE_MASK)) - return S_OK; - - EnterCriticalSection(&queues_section); - entry = get_queue_obj(queue); - if (entry && entry->refcount) - { - entry->refcount++; - hr = S_OK; - } - LeaveCriticalSection(&queues_section); - return hr; -} - -static void shutdown_queue(struct queue *queue) -{ - struct work_item *item, *item2; - - if (!queue->pool) - return; - - CloseThreadpoolCleanupGroupMembers(queue->envs[0].CleanupGroup, TRUE, NULL); - CloseThreadpool(queue->pool); - queue->pool = NULL; - - EnterCriticalSection(&queue->cs); - LIST_FOR_EACH_ENTRY_SAFE(item, item2, &queue->pending_items, struct work_item, entry) - { - list_remove(&item->entry); - release_work_item(item); - } - LeaveCriticalSection(&queue->cs); - - DeleteCriticalSection(&queue->cs); -} - -static HRESULT unlock_user_queue(DWORD queue) -{ - HRESULT hr = MF_E_INVALID_WORKQUEUE; - struct queue_handle *entry; - - if (!(queue & MFASYNC_CALLBACK_QUEUE_PRIVATE_MASK)) - return S_OK; - - EnterCriticalSection(&queues_section); - entry = get_queue_obj(queue); - if (entry && entry->refcount) - { - if (--entry->refcount == 0) - { - shutdown_queue((struct queue *)entry->obj); - heap_free(entry->obj); - entry->obj = next_free_user_queue; - next_free_user_queue = entry; - } - hr = S_OK; - } - LeaveCriticalSection(&queues_section); - return hr; -} - -void shutdown_system_queues(void) -{ - unsigned int i; - - EnterCriticalSection(&queues_section); - - for (i = 0; i < ARRAY_SIZE(system_queues); ++i) - { - shutdown_queue(&system_queues[i]); - } - - LeaveCriticalSection(&queues_section); -} - -static struct work_item *grab_work_item(struct work_item *item) -{ - InterlockedIncrement(&item->refcount); - return item; -} - -static void CALLBACK standard_queue_worker(TP_CALLBACK_INSTANCE *instance, void *context, TP_WORK *work) -{ - struct work_item *item = context; - MFASYNCRESULT *result = (MFASYNCRESULT *)item->result; - - TRACE("result object %p.\n", result); - - IMFAsyncCallback_Invoke(result->pCallback, item->result); - - release_work_item(item); -} - -static HRESULT queue_submit_item(struct queue *queue, LONG priority, IMFAsyncResult *result) -{ - TP_CALLBACK_PRIORITY callback_priority; - struct work_item *item; - TP_WORK *work_object; - - if (!(item = alloc_work_item(queue, result))) - return E_OUTOFMEMORY; - - if (priority == 0) - callback_priority = TP_CALLBACK_PRIORITY_NORMAL; - else if (priority < 0) - callback_priority = TP_CALLBACK_PRIORITY_LOW; - else - callback_priority = TP_CALLBACK_PRIORITY_HIGH; - work_object = CreateThreadpoolWork(standard_queue_worker, item, - (TP_CALLBACK_ENVIRON *)&queue->envs[callback_priority]); - SubmitThreadpoolWork(work_object); - - TRACE("dispatched %p.\n", result); - - return S_OK; -} - -static HRESULT queue_put_work_item(DWORD queue_id, LONG priority, IMFAsyncResult *result) -{ - struct queue *queue; - HRESULT hr; - - if (FAILED(hr = grab_queue(queue_id, &queue))) - return hr; - - hr = queue_submit_item(queue, priority, result); - - return hr; -} - -static HRESULT invoke_async_callback(IMFAsyncResult *result) -{ - MFASYNCRESULT *result_data = (MFASYNCRESULT *)result; - DWORD queue = MFASYNC_CALLBACK_QUEUE_STANDARD, flags; - HRESULT hr; - - if (FAILED(IMFAsyncCallback_GetParameters(result_data->pCallback, &flags, &queue))) - queue = MFASYNC_CALLBACK_QUEUE_STANDARD; - - if (FAILED(lock_user_queue(queue))) - queue = MFASYNC_CALLBACK_QUEUE_STANDARD; - - hr = queue_put_work_item(queue, 0, result); - - unlock_user_queue(queue); - - return hr; -} - -static void queue_release_pending_item(struct work_item *item) -{ - EnterCriticalSection(&item->queue->cs); - if (item->key) - { - list_remove(&item->entry); - item->key = 0; - release_work_item(item); - } - LeaveCriticalSection(&item->queue->cs); -} - -static void CALLBACK waiting_item_callback(TP_CALLBACK_INSTANCE *instance, void *context, TP_WAIT *wait, - TP_WAIT_RESULT wait_result) -{ - struct work_item *item = context; - - TRACE("result object %p.\n", item->result); - - invoke_async_callback(item->result); - - release_work_item(item); -} - -static void CALLBACK waiting_item_cancelable_callback(TP_CALLBACK_INSTANCE *instance, void *context, TP_WAIT *wait, - TP_WAIT_RESULT wait_result) -{ - struct work_item *item = context; - - TRACE("result object %p.\n", item->result); - - queue_release_pending_item(item); - - invoke_async_callback(item->result); - - release_work_item(item); -} - -static void CALLBACK scheduled_item_callback(TP_CALLBACK_INSTANCE *instance, void *context, TP_TIMER *timer) -{ - struct work_item *item = context; - - TRACE("result object %p.\n", item->result); - - invoke_async_callback(item->result); - - release_work_item(item); -} - -static void CALLBACK scheduled_item_cancelable_callback(TP_CALLBACK_INSTANCE *instance, void *context, TP_TIMER *timer) -{ - struct work_item *item = context; - - TRACE("result object %p.\n", item->result); - - queue_release_pending_item(item); - - invoke_async_callback(item->result); - - release_work_item(item); -} - -static void CALLBACK periodic_item_callback(TP_CALLBACK_INSTANCE *instance, void *context, TP_TIMER *timer) -{ - struct work_item *item = grab_work_item(context); - - invoke_async_callback(item->result); - - release_work_item(item); -} - -static void queue_mark_item_pending(DWORD mask, struct work_item *item, MFWORKITEM_KEY *key) -{ - *key = generate_item_key(mask); - item->key = *key; - - EnterCriticalSection(&item->queue->cs); - list_add_tail(&item->queue->pending_items, &item->entry); - grab_work_item(item); - LeaveCriticalSection(&item->queue->cs); -} - -static HRESULT queue_submit_wait(struct queue *queue, HANDLE event, LONG priority, IMFAsyncResult *result, - MFWORKITEM_KEY *key) -{ - PTP_WAIT_CALLBACK callback; - struct work_item *item; - - if (!(item = alloc_work_item(queue, result))) - return E_OUTOFMEMORY; - - if (key) - { - queue_mark_item_pending(WAIT_ITEM_KEY_MASK, item, key); - callback = waiting_item_cancelable_callback; - } - else - callback = waiting_item_callback; - - item->u.wait_object = CreateThreadpoolWait(callback, item, - (TP_CALLBACK_ENVIRON *)&queue->envs[TP_CALLBACK_PRIORITY_NORMAL]); - SetThreadpoolWait(item->u.wait_object, event, NULL); - - TRACE("dispatched %p.\n", result); - - return S_OK; -} - -static HRESULT queue_submit_timer(struct queue *queue, IMFAsyncResult *result, INT64 timeout, DWORD period, - MFWORKITEM_KEY *key) -{ - PTP_TIMER_CALLBACK callback; - struct work_item *item; - FILETIME filetime; - LARGE_INTEGER t; - - if (!(item = alloc_work_item(queue, result))) - return E_OUTOFMEMORY; - - if (key) - { - queue_mark_item_pending(SCHEDULED_ITEM_KEY_MASK, item, key); - } - - if (period) - callback = periodic_item_callback; - else - callback = key ? scheduled_item_cancelable_callback : scheduled_item_callback; - - t.QuadPart = timeout * 1000 * 10; - filetime.dwLowDateTime = t.u.LowPart; - filetime.dwHighDateTime = t.u.HighPart; - - item->u.timer_object = CreateThreadpoolTimer(callback, item, - (TP_CALLBACK_ENVIRON *)&queue->envs[TP_CALLBACK_PRIORITY_NORMAL]); - SetThreadpoolTimer(item->u.timer_object, &filetime, period, 0); - - TRACE("dispatched %p.\n", result); - - return S_OK; -} - -static HRESULT queue_cancel_item(struct queue *queue, MFWORKITEM_KEY key) -{ - HRESULT hr = MF_E_NOT_FOUND; - struct work_item *item; - - EnterCriticalSection(&queue->cs); - LIST_FOR_EACH_ENTRY(item, &queue->pending_items, struct work_item, entry) - { - if (item->key == key) - { - key >>= 32; - if ((key & WAIT_ITEM_KEY_MASK) == WAIT_ITEM_KEY_MASK) - CloseThreadpoolWait(item->u.wait_object); - else if ((key & SCHEDULED_ITEM_KEY_MASK) == SCHEDULED_ITEM_KEY_MASK) - CloseThreadpoolTimer(item->u.timer_object); - else - WARN("Unknown item key mask %#x.\n", (DWORD)key); - queue_release_pending_item(item); - hr = S_OK; - break; - } - } - LeaveCriticalSection(&queue->cs); - - return hr; -} - -static HRESULT alloc_user_queue(MFASYNC_WORKQUEUE_TYPE queue_type, DWORD *queue_id) -{ - struct queue_handle *entry; - struct queue *queue; - unsigned int idx; - - *queue_id = MFASYNC_CALLBACK_QUEUE_UNDEFINED; - - if (!is_platform_locked()) - return MF_E_SHUTDOWN; - - queue = heap_alloc_zero(sizeof(*queue)); - if (!queue) - return E_OUTOFMEMORY; - init_work_queue(queue_type, queue); - - EnterCriticalSection(&queues_section); - - entry = next_free_user_queue; - if (entry) - next_free_user_queue = entry->obj; - else if (next_unused_user_queue < user_queues + MAX_USER_QUEUE_HANDLES) - entry = next_unused_user_queue++; - else - { - LeaveCriticalSection(&queues_section); - heap_free(queue); - WARN("Out of user queue handles.\n"); - return E_OUTOFMEMORY; - } - - entry->refcount = 1; - entry->obj = queue; - if (++queue_generation == 0xffff) queue_generation = 1; - entry->generation = queue_generation; - idx = entry - user_queues + FIRST_USER_QUEUE_HANDLE; - *queue_id = (idx << 16) | entry->generation; - - LeaveCriticalSection(&queues_section); - - return S_OK; -} - -struct async_result -{ - MFASYNCRESULT result; - LONG refcount; - IUnknown *object; - IUnknown *state; -}; - -static struct async_result *impl_from_IMFAsyncResult(IMFAsyncResult *iface) -{ - return CONTAINING_RECORD(iface, struct async_result, result.AsyncResult); -} - -static HRESULT WINAPI async_result_QueryInterface(IMFAsyncResult *iface, REFIID riid, void **obj) -{ - TRACE("%p, %s, %p.\n", iface, debugstr_guid(riid), obj); - - if (IsEqualIID(riid, &IID_IMFAsyncResult) || - IsEqualIID(riid, &IID_IUnknown)) - { - *obj = iface; - IMFAsyncResult_AddRef(iface); - return S_OK; - } - - *obj = NULL; - WARN("Unsupported interface %s.\n", debugstr_guid(riid)); - return E_NOINTERFACE; -} - -static ULONG WINAPI async_result_AddRef(IMFAsyncResult *iface) -{ - struct async_result *result = impl_from_IMFAsyncResult(iface); - ULONG refcount = InterlockedIncrement(&result->refcount); - - TRACE("%p, %u.\n", iface, refcount); - - return refcount; -} - -static ULONG WINAPI async_result_Release(IMFAsyncResult *iface) -{ - struct async_result *result = impl_from_IMFAsyncResult(iface); - ULONG refcount = InterlockedDecrement(&result->refcount); - - TRACE("%p, %u.\n", iface, refcount); - - if (!refcount) - { - if (result->result.pCallback) - IMFAsyncCallback_Release(result->result.pCallback); - if (result->object) - IUnknown_Release(result->object); - if (result->state) - IUnknown_Release(result->state); - if (result->result.hEvent) - CloseHandle(result->result.hEvent); - heap_free(result); - - MFUnlockPlatform(); - } - - return refcount; -} - -static HRESULT WINAPI async_result_GetState(IMFAsyncResult *iface, IUnknown **state) -{ - struct async_result *result = impl_from_IMFAsyncResult(iface); - - TRACE("%p, %p.\n", iface, state); - - if (!result->state) - return E_POINTER; - - *state = result->state; - IUnknown_AddRef(*state); - - return S_OK; -} - -static HRESULT WINAPI async_result_GetStatus(IMFAsyncResult *iface) -{ - struct async_result *result = impl_from_IMFAsyncResult(iface); - - TRACE("%p.\n", iface); - - return result->result.hrStatusResult; -} - -static HRESULT WINAPI async_result_SetStatus(IMFAsyncResult *iface, HRESULT status) -{ - struct async_result *result = impl_from_IMFAsyncResult(iface); - - TRACE("%p, %#x.\n", iface, status); - - result->result.hrStatusResult = status; - - return S_OK; -} - -static HRESULT WINAPI async_result_GetObject(IMFAsyncResult *iface, IUnknown **object) -{ - struct async_result *result = impl_from_IMFAsyncResult(iface); - - TRACE("%p, %p.\n", iface, object); - - if (!result->object) - return E_POINTER; - - *object = result->object; - IUnknown_AddRef(*object); - - return S_OK; -} - -static IUnknown * WINAPI async_result_GetStateNoAddRef(IMFAsyncResult *iface) -{ - struct async_result *result = impl_from_IMFAsyncResult(iface); - - TRACE("%p.\n", iface); - - return result->state; -} - -static const IMFAsyncResultVtbl async_result_vtbl = -{ - async_result_QueryInterface, - async_result_AddRef, - async_result_Release, - async_result_GetState, - async_result_GetStatus, - async_result_SetStatus, - async_result_GetObject, - async_result_GetStateNoAddRef, -}; - -static HRESULT create_async_result(IUnknown *object, IMFAsyncCallback *callback, IUnknown *state, IMFAsyncResult **out) -{ - struct async_result *result; - - if (!out) - return E_INVALIDARG; - - result = heap_alloc_zero(sizeof(*result)); - if (!result) - return E_OUTOFMEMORY; - - MFLockPlatform(); - - result->result.AsyncResult.lpVtbl = &async_result_vtbl; - result->refcount = 1; - result->object = object; - if (result->object) - IUnknown_AddRef(result->object); - result->result.pCallback = callback; - if (result->result.pCallback) - IMFAsyncCallback_AddRef(result->result.pCallback); - result->state = state; - if (result->state) - IUnknown_AddRef(result->state); - - *out = &result->result.AsyncResult; - - TRACE("Created async result object %p.\n", *out); - - return S_OK; -} - -/*********************************************************************** - * MFCreateAsyncResult (mfplat.@) - */ -HRESULT WINAPI MFCreateAsyncResult(IUnknown *object, IMFAsyncCallback *callback, IUnknown *state, IMFAsyncResult **out) -{ - TRACE("%p, %p, %p, %p.\n", object, callback, state, out); - - return create_async_result(object, callback, state, out); -} - /*********************************************************************** * MFAllocateWorkQueue (mfplat.@) */ @@ -807,37 +36,7 @@ HRESULT WINAPI MFAllocateWorkQueue(DWORD *queue) { TRACE("%p.\n", queue);
- return alloc_user_queue(MF_STANDARD_WORKQUEUE, queue); -} - -/*********************************************************************** - * MFAllocateWorkQueueEx (mfplat.@) - */ -HRESULT WINAPI MFAllocateWorkQueueEx(MFASYNC_WORKQUEUE_TYPE queue_type, DWORD *queue) -{ - TRACE("%d, %p.\n", queue_type, queue); - - return alloc_user_queue(queue_type, queue); -} - -/*********************************************************************** - * MFLockWorkQueue (mfplat.@) - */ -HRESULT WINAPI MFLockWorkQueue(DWORD queue) -{ - TRACE("%#x.\n", queue); - - return lock_user_queue(queue); -} - -/*********************************************************************** - * MFUnlockWorkQueue (mfplat.@) - */ -HRESULT WINAPI MFUnlockWorkQueue(DWORD queue) -{ - TRACE("%#x.\n", queue); - - return unlock_user_queue(queue); + return RtwqAllocateWorkQueue(RTWQ_STANDARD_WORKQUEUE, queue); }
/*********************************************************************** @@ -853,7 +52,7 @@ HRESULT WINAPI MFPutWorkItem(DWORD queue, IMFAsyncCallback *callback, IUnknown * if (FAILED(hr = MFCreateAsyncResult(NULL, callback, state, &result))) return hr;
- hr = queue_put_work_item(queue, 0, result); + hr = MFPutWorkItemEx2(queue, 0, result);
IMFAsyncResult_Release(result);
@@ -873,7 +72,7 @@ HRESULT WINAPI MFPutWorkItem2(DWORD queue, LONG priority, IMFAsyncCallback *call if (FAILED(hr = MFCreateAsyncResult(NULL, callback, state, &result))) return hr;
- hr = queue_put_work_item(queue, priority, result); + hr = MFPutWorkItemEx2(queue, priority, result);
IMFAsyncResult_Release(result);
@@ -887,7 +86,7 @@ HRESULT WINAPI MFPutWorkItemEx(DWORD queue, IMFAsyncResult *result) { TRACE("%#x, %p\n", queue, result);
- return queue_put_work_item(queue, 0, result); + return MFPutWorkItemEx2(queue, 0, result); }
/*********************************************************************** @@ -897,46 +96,11 @@ HRESULT WINAPI MFPutWorkItemEx2(DWORD queue, LONG priority, IMFAsyncResult *resu { TRACE("%#x, %d, %p\n", queue, priority, result);
- return queue_put_work_item(queue, priority, result); + return RtwqPutWorkItem(queue, priority, (IRtwqAsyncResult *)result); }
/*********************************************************************** - * MFInvokeCallback (mfplat.@) - */ -HRESULT WINAPI MFInvokeCallback(IMFAsyncResult *result) -{ - TRACE("%p.\n", result); - - return invoke_async_callback(result); -} - -static HRESULT schedule_work_item(IMFAsyncResult *result, INT64 timeout, MFWORKITEM_KEY *key) -{ - struct queue *queue; - HRESULT hr; - - if (FAILED(hr = grab_queue(MFASYNC_CALLBACK_QUEUE_TIMER, &queue))) - return hr; - - TRACE("%p, %s, %p.\n", result, wine_dbgstr_longlong(timeout), key); - - hr = queue_submit_timer(queue, result, timeout, 0, key); - - return hr; -} - -/*********************************************************************** - * MFScheduleWorkItemEx (mfplat.@) - */ -HRESULT WINAPI MFScheduleWorkItemEx(IMFAsyncResult *result, INT64 timeout, MFWORKITEM_KEY *key) -{ - TRACE("%p, %s, %p.\n", result, wine_dbgstr_longlong(timeout), key); - - return schedule_work_item(result, timeout, key); -} - -/*********************************************************************** - * MFScheduleWorkItemEx (mfplat.@) + * MFScheduleWorkItem (mfplat.@) */ HRESULT WINAPI MFScheduleWorkItem(IMFAsyncCallback *callback, IUnknown *state, INT64 timeout, MFWORKITEM_KEY *key) { @@ -948,7 +112,7 @@ HRESULT WINAPI MFScheduleWorkItem(IMFAsyncCallback *callback, IUnknown *state, I if (FAILED(hr = MFCreateAsyncResult(NULL, callback, state, &result))) return hr;
- hr = schedule_work_item(result, timeout, key); + hr = MFScheduleWorkItemEx(result, timeout, key);
IMFAsyncResult_Release(result);
@@ -956,200 +120,43 @@ HRESULT WINAPI MFScheduleWorkItem(IMFAsyncCallback *callback, IUnknown *state, I }
/*********************************************************************** - * MFPutWaitingWorkItem (mfplat.@) + * MFScheduleWorkItemEx (mfplat.@) */ -HRESULT WINAPI MFPutWaitingWorkItem(HANDLE event, LONG priority, IMFAsyncResult *result, MFWORKITEM_KEY *key) +HRESULT WINAPI MFScheduleWorkItemEx(IMFAsyncResult *result, INT64 timeout, MFWORKITEM_KEY *key) { - struct queue *queue; - HRESULT hr; - - TRACE("%p, %d, %p, %p.\n", event, priority, result, key); - - if (FAILED(hr = grab_queue(MFASYNC_CALLBACK_QUEUE_TIMER, &queue))) - return hr; - - hr = queue_submit_wait(queue, event, priority, result, key); + TRACE("%p, %s, %p.\n", result, wine_dbgstr_longlong(timeout), key);
- return hr; + return RtwqScheduleWorkItem((IRtwqAsyncResult *)result, timeout, key); }
/*********************************************************************** - * MFCancelWorkItem (mfplat.@) + * MFInvokeCallback (mfplat.@) */ -HRESULT WINAPI MFCancelWorkItem(MFWORKITEM_KEY key) +HRESULT WINAPI MFInvokeCallback(IMFAsyncResult *result) { - struct queue *queue; - HRESULT hr; - - TRACE("%s.\n", wine_dbgstr_longlong(key)); - - if (FAILED(hr = grab_queue(MFASYNC_CALLBACK_QUEUE_TIMER, &queue))) - return hr; - - hr = queue_cancel_item(queue, key); - - return hr; -} + TRACE("%p.\n", result);
-static DWORD get_timer_period(void) -{ - return 10; + return RtwqInvokeCallback((IRtwqAsyncResult *)result); }
/*********************************************************************** - * MFGetTimerPeriodicity (mfplat.@) + * MFCreateAsyncResult (mfplat.@) */ -HRESULT WINAPI MFGetTimerPeriodicity(DWORD *period) -{ - TRACE("%p.\n", period); - - *period = get_timer_period(); - - return S_OK; -} - -struct periodic_callback -{ - IMFAsyncCallback IMFAsyncCallback_iface; - LONG refcount; - MFPERIODICCALLBACK callback; -}; - -static struct periodic_callback *impl_from_IMFAsyncCallback(IMFAsyncCallback *iface) -{ - return CONTAINING_RECORD(iface, struct periodic_callback, IMFAsyncCallback_iface); -} - -static HRESULT WINAPI periodic_callback_QueryInterface(IMFAsyncCallback *iface, REFIID riid, void **obj) -{ - if (IsEqualIID(riid, &IID_IMFAsyncCallback) || - IsEqualIID(riid, &IID_IUnknown)) - { - *obj = iface; - IMFAsyncCallback_AddRef(iface); - return S_OK; - } - - *obj = NULL; - return E_NOINTERFACE; -} - -static ULONG WINAPI periodic_callback_AddRef(IMFAsyncCallback *iface) -{ - struct periodic_callback *callback = impl_from_IMFAsyncCallback(iface); - ULONG refcount = InterlockedIncrement(&callback->refcount); - - TRACE("%p, %u.\n", iface, refcount); - - return refcount; -} - -static ULONG WINAPI periodic_callback_Release(IMFAsyncCallback *iface) -{ - struct periodic_callback *callback = impl_from_IMFAsyncCallback(iface); - ULONG refcount = InterlockedDecrement(&callback->refcount); - - TRACE("%p, %u.\n", iface, refcount); - - if (!refcount) - heap_free(callback); - - return refcount; -} - -static HRESULT WINAPI periodic_callback_GetParameters(IMFAsyncCallback *iface, DWORD *flags, DWORD *queue) -{ - return E_NOTIMPL; -} - -static HRESULT WINAPI periodic_callback_Invoke(IMFAsyncCallback *iface, IMFAsyncResult *result) -{ - struct periodic_callback *callback = impl_from_IMFAsyncCallback(iface); - IUnknown *context = NULL; - - if (FAILED(IMFAsyncResult_GetObject(result, &context))) - WARN("Expected object to be set for result object.\n"); - - callback->callback(context); - - if (context) - IUnknown_Release(context); - - return S_OK; -} - -static const IMFAsyncCallbackVtbl periodic_callback_vtbl = -{ - periodic_callback_QueryInterface, - periodic_callback_AddRef, - periodic_callback_Release, - periodic_callback_GetParameters, - periodic_callback_Invoke, -}; - -static HRESULT create_periodic_callback_obj(MFPERIODICCALLBACK callback, IMFAsyncCallback **out) +HRESULT WINAPI MFCreateAsyncResult(IUnknown *object, IMFAsyncCallback *callback, IUnknown *state, IMFAsyncResult **out) { - struct periodic_callback *object; - - object = heap_alloc(sizeof(*object)); - if (!object) - return E_OUTOFMEMORY; - - object->IMFAsyncCallback_iface.lpVtbl = &periodic_callback_vtbl; - object->refcount = 1; - object->callback = callback; - - *out = &object->IMFAsyncCallback_iface; + TRACE("%p, %p, %p, %p.\n", object, callback, state, out);
- return S_OK; + return RtwqCreateAsyncResult(object, (IRtwqAsyncCallback *)callback, state, (IRtwqAsyncResult **)out); }
/*********************************************************************** - * MFAddPeriodicCallback (mfplat.@) + * MFGetTimerPeriodicity (mfplat.@) */ -HRESULT WINAPI MFAddPeriodicCallback(MFPERIODICCALLBACK callback, IUnknown *context, DWORD *key) +HRESULT WINAPI MFGetTimerPeriodicity(DWORD *period) { - IMFAsyncCallback *periodic_callback; - MFWORKITEM_KEY workitem_key; - IMFAsyncResult *result; - struct queue *queue; - HRESULT hr; - - TRACE("%p, %p, %p.\n", callback, context, key); - - if (FAILED(hr = grab_queue(MFASYNC_CALLBACK_QUEUE_TIMER, &queue))) - return hr; - - if (FAILED(hr = create_periodic_callback_obj(callback, &periodic_callback))) - return hr; - - hr = create_async_result(context, periodic_callback, NULL, &result); - IMFAsyncCallback_Release(periodic_callback); - if (FAILED(hr)) - return hr; - - hr = queue_submit_timer(queue, result, 0, get_timer_period(), key ? &workitem_key : NULL); - - IMFAsyncResult_Release(result); + TRACE("%p.\n", period);
- if (key) - *key = workitem_key; + *period = 10;
return S_OK; } - -/*********************************************************************** - * MFRemovePeriodicCallback (mfplat.@) - */ -HRESULT WINAPI MFRemovePeriodicCallback(DWORD key) -{ - struct queue *queue; - HRESULT hr; - - TRACE("%#x.\n", key); - - if (FAILED(hr = grab_queue(MFASYNC_CALLBACK_QUEUE_TIMER, &queue))) - return hr; - - return queue_cancel_item(queue, get_item_key(SCHEDULED_ITEM_KEY_MASK, key)); -} diff --git a/dlls/rtworkq/queue.c b/dlls/rtworkq/queue.c index a6aac6abda..ffe2fd1ae0 100644 --- a/dlls/rtworkq/queue.c +++ b/dlls/rtworkq/queue.c @@ -1071,3 +1071,14 @@ HRESULT WINAPI RtwqUnlockWorkQueue(DWORD queue)
return unlock_user_queue(queue); } + +BOOL WINAPI DllMain(HINSTANCE hinstDLL, DWORD reason, void *reserved) +{ + switch (reason) + { + case DLL_PROCESS_ATTACH: + DisableThreadLibraryCalls(hinstDLL); + break; + } + return TRUE; +}