From: Rémi Bernon rbernon@codeweavers.com
--- dlls/wmvcore/async_reader.c | 254 ++++++++++++++++++++++++++++++++--- dlls/wmvcore/tests/wmvcore.c | 4 - 2 files changed, 234 insertions(+), 24 deletions(-)
diff --git a/dlls/wmvcore/async_reader.c b/dlls/wmvcore/async_reader.c index c97b0198f14..cad68ffcb40 100644 --- a/dlls/wmvcore/async_reader.c +++ b/dlls/wmvcore/async_reader.c @@ -60,12 +60,25 @@ struct async_op
struct sample { + struct list entry; INSSBuffer *buffer; QWORD pts, duration; DWORD flags, output; WORD stream; };
+struct stream +{ + struct async_reader *reader; + WORD number; + + HANDLE read_thread; + bool read_requested; + CONDITION_VARIABLE read_cv; + struct list read_samples; + HRESULT read_result; +}; + struct async_reader { IWMReader IWMReader_iface; @@ -79,6 +92,7 @@ struct async_reader LONG refcount;
IWMSyncReader2 *reader; + IWMProfile3 *profile;
CRITICAL_SECTION cs;
@@ -94,6 +108,9 @@ struct async_reader CRITICAL_SECTION callback_cs; CONDITION_VARIABLE callback_cv;
+ DWORD stream_count; + struct stream *streams; + bool running; struct list async_ops;
@@ -293,31 +310,180 @@ static void async_reader_deliver_sample(struct async_reader *reader, struct samp
TRACE("Callback returned %#lx.\n", hr);
+ list_remove(&sample->entry); INSSBuffer_Release(sample->buffer); + free(sample); +} + +static void stream_request_read(struct stream *stream) +{ + stream->read_result = E_PENDING; + stream->read_requested = true; + WakeConditionVariable(&stream->read_cv); +} + +static DWORD WINAPI stream_read_thread(void *arg) +{ + struct stream *stream = arg; + struct async_reader *reader = stream->reader; + struct sample *sample; + HRESULT hr; + + TRACE("reader %p, number %u\n", reader, stream->number); + + EnterCriticalSection(&reader->callback_cs); + + while (reader->running) + { + if (!stream->read_requested) + { + SleepConditionVariableCS(&stream->read_cv, &reader->callback_cs, INFINITE); + continue; + } + + if (!(sample = calloc(1, sizeof(*sample)))) + { + WARN("Failed to allocate memory for sample.\n"); + continue; + } + + while (stream->read_requested) + { + stream->read_requested = false; + + if (sample->buffer) + INSSBuffer_Release(sample->buffer); + sample->buffer = NULL; + + LeaveCriticalSection(&reader->callback_cs); + hr = IWMSyncReader2_GetNextSample(reader->reader, stream->number, + &sample->buffer, &sample->pts, &sample->duration, + &sample->flags, &sample->output, &sample->stream); + EnterCriticalSection(&reader->callback_cs); + } + + if (SUCCEEDED(stream->read_result = hr)) + { + TRACE("Got stream %u buffer with pts %I64d.\n", stream->number, sample->pts); + list_add_tail(&stream->read_samples, &sample->entry); + } + else + { + WARN("Failed to get stream %u sample, hr %#lx.\n", stream->number, stream->read_result); + free(sample); + } + + WakeConditionVariable(&reader->callback_cv); + } + + LeaveCriticalSection(&reader->callback_cs); + + TRACE("Reader is stopping; exiting.\n"); + return 0; +} + +static void stream_flush_samples(struct stream *stream) +{ + struct sample *sample, *next; + + LIST_FOR_EACH_ENTRY_SAFE(sample, next, &stream->read_samples, struct sample, entry) + { + list_remove(&sample->entry); + INSSBuffer_Release(sample->buffer); + free(sample); + } +} + +static void stream_close(struct stream *stream) +{ + if (stream->read_thread) + { + WakeConditionVariable(&stream->read_cv); + WaitForSingleObject(stream->read_thread, INFINITE); + CloseHandle(stream->read_thread); + stream->read_thread = NULL; + } + + stream_flush_samples(stream); +} + +static HRESULT stream_open(struct stream *stream, struct async_reader *reader, WORD number) +{ + if (stream->read_thread) + return S_OK; + + stream->number = number; + stream->reader = reader; + list_init(&stream->read_samples); + + if (!(stream->read_thread = CreateThread(NULL, 0, stream_read_thread, stream, 0, NULL))) + return E_OUTOFMEMORY; + + return S_OK; }
-static void callback_thread_run(struct async_reader *reader) +static HRESULT async_reader_get_next_sample(struct async_reader *reader, + struct stream **out_stream, struct sample **out_sample) { + struct sample *sample, *first_sample = NULL; + struct stream *stream, *first_stream = NULL; + WMT_STREAM_SELECTION selection; + struct list *entry; + DWORD i; + + for (i = 0; i < reader->stream_count; ++i) + { + stream = reader->streams + i; + + if (FAILED(IWMSyncReader2_GetStreamSelected(reader->reader, i + 1, &selection)) + || selection == WMT_OFF) + continue; + if (!(entry = list_head(&stream->read_samples))) + { + if (stream->read_result == E_PENDING) + return E_PENDING; + continue; + } + + sample = LIST_ENTRY(entry, struct sample, entry); + if (!first_sample || first_sample->pts > sample->pts) + { + first_stream = stream; + first_sample = sample; + } + } + + if (!first_sample) + return NS_E_NO_MORE_SAMPLES; + + TRACE("Found first stream %u with pts %I64d.\n", first_stream->number, first_sample->pts); + *out_sample = first_sample; + *out_stream = first_stream; + return S_OK; +} + +static void async_reader_deliver_samples(struct async_reader *reader) +{ + static const DWORD zero; + IWMReaderCallbackAdvanced *callback_advanced = reader->callback_advanced; IWMReaderCallback *callback = reader->callback; - static const DWORD zero; HRESULT hr = S_OK;
+ TRACE("reader %p\n", reader); + while (reader->running && list_empty(&reader->async_ops)) { - struct sample sample; + struct sample *sample; + struct stream *stream;
- LeaveCriticalSection(&reader->callback_cs); - hr = IWMSyncReader2_GetNextSample(reader->reader, 0, &sample.buffer, &sample.pts, - &sample.duration, &sample.flags, &sample.output, &sample.stream); - EnterCriticalSection(&reader->callback_cs); - if (hr != S_OK) + if (FAILED(hr = async_reader_get_next_sample(reader, &stream, &sample))) break;
- if (async_reader_wait_pts(reader, sample.pts)) - async_reader_deliver_sample(reader, &sample); - else - INSSBuffer_Release(sample.buffer); + stream_request_read(stream); + + if (async_reader_wait_pts(reader, sample->pts)) + async_reader_deliver_sample(reader, sample); }
if (hr == NS_E_NO_MORE_SAMPLES) @@ -325,6 +491,8 @@ static void callback_thread_run(struct async_reader *reader) BOOL user_clock = reader->user_clock; QWORD user_time = reader->user_time;
+ TRACE("No more streams samples, sending EOF notifications.\n"); + LeaveCriticalSection(&reader->callback_cs);
IWMReaderCallback_OnStatus(callback, WMT_END_OF_STREAMING, S_OK, @@ -342,25 +510,27 @@ static void callback_thread_run(struct async_reader *reader) }
EnterCriticalSection(&reader->callback_cs); - - TRACE("Reached end of stream; exiting.\n"); } - else if (hr != S_OK) + else if (hr == E_PENDING) { - ERR("Failed to get sample, hr %#lx.\n", hr); + TRACE("Waiting for more streams samples.\n"); } }
static DWORD WINAPI async_reader_callback_thread(void *arg) { - struct async_reader *reader = arg; static const DWORD zero; + + struct async_reader *reader = arg; struct list *entry; HRESULT hr = S_OK; + DWORD i;
IWMReaderCallback_OnStatus(reader->callback, WMT_OPENED, S_OK, WMT_TYPE_DWORD, (BYTE *)&zero, reader->context);
+ TRACE("reader %p\n", reader); + EnterCriticalSection(&reader->callback_cs);
while (reader->running) @@ -379,19 +549,28 @@ static DWORD WINAPI async_reader_callback_thread(void *arg) if (SUCCEEDED(hr)) hr = IWMSyncReader2_SetRange(reader->reader, op->u.start.start, op->u.start.duration); if (SUCCEEDED(hr)) + { reader->clock_start = get_current_time(reader);
+ for (i = 0; i < reader->stream_count; ++i) + { + struct stream *stream = reader->streams + i; + stream_flush_samples(stream); + stream_request_read(stream); + } + } + LeaveCriticalSection(&reader->callback_cs); IWMReaderCallback_OnStatus(reader->callback, WMT_STARTED, hr, WMT_TYPE_DWORD, (BYTE *)&zero, reader->context); EnterCriticalSection(&reader->callback_cs); - - if (SUCCEEDED(hr)) - callback_thread_run(reader); break; }
case ASYNC_OP_STOP: + if (SUCCEEDED(hr)) + reader->clock_start = 0; + LeaveCriticalSection(&reader->callback_cs); IWMReaderCallback_OnStatus(reader->callback, WMT_STOPPED, hr, WMT_TYPE_DWORD, (BYTE *)&zero, reader->context); @@ -412,6 +591,9 @@ static DWORD WINAPI async_reader_callback_thread(void *arg) free(op); }
+ if (reader->clock_start) + async_reader_deliver_samples(reader); + if (reader->running && list_empty(&reader->async_ops)) SleepConditionVariableCS(&reader->callback_cv, &reader->callback_cs, INFINITE); } @@ -425,6 +607,7 @@ static DWORD WINAPI async_reader_callback_thread(void *arg) static void async_reader_close(struct async_reader *reader) { struct async_op *op, *next; + int i;
if (reader->callback_thread) { @@ -439,6 +622,15 @@ static void async_reader_close(struct async_reader *reader) free(op); }
+ for (i = 0; reader->streams && i < reader->stream_count; ++i) + { + struct stream *stream = reader->streams + i; + stream_close(stream); + } + free(reader->streams); + reader->streams = NULL; + reader->stream_count = 0; + if (reader->allocator) IWMReaderAllocatorEx_Release(reader->allocator); reader->allocator = NULL; @@ -456,6 +648,7 @@ static void async_reader_close(struct async_reader *reader) static HRESULT async_reader_open(struct async_reader *reader, IWMReaderCallback *callback, void *context) { HRESULT hr = E_OUTOFMEMORY; + DWORD i;
IWMReaderCallback_AddRef((reader->callback = callback)); reader->context = context; @@ -470,7 +663,24 @@ static HRESULT async_reader_open(struct async_reader *reader, IWMReaderCallback reader->callback_advanced = NULL; }
+ if (FAILED(hr = IWMProfile3_GetStreamCount(reader->profile, &reader->stream_count))) + goto error; + + if (!(reader->streams = calloc(reader->stream_count, sizeof(*reader->streams)))) + { + hr = E_OUTOFMEMORY; + goto error; + } + reader->running = true; + + for (i = 0; i < reader->stream_count; ++i) + { + struct stream *stream = reader->streams + i; + if (FAILED(hr = stream_open(stream, reader, i + 1))) + goto error; + } + if (!(reader->callback_thread = CreateThread(NULL, 0, async_reader_callback_thread, reader, 0, NULL))) goto error;
@@ -1922,6 +2132,10 @@ static HRESULT WINAPI async_reader_create(IWMReader **reader) (void **)&object->reader))) goto failed; IWMReader_Release(&object->IWMReader_iface); + if (FAILED(hr = IUnknown_QueryInterface(object->reader_inner, &IID_IWMProfile3, + (void **)&object->profile))) + goto failed; + IWMReader_Release(&object->IWMReader_iface);
InitializeCriticalSection(&object->cs); object->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": async_reader.cs"); diff --git a/dlls/wmvcore/tests/wmvcore.c b/dlls/wmvcore/tests/wmvcore.c index 401856eb744..4d8ca1afe2e 100644 --- a/dlls/wmvcore/tests/wmvcore.c +++ b/dlls/wmvcore/tests/wmvcore.c @@ -2138,9 +2138,7 @@ static HRESULT WINAPI callback_advanced_AllocateForStream(IWMReaderCallbackAdvan trace("%lu: %04lx: IWMReaderCallbackAdvanced::AllocateForStream(output %u, size %lu)\n", GetTickCount(), GetCurrentThreadId(), stream_number, size);
- todo_wine ok(callback->callback_tid != GetCurrentThreadId(), "got wrong thread\n"); - todo_wine_if(callback->output_tid[stream_number - 1]) ok(callback->output_tid[stream_number - 1] != GetCurrentThreadId(), "got wrong thread\n");
ok(callback->read_compressed, "AllocateForStream() should only be called when reading compressed samples.\n"); @@ -2176,9 +2174,7 @@ static HRESULT WINAPI callback_advanced_AllocateForOutput(IWMReaderCallbackAdvan trace("%lu: %04lx: IWMReaderCallbackAdvanced::AllocateForOutput(output %lu, size %lu)\n", GetTickCount(), GetCurrentThreadId(), output, size);
- todo_wine ok(callback->callback_tid != GetCurrentThreadId(), "got wrong thread\n"); - todo_wine_if(callback->output_tid[output]) ok(callback->output_tid[output] != GetCurrentThreadId(), "got wrong thread\n");
if (!callback->read_compressed)