From: R��mi Bernon rbernon@codeweavers.com
--- dlls/winegstreamer/wm_asyncreader.c | 248 +++++++++++++++++++--------- dlls/wmvcore/tests/wmvcore.c | 40 ++--- 2 files changed, 178 insertions(+), 110 deletions(-)
diff --git a/dlls/winegstreamer/wm_asyncreader.c b/dlls/winegstreamer/wm_asyncreader.c index a4dc077d341..be9f881b9c2 100644 --- a/dlls/winegstreamer/wm_asyncreader.c +++ b/dlls/winegstreamer/wm_asyncreader.c @@ -18,8 +18,22 @@
#include "gst_private.h"
+#include "wine/list.h" + WINE_DEFAULT_DEBUG_CHANNEL(wmvcore);
+struct async_op +{ + enum async_op_type + { + ASYNC_OP_START, + ASYNC_OP_STOP, + ASYNC_OP_CLOSE, + } type; + void *new_context; + struct list entry; +}; + struct async_reader { struct wm_reader reader; @@ -41,6 +55,7 @@ struct async_reader CONDITION_VARIABLE stream_cv;
bool running; + struct list async_ops;
bool user_clock; QWORD user_time; @@ -54,20 +69,9 @@ static REFERENCE_TIME get_current_time(const struct async_reader *reader) return (time.QuadPart * 1000) / reader->clock_frequency.QuadPart * 10000; }
-static void open_stream(struct async_reader *reader, IWMReaderCallback *callback, void *context) -{ - static const DWORD zero; - - IWMReaderCallback_AddRef(reader->callback = callback); - reader->context = context; - IWMReaderCallback_OnStatus(callback, WMT_OPENED, S_OK, WMT_TYPE_DWORD, (BYTE *)&zero, context); -} - -static DWORD WINAPI stream_thread(void *arg) +static void stream_thread_running(struct async_reader *reader, IWMReaderCallbackAdvanced *callback_advanced) { - struct async_reader *reader = arg; IWMReaderCallback *callback = reader->callback; - IWMReaderCallbackAdvanced *callback_advanced; REFERENCE_TIME start_time; struct wm_stream *stream; static const DWORD zero; @@ -79,14 +83,7 @@ static DWORD WINAPI stream_thread(void *arg)
start_time = get_current_time(reader);
- EnterCriticalSection(&reader->stream_cs); - - if (FAILED(hr = IWMReaderCallback_QueryInterface(callback, - &IID_IWMReaderCallbackAdvanced, (void **)&callback_advanced))) - callback_advanced = NULL; - TRACE("Querying for IWMReaderCallbackAdvanced returned %#lx.\n", hr); - - while (reader->running) + while (reader->running && list_empty(&reader->async_ops)) { LeaveCriticalSection(&reader->stream_cs); hr = wm_reader_get_stream_sample(&reader->reader, callback_advanced, 0, &sample, &pts, &duration, &flags, &stream_number); @@ -107,12 +104,12 @@ static DWORD WINAPI stream_thread(void *arg) EnterCriticalSection(&reader->stream_cs); }
- while (pts > reader->user_time && reader->running) + while (pts > reader->user_time && reader->running && list_empty(&reader->async_ops)) SleepConditionVariableCS(&reader->stream_cv, &reader->stream_cs, INFINITE); } else { - while (reader->running) + while (reader->running && list_empty(&reader->async_ops)) { REFERENCE_TIME current_time = get_current_time(reader);
@@ -124,7 +121,7 @@ static DWORD WINAPI stream_thread(void *arg) } }
- if (reader->running) + if (reader->running && list_empty(&reader->async_ops)) { LeaveCriticalSection(&reader->stream_cs); if (stream->read_compressed) @@ -141,52 +138,161 @@ static DWORD WINAPI stream_thread(void *arg) INSSBuffer_Release(sample); }
- LeaveCriticalSection(&reader->stream_cs); - if (hr == NS_E_NO_MORE_SAMPLES) { + BOOL user_clock = reader->user_clock; + QWORD user_time = reader->user_time; + + LeaveCriticalSection(&reader->stream_cs); + IWMReaderCallback_OnStatus(callback, WMT_END_OF_STREAMING, S_OK, WMT_TYPE_DWORD, (BYTE *)&zero, reader->context); IWMReaderCallback_OnStatus(callback, WMT_EOF, S_OK, WMT_TYPE_DWORD, (BYTE *)&zero, reader->context);
- if (reader->user_clock && callback_advanced) + if (user_clock && callback_advanced) { /* We can only get here if user_time is greater than the PTS * of all samples, in which case we cannot have sent this * notification already. */ IWMReaderCallbackAdvanced_OnTime(callback_advanced, - reader->user_time, reader->context); + user_time, reader->context); }
+ EnterCriticalSection(&reader->stream_cs); + TRACE("Reached end of stream; exiting.\n"); } else if (hr != S_OK) { ERR("Failed to get sample, hr %#lx.\n", hr); } +} + +static DWORD WINAPI stream_thread(void *arg) +{ + struct async_reader *reader = arg; + IWMReaderCallback *callback = reader->callback; + IWMReaderCallbackAdvanced *callback_advanced; + static const DWORD zero; + struct list *entry; + HRESULT hr = S_OK; + + if (FAILED(hr = IWMReaderCallback_QueryInterface(callback, + &IID_IWMReaderCallbackAdvanced, (void **)&callback_advanced))) + callback_advanced = NULL; + TRACE("Querying for IWMReaderCallbackAdvanced returned %#lx.\n", hr); + + IWMReaderCallback_OnStatus(reader->callback, WMT_OPENED, S_OK, + WMT_TYPE_DWORD, (BYTE *)&zero, reader->context); + + EnterCriticalSection(&reader->stream_cs); + + while (reader->running) + { + if ((entry = list_head(&reader->async_ops))) + { + struct async_op *op = LIST_ENTRY(entry, struct async_op, entry); + list_remove(&op->entry); + + if (op->new_context) + reader->context = op->new_context; + hr = list_empty(&reader->async_ops) ? S_OK : E_ABORT; + switch (op->type) + { + case ASYNC_OP_START: + { + LeaveCriticalSection(&reader->stream_cs); + IWMReaderCallback_OnStatus(reader->callback, WMT_STARTED, hr, + WMT_TYPE_DWORD, (BYTE *)&zero, reader->context); + EnterCriticalSection(&reader->stream_cs); + + if (SUCCEEDED(hr)) + stream_thread_running(reader, callback_advanced); + break; + } + + case ASYNC_OP_STOP: + LeaveCriticalSection(&reader->stream_cs); + IWMReaderCallback_OnStatus(reader->callback, WMT_STOPPED, hr, + WMT_TYPE_DWORD, (BYTE *)&zero, reader->context); + EnterCriticalSection(&reader->stream_cs); + break; + + case ASYNC_OP_CLOSE: + LeaveCriticalSection(&reader->stream_cs); + IWMReaderCallback_OnStatus(reader->callback, WMT_CLOSED, hr, + WMT_TYPE_DWORD, (BYTE *)&zero, reader->context); + EnterCriticalSection(&reader->stream_cs); + + if (SUCCEEDED(hr)) + reader->running = false; + break; + } + + free(op); + } + + if (reader->running && list_empty(&reader->async_ops)) + SleepConditionVariableCS(&reader->stream_cv, &reader->stream_cs, INFINITE); + }
if (callback_advanced) IWMReaderCallbackAdvanced_Release(callback_advanced);
+ LeaveCriticalSection(&reader->stream_cs); + TRACE("Reader is stopping; exiting.\n"); return 0; }
-static void stop_streaming(struct async_reader *reader) +static HRESULT open_stream(struct async_reader *reader, IWMReaderCallback *callback, void *context) { - if (reader->stream_thread) + IWMReaderCallback_AddRef((reader->callback = callback)); + reader->context = context; + + reader->running = true; + if (!(reader->stream_thread = CreateThread(NULL, 0, stream_thread, reader, 0, NULL))) { - EnterCriticalSection(&reader->stream_cs); + IWMReaderCallback_Release(reader->callback); reader->running = false; - LeaveCriticalSection(&reader->stream_cs); - WakeConditionVariable(&reader->stream_cv); + reader->callback = NULL; + reader->context = NULL; + return E_OUTOFMEMORY; + } + + return S_OK; +} + +static void close_stream(struct async_reader *reader) +{ + if (reader->stream_thread) + { WaitForSingleObject(reader->stream_thread, INFINITE); CloseHandle(reader->stream_thread); reader->stream_thread = NULL; } }
+static HRESULT async_reader_queue_op(struct async_reader *reader, enum async_op_type type, void *context) +{ + struct async_op *op; + + EnterCriticalSection(&reader->stream_cs); + + if (!(op = calloc(1, sizeof(*op)))) + return E_OUTOFMEMORY; + op->type = type; + op->new_context = context; + + list_add_tail(&reader->async_ops, &op->entry); + + LeaveCriticalSection(&reader->stream_cs); + WakeConditionVariable(&reader->stream_cv); + + return S_OK; +} + static struct async_reader *impl_from_IWMReader(IWMReader *iface) { return CONTAINING_RECORD(iface, struct async_reader, IWMReader_iface); @@ -231,8 +337,9 @@ static HRESULT WINAPI WMReader_Open(IWMReader *iface, const WCHAR *url, return E_UNEXPECTED; }
- if (SUCCEEDED(hr = wm_reader_open_file(&reader->reader, url))) - open_stream(reader, callback, context); + if (SUCCEEDED(hr = wm_reader_open_file(&reader->reader, url)) + && FAILED(hr = open_stream(reader, callback, context))) + wm_reader_close(&reader->reader);
LeaveCriticalSection(&reader->reader.cs); return hr; @@ -241,22 +348,18 @@ static HRESULT WINAPI WMReader_Open(IWMReader *iface, const WCHAR *url, static HRESULT WINAPI WMReader_Close(IWMReader *iface) { struct async_reader *reader = impl_from_IWMReader(iface); - static const DWORD zero; HRESULT hr;
TRACE("reader %p.\n", reader);
EnterCriticalSection(&reader->reader.cs);
- stop_streaming(reader); + async_reader_queue_op(reader, ASYNC_OP_CLOSE, NULL); + close_stream(reader);
hr = wm_reader_close(&reader->reader); if (reader->callback) - { - IWMReaderCallback_OnStatus(reader->callback, WMT_CLOSED, S_OK, - WMT_TYPE_DWORD, (BYTE *)&zero, reader->context); IWMReaderCallback_Release(reader->callback); - } reader->callback = NULL;
LeaveCriticalSection(&reader->reader.cs); @@ -317,7 +420,7 @@ static HRESULT WINAPI WMReader_Start(IWMReader *iface, QWORD start, QWORD duration, float rate, void *context) { struct async_reader *reader = impl_from_IWMReader(iface); - static const DWORD zero; + HRESULT hr;
TRACE("reader %p, start %s, duration %s, rate %.8e, context %p.\n", reader, debugstr_time(start), debugstr_time(duration), rate, context); @@ -327,56 +430,36 @@ static HRESULT WINAPI WMReader_Start(IWMReader *iface,
EnterCriticalSection(&reader->reader.cs);
- if (!reader->reader.wg_parser) - { - LeaveCriticalSection(&reader->reader.cs); - WARN("No stream is open; returning NS_E_INVALID_REQUEST.\n"); - return NS_E_INVALID_REQUEST; - } - - stop_streaming(reader); - - IWMReaderCallback_OnStatus(reader->callback, WMT_STARTED, S_OK, WMT_TYPE_DWORD, (BYTE *)&zero, context); - reader->context = context; - - wm_reader_seek(&reader->reader, start, duration); - - reader->running = true; - reader->user_time = 0; - - if (!(reader->stream_thread = CreateThread(NULL, 0, stream_thread, reader, 0, NULL))) + if (!reader->stream_thread) + hr = NS_E_INVALID_REQUEST; + else { - LeaveCriticalSection(&reader->reader.cs); - return E_OUTOFMEMORY; + wm_reader_seek(&reader->reader, start, duration); + hr = async_reader_queue_op(reader, ASYNC_OP_START, context); }
LeaveCriticalSection(&reader->reader.cs); - WakeConditionVariable(&reader->stream_cv);
- return S_OK; + return hr; }
static HRESULT WINAPI WMReader_Stop(IWMReader *iface) { struct async_reader *reader = impl_from_IWMReader(iface); - static const DWORD zero; + HRESULT hr;
TRACE("reader %p.\n", reader);
EnterCriticalSection(&reader->reader.cs);
- if (!reader->reader.wg_parser) - { - LeaveCriticalSection(&reader->reader.cs); - WARN("No stream is open; returning E_UNEXPECTED.\n"); - return E_UNEXPECTED; - } + if (!reader->stream_thread) + hr = E_UNEXPECTED; + else + hr = async_reader_queue_op(reader, ASYNC_OP_STOP, NULL);
- stop_streaming(reader); - IWMReaderCallback_OnStatus(reader->callback, WMT_STOPPED, S_OK, - WMT_TYPE_DWORD, (BYTE *)&zero, reader->context); LeaveCriticalSection(&reader->reader.cs); - return S_OK; + + return hr; }
static HRESULT WINAPI WMReader_Pause(IWMReader *iface) @@ -734,8 +817,9 @@ static HRESULT WINAPI WMReaderAdvanced2_OpenStream(IWMReaderAdvanced6 *iface, return E_UNEXPECTED; }
- if (SUCCEEDED(hr = wm_reader_open_stream(&reader->reader, stream))) - open_stream(reader, callback, context); + if (SUCCEEDED(hr = wm_reader_open_stream(&reader->reader, stream)) + && FAILED(hr = open_stream(reader, callback, context))) + wm_reader_close(&reader->reader);
LeaveCriticalSection(&reader->reader.cs); return hr; @@ -1590,11 +1674,12 @@ static void async_reader_destroy(struct wm_reader *iface)
TRACE("reader %p.\n", reader);
- if (reader->stream_thread) - { - WaitForSingleObject(reader->stream_thread, INFINITE); - CloseHandle(reader->stream_thread); - } + EnterCriticalSection(&reader->stream_cs); + reader->running = false; + LeaveCriticalSection(&reader->stream_cs); + WakeConditionVariable(&reader->stream_cv); + + close_stream(reader);
reader->stream_cs.DebugInfo->Spare[0] = 0; DeleteCriticalSection(&reader->stream_cs); @@ -1637,6 +1722,7 @@ HRESULT WINAPI winegstreamer_create_wm_async_reader(IWMReader **reader) object->stream_cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": async_reader.stream_cs");
QueryPerformanceFrequency(&object->clock_frequency); + list_init(&object->async_ops);
TRACE("Created async reader %p.\n", object); *reader = (IWMReader *)&object->IWMReader_iface; diff --git a/dlls/wmvcore/tests/wmvcore.c b/dlls/wmvcore/tests/wmvcore.c index ee2e685bc0a..fa58caeae35 100644 --- a/dlls/wmvcore/tests/wmvcore.c +++ b/dlls/wmvcore/tests/wmvcore.c @@ -1496,7 +1496,6 @@ static HRESULT WINAPI callback_OnStatus(IWMReaderCallback *iface, WMT_STATUS sta ok(!*(DWORD *)value, "Got value %#lx.\n", *(DWORD *)value); ok(context == (void *)0xdeadbeef, "Got unexpected context %p.\n", context); ret = WaitForSingleObject(callback->expect_opened, 100); - todo_wine ok(!ret, "Wait timed out.\n"); SetEvent(callback->got_opened); break; @@ -1506,7 +1505,6 @@ static HRESULT WINAPI callback_OnStatus(IWMReaderCallback *iface, WMT_STATUS sta ok(!*(DWORD *)value, "Got value %#lx.\n", *(DWORD *)value); ok(context == (void *)0xfacade, "Got unexpected context %p.\n", context); ret = WaitForSingleObject(callback->expect_started, 100); - todo_wine ok(!ret, "Wait timed out.\n"); callback->end_of_streaming_count = callback->eof_count = callback->sample_count = 0; ++callback->started_count; @@ -1519,7 +1517,6 @@ static HRESULT WINAPI callback_OnStatus(IWMReaderCallback *iface, WMT_STATUS sta ok(!*(DWORD *)value, "Got value %#lx.\n", *(DWORD *)value); ok(context == (void *)0xfacade, "Got unexpected context %p.\n", context); ret = WaitForSingleObject(callback->expect_stopped, 100); - todo_wine ok(!ret, "Wait timed out.\n"); SetEvent(callback->got_stopped); break; @@ -1698,7 +1695,6 @@ static HRESULT WINAPI callback_advanced_OnTime(IWMReaderCallbackAdvanced *iface, trace("%lu: %04lx: IWMReaderCallbackAdvanced::OnTime(time %I64u)\n", GetTickCount(), GetCurrentThreadId(), time);
- todo_wine_if(time == 0 && callback->expect_ontime) ok(time == callback->expect_ontime, "Got time %I64u.\n", time); ok(context == (void *)0xfacade, "Got unexpected context %p.\n", context); SetEvent(callback->ontime_event); @@ -1888,14 +1884,10 @@ static void wait_opened_callback_(int line, struct callback *callback) DWORD ret;
ret = WaitForSingleObject(callback->got_opened, 0); - todo_wine ok_(__FILE__, line)(ret == WAIT_TIMEOUT, "Got unexpected WMT_OPENED.\n"); - if (ret == WAIT_TIMEOUT) - { - SetEvent(callback->expect_opened); - ret = WaitForSingleObject(callback->got_opened, 1000); - ok_(__FILE__, line)(!ret, "Wait timed out.\n"); - } + SetEvent(callback->expect_opened); + ret = WaitForSingleObject(callback->got_opened, 1000); + ok_(__FILE__, line)(!ret, "Wait timed out.\n"); }
#define wait_started_callback(a) wait_started_callback_(__LINE__, a) @@ -1904,14 +1896,10 @@ static void wait_started_callback_(int line, struct callback *callback) DWORD ret;
ret = WaitForSingleObject(callback->got_started, 0); - todo_wine ok_(__FILE__, line)(ret == WAIT_TIMEOUT, "Got unexpected WMT_STARTED.\n"); - if (ret == WAIT_TIMEOUT) - { - SetEvent(callback->expect_started); - ret = WaitForSingleObject(callback->got_started, 1000); - ok_(__FILE__, line)(!ret, "Wait timed out.\n"); - } + SetEvent(callback->expect_started); + ret = WaitForSingleObject(callback->got_started, 1000); + ok_(__FILE__, line)(!ret, "Wait timed out.\n"); }
#define wait_stopped_callback(a) wait_stopped_callback_(__LINE__, a) @@ -1920,14 +1908,10 @@ static void wait_stopped_callback_(int line, struct callback *callback) DWORD ret;
ret = WaitForSingleObject(callback->got_stopped, 0); - todo_wine ok_(__FILE__, line)(ret == WAIT_TIMEOUT, "Got unexpected WMT_STOPPED.\n"); - if (ret == WAIT_TIMEOUT) - { - SetEvent(callback->expect_stopped); - ret = WaitForSingleObject(callback->got_stopped, 1000); - ok_(__FILE__, line)(!ret, "Wait timed out.\n"); - } + SetEvent(callback->expect_stopped); + ret = WaitForSingleObject(callback->got_stopped, 1000); + ok_(__FILE__, line)(!ret, "Wait timed out.\n"); }
#define wait_eof_callback(a) wait_eof_callback_(__LINE__, a) @@ -1967,8 +1951,7 @@ static void run_async_reader(IWMReader *reader, IWMReaderAdvanced2 *advanced, st wait_started_callback(callback); }
- if (callback->all_streams_off || strcmp(winetest_platform, "wine")) - wait_started_callback(callback); + wait_started_callback(callback);
hr = IWMReaderAdvanced2_SetUserProvidedClock(advanced, TRUE); ok(hr == S_OK, "Got hr %#lx.\n", hr); @@ -1982,8 +1965,7 @@ static void run_async_reader(IWMReader *reader, IWMReaderAdvanced2 *advanced, st hr = IWMReader_Stop(reader); ok(hr == S_OK, "Got hr %#lx.\n", hr); wait_stopped_callback(callback); - if (strcmp(winetest_platform, "wine")) - wait_stopped_callback(callback); + wait_stopped_callback(callback);
ok(!outstanding_buffers, "Got %ld outstanding buffers.\n", outstanding_buffers); }