-- v2: winegstreamer: Check for already opened stream in wm_reader_open*. winegstreamer: Use a dedicated CS to serialize async reader commands. winegstreamer: Introduce a new async_reader_deliver_sample helper. winegstreamer: Wake thread when async reader user clock is modified.
From: Rémi Bernon rbernon@codeweavers.com
--- dlls/winegstreamer/wm_asyncreader.c | 1 + 1 file changed, 1 insertion(+)
diff --git a/dlls/winegstreamer/wm_asyncreader.c b/dlls/winegstreamer/wm_asyncreader.c index b4750592778..b882d205220 100644 --- a/dlls/winegstreamer/wm_asyncreader.c +++ b/dlls/winegstreamer/wm_asyncreader.c @@ -560,6 +560,7 @@ static HRESULT WINAPI WMReaderAdvanced_SetUserProvidedClock(IWMReaderAdvanced6 * EnterCriticalSection(&reader->callback_cs); reader->user_clock = !!user_clock; LeaveCriticalSection(&reader->callback_cs); + WakeConditionVariable(&reader->callback_cv); return S_OK; }
From: Rémi Bernon rbernon@codeweavers.com
--- dlls/winegstreamer/wm_asyncreader.c | 67 +++++++++++++++++++---------- 1 file changed, 45 insertions(+), 22 deletions(-)
diff --git a/dlls/winegstreamer/wm_asyncreader.c b/dlls/winegstreamer/wm_asyncreader.c index b882d205220..27195f2e094 100644 --- a/dlls/winegstreamer/wm_asyncreader.c +++ b/dlls/winegstreamer/wm_asyncreader.c @@ -44,6 +44,14 @@ struct async_op struct list entry; };
+struct sample +{ + INSSBuffer *buffer; + QWORD pts, duration; + DWORD flags; + WORD stream; +}; + struct async_reader { struct wm_reader reader; @@ -121,42 +129,57 @@ static bool async_reader_wait_pts(struct async_reader *reader, QWORD pts) return false; }
+static void async_reader_deliver_sample(struct async_reader *reader, struct sample *sample) +{ + IWMReaderCallbackAdvanced *callback_advanced = reader->callback_advanced; + IWMReaderCallback *callback = reader->callback; + struct wm_stream *stream; + BOOL read_compressed; + HRESULT hr; + + TRACE("reader %p, stream %u, pts %s, duration %s, flags %#lx, buffer %p.\n", + reader, sample->stream, debugstr_time(sample->pts), debugstr_time(sample->duration), + sample->flags, sample->buffer); + + stream = wm_reader_get_stream_by_stream_number(&reader->reader, sample->stream); + read_compressed = stream->read_compressed; + + LeaveCriticalSection(&reader->callback_cs); + if (read_compressed) + hr = IWMReaderCallbackAdvanced_OnStreamSample(callback_advanced, sample->stream, + sample->pts, sample->duration, sample->flags, sample->buffer, reader->context); + else + hr = IWMReaderCallback_OnSample(callback, sample->stream - 1, sample->pts, sample->duration, + sample->flags, sample->buffer, reader->context); + EnterCriticalSection(&reader->callback_cs); + + TRACE("Callback returned %#lx.\n", hr); + + INSSBuffer_Release(sample->buffer); +} + static void callback_thread_run(struct async_reader *reader) { IWMReaderCallbackAdvanced *callback_advanced = reader->callback_advanced; IWMReaderCallback *callback = reader->callback; static const DWORD zero; - QWORD pts, duration; - WORD stream_number; - INSSBuffer *sample; HRESULT hr = S_OK; - DWORD flags;
while (reader->running && list_empty(&reader->async_ops)) { + struct sample sample; + LeaveCriticalSection(&reader->callback_cs); - hr = wm_reader_get_stream_sample(&reader->reader, callback_advanced, 0, &sample, &pts, &duration, &flags, &stream_number); + hr = wm_reader_get_stream_sample(&reader->reader, callback_advanced, 0, &sample.buffer, + &sample.pts, &sample.duration, &sample.flags, &sample.stream); EnterCriticalSection(&reader->callback_cs); if (hr != S_OK) break;
- if (async_reader_wait_pts(reader, pts)) - { - struct wm_stream *stream = wm_reader_get_stream_by_stream_number(&reader->reader, stream_number); - - LeaveCriticalSection(&reader->callback_cs); - if (stream->read_compressed) - hr = IWMReaderCallbackAdvanced_OnStreamSample(callback_advanced, - stream_number, pts, duration, flags, sample, reader->context); - else - hr = IWMReaderCallback_OnSample(callback, stream_number - 1, pts, duration, - flags, sample, reader->context); - EnterCriticalSection(&reader->callback_cs); - - TRACE("Callback returned %#lx.\n", hr); - } - - INSSBuffer_Release(sample); + if (async_reader_wait_pts(reader, sample.pts)) + async_reader_deliver_sample(reader, &sample); + else + INSSBuffer_Release(sample.buffer); }
if (hr == NS_E_NO_MORE_SAMPLES)
From: Rémi Bernon rbernon@codeweavers.com
--- dlls/winegstreamer/wm_asyncreader.c | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-)
diff --git a/dlls/winegstreamer/wm_asyncreader.c b/dlls/winegstreamer/wm_asyncreader.c index 27195f2e094..54c601d109a 100644 --- a/dlls/winegstreamer/wm_asyncreader.c +++ b/dlls/winegstreamer/wm_asyncreader.c @@ -64,6 +64,8 @@ struct async_reader IWMReaderTypeNegotiation IWMReaderTypeNegotiation_iface; IReferenceClock IReferenceClock_iface;
+ CRITICAL_SECTION cs; + IWMReaderCallbackAdvanced *callback_advanced; IWMReaderCallback *callback; void *context; @@ -390,11 +392,13 @@ static HRESULT WINAPI WMReader_Open(IWMReader *iface, const WCHAR *url, TRACE("reader %p, url %s, callback %p, context %p.\n", reader, debugstr_w(url), callback, context);
+ EnterCriticalSection(&reader->cs); EnterCriticalSection(&reader->reader.cs);
if (reader->reader.wg_parser) { LeaveCriticalSection(&reader->reader.cs); + LeaveCriticalSection(&reader->cs); WARN("Stream is already open; returning E_UNEXPECTED.\n"); return E_UNEXPECTED; } @@ -404,6 +408,7 @@ static HRESULT WINAPI WMReader_Open(IWMReader *iface, const WCHAR *url, wm_reader_close(&reader->reader);
LeaveCriticalSection(&reader->reader.cs); + LeaveCriticalSection(&reader->cs); return hr; }
@@ -414,7 +419,7 @@ static HRESULT WINAPI WMReader_Close(IWMReader *iface)
TRACE("reader %p.\n", reader);
- EnterCriticalSection(&reader->reader.cs); + EnterCriticalSection(&reader->cs);
if (SUCCEEDED(hr = async_reader_queue_op(reader, ASYNC_OP_CLOSE, NULL))) { @@ -422,7 +427,7 @@ static HRESULT WINAPI WMReader_Close(IWMReader *iface) hr = wm_reader_close(&reader->reader); }
- LeaveCriticalSection(&reader->reader.cs); + LeaveCriticalSection(&reader->cs);
return hr; } @@ -489,14 +494,14 @@ static HRESULT WINAPI WMReader_Start(IWMReader *iface, if (rate != 1.0f) FIXME("Ignoring rate %.8e.\n", rate);
- EnterCriticalSection(&reader->reader.cs); + EnterCriticalSection(&reader->cs);
if (!reader->callback_thread) hr = NS_E_INVALID_REQUEST; else hr = async_reader_queue_op(reader, ASYNC_OP_START, &data);
- LeaveCriticalSection(&reader->reader.cs); + LeaveCriticalSection(&reader->cs);
return hr; } @@ -508,14 +513,14 @@ static HRESULT WINAPI WMReader_Stop(IWMReader *iface)
TRACE("reader %p.\n", reader);
- EnterCriticalSection(&reader->reader.cs); + EnterCriticalSection(&reader->cs);
if (!reader->callback_thread) hr = E_UNEXPECTED; else hr = async_reader_queue_op(reader, ASYNC_OP_STOP, NULL);
- LeaveCriticalSection(&reader->reader.cs); + LeaveCriticalSection(&reader->cs);
return hr; } @@ -867,11 +872,13 @@ static HRESULT WINAPI WMReaderAdvanced2_OpenStream(IWMReaderAdvanced6 *iface,
TRACE("reader %p, stream %p, callback %p, context %p.\n", reader, stream, callback, context);
+ EnterCriticalSection(&reader->cs); EnterCriticalSection(&reader->reader.cs);
if (reader->reader.wg_parser) { LeaveCriticalSection(&reader->reader.cs); + LeaveCriticalSection(&reader->cs); WARN("Stream is already open; returning E_UNEXPECTED.\n"); return E_UNEXPECTED; } @@ -881,6 +888,7 @@ static HRESULT WINAPI WMReaderAdvanced2_OpenStream(IWMReaderAdvanced6 *iface, wm_reader_close(&reader->reader);
LeaveCriticalSection(&reader->reader.cs); + LeaveCriticalSection(&reader->cs); return hr; }
@@ -1742,6 +1750,8 @@ static void async_reader_destroy(struct wm_reader *iface)
reader->callback_cs.DebugInfo->Spare[0] = 0; DeleteCriticalSection(&reader->callback_cs); + reader->cs.DebugInfo->Spare[0] = 0; + DeleteCriticalSection(&reader->cs);
wm_reader_close(&reader->reader);
@@ -1774,6 +1784,8 @@ HRESULT WINAPI winegstreamer_create_wm_async_reader(IWMReader **reader) object->IWMReaderStreamClock_iface.lpVtbl = &WMReaderStreamClockVtbl; object->IWMReaderTypeNegotiation_iface.lpVtbl = &WMReaderTypeNegotiationVtbl;
+ InitializeCriticalSection(&object->cs); + object->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": async_reader.cs"); InitializeCriticalSection(&object->callback_cs); object->callback_cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": async_reader.callback_cs");
From: Rémi Bernon rbernon@codeweavers.com
--- dlls/winegstreamer/wm_asyncreader.c | 20 -------------------- dlls/winegstreamer/wm_reader.c | 15 +++++++++++++++ 2 files changed, 15 insertions(+), 20 deletions(-)
diff --git a/dlls/winegstreamer/wm_asyncreader.c b/dlls/winegstreamer/wm_asyncreader.c index 54c601d109a..5b09bc616a6 100644 --- a/dlls/winegstreamer/wm_asyncreader.c +++ b/dlls/winegstreamer/wm_asyncreader.c @@ -393,21 +393,11 @@ static HRESULT WINAPI WMReader_Open(IWMReader *iface, const WCHAR *url, reader, debugstr_w(url), callback, context);
EnterCriticalSection(&reader->cs); - EnterCriticalSection(&reader->reader.cs); - - if (reader->reader.wg_parser) - { - LeaveCriticalSection(&reader->reader.cs); - LeaveCriticalSection(&reader->cs); - WARN("Stream is already open; returning E_UNEXPECTED.\n"); - return E_UNEXPECTED; - }
if (SUCCEEDED(hr = wm_reader_open_file(&reader->reader, url)) && FAILED(hr = async_reader_open(reader, callback, context))) wm_reader_close(&reader->reader);
- LeaveCriticalSection(&reader->reader.cs); LeaveCriticalSection(&reader->cs); return hr; } @@ -873,21 +863,11 @@ static HRESULT WINAPI WMReaderAdvanced2_OpenStream(IWMReaderAdvanced6 *iface, TRACE("reader %p, stream %p, callback %p, context %p.\n", reader, stream, callback, context);
EnterCriticalSection(&reader->cs); - EnterCriticalSection(&reader->reader.cs); - - if (reader->reader.wg_parser) - { - LeaveCriticalSection(&reader->reader.cs); - LeaveCriticalSection(&reader->cs); - WARN("Stream is already open; returning E_UNEXPECTED.\n"); - return E_UNEXPECTED; - }
if (SUCCEEDED(hr = wm_reader_open_stream(&reader->reader, stream)) && FAILED(hr = async_reader_open(reader, callback, context))) wm_reader_close(&reader->reader);
- LeaveCriticalSection(&reader->reader.cs); LeaveCriticalSection(&reader->cs); return hr; } diff --git a/dlls/winegstreamer/wm_reader.c b/dlls/winegstreamer/wm_reader.c index 156f9d79446..243ebb20335 100644 --- a/dlls/winegstreamer/wm_reader.c +++ b/dlls/winegstreamer/wm_reader.c @@ -1558,6 +1558,13 @@ HRESULT wm_reader_open_stream(struct wm_reader *reader, IStream *stream)
EnterCriticalSection(&reader->cs);
+ if (reader->wg_parser) + { + LeaveCriticalSection(&reader->cs); + WARN("Stream is already open; returning E_UNEXPECTED.\n"); + return E_UNEXPECTED; + } + IStream_AddRef(reader->source_stream = stream); if (FAILED(hr = init_stream(reader, stat.cbSize.QuadPart))) { @@ -1591,6 +1598,14 @@ HRESULT wm_reader_open_file(struct wm_reader *reader, const WCHAR *filename)
EnterCriticalSection(&reader->cs);
+ if (reader->wg_parser) + { + LeaveCriticalSection(&reader->cs); + WARN("Stream is already open; returning E_UNEXPECTED.\n"); + CloseHandle(file); + return E_UNEXPECTED; + } + reader->file = file;
if (FAILED(hr = init_stream(reader, size.QuadPart)))
I find it generally more idiomatic when input-only parameters aren't freed by default; this would also obviate that "else" below.
This will eventually be called from a different place, and samples be reference counted. I think it's best for the helper to take the ownership.
v2: Left the async open aside, I'm not sure why it was useful anymore. It's definitely how native works though and it's possible (though unpredictable) to test it by calling `Open` multiple times in a short sequence.
This merge request was approved by Zebediah Figura.