From: Rémi Bernon rbernon@codeweavers.com
--- dlls/wmvcore/async_reader.c | 281 ++++++++++++++++++++++++++++++++--- dlls/wmvcore/tests/wmvcore.c | 4 - 2 files changed, 260 insertions(+), 25 deletions(-)
diff --git a/dlls/wmvcore/async_reader.c b/dlls/wmvcore/async_reader.c index a51018260b1..39dec5a873c 100644 --- a/dlls/wmvcore/async_reader.c +++ b/dlls/wmvcore/async_reader.c @@ -65,6 +65,18 @@ struct sample WORD stream; };
+struct stream +{ + struct async_reader *reader; + WORD number; + + HANDLE read_thread; + bool read_requested; + CONDITION_VARIABLE read_cv; + struct sample *next_sample; + HRESULT read_result; +}; + struct async_reader { IWMReader IWMReader_iface; @@ -78,6 +90,7 @@ struct async_reader LONG refcount;
IWMSyncReader2 *reader; + IWMProfile3 *profile;
CRITICAL_SECTION cs;
@@ -93,6 +106,9 @@ struct async_reader CRITICAL_SECTION callback_cs; CONDITION_VARIABLE callback_cv;
+ DWORD stream_count; + struct stream *streams; + bool running; struct list async_ops;
@@ -293,30 +309,179 @@ static void async_reader_deliver_sample(struct async_reader *reader, struct samp TRACE("Callback returned %#lx.\n", hr);
INSSBuffer_Release(sample->buffer); + free(sample); +} + +static void stream_request_read(struct stream *stream) +{ + struct async_reader *reader = stream->reader; + WMT_STREAM_SELECTION selection; + + /* stream is not selected, or already has a sample waiting to be delivered */ + if (FAILED(IWMSyncReader2_GetStreamSelected(reader->reader, stream->number, &selection)) + || selection == WMT_OFF || stream->next_sample) + return; + + stream->read_result = E_PENDING; + stream->read_requested = true; + WakeConditionVariable(&stream->read_cv); +} + +static DWORD WINAPI stream_read_thread(void *arg) +{ + struct stream *stream = arg; + struct async_reader *reader = stream->reader; + struct sample *sample; + HRESULT hr; + + TRACE("reader %p, number %u\n", reader, stream->number); + + EnterCriticalSection(&reader->callback_cs); + + while (reader->running) + { + if (!stream->read_requested) + { + SleepConditionVariableCS(&stream->read_cv, &reader->callback_cs, INFINITE); + continue; + } + + if (!(sample = calloc(1, sizeof(*sample)))) + { + WARN("Failed to allocate memory for sample.\n"); + continue; + } + + while (stream->read_requested) + { + stream->read_requested = false; + + if (sample->buffer) + INSSBuffer_Release(sample->buffer); + sample->buffer = NULL; + + LeaveCriticalSection(&reader->callback_cs); + hr = IWMSyncReader2_GetNextSample(reader->reader, stream->number, + &sample->buffer, &sample->pts, &sample->duration, + &sample->flags, &sample->output, &sample->stream); + EnterCriticalSection(&reader->callback_cs); + } + + if (SUCCEEDED(stream->read_result = hr)) + { + TRACE("Got stream %u buffer with pts %I64d.\n", stream->number, sample->pts); + stream->next_sample = sample; + } + else + { + WARN("Failed to get stream %u sample, hr %#lx.\n", stream->number, stream->read_result); + free(sample); + } + + WakeConditionVariable(&reader->callback_cv); + } + + LeaveCriticalSection(&reader->callback_cs); + + TRACE("Reader is stopping; exiting.\n"); + return 0; +} + +static void stream_flush_samples(struct stream *stream) +{ + struct sample *sample; + + if ((sample = stream->next_sample)) + { + stream->next_sample = NULL; + INSSBuffer_Release(sample->buffer); + free(sample); + } +} + +static void stream_close(struct stream *stream) +{ + if (stream->read_thread) + { + WakeConditionVariable(&stream->read_cv); + WaitForSingleObject(stream->read_thread, INFINITE); + CloseHandle(stream->read_thread); + stream->read_thread = NULL; + } + + stream_flush_samples(stream); +} + +static HRESULT stream_open(struct stream *stream, struct async_reader *reader, WORD number) +{ + stream->number = number; + stream->reader = reader; + + if (!(stream->read_thread = CreateThread(NULL, 0, stream_read_thread, stream, 0, NULL))) + return E_OUTOFMEMORY; + + return S_OK; +} + +static HRESULT async_reader_get_next_sample(struct async_reader *reader, + struct stream **out_stream, struct sample **out_sample) +{ + struct sample *sample, *first_sample = NULL; + struct stream *stream, *first_stream = NULL; + DWORD i; + + for (i = 0; i < reader->stream_count; ++i) + { + stream = reader->streams + i; + + if (!(sample = stream->next_sample)) + { + /* stream has a pending read request, wait for it */ + if (stream->read_result == E_PENDING) + return E_PENDING; + continue; + } + + if (!first_sample || first_sample->pts > sample->pts) + { + first_stream = stream; + first_sample = sample; + } + } + + if (!first_sample) + return NS_E_NO_MORE_SAMPLES; + + TRACE("Found first stream %u with pts %I64d.\n", first_stream->number, first_sample->pts); + + first_stream->next_sample = NULL; + *out_sample = first_sample; + *out_stream = first_stream; + return S_OK; }
-static void callback_thread_run(struct async_reader *reader) +static void async_reader_deliver_samples(struct async_reader *reader) { + static const DWORD zero; + IWMReaderCallbackAdvanced *callback_advanced = reader->callback_advanced; IWMReaderCallback *callback = reader->callback; - static const DWORD zero; HRESULT hr = S_OK;
+ TRACE("reader %p\n", reader); + while (reader->running && list_empty(&reader->async_ops)) { - struct sample sample; + struct sample *sample; + struct stream *stream;
- LeaveCriticalSection(&reader->callback_cs); - hr = IWMSyncReader2_GetNextSample(reader->reader, 0, &sample.buffer, &sample.pts, - &sample.duration, &sample.flags, &sample.output, &sample.stream); - EnterCriticalSection(&reader->callback_cs); - if (hr != S_OK) + if (FAILED(hr = async_reader_get_next_sample(reader, &stream, &sample))) break;
- if (async_reader_wait_pts(reader, sample.pts)) - async_reader_deliver_sample(reader, &sample); - else - INSSBuffer_Release(sample.buffer); + stream_request_read(stream); + + if (async_reader_wait_pts(reader, sample->pts)) + async_reader_deliver_sample(reader, sample); }
if (hr == NS_E_NO_MORE_SAMPLES) @@ -324,6 +489,8 @@ static void callback_thread_run(struct async_reader *reader) BOOL user_clock = reader->user_clock; QWORD user_time = reader->user_time;
+ TRACE("No more streams samples, sending EOF notifications.\n"); + LeaveCriticalSection(&reader->callback_cs);
IWMReaderCallback_OnStatus(callback, WMT_END_OF_STREAMING, S_OK, @@ -341,25 +508,28 @@ static void callback_thread_run(struct async_reader *reader) }
EnterCriticalSection(&reader->callback_cs); - - TRACE("Reached end of stream; exiting.\n"); } - else if (hr != S_OK) + else if (hr == E_PENDING) { - ERR("Failed to get sample, hr %#lx.\n", hr); + TRACE("Waiting for more streams samples.\n"); } }
static DWORD WINAPI async_reader_callback_thread(void *arg) { - struct async_reader *reader = arg; static const DWORD zero; + + struct async_reader *reader = arg; + BOOL started = false; struct list *entry; HRESULT hr = S_OK; + DWORD i;
IWMReaderCallback_OnStatus(reader->callback, WMT_OPENED, S_OK, WMT_TYPE_DWORD, (BYTE *)&zero, reader->context);
+ TRACE("reader %p\n", reader); + EnterCriticalSection(&reader->callback_cs);
while (reader->running) @@ -378,19 +548,29 @@ static DWORD WINAPI async_reader_callback_thread(void *arg) if (SUCCEEDED(hr)) hr = IWMSyncReader2_SetRange(reader->reader, op->u.start.start, op->u.start.duration); if (SUCCEEDED(hr)) + { reader->clock_start = get_current_time(reader); + started = true; + + for (i = 0; i < reader->stream_count; ++i) + { + struct stream *stream = reader->streams + i; + stream_flush_samples(stream); + stream_request_read(stream); + } + }
LeaveCriticalSection(&reader->callback_cs); IWMReaderCallback_OnStatus(reader->callback, WMT_STARTED, hr, WMT_TYPE_DWORD, (BYTE *)&zero, reader->context); EnterCriticalSection(&reader->callback_cs); - - if (SUCCEEDED(hr)) - callback_thread_run(reader); break; }
case ASYNC_OP_STOP: + if (SUCCEEDED(hr)) + started = false; + LeaveCriticalSection(&reader->callback_cs); IWMReaderCallback_OnStatus(reader->callback, WMT_STOPPED, hr, WMT_TYPE_DWORD, (BYTE *)&zero, reader->context); @@ -411,6 +591,9 @@ static DWORD WINAPI async_reader_callback_thread(void *arg) free(op); }
+ if (started) + async_reader_deliver_samples(reader); + if (reader->running && list_empty(&reader->async_ops)) SleepConditionVariableCS(&reader->callback_cv, &reader->callback_cs, INFINITE); } @@ -424,6 +607,7 @@ static DWORD WINAPI async_reader_callback_thread(void *arg) static void async_reader_close(struct async_reader *reader) { struct async_op *op, *next; + DWORD i;
if (reader->callback_thread) { @@ -438,6 +622,15 @@ static void async_reader_close(struct async_reader *reader) free(op); }
+ for (i = 0; reader->streams && i < reader->stream_count; ++i) + { + struct stream *stream = reader->streams + i; + stream_close(stream); + } + free(reader->streams); + reader->streams = NULL; + reader->stream_count = 0; + if (reader->allocator) IWMReaderAllocatorEx_Release(reader->allocator); reader->allocator = NULL; @@ -455,6 +648,7 @@ static void async_reader_close(struct async_reader *reader) static HRESULT async_reader_open(struct async_reader *reader, IWMReaderCallback *callback, void *context) { HRESULT hr = E_OUTOFMEMORY; + DWORD i;
IWMReaderCallback_AddRef((reader->callback = callback)); reader->context = context; @@ -469,7 +663,23 @@ static HRESULT async_reader_open(struct async_reader *reader, IWMReaderCallback reader->callback_advanced = NULL; }
+ if (FAILED(hr = IWMProfile3_GetStreamCount(reader->profile, &reader->stream_count))) + goto error; + + if (!(reader->streams = calloc(reader->stream_count, sizeof(*reader->streams)))) + { + hr = E_OUTOFMEMORY; + goto error; + } + reader->running = true; + for (i = 0; i < reader->stream_count; ++i) + { + struct stream *stream = reader->streams + i; + if (FAILED(hr = stream_open(stream, reader, i + 1))) + goto error; + } + if (!(reader->callback_thread = CreateThread(NULL, 0, async_reader_callback_thread, reader, 0, NULL))) goto error;
@@ -833,11 +1043,36 @@ static HRESULT WINAPI WMReaderAdvanced_SetStreamsSelected(IWMReaderAdvanced6 *if WORD count, WORD *stream_numbers, WMT_STREAM_SELECTION *selections) { struct async_reader *reader = impl_from_IWMReaderAdvanced6(iface); + HRESULT hr; + DWORD i;
TRACE("reader %p, count %u, stream_numbers %p, selections %p.\n", reader, count, stream_numbers, selections);
- return IWMSyncReader2_SetStreamsSelected(reader->reader, count, stream_numbers, selections); + if (FAILED(hr = IWMSyncReader2_SetStreamsSelected(reader->reader, count, stream_numbers, selections))) + return hr; + + EnterCriticalSection(&reader->cs); + + if (!reader->streams) + { + LeaveCriticalSection(&reader->cs); + return E_UNEXPECTED; + } + + EnterCriticalSection(&reader->callback_cs); + + for (i = 0; i < reader->stream_count; ++i) + { + struct stream *stream = reader->streams + i; + stream_request_read(stream); + } + + LeaveCriticalSection(&reader->callback_cs); + + LeaveCriticalSection(&reader->cs); + + return S_OK; }
static HRESULT WINAPI WMReaderAdvanced_GetStreamSelected(IWMReaderAdvanced6 *iface, @@ -1921,6 +2156,10 @@ static HRESULT WINAPI async_reader_create(IWMReader **reader) (void **)&object->reader))) goto failed; IWMReader_Release(&object->IWMReader_iface); + if (FAILED(hr = IUnknown_QueryInterface(object->reader_inner, &IID_IWMProfile3, + (void **)&object->profile))) + goto failed; + IWMReader_Release(&object->IWMReader_iface);
InitializeCriticalSection(&object->cs); object->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": async_reader.cs"); diff --git a/dlls/wmvcore/tests/wmvcore.c b/dlls/wmvcore/tests/wmvcore.c index 401856eb744..4d8ca1afe2e 100644 --- a/dlls/wmvcore/tests/wmvcore.c +++ b/dlls/wmvcore/tests/wmvcore.c @@ -2138,9 +2138,7 @@ static HRESULT WINAPI callback_advanced_AllocateForStream(IWMReaderCallbackAdvan trace("%lu: %04lx: IWMReaderCallbackAdvanced::AllocateForStream(output %u, size %lu)\n", GetTickCount(), GetCurrentThreadId(), stream_number, size);
- todo_wine ok(callback->callback_tid != GetCurrentThreadId(), "got wrong thread\n"); - todo_wine_if(callback->output_tid[stream_number - 1]) ok(callback->output_tid[stream_number - 1] != GetCurrentThreadId(), "got wrong thread\n");
ok(callback->read_compressed, "AllocateForStream() should only be called when reading compressed samples.\n"); @@ -2176,9 +2174,7 @@ static HRESULT WINAPI callback_advanced_AllocateForOutput(IWMReaderCallbackAdvan trace("%lu: %04lx: IWMReaderCallbackAdvanced::AllocateForOutput(output %lu, size %lu)\n", GetTickCount(), GetCurrentThreadId(), output, size);
- todo_wine ok(callback->callback_tid != GetCurrentThreadId(), "got wrong thread\n"); - todo_wine_if(callback->output_tid[output]) ok(callback->output_tid[output] != GetCurrentThreadId(), "got wrong thread\n");
if (!callback->read_compressed)