From: Rémi Bernon rbernon@codeweavers.com
--- dlls/winegstreamer/Makefile.in | 1 - dlls/winegstreamer/gst_private.h | 2 - dlls/winegstreamer/winegstreamer.spec | 1 - dlls/wmvcore/Makefile.in | 3 +- .../async_reader.c} | 46 +++++++++++++++---- dlls/wmvcore/wmvcore.h | 30 ++++++++++-- dlls/wmvcore/wmvcore_main.c | 27 ++++------- 7 files changed, 74 insertions(+), 36 deletions(-) rename dlls/{winegstreamer/wm_asyncreader.c => wmvcore/async_reader.c} (98%)
diff --git a/dlls/winegstreamer/Makefile.in b/dlls/winegstreamer/Makefile.in index 3daa929b36d..064a8b68343 100644 --- a/dlls/winegstreamer/Makefile.in +++ b/dlls/winegstreamer/Makefile.in @@ -22,7 +22,6 @@ C_SRCS = \ wg_parser.c \ wg_sample.c \ wg_transform.c \ - wm_asyncreader.c \ wm_reader.c \ wma_decoder.c \ wmv_decoder.c diff --git a/dlls/winegstreamer/gst_private.h b/dlls/winegstreamer/gst_private.h index 523e2711edb..ee5e6dae3cf 100644 --- a/dlls/winegstreamer/gst_private.h +++ b/dlls/winegstreamer/gst_private.h @@ -147,6 +147,4 @@ HRESULT aac_decoder_create(REFIID riid, void **ret); HRESULT h264_decoder_create(REFIID riid, void **ret); HRESULT video_processor_create(REFIID riid, void **ret);
-HRESULT WINAPI winegstreamer_create_wm_sync_reader(IUnknown *outer, void **out); - #endif /* __GST_PRIVATE_INCLUDED__ */ diff --git a/dlls/winegstreamer/winegstreamer.spec b/dlls/winegstreamer/winegstreamer.spec index 911689210b7..9804e324044 100644 --- a/dlls/winegstreamer/winegstreamer.spec +++ b/dlls/winegstreamer/winegstreamer.spec @@ -2,5 +2,4 @@ @ stdcall -private DllGetClassObject(ptr ptr ptr) @ stdcall -private DllRegisterServer() @ stdcall -private DllUnregisterServer() -@ stdcall winegstreamer_create_wm_async_reader(ptr) @ stdcall winegstreamer_create_wm_sync_reader(ptr ptr) diff --git a/dlls/wmvcore/Makefile.in b/dlls/wmvcore/Makefile.in index 6aed828abae..0a02fd0c1de 100644 --- a/dlls/wmvcore/Makefile.in +++ b/dlls/wmvcore/Makefile.in @@ -1,12 +1,13 @@ MODULE = wmvcore.dll IMPORTLIB = wmvcore -IMPORTS = kernel32 +IMPORTS = kernel32 combase DELAYIMPORTS = winegstreamer
EXTRADLLFLAGS = -Wb,--prefer-native
C_SRCS = \ wmvcore_main.c \ + async_reader.c \ writer.c
RC_SRCS = version.rc diff --git a/dlls/winegstreamer/wm_asyncreader.c b/dlls/wmvcore/async_reader.c similarity index 98% rename from dlls/winegstreamer/wm_asyncreader.c rename to dlls/wmvcore/async_reader.c index 409ebeae1af..c97b0198f14 100644 --- a/dlls/winegstreamer/wm_asyncreader.c +++ b/dlls/wmvcore/async_reader.c @@ -16,8 +16,22 @@ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA */
-#include "gst_private.h" +#include <stdarg.h> +#include <stddef.h> +#include <stdbool.h>
+#define COBJMACROS +#include "windef.h" +#include "winbase.h" + +#define EXTERN_GUID DEFINE_GUID +#include "initguid.h" +#include "wmvcore.h" + +#include "dshow.h" +#include "wmsdk.h" + +#include "wine/debug.h" #include "wine/list.h"
WINE_DEFAULT_DEBUG_CHANNEL(wmvcore); @@ -669,8 +683,8 @@ static HRESULT WINAPI WMReader_Start(IWMReader *iface, struct async_reader *reader = impl_from_IWMReader(iface); HRESULT hr;
- TRACE("reader %p, start %s, duration %s, rate %.8e, context %p.\n", - reader, debugstr_time(start), debugstr_time(duration), rate, context); + TRACE("reader %p, start %I64d, duration %I64d, rate %.8e, context %p.\n", + reader, start, duration, rate, context);
if (rate != 1.0f) FIXME("Ignoring rate %.8e.\n", rate); @@ -784,7 +798,7 @@ static HRESULT WINAPI WMReaderAdvanced_DeliverTime(IWMReaderAdvanced6 *iface, QW { struct async_reader *reader = impl_from_IWMReaderAdvanced6(iface);
- TRACE("reader %p, time %s.\n", reader, debugstr_time(time)); + TRACE("reader %p, time %I64d.\n", reader, time);
EnterCriticalSection(&reader->callback_cs);
@@ -1844,8 +1858,8 @@ static HRESULT WINAPI refclock_AdviseTime(IReferenceClock *iface, REFERENCE_TIME { struct async_reader *reader = impl_from_IReferenceClock(iface);
- FIXME("reader %p, basetime %s, streamtime %s, event %#Ix, cookie %p, stub!\n", - reader, debugstr_time(basetime), debugstr_time(streamtime), event, cookie); + FIXME("reader %p, basetime %I64d, streamtime %I64d, event %#Ix, cookie %p, stub!\n", + reader, basetime, streamtime, event, cookie);
return E_NOTIMPL; } @@ -1855,8 +1869,8 @@ static HRESULT WINAPI refclock_AdvisePeriodic(IReferenceClock *iface, REFERENCE_ { struct async_reader *reader = impl_from_IReferenceClock(iface);
- FIXME("reader %p, starttime %s, period %s, semaphore %#Ix, cookie %p, stub!\n", - reader, debugstr_time(starttime), debugstr_time(period), semaphore, cookie); + FIXME("reader %p, starttime %I64d, period %I64d, semaphore %#Ix, cookie %p, stub!\n", + reader, starttime, period, semaphore, cookie);
return E_NOTIMPL; } @@ -1881,7 +1895,7 @@ static const IReferenceClockVtbl ReferenceClockVtbl = refclock_Unadvise };
-HRESULT WINAPI winegstreamer_create_wm_async_reader(IWMReader **reader) +static HRESULT WINAPI async_reader_create(IWMReader **reader) { struct async_reader *object; HRESULT hr; @@ -1927,3 +1941,17 @@ failed: free(object); return hr; } + +HRESULT WINAPI WMCreateReader(IUnknown *reserved, DWORD rights, IWMReader **reader) +{ + TRACE("reserved %p, rights %#lx, reader %p.\n", reserved, rights, reader); + + return async_reader_create(reader); +} + +HRESULT WINAPI WMCreateReaderPriv(IWMReader **reader) +{ + TRACE("reader %p.\n", reader); + + return async_reader_create(reader); +} diff --git a/dlls/wmvcore/wmvcore.h b/dlls/wmvcore/wmvcore.h index e17547052de..a5225d79da9 100644 --- a/dlls/wmvcore/wmvcore.h +++ b/dlls/wmvcore/wmvcore.h @@ -17,10 +17,34 @@ */
#include <stdarg.h> +#include <stddef.h>
#define COBJMACROS - -#define EXTERN_GUID DEFINE_GUID - #include "windef.h" #include "winbase.h" +#include "objidl.h" + +#include "wine/debug.h" + +HRESULT WINAPI winegstreamer_create_wm_sync_reader(IUnknown *outer, void **out); + +static inline const char *debugstr_time(LONGLONG time) +{ + ULONGLONG abstime = time >= 0 ? time : -time; + unsigned int i = 0, j = 0; + char buffer[23], rev[23]; + + while (abstime || i <= 8) + { + buffer[i++] = '0' + (abstime % 10); + abstime /= 10; + if (i == 7) buffer[i++] = '.'; + } + if (time < 0) buffer[i++] = '-'; + + while (i--) rev[j++] = buffer[i]; + while (rev[j-1] == '0' && rev[j-2] != '.') --j; + rev[j] = 0; + + return wine_dbg_sprintf("%s", rev); +} diff --git a/dlls/wmvcore/wmvcore_main.c b/dlls/wmvcore/wmvcore_main.c index bae37cd1cff..7d8e24f448b 100644 --- a/dlls/wmvcore/wmvcore_main.c +++ b/dlls/wmvcore/wmvcore_main.c @@ -16,32 +16,21 @@ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA */
-#include "wmvcore.h" +#include <stdarg.h> +#include <stddef.h> + +#define COBJMACROS +#include "windef.h" +#include "winbase.h"
-#include "initguid.h" +#include "wmvcore.h" #include "wmsdk.h" + #include "wine/debug.h" #include "wine/heap.h"
WINE_DEFAULT_DEBUG_CHANNEL(wmvcore);
-HRESULT WINAPI winegstreamer_create_wm_async_reader(IWMReader **reader); -HRESULT WINAPI winegstreamer_create_wm_sync_reader(IUnknown *outer, void **out); - -HRESULT WINAPI WMCreateReader(IUnknown *reserved, DWORD rights, IWMReader **reader) -{ - TRACE("reserved %p, rights %#lx, reader %p.\n", reserved, rights, reader); - - return winegstreamer_create_wm_async_reader(reader); -} - -HRESULT WINAPI WMCreateReaderPriv(IWMReader **reader) -{ - TRACE("reader %p.\n", reader); - - return winegstreamer_create_wm_async_reader(reader); -} - HRESULT WINAPI WMCreateSyncReader(IUnknown *reserved, DWORD rights, IWMSyncReader **reader) { TRACE("reserved %p, rights %#lx, reader %p.\n", reserved, rights, reader);
From: Rémi Bernon rbernon@codeweavers.com
The async reader already guarantees the synchronization and we'll need to call GetNextSample concurrently in each stream thread. --- dlls/winegstreamer/wm_reader.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-)
diff --git a/dlls/winegstreamer/wm_reader.c b/dlls/winegstreamer/wm_reader.c index 8ac852a99b0..ffaece21aef 100644 --- a/dlls/winegstreamer/wm_reader.c +++ b/dlls/winegstreamer/wm_reader.c @@ -1830,7 +1830,8 @@ static HRESULT WINAPI reader_GetNextSample(IWMSyncReader2 *iface, if (!stream_number && !output_number && !ret_stream_number) return E_INVALIDARG;
- EnterCriticalSection(&reader->cs); + if (reader->outer == &reader->IUnknown_inner) + EnterCriticalSection(&reader->cs);
if (!stream_number) stream = NULL; @@ -1858,7 +1859,8 @@ static HRESULT WINAPI reader_GetNextSample(IWMSyncReader2 *iface, if (ret_stream_number && (hr == S_OK || stream_number)) *ret_stream_number = stream_number;
- LeaveCriticalSection(&reader->cs); + if (reader->outer == &reader->IUnknown_inner) + LeaveCriticalSection(&reader->cs); return hr; }
From: Rémi Bernon rbernon@codeweavers.com
--- dlls/wmvcore/async_reader.c | 254 ++++++++++++++++++++++++++++++++--- dlls/wmvcore/tests/wmvcore.c | 4 - 2 files changed, 234 insertions(+), 24 deletions(-)
diff --git a/dlls/wmvcore/async_reader.c b/dlls/wmvcore/async_reader.c index c97b0198f14..cad68ffcb40 100644 --- a/dlls/wmvcore/async_reader.c +++ b/dlls/wmvcore/async_reader.c @@ -60,12 +60,25 @@ struct async_op
struct sample { + struct list entry; INSSBuffer *buffer; QWORD pts, duration; DWORD flags, output; WORD stream; };
+struct stream +{ + struct async_reader *reader; + WORD number; + + HANDLE read_thread; + bool read_requested; + CONDITION_VARIABLE read_cv; + struct list read_samples; + HRESULT read_result; +}; + struct async_reader { IWMReader IWMReader_iface; @@ -79,6 +92,7 @@ struct async_reader LONG refcount;
IWMSyncReader2 *reader; + IWMProfile3 *profile;
CRITICAL_SECTION cs;
@@ -94,6 +108,9 @@ struct async_reader CRITICAL_SECTION callback_cs; CONDITION_VARIABLE callback_cv;
+ DWORD stream_count; + struct stream *streams; + bool running; struct list async_ops;
@@ -293,31 +310,180 @@ static void async_reader_deliver_sample(struct async_reader *reader, struct samp
TRACE("Callback returned %#lx.\n", hr);
+ list_remove(&sample->entry); INSSBuffer_Release(sample->buffer); + free(sample); +} + +static void stream_request_read(struct stream *stream) +{ + stream->read_result = E_PENDING; + stream->read_requested = true; + WakeConditionVariable(&stream->read_cv); +} + +static DWORD WINAPI stream_read_thread(void *arg) +{ + struct stream *stream = arg; + struct async_reader *reader = stream->reader; + struct sample *sample; + HRESULT hr; + + TRACE("reader %p, number %u\n", reader, stream->number); + + EnterCriticalSection(&reader->callback_cs); + + while (reader->running) + { + if (!stream->read_requested) + { + SleepConditionVariableCS(&stream->read_cv, &reader->callback_cs, INFINITE); + continue; + } + + if (!(sample = calloc(1, sizeof(*sample)))) + { + WARN("Failed to allocate memory for sample.\n"); + continue; + } + + while (stream->read_requested) + { + stream->read_requested = false; + + if (sample->buffer) + INSSBuffer_Release(sample->buffer); + sample->buffer = NULL; + + LeaveCriticalSection(&reader->callback_cs); + hr = IWMSyncReader2_GetNextSample(reader->reader, stream->number, + &sample->buffer, &sample->pts, &sample->duration, + &sample->flags, &sample->output, &sample->stream); + EnterCriticalSection(&reader->callback_cs); + } + + if (SUCCEEDED(stream->read_result = hr)) + { + TRACE("Got stream %u buffer with pts %I64d.\n", stream->number, sample->pts); + list_add_tail(&stream->read_samples, &sample->entry); + } + else + { + WARN("Failed to get stream %u sample, hr %#lx.\n", stream->number, stream->read_result); + free(sample); + } + + WakeConditionVariable(&reader->callback_cv); + } + + LeaveCriticalSection(&reader->callback_cs); + + TRACE("Reader is stopping; exiting.\n"); + return 0; +} + +static void stream_flush_samples(struct stream *stream) +{ + struct sample *sample, *next; + + LIST_FOR_EACH_ENTRY_SAFE(sample, next, &stream->read_samples, struct sample, entry) + { + list_remove(&sample->entry); + INSSBuffer_Release(sample->buffer); + free(sample); + } +} + +static void stream_close(struct stream *stream) +{ + if (stream->read_thread) + { + WakeConditionVariable(&stream->read_cv); + WaitForSingleObject(stream->read_thread, INFINITE); + CloseHandle(stream->read_thread); + stream->read_thread = NULL; + } + + stream_flush_samples(stream); +} + +static HRESULT stream_open(struct stream *stream, struct async_reader *reader, WORD number) +{ + if (stream->read_thread) + return S_OK; + + stream->number = number; + stream->reader = reader; + list_init(&stream->read_samples); + + if (!(stream->read_thread = CreateThread(NULL, 0, stream_read_thread, stream, 0, NULL))) + return E_OUTOFMEMORY; + + return S_OK; }
-static void callback_thread_run(struct async_reader *reader) +static HRESULT async_reader_get_next_sample(struct async_reader *reader, + struct stream **out_stream, struct sample **out_sample) { + struct sample *sample, *first_sample = NULL; + struct stream *stream, *first_stream = NULL; + WMT_STREAM_SELECTION selection; + struct list *entry; + DWORD i; + + for (i = 0; i < reader->stream_count; ++i) + { + stream = reader->streams + i; + + if (FAILED(IWMSyncReader2_GetStreamSelected(reader->reader, i + 1, &selection)) + || selection == WMT_OFF) + continue; + if (!(entry = list_head(&stream->read_samples))) + { + if (stream->read_result == E_PENDING) + return E_PENDING; + continue; + } + + sample = LIST_ENTRY(entry, struct sample, entry); + if (!first_sample || first_sample->pts > sample->pts) + { + first_stream = stream; + first_sample = sample; + } + } + + if (!first_sample) + return NS_E_NO_MORE_SAMPLES; + + TRACE("Found first stream %u with pts %I64d.\n", first_stream->number, first_sample->pts); + *out_sample = first_sample; + *out_stream = first_stream; + return S_OK; +} + +static void async_reader_deliver_samples(struct async_reader *reader) +{ + static const DWORD zero; + IWMReaderCallbackAdvanced *callback_advanced = reader->callback_advanced; IWMReaderCallback *callback = reader->callback; - static const DWORD zero; HRESULT hr = S_OK;
+ TRACE("reader %p\n", reader); + while (reader->running && list_empty(&reader->async_ops)) { - struct sample sample; + struct sample *sample; + struct stream *stream;
- LeaveCriticalSection(&reader->callback_cs); - hr = IWMSyncReader2_GetNextSample(reader->reader, 0, &sample.buffer, &sample.pts, - &sample.duration, &sample.flags, &sample.output, &sample.stream); - EnterCriticalSection(&reader->callback_cs); - if (hr != S_OK) + if (FAILED(hr = async_reader_get_next_sample(reader, &stream, &sample))) break;
- if (async_reader_wait_pts(reader, sample.pts)) - async_reader_deliver_sample(reader, &sample); - else - INSSBuffer_Release(sample.buffer); + stream_request_read(stream); + + if (async_reader_wait_pts(reader, sample->pts)) + async_reader_deliver_sample(reader, sample); }
if (hr == NS_E_NO_MORE_SAMPLES) @@ -325,6 +491,8 @@ static void callback_thread_run(struct async_reader *reader) BOOL user_clock = reader->user_clock; QWORD user_time = reader->user_time;
+ TRACE("No more streams samples, sending EOF notifications.\n"); + LeaveCriticalSection(&reader->callback_cs);
IWMReaderCallback_OnStatus(callback, WMT_END_OF_STREAMING, S_OK, @@ -342,25 +510,27 @@ static void callback_thread_run(struct async_reader *reader) }
EnterCriticalSection(&reader->callback_cs); - - TRACE("Reached end of stream; exiting.\n"); } - else if (hr != S_OK) + else if (hr == E_PENDING) { - ERR("Failed to get sample, hr %#lx.\n", hr); + TRACE("Waiting for more streams samples.\n"); } }
static DWORD WINAPI async_reader_callback_thread(void *arg) { - struct async_reader *reader = arg; static const DWORD zero; + + struct async_reader *reader = arg; struct list *entry; HRESULT hr = S_OK; + DWORD i;
IWMReaderCallback_OnStatus(reader->callback, WMT_OPENED, S_OK, WMT_TYPE_DWORD, (BYTE *)&zero, reader->context);
+ TRACE("reader %p\n", reader); + EnterCriticalSection(&reader->callback_cs);
while (reader->running) @@ -379,19 +549,28 @@ static DWORD WINAPI async_reader_callback_thread(void *arg) if (SUCCEEDED(hr)) hr = IWMSyncReader2_SetRange(reader->reader, op->u.start.start, op->u.start.duration); if (SUCCEEDED(hr)) + { reader->clock_start = get_current_time(reader);
+ for (i = 0; i < reader->stream_count; ++i) + { + struct stream *stream = reader->streams + i; + stream_flush_samples(stream); + stream_request_read(stream); + } + } + LeaveCriticalSection(&reader->callback_cs); IWMReaderCallback_OnStatus(reader->callback, WMT_STARTED, hr, WMT_TYPE_DWORD, (BYTE *)&zero, reader->context); EnterCriticalSection(&reader->callback_cs); - - if (SUCCEEDED(hr)) - callback_thread_run(reader); break; }
case ASYNC_OP_STOP: + if (SUCCEEDED(hr)) + reader->clock_start = 0; + LeaveCriticalSection(&reader->callback_cs); IWMReaderCallback_OnStatus(reader->callback, WMT_STOPPED, hr, WMT_TYPE_DWORD, (BYTE *)&zero, reader->context); @@ -412,6 +591,9 @@ static DWORD WINAPI async_reader_callback_thread(void *arg) free(op); }
+ if (reader->clock_start) + async_reader_deliver_samples(reader); + if (reader->running && list_empty(&reader->async_ops)) SleepConditionVariableCS(&reader->callback_cv, &reader->callback_cs, INFINITE); } @@ -425,6 +607,7 @@ static DWORD WINAPI async_reader_callback_thread(void *arg) static void async_reader_close(struct async_reader *reader) { struct async_op *op, *next; + int i;
if (reader->callback_thread) { @@ -439,6 +622,15 @@ static void async_reader_close(struct async_reader *reader) free(op); }
+ for (i = 0; reader->streams && i < reader->stream_count; ++i) + { + struct stream *stream = reader->streams + i; + stream_close(stream); + } + free(reader->streams); + reader->streams = NULL; + reader->stream_count = 0; + if (reader->allocator) IWMReaderAllocatorEx_Release(reader->allocator); reader->allocator = NULL; @@ -456,6 +648,7 @@ static void async_reader_close(struct async_reader *reader) static HRESULT async_reader_open(struct async_reader *reader, IWMReaderCallback *callback, void *context) { HRESULT hr = E_OUTOFMEMORY; + DWORD i;
IWMReaderCallback_AddRef((reader->callback = callback)); reader->context = context; @@ -470,7 +663,24 @@ static HRESULT async_reader_open(struct async_reader *reader, IWMReaderCallback reader->callback_advanced = NULL; }
+ if (FAILED(hr = IWMProfile3_GetStreamCount(reader->profile, &reader->stream_count))) + goto error; + + if (!(reader->streams = calloc(reader->stream_count, sizeof(*reader->streams)))) + { + hr = E_OUTOFMEMORY; + goto error; + } + reader->running = true; + + for (i = 0; i < reader->stream_count; ++i) + { + struct stream *stream = reader->streams + i; + if (FAILED(hr = stream_open(stream, reader, i + 1))) + goto error; + } + if (!(reader->callback_thread = CreateThread(NULL, 0, async_reader_callback_thread, reader, 0, NULL))) goto error;
@@ -1922,6 +2132,10 @@ static HRESULT WINAPI async_reader_create(IWMReader **reader) (void **)&object->reader))) goto failed; IWMReader_Release(&object->IWMReader_iface); + if (FAILED(hr = IUnknown_QueryInterface(object->reader_inner, &IID_IWMProfile3, + (void **)&object->profile))) + goto failed; + IWMReader_Release(&object->IWMReader_iface);
InitializeCriticalSection(&object->cs); object->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": async_reader.cs"); diff --git a/dlls/wmvcore/tests/wmvcore.c b/dlls/wmvcore/tests/wmvcore.c index 401856eb744..4d8ca1afe2e 100644 --- a/dlls/wmvcore/tests/wmvcore.c +++ b/dlls/wmvcore/tests/wmvcore.c @@ -2138,9 +2138,7 @@ static HRESULT WINAPI callback_advanced_AllocateForStream(IWMReaderCallbackAdvan trace("%lu: %04lx: IWMReaderCallbackAdvanced::AllocateForStream(output %u, size %lu)\n", GetTickCount(), GetCurrentThreadId(), stream_number, size);
- todo_wine ok(callback->callback_tid != GetCurrentThreadId(), "got wrong thread\n"); - todo_wine_if(callback->output_tid[stream_number - 1]) ok(callback->output_tid[stream_number - 1] != GetCurrentThreadId(), "got wrong thread\n");
ok(callback->read_compressed, "AllocateForStream() should only be called when reading compressed samples.\n"); @@ -2176,9 +2174,7 @@ static HRESULT WINAPI callback_advanced_AllocateForOutput(IWMReaderCallbackAdvan trace("%lu: %04lx: IWMReaderCallbackAdvanced::AllocateForOutput(output %lu, size %lu)\n", GetTickCount(), GetCurrentThreadId(), output, size);
- todo_wine ok(callback->callback_tid != GetCurrentThreadId(), "got wrong thread\n"); - todo_wine_if(callback->output_tid[output]) ok(callback->output_tid[output] != GetCurrentThreadId(), "got wrong thread\n");
if (!callback->read_compressed)
From: Rémi Bernon rbernon@codeweavers.com
This restores the todo_wine in the allocator callback as we're now allocating samples in the delivering thread as well. We'll need to create new threads for allocation to match native behavior. --- dlls/wmvcore/async_reader.c | 114 +++++++++++++++++++++++++++++++++-- dlls/wmvcore/tests/wmvcore.c | 16 +---- 2 files changed, 109 insertions(+), 21 deletions(-)
diff --git a/dlls/wmvcore/async_reader.c b/dlls/wmvcore/async_reader.c index cad68ffcb40..d608c79b1bc 100644 --- a/dlls/wmvcore/async_reader.c +++ b/dlls/wmvcore/async_reader.c @@ -77,6 +77,11 @@ struct stream CONDITION_VARIABLE read_cv; struct list read_samples; HRESULT read_result; + + bool dedicated_delivery_thread; + HANDLE deliver_thread; + struct list deliver_samples; + CONDITION_VARIABLE deliver_cv; };
struct async_reader @@ -322,6 +327,48 @@ static void stream_request_read(struct stream *stream) WakeConditionVariable(&stream->read_cv); }
+static void stream_request_deliver(struct async_reader *reader, struct sample *sample) +{ + struct stream *stream = reader->streams + sample->output; + + list_remove(&sample->entry); + list_add_tail(&stream->deliver_samples, &sample->entry); + WakeConditionVariable(&stream->deliver_cv); +} + +static DWORD WINAPI stream_deliver_thread(void *arg) +{ + struct stream *stream = arg; + struct async_reader *reader = stream->reader; + struct list *entry; + + TRACE("reader %p, number %u\n", reader, stream->number); + + EnterCriticalSection(&reader->callback_cs); + + while (reader->running) + { + if (list_empty(&stream->deliver_samples)) + { + SleepConditionVariableCS(&stream->deliver_cv, &reader->callback_cs, INFINITE); + continue; + } + + while ((entry = list_head(&stream->deliver_samples))) + { + struct sample *sample = LIST_ENTRY(entry, struct sample, entry); + async_reader_deliver_sample(reader, sample); + } + + WakeConditionVariable(&reader->callback_cv); + } + + LeaveCriticalSection(&reader->callback_cs); + + TRACE("Reader is stopping; exiting.\n"); + return 0; +} + static DWORD WINAPI stream_read_thread(void *arg) { struct stream *stream = arg; @@ -392,6 +439,13 @@ static void stream_flush_samples(struct stream *stream) INSSBuffer_Release(sample->buffer); free(sample); } + + LIST_FOR_EACH_ENTRY_SAFE(sample, next, &stream->deliver_samples, struct sample, entry) + { + list_remove(&sample->entry); + INSSBuffer_Release(sample->buffer); + free(sample); + } }
static void stream_close(struct stream *stream) @@ -404,6 +458,14 @@ static void stream_close(struct stream *stream) stream->read_thread = NULL; }
+ if (stream->deliver_thread) + { + WakeConditionVariable(&stream->deliver_cv); + WaitForSingleObject(stream->deliver_thread, INFINITE); + CloseHandle(stream->deliver_thread); + stream->deliver_thread = NULL; + } + stream_flush_samples(stream); }
@@ -415,10 +477,17 @@ static HRESULT stream_open(struct stream *stream, struct async_reader *reader, W stream->number = number; stream->reader = reader; list_init(&stream->read_samples); + list_init(&stream->deliver_samples);
if (!(stream->read_thread = CreateThread(NULL, 0, stream_read_thread, stream, 0, NULL))) return E_OUTOFMEMORY;
+ if (!(stream->deliver_thread = CreateThread(NULL, 0, stream_deliver_thread, stream, 0, NULL))) + { + stream_close(stream); + return E_OUTOFMEMORY; + } + return S_OK; }
@@ -428,6 +497,7 @@ static HRESULT async_reader_get_next_sample(struct async_reader *reader, struct sample *sample, *first_sample = NULL; struct stream *stream, *first_stream = NULL; WMT_STREAM_SELECTION selection; + BOOL pending = FALSE; struct list *entry; DWORD i;
@@ -435,6 +505,8 @@ static HRESULT async_reader_get_next_sample(struct async_reader *reader, { stream = reader->streams + i;
+ if (!list_empty(&stream->deliver_samples)) + pending = TRUE; if (FAILED(IWMSyncReader2_GetStreamSelected(reader->reader, i + 1, &selection)) || selection == WMT_OFF) continue; @@ -454,7 +526,7 @@ static HRESULT async_reader_get_next_sample(struct async_reader *reader, }
if (!first_sample) - return NS_E_NO_MORE_SAMPLES; + return pending ? E_PENDING : NS_E_NO_MORE_SAMPLES;
TRACE("Found first stream %u with pts %I64d.\n", first_stream->number, first_sample->pts); *out_sample = first_sample; @@ -474,8 +546,8 @@ static void async_reader_deliver_samples(struct async_reader *reader)
while (reader->running && list_empty(&reader->async_ops)) { - struct sample *sample; struct stream *stream; + struct sample *sample;
if (FAILED(hr = async_reader_get_next_sample(reader, &stream, &sample))) break; @@ -483,7 +555,12 @@ static void async_reader_deliver_samples(struct async_reader *reader) stream_request_read(stream);
if (async_reader_wait_pts(reader, sample->pts)) - async_reader_deliver_sample(reader, sample); + { + if (!stream->dedicated_delivery_thread) + async_reader_deliver_sample(reader, sample); + else + stream_request_deliver(reader, sample); + } }
if (hr == NS_E_NO_MORE_SAMPLES) @@ -1252,9 +1329,34 @@ static HRESULT WINAPI WMReaderAdvanced2_GetOutputSetting(IWMReaderAdvanced6 *ifa static HRESULT WINAPI WMReaderAdvanced2_SetOutputSetting(IWMReaderAdvanced6 *iface, DWORD output_num, const WCHAR *name, WMT_ATTR_DATATYPE type, const BYTE *value, WORD length) { - struct async_reader *This = impl_from_IWMReaderAdvanced6(iface); - FIXME("(%p)->(%lu %s %#x %p %u)\n", This, output_num, debugstr_w(name), type, value, length); - return E_NOTIMPL; + struct async_reader *reader = impl_from_IWMReaderAdvanced6(iface); + struct stream *stream; + HRESULT hr = E_NOTIMPL; + + FIXME("reader %p, output_num %lu, name %s, type %u, value %p, length %u semi-stub!\n", + reader, output_num, debugstr_w(name), type, value, length); + + EnterCriticalSection(&reader->cs); + + if (!reader->streams) + { + LeaveCriticalSection(&reader->cs); + return E_UNEXPECTED; + } + + stream = reader->streams + output_num; + + EnterCriticalSection(&reader->callback_cs); + if (!wcscmp(name, L"DedicatedDeliveryThread")) + { + stream->dedicated_delivery_thread = *(BOOL *)value; + hr = S_OK; + } + LeaveCriticalSection(&reader->callback_cs); + + LeaveCriticalSection(&reader->cs); + + return hr; }
static HRESULT WINAPI WMReaderAdvanced2_Preroll(IWMReaderAdvanced6 *iface, QWORD start, QWORD duration, float rate) diff --git a/dlls/wmvcore/tests/wmvcore.c b/dlls/wmvcore/tests/wmvcore.c index 4d8ca1afe2e..ebcda39e298 100644 --- a/dlls/wmvcore/tests/wmvcore.c +++ b/dlls/wmvcore/tests/wmvcore.c @@ -1991,10 +1991,7 @@ static HRESULT WINAPI callback_OnSample(IWMReaderCallback *iface, DWORD output, GetTickCount(), GetCurrentThreadId(), output, time, duration, flags);
if (callback->dedicated_threads) - { - todo_wine ok(callback->callback_tid != GetCurrentThreadId(), "got wrong thread\n"); - } else ok(callback->callback_tid == GetCurrentThreadId(), "got wrong thread\n");
@@ -2004,10 +2001,7 @@ static HRESULT WINAPI callback_OnSample(IWMReaderCallback *iface, DWORD output, ok(callback->output_tid[output] == GetCurrentThreadId(), "got wrong thread\n");
if (callback->dedicated_threads && callback->output_tid[1 - output]) - { - todo_wine ok(callback->output_tid[1 - output] != GetCurrentThreadId(), "got wrong thread\n"); - }
ok(context == (void *)callback->expect_context, "Got unexpected context %p.\n", context);
@@ -2063,10 +2057,7 @@ static HRESULT WINAPI callback_advanced_OnStreamSample(IWMReaderCallbackAdvanced GetTickCount(), GetCurrentThreadId(), stream_number, pts, duration, flags);
if (callback->dedicated_threads) - { - todo_wine ok(callback->callback_tid != GetCurrentThreadId(), "got wrong thread\n"); - } else { ok(callback->callback_tid == GetCurrentThreadId(), "got wrong thread\n"); @@ -2080,10 +2071,7 @@ static HRESULT WINAPI callback_advanced_OnStreamSample(IWMReaderCallbackAdvanced ok(callback->output_tid[stream_number - 1] == GetCurrentThreadId(), "got wrong thread\n");
if (callback->dedicated_threads && callback->output_tid[2 - stream_number]) - { - todo_wine ok(callback->output_tid[2 - stream_number] != GetCurrentThreadId(), "got wrong thread\n"); - }
ok(context == (void *)callback->expect_context, "Got unexpected context %p.\n", context);
@@ -2434,7 +2422,7 @@ static void check_async_set_output_setting(IWMReaderAdvanced2 *reader, DWORD out size = sizeof(DWORD);
hr = IWMReaderAdvanced2_SetOutputSetting(reader, output, name, type, (BYTE *)&value, size); - todo_wine + todo_wine_if(wcscmp(name, L"DedicatedDeliveryThread")) ok(hr == expect_hr, "Got hr %#lx.\n", hr);
winetest_pop_context(); @@ -3639,8 +3627,6 @@ START_TEST(wmvcore) if(hr != S_OK) return;
- winetest_mute_threshold = 3; /* FIXME: thread tests print too many "got wrong thread" todos */ - test_wmreader_interfaces(); test_wmsyncreader_interfaces(); test_wmwriter_interfaces();
Hi,
It looks like your patch introduced the new failures shown below. Please investigate and fix them before resubmitting your patch. If they are not new, fixing them anyway would help a lot. Otherwise please ask for the known failures list to be updated.
The tests also ran into some preexisting test failures. If you know how to fix them that would be helpful. See the TestBot job for the details:
The full results can be found at: https://testbot.winehq.org/JobDetails.pl?Key=125936
Your paranoid android.
=== w10pro64_zh_CN (testbot log) ===
WineRunTask.pl:error: The previous 1 run(s) terminated abnormally
=== debian11b (64 bit WoW report) ===
wmvcore: wmvcore.c:1663: Test failed: Stream 0: Format 8: Got hr 0xc00d0041. wmvcore.c:1680: Test failed: Stream 0: Format 8: Media types didn't match.
Zebediah Figura (@zfigura) commented about dlls/wmvcore/async_reader.c:
struct async_reader *reader = impl_from_IWMReader(iface); HRESULT hr;
- TRACE("reader %p, start %s, duration %s, rate %.8e, context %p.\n",
reader, debugstr_time(start), debugstr_time(duration), rate, context);
- TRACE("reader %p, start %I64d, duration %I64d, rate %.8e, context %p.\n",
reader, start, duration, rate, context);
Did you mean to remove the debugstr_time() from these?
Zebediah Figura (@zfigura) commented about dlls/wmvcore/async_reader.c:
*/
-#include "gst_private.h" +#include <stdarg.h> +#include <stddef.h> +#include <stdbool.h>
+#define COBJMACROS +#include "windef.h" +#include "winbase.h"
+#define EXTERN_GUID DEFINE_GUID +#include "initguid.h" +#include "wmvcore.h"
+#include "dshow.h"
I think this can be removed.
Zebediah Figura (@zfigura) commented about dlls/wmvcore/async_reader.c:
- Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
*/
-#include "gst_private.h" +#include <stdarg.h> +#include <stddef.h> +#include <stdbool.h>
+#define COBJMACROS +#include "windef.h" +#include "winbase.h"
+#define EXTERN_GUID DEFINE_GUID +#include "initguid.h" +#include "wmvcore.h"
Can we please call this "wmvcore_private.h"? Mostly because "wmvcore.h" looks too much like a public header name.
Zebediah Figura (@zfigura) commented about dlls/wmvcore/Makefile.in:
MODULE = wmvcore.dll IMPORTLIB = wmvcore -IMPORTS = kernel32 +IMPORTS = kernel32 combase
What's this for?
Zebediah Figura (@zfigura) commented about dlls/wmvcore/async_reader.c:
static void async_reader_close(struct async_reader *reader) { struct async_op *op, *next;
- int i;
This should be DWORD or unsigned int.
Zebediah Figura (@zfigura) commented about dlls/wmvcore/async_reader.c:
}
LeaveCriticalSection(&reader->callback_cs); IWMReaderCallback_OnStatus(reader->callback, WMT_STARTED, hr, WMT_TYPE_DWORD, (BYTE *)&zero, reader->context); EnterCriticalSection(&reader->callback_cs);
if (SUCCEEDED(hr))
callback_thread_run(reader); break; } case ASYNC_OP_STOP:
if (SUCCEEDED(hr))
reader->clock_start = 0;
Can we please use a separate variable for this, instead of overloading clock_start?
Zebediah Figura (@zfigura) commented about dlls/wmvcore/async_reader.c:
if (sample->buffer)
INSSBuffer_Release(sample->buffer);
sample->buffer = NULL;
LeaveCriticalSection(&reader->callback_cs);
hr = IWMSyncReader2_GetNextSample(reader->reader, stream->number,
&sample->buffer, &sample->pts, &sample->duration,
&sample->flags, &sample->output, &sample->stream);
EnterCriticalSection(&reader->callback_cs);
}
if (SUCCEEDED(stream->read_result = hr))
{
TRACE("Got stream %u buffer with pts %I64d.\n", stream->number, sample->pts);
list_add_tail(&stream->read_samples, &sample->entry);
Under normal circumstances we only buffer one sample at a time; why use a list?
Zebediah Figura (@zfigura) commented about dlls/winegstreamer/wm_reader.c:
if (!stream_number && !output_number && !ret_stream_number) return E_INVALIDARG;
- EnterCriticalSection(&reader->cs);
- if (reader->outer == &reader->IUnknown_inner)
EnterCriticalSection(&reader->cs);
How much of a performance difference does it make that GetNextSample() is not serialized?
Zebediah Figura (@zfigura) commented about dlls/wmvcore/async_reader.c:
static HRESULT WINAPI WMReaderAdvanced2_SetOutputSetting(IWMReaderAdvanced6 *iface, DWORD output_num, const WCHAR *name, WMT_ATTR_DATATYPE type, const BYTE *value, WORD length) {
- struct async_reader *This = impl_from_IWMReaderAdvanced6(iface);
- FIXME("(%p)->(%lu %s %#x %p %u)\n", This, output_num, debugstr_w(name), type, value, length);
- return E_NOTIMPL;
- struct async_reader *reader = impl_from_IWMReaderAdvanced6(iface);
- struct stream *stream;
- HRESULT hr = E_NOTIMPL;
- FIXME("reader %p, output_num %lu, name %s, type %u, value %p, length %u semi-stub!\n",
reader, output_num, debugstr_w(name), type, value, length);
This should probably be a TRACE, and we should FIXME if name is something other than DedicatedDeliveryThread.
Zebediah Figura (@zfigura) commented about dlls/wmvcore/async_reader.c:
+{
- if (stream->read_thread)
- {
WakeConditionVariable(&stream->read_cv);
WaitForSingleObject(stream->read_thread, INFINITE);
CloseHandle(stream->read_thread);
stream->read_thread = NULL;
- }
- stream_flush_samples(stream);
+}
+static HRESULT stream_open(struct stream *stream, struct async_reader *reader, WORD number) +{
- if (stream->read_thread)
return S_OK;
We just created the stream, so this can't be true.
Zebediah Figura (@zfigura) commented about dlls/wmvcore/async_reader.c:
+static HRESULT async_reader_get_next_sample(struct async_reader *reader,
struct stream **out_stream, struct sample **out_sample)
{
- struct sample *sample, *first_sample = NULL;
- struct stream *stream, *first_stream = NULL;
- WMT_STREAM_SELECTION selection;
- struct list *entry;
- DWORD i;
- for (i = 0; i < reader->stream_count; ++i)
- {
stream = reader->streams + i;
if (FAILED(IWMSyncReader2_GetStreamSelected(reader->reader, i + 1, &selection))
|| selection == WMT_OFF)
continue;
I notice we're still creating threads for unselected streams, which doesn't seem desirable.
Zebediah Figura (@zfigura) commented about dlls/wmvcore/async_reader.c:
stream->number = number; stream->reader = reader; list_init(&stream->read_samples);
list_init(&stream->deliver_samples);
if (!(stream->read_thread = CreateThread(NULL, 0, stream_read_thread, stream, 0, NULL))) return E_OUTOFMEMORY;
if (!(stream->deliver_thread = CreateThread(NULL, 0, stream_deliver_thread, stream, 0, NULL)))
{
stream_close(stream);
return E_OUTOFMEMORY;
}
Why are we using a separate delivery thread, instead of reusing read_thread?
Zebediah Figura (@zfigura) commented about dlls/wmvcore/async_reader.c:
{ stream = reader->streams + i;
if (!list_empty(&stream->deliver_samples))
pending = TRUE;
Should this be after the check for stream selection?
On Fri Nov 11 00:26:24 2022 +0000, Zebediah Figura wrote:
Should this be after the check for stream selection?
Maybe, though I'll try to avoid running de-selected streams instead.
On Fri Nov 11 00:26:23 2022 +0000, Zebediah Figura wrote:
This should probably be a TRACE, and we should FIXME if name is something other than DedicatedDeliveryThread.
Right.
On Fri Nov 11 00:26:23 2022 +0000, Zebediah Figura wrote:
We just created the stream, so this can't be true.
Indeed.
On Fri Nov 11 00:26:24 2022 +0000, Zebediah Figura wrote:
I notice we're still creating threads for unselected streams, which doesn't seem desirable.
Yeah, I'm a bit afraid of starting / stopping the stream threads dynamically, but I'll try.
On Fri Nov 11 00:26:24 2022 +0000, Zebediah Figura wrote:
Why are we using a separate delivery thread, instead of reusing read_thread?
First because it's how it's supposed to be. Second, because `OnSample` callbacks may block. For instance on preroll, waiting for each stream to get a sample.
We need the read threads to always be running and returning their next sample because we then deliver them in PTS order. If one read thread is blocked while delivering its sample, the callback thread would be waiting for it indefinitely and wouldn't be able to decide which stream is first.
On Fri Nov 11 00:26:23 2022 +0000, Zebediah Figura wrote:
How much of a performance difference does it make that GetNextSample() is not serialized?
It wasn't so much a matter of performance but rather to allow waiting on samples concurrently. `GetNextSample` will call the allocation callbacks, which may block.
I'm not completely sure anymore in which case it was needed, maybe it was some pool size issue. I'll try removing it for now.
On Fri Nov 11 00:26:20 2022 +0000, Zebediah Figura wrote:
What's this for?
No reason, some leftovers when I tried different ways to instantiate the sync reader.
On Fri Nov 11 00:26:20 2022 +0000, Zebediah Figura wrote:
I think this can be removed.
Indeed.
On Fri Nov 11 00:26:19 2022 +0000, Zebediah Figura wrote:
Did you mean to remove the debugstr_time() from these?
I didn't, some leftovers from before I moved the helper.
On Fri Nov 11 00:26:20 2022 +0000, Zebediah Figura wrote:
Can we please call this "wmvcore_private.h"? Mostly because "wmvcore.h" looks too much like a public header name.
Sure.
On Fri Nov 11 00:26:21 2022 +0000, Zebediah Figura wrote:
This should be DWORD or unsigned int.
Ok.
On Fri Nov 11 00:26:22 2022 +0000, Zebediah Figura wrote:
Can we please use a separate variable for this, instead of overloading clock_start?
Sure, I'll rename running too.
On Fri Nov 11 00:26:22 2022 +0000, Zebediah Figura wrote:
Under normal circumstances we only buffer one sample at a time; why use a list?
Seemed not so unnatural and it was going well with the `deliver_samples` list. I can use a single sample probably.
On Fri Nov 11 14:46:02 2022 +0000, Rémi Bernon wrote:
Yeah, I'm a bit afraid of starting / stopping the stream threads dynamically, but I'll try.
Hmm, I never tested selecting streams while running. If that's supposed to work it may be better just to leave it as is.
On Fri Nov 11 14:46:02 2022 +0000, Rémi Bernon wrote:
It wasn't so much a matter of performance but rather to allow waiting on samples concurrently. `GetNextSample` will call the allocation callbacks, which may block. I'm not completely sure anymore in which case it was needed, maybe it was some pool size issue. I'll try removing it for now.
Yeah, if we need to do this to prevent a deadlock, that sounds to me like something we need to solve on the sync reader side.
First because it's how it's supposed to be.
I initially thought that we weren't actually running any visible callbacks on these read threads, but I guess the allocation callbacks are coming from here. Windows really spawns two threads for each stream? Sheesh.
Side note that just occurs to me, we should probably add thread descriptions for any new threads, though I don't mind deferring that.
On Fri Nov 11 14:46:01 2022 +0000, Rémi Bernon wrote:
Maybe, though I'll try to avoid running de-selected streams instead.
I think we'd still need that GetStreamSelected() check, though, given we're looping over all the streams here?