-- v2: wmvcore: Add descriptions to wmvcore async reader threads. wmvcore: Implement DedicatedDeliveryThread async reader output setting. wmvcore: Spawn stream read threads for async reader streams. wmvcore: Move async reader from winegstreamer/wm_asyncreader.c.
From: Rémi Bernon rbernon@codeweavers.com
--- dlls/winegstreamer/Makefile.in | 1 - dlls/winegstreamer/gst_private.h | 2 -- dlls/winegstreamer/winegstreamer.spec | 1 - dlls/wmvcore/Makefile.in | 1 + .../async_reader.c} | 31 +++++++++++++++++-- dlls/wmvcore/wmvcore_main.c | 27 +++++----------- dlls/wmvcore/{wmvcore.h => wmvcore_private.h} | 30 ++++++++++++++++-- dlls/wmvcore/writer.c | 2 +- 8 files changed, 66 insertions(+), 29 deletions(-) rename dlls/{winegstreamer/wm_asyncreader.c => wmvcore/async_reader.c} (99%) rename dlls/wmvcore/{wmvcore.h => wmvcore_private.h} (57%)
diff --git a/dlls/winegstreamer/Makefile.in b/dlls/winegstreamer/Makefile.in index 3daa929b36d..064a8b68343 100644 --- a/dlls/winegstreamer/Makefile.in +++ b/dlls/winegstreamer/Makefile.in @@ -22,7 +22,6 @@ C_SRCS = \ wg_parser.c \ wg_sample.c \ wg_transform.c \ - wm_asyncreader.c \ wm_reader.c \ wma_decoder.c \ wmv_decoder.c diff --git a/dlls/winegstreamer/gst_private.h b/dlls/winegstreamer/gst_private.h index 523e2711edb..ee5e6dae3cf 100644 --- a/dlls/winegstreamer/gst_private.h +++ b/dlls/winegstreamer/gst_private.h @@ -147,6 +147,4 @@ HRESULT aac_decoder_create(REFIID riid, void **ret); HRESULT h264_decoder_create(REFIID riid, void **ret); HRESULT video_processor_create(REFIID riid, void **ret);
-HRESULT WINAPI winegstreamer_create_wm_sync_reader(IUnknown *outer, void **out); - #endif /* __GST_PRIVATE_INCLUDED__ */ diff --git a/dlls/winegstreamer/winegstreamer.spec b/dlls/winegstreamer/winegstreamer.spec index 911689210b7..9804e324044 100644 --- a/dlls/winegstreamer/winegstreamer.spec +++ b/dlls/winegstreamer/winegstreamer.spec @@ -2,5 +2,4 @@ @ stdcall -private DllGetClassObject(ptr ptr ptr) @ stdcall -private DllRegisterServer() @ stdcall -private DllUnregisterServer() -@ stdcall winegstreamer_create_wm_async_reader(ptr) @ stdcall winegstreamer_create_wm_sync_reader(ptr ptr) diff --git a/dlls/wmvcore/Makefile.in b/dlls/wmvcore/Makefile.in index 6aed828abae..6f4f01bec40 100644 --- a/dlls/wmvcore/Makefile.in +++ b/dlls/wmvcore/Makefile.in @@ -7,6 +7,7 @@ EXTRADLLFLAGS = -Wb,--prefer-native
C_SRCS = \ wmvcore_main.c \ + async_reader.c \ writer.c
RC_SRCS = version.rc diff --git a/dlls/winegstreamer/wm_asyncreader.c b/dlls/wmvcore/async_reader.c similarity index 99% rename from dlls/winegstreamer/wm_asyncreader.c rename to dlls/wmvcore/async_reader.c index 409ebeae1af..a51018260b1 100644 --- a/dlls/winegstreamer/wm_asyncreader.c +++ b/dlls/wmvcore/async_reader.c @@ -16,8 +16,21 @@ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA */
-#include "gst_private.h" +#include <stdarg.h> +#include <stddef.h> +#include <stdbool.h>
+#define COBJMACROS +#include "windef.h" +#include "winbase.h" + +#define EXTERN_GUID DEFINE_GUID +#include "initguid.h" +#include "wmvcore_private.h" + +#include "wmsdk.h" + +#include "wine/debug.h" #include "wine/list.h"
WINE_DEFAULT_DEBUG_CHANNEL(wmvcore); @@ -1881,7 +1894,7 @@ static const IReferenceClockVtbl ReferenceClockVtbl = refclock_Unadvise };
-HRESULT WINAPI winegstreamer_create_wm_async_reader(IWMReader **reader) +static HRESULT WINAPI async_reader_create(IWMReader **reader) { struct async_reader *object; HRESULT hr; @@ -1927,3 +1940,17 @@ failed: free(object); return hr; } + +HRESULT WINAPI WMCreateReader(IUnknown *reserved, DWORD rights, IWMReader **reader) +{ + TRACE("reserved %p, rights %#lx, reader %p.\n", reserved, rights, reader); + + return async_reader_create(reader); +} + +HRESULT WINAPI WMCreateReaderPriv(IWMReader **reader) +{ + TRACE("reader %p.\n", reader); + + return async_reader_create(reader); +} diff --git a/dlls/wmvcore/wmvcore_main.c b/dlls/wmvcore/wmvcore_main.c index bae37cd1cff..016287b1b78 100644 --- a/dlls/wmvcore/wmvcore_main.c +++ b/dlls/wmvcore/wmvcore_main.c @@ -16,32 +16,21 @@ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA */
-#include "wmvcore.h" +#include <stdarg.h> +#include <stddef.h>
-#include "initguid.h" +#define COBJMACROS +#include "windef.h" +#include "winbase.h" + +#include "wmvcore_private.h" #include "wmsdk.h" + #include "wine/debug.h" #include "wine/heap.h"
WINE_DEFAULT_DEBUG_CHANNEL(wmvcore);
-HRESULT WINAPI winegstreamer_create_wm_async_reader(IWMReader **reader); -HRESULT WINAPI winegstreamer_create_wm_sync_reader(IUnknown *outer, void **out); - -HRESULT WINAPI WMCreateReader(IUnknown *reserved, DWORD rights, IWMReader **reader) -{ - TRACE("reserved %p, rights %#lx, reader %p.\n", reserved, rights, reader); - - return winegstreamer_create_wm_async_reader(reader); -} - -HRESULT WINAPI WMCreateReaderPriv(IWMReader **reader) -{ - TRACE("reader %p.\n", reader); - - return winegstreamer_create_wm_async_reader(reader); -} - HRESULT WINAPI WMCreateSyncReader(IUnknown *reserved, DWORD rights, IWMSyncReader **reader) { TRACE("reserved %p, rights %#lx, reader %p.\n", reserved, rights, reader); diff --git a/dlls/wmvcore/wmvcore.h b/dlls/wmvcore/wmvcore_private.h similarity index 57% rename from dlls/wmvcore/wmvcore.h rename to dlls/wmvcore/wmvcore_private.h index e17547052de..a5225d79da9 100644 --- a/dlls/wmvcore/wmvcore.h +++ b/dlls/wmvcore/wmvcore_private.h @@ -17,10 +17,34 @@ */
#include <stdarg.h> +#include <stddef.h>
#define COBJMACROS - -#define EXTERN_GUID DEFINE_GUID - #include "windef.h" #include "winbase.h" +#include "objidl.h" + +#include "wine/debug.h" + +HRESULT WINAPI winegstreamer_create_wm_sync_reader(IUnknown *outer, void **out); + +static inline const char *debugstr_time(LONGLONG time) +{ + ULONGLONG abstime = time >= 0 ? time : -time; + unsigned int i = 0, j = 0; + char buffer[23], rev[23]; + + while (abstime || i <= 8) + { + buffer[i++] = '0' + (abstime % 10); + abstime /= 10; + if (i == 7) buffer[i++] = '.'; + } + if (time < 0) buffer[i++] = '-'; + + while (i--) rev[j++] = buffer[i]; + while (rev[j-1] == '0' && rev[j-2] != '.') --j; + rev[j] = 0; + + return wine_dbg_sprintf("%s", rev); +} diff --git a/dlls/wmvcore/writer.c b/dlls/wmvcore/writer.c index 5c62cd35143..f34e40f0e47 100644 --- a/dlls/wmvcore/writer.c +++ b/dlls/wmvcore/writer.c @@ -16,7 +16,7 @@ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA */
-#include "wmvcore.h" +#include "wmvcore_private.h" #include "wmsdkidl.h"
#include "wine/debug.h"
From: Rémi Bernon rbernon@codeweavers.com
--- dlls/wmvcore/async_reader.c | 281 ++++++++++++++++++++++++++++++++--- dlls/wmvcore/tests/wmvcore.c | 4 - 2 files changed, 260 insertions(+), 25 deletions(-)
diff --git a/dlls/wmvcore/async_reader.c b/dlls/wmvcore/async_reader.c index a51018260b1..39dec5a873c 100644 --- a/dlls/wmvcore/async_reader.c +++ b/dlls/wmvcore/async_reader.c @@ -65,6 +65,18 @@ struct sample WORD stream; };
+struct stream +{ + struct async_reader *reader; + WORD number; + + HANDLE read_thread; + bool read_requested; + CONDITION_VARIABLE read_cv; + struct sample *next_sample; + HRESULT read_result; +}; + struct async_reader { IWMReader IWMReader_iface; @@ -78,6 +90,7 @@ struct async_reader LONG refcount;
IWMSyncReader2 *reader; + IWMProfile3 *profile;
CRITICAL_SECTION cs;
@@ -93,6 +106,9 @@ struct async_reader CRITICAL_SECTION callback_cs; CONDITION_VARIABLE callback_cv;
+ DWORD stream_count; + struct stream *streams; + bool running; struct list async_ops;
@@ -293,30 +309,179 @@ static void async_reader_deliver_sample(struct async_reader *reader, struct samp TRACE("Callback returned %#lx.\n", hr);
INSSBuffer_Release(sample->buffer); + free(sample); +} + +static void stream_request_read(struct stream *stream) +{ + struct async_reader *reader = stream->reader; + WMT_STREAM_SELECTION selection; + + /* stream is not selected, or already has a sample waiting to be delivered */ + if (FAILED(IWMSyncReader2_GetStreamSelected(reader->reader, stream->number, &selection)) + || selection == WMT_OFF || stream->next_sample) + return; + + stream->read_result = E_PENDING; + stream->read_requested = true; + WakeConditionVariable(&stream->read_cv); +} + +static DWORD WINAPI stream_read_thread(void *arg) +{ + struct stream *stream = arg; + struct async_reader *reader = stream->reader; + struct sample *sample; + HRESULT hr; + + TRACE("reader %p, number %u\n", reader, stream->number); + + EnterCriticalSection(&reader->callback_cs); + + while (reader->running) + { + if (!stream->read_requested) + { + SleepConditionVariableCS(&stream->read_cv, &reader->callback_cs, INFINITE); + continue; + } + + if (!(sample = calloc(1, sizeof(*sample)))) + { + WARN("Failed to allocate memory for sample.\n"); + continue; + } + + while (stream->read_requested) + { + stream->read_requested = false; + + if (sample->buffer) + INSSBuffer_Release(sample->buffer); + sample->buffer = NULL; + + LeaveCriticalSection(&reader->callback_cs); + hr = IWMSyncReader2_GetNextSample(reader->reader, stream->number, + &sample->buffer, &sample->pts, &sample->duration, + &sample->flags, &sample->output, &sample->stream); + EnterCriticalSection(&reader->callback_cs); + } + + if (SUCCEEDED(stream->read_result = hr)) + { + TRACE("Got stream %u buffer with pts %I64d.\n", stream->number, sample->pts); + stream->next_sample = sample; + } + else + { + WARN("Failed to get stream %u sample, hr %#lx.\n", stream->number, stream->read_result); + free(sample); + } + + WakeConditionVariable(&reader->callback_cv); + } + + LeaveCriticalSection(&reader->callback_cs); + + TRACE("Reader is stopping; exiting.\n"); + return 0; +} + +static void stream_flush_samples(struct stream *stream) +{ + struct sample *sample; + + if ((sample = stream->next_sample)) + { + stream->next_sample = NULL; + INSSBuffer_Release(sample->buffer); + free(sample); + } +} + +static void stream_close(struct stream *stream) +{ + if (stream->read_thread) + { + WakeConditionVariable(&stream->read_cv); + WaitForSingleObject(stream->read_thread, INFINITE); + CloseHandle(stream->read_thread); + stream->read_thread = NULL; + } + + stream_flush_samples(stream); +} + +static HRESULT stream_open(struct stream *stream, struct async_reader *reader, WORD number) +{ + stream->number = number; + stream->reader = reader; + + if (!(stream->read_thread = CreateThread(NULL, 0, stream_read_thread, stream, 0, NULL))) + return E_OUTOFMEMORY; + + return S_OK; +} + +static HRESULT async_reader_get_next_sample(struct async_reader *reader, + struct stream **out_stream, struct sample **out_sample) +{ + struct sample *sample, *first_sample = NULL; + struct stream *stream, *first_stream = NULL; + DWORD i; + + for (i = 0; i < reader->stream_count; ++i) + { + stream = reader->streams + i; + + if (!(sample = stream->next_sample)) + { + /* stream has a pending read request, wait for it */ + if (stream->read_result == E_PENDING) + return E_PENDING; + continue; + } + + if (!first_sample || first_sample->pts > sample->pts) + { + first_stream = stream; + first_sample = sample; + } + } + + if (!first_sample) + return NS_E_NO_MORE_SAMPLES; + + TRACE("Found first stream %u with pts %I64d.\n", first_stream->number, first_sample->pts); + + first_stream->next_sample = NULL; + *out_sample = first_sample; + *out_stream = first_stream; + return S_OK; }
-static void callback_thread_run(struct async_reader *reader) +static void async_reader_deliver_samples(struct async_reader *reader) { + static const DWORD zero; + IWMReaderCallbackAdvanced *callback_advanced = reader->callback_advanced; IWMReaderCallback *callback = reader->callback; - static const DWORD zero; HRESULT hr = S_OK;
+ TRACE("reader %p\n", reader); + while (reader->running && list_empty(&reader->async_ops)) { - struct sample sample; + struct sample *sample; + struct stream *stream;
- LeaveCriticalSection(&reader->callback_cs); - hr = IWMSyncReader2_GetNextSample(reader->reader, 0, &sample.buffer, &sample.pts, - &sample.duration, &sample.flags, &sample.output, &sample.stream); - EnterCriticalSection(&reader->callback_cs); - if (hr != S_OK) + if (FAILED(hr = async_reader_get_next_sample(reader, &stream, &sample))) break;
- if (async_reader_wait_pts(reader, sample.pts)) - async_reader_deliver_sample(reader, &sample); - else - INSSBuffer_Release(sample.buffer); + stream_request_read(stream); + + if (async_reader_wait_pts(reader, sample->pts)) + async_reader_deliver_sample(reader, sample); }
if (hr == NS_E_NO_MORE_SAMPLES) @@ -324,6 +489,8 @@ static void callback_thread_run(struct async_reader *reader) BOOL user_clock = reader->user_clock; QWORD user_time = reader->user_time;
+ TRACE("No more streams samples, sending EOF notifications.\n"); + LeaveCriticalSection(&reader->callback_cs);
IWMReaderCallback_OnStatus(callback, WMT_END_OF_STREAMING, S_OK, @@ -341,25 +508,28 @@ static void callback_thread_run(struct async_reader *reader) }
EnterCriticalSection(&reader->callback_cs); - - TRACE("Reached end of stream; exiting.\n"); } - else if (hr != S_OK) + else if (hr == E_PENDING) { - ERR("Failed to get sample, hr %#lx.\n", hr); + TRACE("Waiting for more streams samples.\n"); } }
static DWORD WINAPI async_reader_callback_thread(void *arg) { - struct async_reader *reader = arg; static const DWORD zero; + + struct async_reader *reader = arg; + BOOL started = false; struct list *entry; HRESULT hr = S_OK; + DWORD i;
IWMReaderCallback_OnStatus(reader->callback, WMT_OPENED, S_OK, WMT_TYPE_DWORD, (BYTE *)&zero, reader->context);
+ TRACE("reader %p\n", reader); + EnterCriticalSection(&reader->callback_cs);
while (reader->running) @@ -378,19 +548,29 @@ static DWORD WINAPI async_reader_callback_thread(void *arg) if (SUCCEEDED(hr)) hr = IWMSyncReader2_SetRange(reader->reader, op->u.start.start, op->u.start.duration); if (SUCCEEDED(hr)) + { reader->clock_start = get_current_time(reader); + started = true; + + for (i = 0; i < reader->stream_count; ++i) + { + struct stream *stream = reader->streams + i; + stream_flush_samples(stream); + stream_request_read(stream); + } + }
LeaveCriticalSection(&reader->callback_cs); IWMReaderCallback_OnStatus(reader->callback, WMT_STARTED, hr, WMT_TYPE_DWORD, (BYTE *)&zero, reader->context); EnterCriticalSection(&reader->callback_cs); - - if (SUCCEEDED(hr)) - callback_thread_run(reader); break; }
case ASYNC_OP_STOP: + if (SUCCEEDED(hr)) + started = false; + LeaveCriticalSection(&reader->callback_cs); IWMReaderCallback_OnStatus(reader->callback, WMT_STOPPED, hr, WMT_TYPE_DWORD, (BYTE *)&zero, reader->context); @@ -411,6 +591,9 @@ static DWORD WINAPI async_reader_callback_thread(void *arg) free(op); }
+ if (started) + async_reader_deliver_samples(reader); + if (reader->running && list_empty(&reader->async_ops)) SleepConditionVariableCS(&reader->callback_cv, &reader->callback_cs, INFINITE); } @@ -424,6 +607,7 @@ static DWORD WINAPI async_reader_callback_thread(void *arg) static void async_reader_close(struct async_reader *reader) { struct async_op *op, *next; + DWORD i;
if (reader->callback_thread) { @@ -438,6 +622,15 @@ static void async_reader_close(struct async_reader *reader) free(op); }
+ for (i = 0; reader->streams && i < reader->stream_count; ++i) + { + struct stream *stream = reader->streams + i; + stream_close(stream); + } + free(reader->streams); + reader->streams = NULL; + reader->stream_count = 0; + if (reader->allocator) IWMReaderAllocatorEx_Release(reader->allocator); reader->allocator = NULL; @@ -455,6 +648,7 @@ static void async_reader_close(struct async_reader *reader) static HRESULT async_reader_open(struct async_reader *reader, IWMReaderCallback *callback, void *context) { HRESULT hr = E_OUTOFMEMORY; + DWORD i;
IWMReaderCallback_AddRef((reader->callback = callback)); reader->context = context; @@ -469,7 +663,23 @@ static HRESULT async_reader_open(struct async_reader *reader, IWMReaderCallback reader->callback_advanced = NULL; }
+ if (FAILED(hr = IWMProfile3_GetStreamCount(reader->profile, &reader->stream_count))) + goto error; + + if (!(reader->streams = calloc(reader->stream_count, sizeof(*reader->streams)))) + { + hr = E_OUTOFMEMORY; + goto error; + } + reader->running = true; + for (i = 0; i < reader->stream_count; ++i) + { + struct stream *stream = reader->streams + i; + if (FAILED(hr = stream_open(stream, reader, i + 1))) + goto error; + } + if (!(reader->callback_thread = CreateThread(NULL, 0, async_reader_callback_thread, reader, 0, NULL))) goto error;
@@ -833,11 +1043,36 @@ static HRESULT WINAPI WMReaderAdvanced_SetStreamsSelected(IWMReaderAdvanced6 *if WORD count, WORD *stream_numbers, WMT_STREAM_SELECTION *selections) { struct async_reader *reader = impl_from_IWMReaderAdvanced6(iface); + HRESULT hr; + DWORD i;
TRACE("reader %p, count %u, stream_numbers %p, selections %p.\n", reader, count, stream_numbers, selections);
- return IWMSyncReader2_SetStreamsSelected(reader->reader, count, stream_numbers, selections); + if (FAILED(hr = IWMSyncReader2_SetStreamsSelected(reader->reader, count, stream_numbers, selections))) + return hr; + + EnterCriticalSection(&reader->cs); + + if (!reader->streams) + { + LeaveCriticalSection(&reader->cs); + return E_UNEXPECTED; + } + + EnterCriticalSection(&reader->callback_cs); + + for (i = 0; i < reader->stream_count; ++i) + { + struct stream *stream = reader->streams + i; + stream_request_read(stream); + } + + LeaveCriticalSection(&reader->callback_cs); + + LeaveCriticalSection(&reader->cs); + + return S_OK; }
static HRESULT WINAPI WMReaderAdvanced_GetStreamSelected(IWMReaderAdvanced6 *iface, @@ -1921,6 +2156,10 @@ static HRESULT WINAPI async_reader_create(IWMReader **reader) (void **)&object->reader))) goto failed; IWMReader_Release(&object->IWMReader_iface); + if (FAILED(hr = IUnknown_QueryInterface(object->reader_inner, &IID_IWMProfile3, + (void **)&object->profile))) + goto failed; + IWMReader_Release(&object->IWMReader_iface);
InitializeCriticalSection(&object->cs); object->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": async_reader.cs"); diff --git a/dlls/wmvcore/tests/wmvcore.c b/dlls/wmvcore/tests/wmvcore.c index 401856eb744..4d8ca1afe2e 100644 --- a/dlls/wmvcore/tests/wmvcore.c +++ b/dlls/wmvcore/tests/wmvcore.c @@ -2138,9 +2138,7 @@ static HRESULT WINAPI callback_advanced_AllocateForStream(IWMReaderCallbackAdvan trace("%lu: %04lx: IWMReaderCallbackAdvanced::AllocateForStream(output %u, size %lu)\n", GetTickCount(), GetCurrentThreadId(), stream_number, size);
- todo_wine ok(callback->callback_tid != GetCurrentThreadId(), "got wrong thread\n"); - todo_wine_if(callback->output_tid[stream_number - 1]) ok(callback->output_tid[stream_number - 1] != GetCurrentThreadId(), "got wrong thread\n");
ok(callback->read_compressed, "AllocateForStream() should only be called when reading compressed samples.\n"); @@ -2176,9 +2174,7 @@ static HRESULT WINAPI callback_advanced_AllocateForOutput(IWMReaderCallbackAdvan trace("%lu: %04lx: IWMReaderCallbackAdvanced::AllocateForOutput(output %lu, size %lu)\n", GetTickCount(), GetCurrentThreadId(), output, size);
- todo_wine ok(callback->callback_tid != GetCurrentThreadId(), "got wrong thread\n"); - todo_wine_if(callback->output_tid[output]) ok(callback->output_tid[output] != GetCurrentThreadId(), "got wrong thread\n");
if (!callback->read_compressed)
From: Rémi Bernon rbernon@codeweavers.com
This restores the todo_wine in the allocator callback as we're now allocating samples in the delivering thread as well. We'll need to create new threads for allocation to match native behavior. --- dlls/wmvcore/async_reader.c | 118 +++++++++++++++++++++++++++++++++-- dlls/wmvcore/tests/wmvcore.c | 16 +---- 2 files changed, 113 insertions(+), 21 deletions(-)
diff --git a/dlls/wmvcore/async_reader.c b/dlls/wmvcore/async_reader.c index 39dec5a873c..8049288e94b 100644 --- a/dlls/wmvcore/async_reader.c +++ b/dlls/wmvcore/async_reader.c @@ -59,6 +59,7 @@ struct async_op
struct sample { + struct list entry; INSSBuffer *buffer; QWORD pts, duration; DWORD flags, output; @@ -75,6 +76,11 @@ struct stream CONDITION_VARIABLE read_cv; struct sample *next_sample; HRESULT read_result; + + bool dedicated_delivery_thread; + HANDLE deliver_thread; + struct list deliver_samples; + CONDITION_VARIABLE deliver_cv; };
struct async_reader @@ -327,6 +333,48 @@ static void stream_request_read(struct stream *stream) WakeConditionVariable(&stream->read_cv); }
+static void stream_request_deliver(struct async_reader *reader, struct sample *sample) +{ + struct stream *stream = reader->streams + sample->output; + + list_add_tail(&stream->deliver_samples, &sample->entry); + WakeConditionVariable(&stream->deliver_cv); +} + +static DWORD WINAPI stream_deliver_thread(void *arg) +{ + struct stream *stream = arg; + struct async_reader *reader = stream->reader; + struct list *entry; + + TRACE("reader %p, number %u\n", reader, stream->number); + + EnterCriticalSection(&reader->callback_cs); + + while (reader->running) + { + if (list_empty(&stream->deliver_samples)) + { + SleepConditionVariableCS(&stream->deliver_cv, &reader->callback_cs, INFINITE); + continue; + } + + while ((entry = list_head(&stream->deliver_samples))) + { + struct sample *sample = LIST_ENTRY(entry, struct sample, entry); + list_remove(&sample->entry); + async_reader_deliver_sample(reader, sample); + } + + WakeConditionVariable(&reader->callback_cv); + } + + LeaveCriticalSection(&reader->callback_cs); + + TRACE("Reader is stopping; exiting.\n"); + return 0; +} + static DWORD WINAPI stream_read_thread(void *arg) { struct stream *stream = arg; @@ -389,7 +437,7 @@ static DWORD WINAPI stream_read_thread(void *arg)
static void stream_flush_samples(struct stream *stream) { - struct sample *sample; + struct sample *sample, *next;
if ((sample = stream->next_sample)) { @@ -397,6 +445,13 @@ static void stream_flush_samples(struct stream *stream) INSSBuffer_Release(sample->buffer); free(sample); } + + LIST_FOR_EACH_ENTRY_SAFE(sample, next, &stream->deliver_samples, struct sample, entry) + { + list_remove(&sample->entry); + INSSBuffer_Release(sample->buffer); + free(sample); + } }
static void stream_close(struct stream *stream) @@ -409,6 +464,14 @@ static void stream_close(struct stream *stream) stream->read_thread = NULL; }
+ if (stream->deliver_thread) + { + WakeConditionVariable(&stream->deliver_cv); + WaitForSingleObject(stream->deliver_thread, INFINITE); + CloseHandle(stream->deliver_thread); + stream->deliver_thread = NULL; + } + stream_flush_samples(stream); }
@@ -416,10 +479,17 @@ static HRESULT stream_open(struct stream *stream, struct async_reader *reader, W { stream->number = number; stream->reader = reader; + list_init(&stream->deliver_samples);
if (!(stream->read_thread = CreateThread(NULL, 0, stream_read_thread, stream, 0, NULL))) return E_OUTOFMEMORY;
+ if (!(stream->deliver_thread = CreateThread(NULL, 0, stream_deliver_thread, stream, 0, NULL))) + { + stream_close(stream); + return E_OUTOFMEMORY; + } + return S_OK; }
@@ -428,12 +498,15 @@ static HRESULT async_reader_get_next_sample(struct async_reader *reader, { struct sample *sample, *first_sample = NULL; struct stream *stream, *first_stream = NULL; + BOOL pending = FALSE; DWORD i;
for (i = 0; i < reader->stream_count; ++i) { stream = reader->streams + i;
+ if (!list_empty(&stream->deliver_samples)) + pending = TRUE; if (!(sample = stream->next_sample)) { /* stream has a pending read request, wait for it */ @@ -450,7 +523,7 @@ static HRESULT async_reader_get_next_sample(struct async_reader *reader, }
if (!first_sample) - return NS_E_NO_MORE_SAMPLES; + return pending ? E_PENDING : NS_E_NO_MORE_SAMPLES;
TRACE("Found first stream %u with pts %I64d.\n", first_stream->number, first_sample->pts);
@@ -481,7 +554,12 @@ static void async_reader_deliver_samples(struct async_reader *reader) stream_request_read(stream);
if (async_reader_wait_pts(reader, sample->pts)) - async_reader_deliver_sample(reader, sample); + { + if (!stream->dedicated_delivery_thread) + async_reader_deliver_sample(reader, sample); + else + stream_request_deliver(reader, sample); + } }
if (hr == NS_E_NO_MORE_SAMPLES) @@ -1276,9 +1354,37 @@ static HRESULT WINAPI WMReaderAdvanced2_GetOutputSetting(IWMReaderAdvanced6 *ifa static HRESULT WINAPI WMReaderAdvanced2_SetOutputSetting(IWMReaderAdvanced6 *iface, DWORD output_num, const WCHAR *name, WMT_ATTR_DATATYPE type, const BYTE *value, WORD length) { - struct async_reader *This = impl_from_IWMReaderAdvanced6(iface); - FIXME("(%p)->(%lu %s %#x %p %u)\n", This, output_num, debugstr_w(name), type, value, length); - return E_NOTIMPL; + struct async_reader *reader = impl_from_IWMReaderAdvanced6(iface); + struct stream *stream; + HRESULT hr = E_NOTIMPL; + + TRACE("reader %p, output_num %lu, name %s, type %u, value %p, length %u semi-stub!\n", + reader, output_num, debugstr_w(name), type, value, length); + + EnterCriticalSection(&reader->cs); + + if (!reader->streams) + { + LeaveCriticalSection(&reader->cs); + return E_UNEXPECTED; + } + + stream = reader->streams + output_num; + + EnterCriticalSection(&reader->callback_cs); + + if (!wcscmp(name, L"DedicatedDeliveryThread")) + { + stream->dedicated_delivery_thread = *(BOOL *)value; + hr = S_OK; + } + else FIXME("Setting %s not implemented!\n", debugstr_w(name)); + + LeaveCriticalSection(&reader->callback_cs); + + LeaveCriticalSection(&reader->cs); + + return hr; }
static HRESULT WINAPI WMReaderAdvanced2_Preroll(IWMReaderAdvanced6 *iface, QWORD start, QWORD duration, float rate) diff --git a/dlls/wmvcore/tests/wmvcore.c b/dlls/wmvcore/tests/wmvcore.c index 4d8ca1afe2e..ebcda39e298 100644 --- a/dlls/wmvcore/tests/wmvcore.c +++ b/dlls/wmvcore/tests/wmvcore.c @@ -1991,10 +1991,7 @@ static HRESULT WINAPI callback_OnSample(IWMReaderCallback *iface, DWORD output, GetTickCount(), GetCurrentThreadId(), output, time, duration, flags);
if (callback->dedicated_threads) - { - todo_wine ok(callback->callback_tid != GetCurrentThreadId(), "got wrong thread\n"); - } else ok(callback->callback_tid == GetCurrentThreadId(), "got wrong thread\n");
@@ -2004,10 +2001,7 @@ static HRESULT WINAPI callback_OnSample(IWMReaderCallback *iface, DWORD output, ok(callback->output_tid[output] == GetCurrentThreadId(), "got wrong thread\n");
if (callback->dedicated_threads && callback->output_tid[1 - output]) - { - todo_wine ok(callback->output_tid[1 - output] != GetCurrentThreadId(), "got wrong thread\n"); - }
ok(context == (void *)callback->expect_context, "Got unexpected context %p.\n", context);
@@ -2063,10 +2057,7 @@ static HRESULT WINAPI callback_advanced_OnStreamSample(IWMReaderCallbackAdvanced GetTickCount(), GetCurrentThreadId(), stream_number, pts, duration, flags);
if (callback->dedicated_threads) - { - todo_wine ok(callback->callback_tid != GetCurrentThreadId(), "got wrong thread\n"); - } else { ok(callback->callback_tid == GetCurrentThreadId(), "got wrong thread\n"); @@ -2080,10 +2071,7 @@ static HRESULT WINAPI callback_advanced_OnStreamSample(IWMReaderCallbackAdvanced ok(callback->output_tid[stream_number - 1] == GetCurrentThreadId(), "got wrong thread\n");
if (callback->dedicated_threads && callback->output_tid[2 - stream_number]) - { - todo_wine ok(callback->output_tid[2 - stream_number] != GetCurrentThreadId(), "got wrong thread\n"); - }
ok(context == (void *)callback->expect_context, "Got unexpected context %p.\n", context);
@@ -2434,7 +2422,7 @@ static void check_async_set_output_setting(IWMReaderAdvanced2 *reader, DWORD out size = sizeof(DWORD);
hr = IWMReaderAdvanced2_SetOutputSetting(reader, output, name, type, (BYTE *)&value, size); - todo_wine + todo_wine_if(wcscmp(name, L"DedicatedDeliveryThread")) ok(hr == expect_hr, "Got hr %#lx.\n", hr);
winetest_pop_context(); @@ -3639,8 +3627,6 @@ START_TEST(wmvcore) if(hr != S_OK) return;
- winetest_mute_threshold = 3; /* FIXME: thread tests print too many "got wrong thread" todos */ - test_wmreader_interfaces(); test_wmsyncreader_interfaces(); test_wmwriter_interfaces();
From: Rémi Bernon rbernon@codeweavers.com
--- dlls/wmvcore/async_reader.c | 10 ++++++++++ 1 file changed, 10 insertions(+)
diff --git a/dlls/wmvcore/async_reader.c b/dlls/wmvcore/async_reader.c index 8049288e94b..c42b20a3cc2 100644 --- a/dlls/wmvcore/async_reader.c +++ b/dlls/wmvcore/async_reader.c @@ -346,9 +346,13 @@ static DWORD WINAPI stream_deliver_thread(void *arg) struct stream *stream = arg; struct async_reader *reader = stream->reader; struct list *entry; + WCHAR buffer[256];
TRACE("reader %p, number %u\n", reader, stream->number);
+ swprintf(buffer, ARRAY_SIZE(buffer), L"wine_wmreader_stream_%u_deliver", stream->number); + SetThreadDescription(GetCurrentThread(), buffer); + EnterCriticalSection(&reader->callback_cs);
while (reader->running) @@ -380,10 +384,14 @@ static DWORD WINAPI stream_read_thread(void *arg) struct stream *stream = arg; struct async_reader *reader = stream->reader; struct sample *sample; + WCHAR buffer[256]; HRESULT hr;
TRACE("reader %p, number %u\n", reader, stream->number);
+ swprintf(buffer, ARRAY_SIZE(buffer), L"wine_wmreader_stream_%u_read", stream->number); + SetThreadDescription(GetCurrentThread(), buffer); + EnterCriticalSection(&reader->callback_cs);
while (reader->running) @@ -603,6 +611,8 @@ static DWORD WINAPI async_reader_callback_thread(void *arg) HRESULT hr = S_OK; DWORD i;
+ SetThreadDescription(GetCurrentThread(), L"wine_wmreader_callback"); + IWMReaderCallback_OnStatus(reader->callback, WMT_OPENED, S_OK, WMT_TYPE_DWORD, (BYTE *)&zero, reader->context);
Hi,
It looks like your patch introduced the new failures shown below. Please investigate and fix them before resubmitting your patch. If they are not new, fixing them anyway would help a lot. Otherwise please ask for the known failures list to be updated.
The tests also ran into some preexisting test failures. If you know how to fix them that would be helpful. See the TestBot job for the details:
The full results can be found at: https://testbot.winehq.org/JobDetails.pl?Key=126137
Your paranoid android.
=== debian11 (32 bit report) ===
crypt32: cert.c:4191: Test failed: success cert.c:4192: Test failed: got 00000000 cert.c:4193: Test failed: got 00000000
qasf: asfreader.c:943: Test failed: blocking 0: Wait timed out. asfreader.c:955: Test failed: blocking 0: got wrong thread asfreader: Timeout
=== debian11 (32 bit de report) ===
wmvcore: wmvcore.c:2981: Test failed: Wait timed out.