From: Derek Lesho dlesho@codeweavers.com
Signed-off-by: Nikolay Sivov nsivov@codeweavers.com ---
v4:
- changed decoder calls logic; - added support for external samples; - now pulling all samples right on EOS event.
v3:
- moved decoding calls to event handler, queue now always contains processed samples; - removed draining logic for now, I believe we can do that explicitly on ReadSample() or on EndOfStream instead. Same helper could be used for sync/async read and EOS event; - added a check for output transform flags;
dlls/mfreadwrite/main.c | 128 ++++++++++++++++++++++++++++++++++------ 1 file changed, 110 insertions(+), 18 deletions(-)
diff --git a/dlls/mfreadwrite/main.c b/dlls/mfreadwrite/main.c index d3c10a4d11..aa46ac188b 100644 --- a/dlls/mfreadwrite/main.c +++ b/dlls/mfreadwrite/main.c @@ -366,6 +366,96 @@ static ULONG WINAPI source_reader_stream_events_callback_Release(IMFAsyncCallbac return IMFSourceReader_Release(&reader->IMFSourceReader_iface); }
+static void source_reader_queue_sample(struct media_stream *stream, IMFSample *sample) +{ + struct sample *pending_sample; + + if (!sample) + return; + + pending_sample = heap_alloc(sizeof(*pending_sample)); + pending_sample->sample = sample; + IMFSample_AddRef(pending_sample->sample); + + list_add_tail(&stream->samples, &pending_sample->entry); +} + +static HRESULT source_reader_pull_stream_samples(struct media_stream *stream) +{ + MFT_OUTPUT_STREAM_INFO stream_info = { 0 }; + MFT_OUTPUT_DATA_BUFFER out_buffer; + IMFMediaBuffer *buffer; + DWORD status; + HRESULT hr; + + if (FAILED(hr = IMFTransform_GetOutputStreamInfo(stream->decoder, 0, &stream_info))) + { + WARN("Failed to get output stream info, hr %#x.\n", hr); + return hr; + } + + for (;;) + { + memset(&out_buffer, 0, sizeof(out_buffer)); + + if (!(stream_info.dwFlags & MFT_OUTPUT_STREAM_PROVIDES_SAMPLES)) + { + if (FAILED(hr = MFCreateSample(&out_buffer.pSample))) + break; + + if (FAILED(hr = MFCreateAlignedMemoryBuffer(stream_info.cbSize, stream_info.cbAlignment, &buffer))) + { + IMFSample_Release(out_buffer.pSample); + break; + } + + IMFSample_AddBuffer(out_buffer.pSample, buffer); + IMFMediaBuffer_Release(buffer); + } + + if (FAILED(hr = IMFTransform_ProcessOutput(stream->decoder, 0, 1, &out_buffer, &status))) + break; + + source_reader_queue_sample(stream, out_buffer.pSample); + if (out_buffer.pSample) + IMFSample_Release(out_buffer.pSample); + if (out_buffer.pEvents) + IMFCollection_Release(out_buffer.pEvents); + } + + return hr; +} + +static HRESULT source_reader_process_sample(struct media_stream *stream, IMFSample *sample) +{ + HRESULT hr; + + if (!stream->decoder) + { + source_reader_queue_sample(stream, sample); + return S_OK; + } + + /* It's assumed that decoder has 1 input and 1 output, both id's are 0. */ + + hr = source_reader_pull_stream_samples(stream); + if (hr == MF_E_TRANSFORM_NEED_MORE_INPUT) + { + if (FAILED(hr = IMFTransform_ProcessInput(stream->decoder, 0, sample, 0))) + { + WARN("Transform failed to process input, hr %#x.\n", hr); + return hr; + } + + if ((hr = source_reader_pull_stream_samples(stream)) == MF_E_TRANSFORM_NEED_MORE_INPUT) + return S_OK; + } + else + WARN("Transform failed to process output, hr %#x.\n", hr); + + return hr; +} + static HRESULT source_reader_media_sample_handler(struct source_reader *reader, IMFMediaStream *stream, IMFMediaEvent *event) { @@ -393,21 +483,14 @@ static HRESULT source_reader_media_sample_handler(struct source_reader *reader, { if (id == reader->streams[i].id) { - struct sample *pending_sample; - - if (!(pending_sample = heap_alloc(sizeof(*pending_sample)))) - { - hr = E_OUTOFMEMORY; - goto failed; - } + EnterCriticalSection(&reader->streams[i].cs);
- pending_sample->sample = sample; - IMFSample_AddRef(pending_sample->sample); + hr = source_reader_process_sample(&reader->streams[i], sample);
- EnterCriticalSection(&reader->streams[i].cs); - list_add_tail(&reader->streams[i].samples, &pending_sample->entry); LeaveCriticalSection(&reader->streams[i].cs);
+ /* FIXME: propagate processing errors? */ + WakeAllConditionVariable(&reader->streams[i].sample_event);
break; @@ -417,7 +500,6 @@ static HRESULT source_reader_media_sample_handler(struct source_reader *reader, if (i == reader->stream_count) WARN("Stream with id %#x was not present in presentation descriptor.\n", id);
-failed: IMFSample_Release(sample);
return hr; @@ -438,26 +520,36 @@ static HRESULT source_reader_media_stream_state_handler(struct source_reader *re
for (i = 0; i < reader->stream_count; ++i) { - if (id == reader->streams[i].id) + struct media_stream *stream = &reader->streams[i]; + + if (id == stream->id) { - EnterCriticalSection(&reader->streams[i].cs); + EnterCriticalSection(&stream->cs);
switch (event) { case MEEndOfStream: - reader->streams[i].state = STREAM_STATE_EOS; + stream->state = STREAM_STATE_EOS; + + if (stream->decoder && SUCCEEDED(IMFTransform_ProcessMessage(stream->decoder, + MFT_MESSAGE_COMMAND_DRAIN, 0))) + { + if ((hr = source_reader_pull_stream_samples(stream)) != MF_E_TRANSFORM_NEED_MORE_INPUT) + WARN("Failed to pull pending samples, hr %#x.\n", hr); + } + break; case MEStreamSeeked: case MEStreamStarted: - reader->streams[i].state = STREAM_STATE_READY; + stream->state = STREAM_STATE_READY; break; default: ; }
- LeaveCriticalSection(&reader->streams[i].cs); + LeaveCriticalSection(&stream->cs);
- WakeAllConditionVariable(&reader->streams[i].sample_event); + WakeAllConditionVariable(&stream->sample_event);
break; }
Signed-off-by: Nikolay Sivov nsivov@codeweavers.com --- dlls/mfreadwrite/main.c | 104 ++++++++++++++++++++++++---------------- 1 file changed, 63 insertions(+), 41 deletions(-)
diff --git a/dlls/mfreadwrite/main.c b/dlls/mfreadwrite/main.c index aa46ac188b..9a2e542b1e 100644 --- a/dlls/mfreadwrite/main.c +++ b/dlls/mfreadwrite/main.c @@ -72,9 +72,13 @@ HRESULT WINAPI DllUnregisterServer(void) return __wine_unregister_resources( mfinstance ); }
-struct sample +struct stream_response { struct list entry; + HRESULT status; + DWORD stream_index; + DWORD stream_flags; + LONGLONG timestamp; IMFSample *sample; };
@@ -96,9 +100,10 @@ struct media_stream IMFMediaType *current; IMFTransform *decoder; DWORD id; + unsigned int index; CRITICAL_SECTION cs; CONDITION_VARIABLE sample_event; - struct list samples; + struct list responses; enum media_stream_state state; BOOL selected; BOOL presented; @@ -366,18 +371,21 @@ static ULONG WINAPI source_reader_stream_events_callback_Release(IMFAsyncCallbac return IMFSourceReader_Release(&reader->IMFSourceReader_iface); }
-static void source_reader_queue_sample(struct media_stream *stream, IMFSample *sample) +static void source_reader_queue_response(struct media_stream *stream, HRESULT status, DWORD stream_index, + DWORD stream_flags, LONGLONG timestamp, IMFSample *sample) { - struct sample *pending_sample; - - if (!sample) - return; + struct stream_response *response;
- pending_sample = heap_alloc(sizeof(*pending_sample)); - pending_sample->sample = sample; - IMFSample_AddRef(pending_sample->sample); + response = heap_alloc_zero(sizeof(*response)); + response->status = status; + response->stream_index = stream_index; + response->stream_flags = stream_flags; + response->timestamp = timestamp; + response->sample = sample; + if (response->sample) + IMFSample_AddRef(response->sample);
- list_add_tail(&stream->samples, &pending_sample->entry); + list_add_tail(&stream->responses, &response->entry); }
static HRESULT source_reader_pull_stream_samples(struct media_stream *stream) @@ -385,6 +393,7 @@ static HRESULT source_reader_pull_stream_samples(struct media_stream *stream) MFT_OUTPUT_STREAM_INFO stream_info = { 0 }; MFT_OUTPUT_DATA_BUFFER out_buffer; IMFMediaBuffer *buffer; + LONGLONG timestamp; DWORD status; HRESULT hr;
@@ -416,7 +425,11 @@ static HRESULT source_reader_pull_stream_samples(struct media_stream *stream) if (FAILED(hr = IMFTransform_ProcessOutput(stream->decoder, 0, 1, &out_buffer, &status))) break;
- source_reader_queue_sample(stream, out_buffer.pSample); + timestamp = 0; + if (FAILED(IMFSample_GetSampleTime(out_buffer.pSample, ×tamp))) + WARN("Sample time wasn't set.\n"); + + source_reader_queue_response(stream, S_OK /* FIXME */, stream->index, 0, timestamp, out_buffer.pSample); if (out_buffer.pSample) IMFSample_Release(out_buffer.pSample); if (out_buffer.pEvents) @@ -428,11 +441,16 @@ static HRESULT source_reader_pull_stream_samples(struct media_stream *stream)
static HRESULT source_reader_process_sample(struct media_stream *stream, IMFSample *sample) { + LONGLONG timestamp; HRESULT hr;
if (!stream->decoder) { - source_reader_queue_sample(stream, sample); + timestamp = 0; + if (FAILED(IMFSample_GetSampleTime(sample, ×tamp))) + WARN("Sample time wasn't set.\n"); + + source_reader_queue_response(stream, S_OK, stream->index, 0, timestamp, sample); return S_OK; }
@@ -663,7 +681,7 @@ static ULONG WINAPI src_reader_Release(IMFSourceReader *iface) for (i = 0; i < reader->stream_count; ++i) { struct media_stream *stream = &reader->streams[i]; - struct sample *ptr, *next; + struct stream_response *ptr, *next;
if (stream->stream) IMFMediaStream_Release(stream->stream); @@ -673,9 +691,10 @@ static ULONG WINAPI src_reader_Release(IMFSourceReader *iface) IMFTransform_Release(stream->decoder); DeleteCriticalSection(&stream->cs);
- LIST_FOR_EACH_ENTRY_SAFE(ptr, next, &stream->samples, struct sample, entry) + LIST_FOR_EACH_ENTRY_SAFE(ptr, next, &stream->responses, struct stream_response, entry) { - IMFSample_Release(ptr->sample); + if (ptr->sample) + IMFSample_Release(ptr->sample); list_remove(&ptr->entry); heap_free(ptr); } @@ -1085,22 +1104,18 @@ static HRESULT WINAPI src_reader_SetCurrentPosition(IMFSourceReader *iface, REFG return IMFMediaSource_Start(reader->source, reader->descriptor, format, position); }
-static IMFSample *media_stream_pop_sample(struct media_stream *stream, DWORD *stream_flags) +static struct stream_response *media_stream_pop_response(struct media_stream *stream) { - IMFSample *ret = NULL; + struct stream_response *response = NULL; struct list *head;
- if ((head = list_head(&stream->samples))) + if ((head = list_head(&stream->responses))) { - struct sample *pending_sample = LIST_ENTRY(head, struct sample, entry); - ret = pending_sample->sample; - list_remove(&pending_sample->entry); - heap_free(pending_sample); + response = LIST_ENTRY(head, struct stream_response, entry); + list_remove(&response->entry); }
- *stream_flags = stream->state == STREAM_STATE_EOS ? MF_SOURCE_READERF_ENDOFSTREAM : 0; - - return ret; + return response; }
static HRESULT source_reader_start_source(struct source_reader *reader) @@ -1141,6 +1156,7 @@ static HRESULT source_reader_start_source(struct source_reader *reader) static HRESULT source_reader_read_sample(struct source_reader *reader, DWORD index, DWORD flags, DWORD *actual_index, DWORD *stream_flags, LONGLONG *timestamp, IMFSample **sample) { + struct stream_response *response; struct media_stream *stream; DWORD stream_index; HRESULT hr = S_OK; @@ -1151,6 +1167,9 @@ static HRESULT source_reader_read_sample(struct source_reader *reader, DWORD ind
*sample = NULL;
+ if (timestamp) + *timestamp = 0; + switch (index) { case MF_SOURCE_READER_FIRST_VIDEO_STREAM: @@ -1175,8 +1194,6 @@ static HRESULT source_reader_read_sample(struct source_reader *reader, DWORD ind *stream_flags = MF_SOURCE_READERF_ERROR; if (actual_index) *actual_index = index; - if (timestamp) - *timestamp = 0; return hr; }
@@ -1191,7 +1208,7 @@ static HRESULT source_reader_read_sample(struct source_reader *reader, DWORD ind { if (!(flags & MF_SOURCE_READER_CONTROLF_DRAIN)) { - while (list_empty(&stream->samples) && stream->state != STREAM_STATE_EOS) + while (list_empty(&stream->responses) && stream->state != STREAM_STATE_EOS) { if (stream->stream) { @@ -1202,21 +1219,25 @@ static HRESULT source_reader_read_sample(struct source_reader *reader, DWORD ind } }
- *sample = media_stream_pop_sample(stream, stream_flags); + if ((response = media_stream_pop_response(stream))) + { + *stream_flags = response->stream_flags; + if (timestamp) + *timestamp = response->timestamp; + *sample = response->sample; + if (*sample) + IMFSample_AddRef(*sample); + } + else + { + *stream_flags = list_empty(&stream->responses) && stream->state == STREAM_STATE_EOS ? + MF_SOURCE_READERF_ENDOFSTREAM : 0; + } }
LeaveCriticalSection(&stream->cs);
- TRACE("Got sample %p.\n", *sample); - - if (timestamp) - { - /* TODO: it's possible timestamp has to be set for some events. - For MEEndOfStream it's correct to return 0. */ - *timestamp = 0; - if (*sample) - IMFSample_GetSampleTime(*sample, timestamp); - } + TRACE("Got sample %p, flags %#x.\n", *sample, *stream_flags);
return hr; } @@ -1462,9 +1483,10 @@ static HRESULT create_source_reader_from_source(IMFMediaSource *source, IMFAttri if (FAILED(hr)) break;
+ object->streams[i].index = i; InitializeCriticalSection(&object->streams[i].cs); InitializeConditionVariable(&object->streams[i].sample_event); - list_init(&object->streams[i].samples); + list_init(&object->streams[i].responses); }
if (FAILED(hr))
Signed-off-by: Nikolay Sivov nsivov@codeweavers.com --- dlls/mfreadwrite/main.c | 35 +++++++++++++++++++++++++++++------ 1 file changed, 29 insertions(+), 6 deletions(-)
diff --git a/dlls/mfreadwrite/main.c b/dlls/mfreadwrite/main.c index 9a2e542b1e..315ff8332b 100644 --- a/dlls/mfreadwrite/main.c +++ b/dlls/mfreadwrite/main.c @@ -524,12 +524,17 @@ static HRESULT source_reader_media_sample_handler(struct source_reader *reader, }
static HRESULT source_reader_media_stream_state_handler(struct source_reader *reader, IMFMediaStream *stream, - MediaEventType event) + IMFMediaEvent *event) { + MediaEventType event_type; + LONGLONG timestamp; + PROPVARIANT value; unsigned int i; HRESULT hr; DWORD id;
+ IMFMediaEvent_GetType(event, &event_type); + if (FAILED(hr = media_stream_get_id(stream, &id))) { WARN("Unidentified stream %p, hr %#x.\n", stream, hr); @@ -544,7 +549,7 @@ static HRESULT source_reader_media_stream_state_handler(struct source_reader *re { EnterCriticalSection(&stream->cs);
- switch (event) + switch (event_type) { case MEEndOfStream: stream->state = STREAM_STATE_EOS; @@ -561,6 +566,16 @@ static HRESULT source_reader_media_stream_state_handler(struct source_reader *re case MEStreamStarted: stream->state = STREAM_STATE_READY; break; + case MEStreamTick: + value.vt = VT_EMPTY; + hr = SUCCEEDED(IMFMediaEvent_GetValue(event, &value)) && value.vt == VT_I8 ? S_OK : E_UNEXPECTED; + timestamp = SUCCEEDED(hr) ? value.u.hVal.QuadPart : 0; + PropVariantClear(&value); + + source_reader_queue_response(stream, hr, stream->index, MF_SOURCE_READERF_STREAMTICK, timestamp, NULL); + + WakeAllConditionVariable(&stream->sample_event); + break; default: ; } @@ -602,8 +617,9 @@ static HRESULT WINAPI source_reader_stream_events_callback_Invoke(IMFAsyncCallba break; case MEStreamSeeked: case MEStreamStarted: + case MEStreamTick: case MEEndOfStream: - hr = source_reader_media_stream_state_handler(reader, stream, event_type); + hr = source_reader_media_stream_state_handler(reader, stream, event); break; default: ; @@ -660,6 +676,13 @@ static ULONG WINAPI src_reader_AddRef(IMFSourceReader *iface) return refcount; }
+static void source_reader_release_response(struct stream_response *response) +{ + if (response->sample) + IMFSample_Release(response->sample); + heap_free(response); +} + static ULONG WINAPI src_reader_Release(IMFSourceReader *iface) { struct source_reader *reader = impl_from_IMFSourceReader(iface); @@ -693,10 +716,8 @@ static ULONG WINAPI src_reader_Release(IMFSourceReader *iface)
LIST_FOR_EACH_ENTRY_SAFE(ptr, next, &stream->responses, struct stream_response, entry) { - if (ptr->sample) - IMFSample_Release(ptr->sample); list_remove(&ptr->entry); - heap_free(ptr); + source_reader_release_response(ptr); } } heap_free(reader->streams); @@ -1227,6 +1248,8 @@ static HRESULT source_reader_read_sample(struct source_reader *reader, DWORD ind *sample = response->sample; if (*sample) IMFSample_AddRef(*sample); + hr = response->status; + source_reader_release_response(response); } else {
Signed-off-by: Nikolay Sivov nsivov@codeweavers.com --- dlls/mfreadwrite/main.c | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-)
diff --git a/dlls/mfreadwrite/main.c b/dlls/mfreadwrite/main.c index 315ff8332b..5cb3af84f5 100644 --- a/dlls/mfreadwrite/main.c +++ b/dlls/mfreadwrite/main.c @@ -94,6 +94,11 @@ enum media_source_state SOURCE_STATE_STARTED, };
+enum media_stream_flags +{ + STREAM_FLAG_SAMPLE_REQUESTED = 0x1, +}; + struct media_stream { IMFMediaStream *stream; @@ -107,6 +112,7 @@ struct media_stream enum media_stream_state state; BOOL selected; BOOL presented; + DWORD flags; };
struct source_reader @@ -503,6 +509,7 @@ static HRESULT source_reader_media_sample_handler(struct source_reader *reader, { EnterCriticalSection(&reader->streams[i].cs);
+ reader->streams[i].flags &= ~STREAM_FLAG_SAMPLE_REQUESTED; hr = source_reader_process_sample(&reader->streams[i], sample);
LeaveCriticalSection(&reader->streams[i].cs); @@ -1231,10 +1238,12 @@ static HRESULT source_reader_read_sample(struct source_reader *reader, DWORD ind { while (list_empty(&stream->responses) && stream->state != STREAM_STATE_EOS) { - if (stream->stream) + if (stream->stream && !(stream->flags & STREAM_FLAG_SAMPLE_REQUESTED)) { if (FAILED(hr = IMFMediaStream_RequestSample(stream->stream, NULL))) WARN("Sample request failed, hr %#x.\n", hr); + else + stream->flags |= STREAM_FLAG_SAMPLE_REQUESTED; } SleepConditionVariableCS(&stream->sample_event, &stream->cs, INFINITE); }
Signed-off-by: Nikolay Sivov nsivov@codeweavers.com --- dlls/mfreadwrite/main.c | 616 +++++++++++++++++++++++++++++----------- 1 file changed, 447 insertions(+), 169 deletions(-)
diff --git a/dlls/mfreadwrite/main.c b/dlls/mfreadwrite/main.c index 5cb3af84f5..fd5c7d3ebc 100644 --- a/dlls/mfreadwrite/main.c +++ b/dlls/mfreadwrite/main.c @@ -113,6 +113,23 @@ struct media_stream BOOL selected; BOOL presented; DWORD flags; + unsigned int requests; +}; + +enum source_reader_async_op +{ + SOURCE_READER_ASYNC_READ, + SOURCE_READER_ASYNC_FLUSH, + SOURCE_READER_ASYNC_SAMPLE_READY, +}; + +struct source_reader_async_command +{ + IUnknown IUnknown_iface; + LONG refcount; + enum source_reader_async_op op; + unsigned int flags; + unsigned int stream_index; };
struct source_reader @@ -120,6 +137,7 @@ struct source_reader IMFSourceReader IMFSourceReader_iface; IMFAsyncCallback source_events_callback; IMFAsyncCallback stream_events_callback; + IMFAsyncCallback async_commands_callback; LONG refcount; IMFMediaSource *source; IMFPresentationDescriptor *descriptor; @@ -154,11 +172,74 @@ static struct source_reader *impl_from_stream_callback_IMFAsyncCallback(IMFAsync return CONTAINING_RECORD(iface, struct source_reader, stream_events_callback); }
+static struct source_reader *impl_from_async_commands_callback_IMFAsyncCallback(IMFAsyncCallback *iface) +{ + return CONTAINING_RECORD(iface, struct source_reader, async_commands_callback); +} + +static struct source_reader_async_command *impl_from_async_command_IUnknown(IUnknown *iface) +{ + return CONTAINING_RECORD(iface, struct source_reader_async_command, IUnknown_iface); +} + static inline struct sink_writer *impl_from_IMFSinkWriter(IMFSinkWriter *iface) { return CONTAINING_RECORD(iface, struct sink_writer, IMFSinkWriter_iface); }
+static HRESULT WINAPI source_reader_async_command_QueryInterface(IUnknown *iface, REFIID riid, void **obj) +{ + if (IsEqualIID(riid, &IID_IUnknown)) + { + *obj = iface; + IUnknown_AddRef(iface); + return S_OK; + } + + WARN("Unsupported interface %s.\n", debugstr_guid(riid)); + *obj = NULL; + return E_NOINTERFACE; +} + +static ULONG WINAPI source_reader_async_command_AddRef(IUnknown *iface) +{ + struct source_reader_async_command *command = impl_from_async_command_IUnknown(iface); + return InterlockedIncrement(&command->refcount); +} + +static ULONG WINAPI source_reader_async_command_Release(IUnknown *iface) +{ + struct source_reader_async_command *command = impl_from_async_command_IUnknown(iface); + ULONG refcount = InterlockedIncrement(&command->refcount); + + if (!refcount) + heap_free(command); + + return refcount; +} + +static const IUnknownVtbl source_reader_async_command_vtbl = +{ + source_reader_async_command_QueryInterface, + source_reader_async_command_AddRef, + source_reader_async_command_Release, +}; + +static HRESULT source_reader_create_async_op(enum source_reader_async_op op, struct source_reader_async_command **ret) +{ + struct source_reader_async_command *command; + + if (!(command = heap_alloc_zero(sizeof(*command)))) + return E_OUTOFMEMORY; + + command->IUnknown_iface.lpVtbl = &source_reader_async_command_vtbl; + command->op = op; + + *ret = command; + + return S_OK; +} + static HRESULT media_event_get_object(IMFMediaEvent *event, REFIID riid, void **obj) { PROPVARIANT value; @@ -203,7 +284,7 @@ static HRESULT media_stream_get_id(IMFMediaStream *stream, DWORD *id) return hr; }
-static HRESULT WINAPI source_reader_source_events_callback_QueryInterface(IMFAsyncCallback *iface, +static HRESULT WINAPI source_reader_callback_QueryInterface(IMFAsyncCallback *iface, REFIID riid, void **obj) { TRACE("%p, %s, %p.\n", iface, debugstr_guid(riid), obj); @@ -233,12 +314,65 @@ static ULONG WINAPI source_reader_source_events_callback_Release(IMFAsyncCallbac return IMFSourceReader_Release(&reader->IMFSourceReader_iface); }
-static HRESULT WINAPI source_reader_source_events_callback_GetParameters(IMFAsyncCallback *iface, +static HRESULT WINAPI source_reader_callback_GetParameters(IMFAsyncCallback *iface, DWORD *flags, DWORD *queue) { return E_NOTIMPL; }
+static void source_reader_queue_response(struct source_reader *reader, struct media_stream *stream, HRESULT status, + DWORD stream_flags, LONGLONG timestamp, IMFSample *sample) +{ + struct source_reader_async_command *command; + struct stream_response *response; + HRESULT hr; + + response = heap_alloc_zero(sizeof(*response)); + response->status = status; + response->stream_index = stream->index; + response->stream_flags = stream_flags; + response->timestamp = timestamp; + response->sample = sample; + if (response->sample) + IMFSample_AddRef(response->sample); + + list_add_tail(&stream->responses, &response->entry); + + if (stream->requests) + { + if (reader->async_callback) + { + if (SUCCEEDED(source_reader_create_async_op(SOURCE_READER_ASYNC_SAMPLE_READY, &command))) + { + command->stream_index = stream->index; + if (FAILED(hr = MFPutWorkItem(MFASYNC_CALLBACK_QUEUE_STANDARD, &reader->async_commands_callback, + &command->IUnknown_iface))) + WARN("Failed to submit async result, hr %#x.\n", hr); + IUnknown_Release(&command->IUnknown_iface); + } + } + else + WakeAllConditionVariable(&stream->sample_event); + + stream->requests--; + } +} + +static HRESULT source_reader_request_sample(struct source_reader *reader, struct media_stream *stream) +{ + HRESULT hr = S_OK; + + if (stream->stream && !(stream->flags & STREAM_FLAG_SAMPLE_REQUESTED)) + { + if (FAILED(hr = IMFMediaStream_RequestSample(stream->stream, NULL))) + WARN("Sample request failed, hr %#x.\n", hr); + else + stream->flags |= STREAM_FLAG_SAMPLE_REQUESTED; + } + + return hr; +} + static HRESULT source_reader_new_stream_handler(struct source_reader *reader, IMFMediaEvent *event) { IMFMediaStream *stream; @@ -274,8 +408,10 @@ static HRESULT source_reader_new_stream_handler(struct source_reader *reader, IM WARN("Failed to subscribe to stream events, hr %#x.\n", hr); }
- /* Wake so any waiting ReadSample() calls have a chance to make requests. */ - WakeAllConditionVariable(&reader->streams[i].sample_event); + EnterCriticalSection(&reader->streams[i].cs); + if (reader->streams[i].requests) + source_reader_request_sample(reader, &reader->streams[i]); + LeaveCriticalSection(&reader->streams[i].cs); } break; } @@ -358,10 +494,10 @@ static HRESULT WINAPI source_reader_source_events_callback_Invoke(IMFAsyncCallba
static const IMFAsyncCallbackVtbl source_events_callback_vtbl = { - source_reader_source_events_callback_QueryInterface, + source_reader_callback_QueryInterface, source_reader_source_events_callback_AddRef, source_reader_source_events_callback_Release, - source_reader_source_events_callback_GetParameters, + source_reader_callback_GetParameters, source_reader_source_events_callback_Invoke, };
@@ -377,24 +513,7 @@ static ULONG WINAPI source_reader_stream_events_callback_Release(IMFAsyncCallbac return IMFSourceReader_Release(&reader->IMFSourceReader_iface); }
-static void source_reader_queue_response(struct media_stream *stream, HRESULT status, DWORD stream_index, - DWORD stream_flags, LONGLONG timestamp, IMFSample *sample) -{ - struct stream_response *response; - - response = heap_alloc_zero(sizeof(*response)); - response->status = status; - response->stream_index = stream_index; - response->stream_flags = stream_flags; - response->timestamp = timestamp; - response->sample = sample; - if (response->sample) - IMFSample_AddRef(response->sample); - - list_add_tail(&stream->responses, &response->entry); -} - -static HRESULT source_reader_pull_stream_samples(struct media_stream *stream) +static HRESULT source_reader_pull_stream_samples(struct source_reader *reader, struct media_stream *stream) { MFT_OUTPUT_STREAM_INFO stream_info = { 0 }; MFT_OUTPUT_DATA_BUFFER out_buffer; @@ -435,7 +554,7 @@ static HRESULT source_reader_pull_stream_samples(struct media_stream *stream) if (FAILED(IMFSample_GetSampleTime(out_buffer.pSample, ×tamp))) WARN("Sample time wasn't set.\n");
- source_reader_queue_response(stream, S_OK /* FIXME */, stream->index, 0, timestamp, out_buffer.pSample); + source_reader_queue_response(reader, stream, S_OK /* FIXME */, 0, timestamp, out_buffer.pSample); if (out_buffer.pSample) IMFSample_Release(out_buffer.pSample); if (out_buffer.pEvents) @@ -445,7 +564,8 @@ static HRESULT source_reader_pull_stream_samples(struct media_stream *stream) return hr; }
-static HRESULT source_reader_process_sample(struct media_stream *stream, IMFSample *sample) +static HRESULT source_reader_process_sample(struct source_reader *reader, struct media_stream *stream, + IMFSample *sample) { LONGLONG timestamp; HRESULT hr; @@ -456,13 +576,13 @@ static HRESULT source_reader_process_sample(struct media_stream *stream, IMFSamp if (FAILED(IMFSample_GetSampleTime(sample, ×tamp))) WARN("Sample time wasn't set.\n");
- source_reader_queue_response(stream, S_OK, stream->index, 0, timestamp, sample); + source_reader_queue_response(reader, stream, S_OK, 0, timestamp, sample); return S_OK; }
/* It's assumed that decoder has 1 input and 1 output, both id's are 0. */
- hr = source_reader_pull_stream_samples(stream); + hr = source_reader_pull_stream_samples(reader, stream); if (hr == MF_E_TRANSFORM_NEED_MORE_INPUT) { if (FAILED(hr = IMFTransform_ProcessInput(stream->decoder, 0, sample, 0))) @@ -471,7 +591,7 @@ static HRESULT source_reader_process_sample(struct media_stream *stream, IMFSamp return hr; }
- if ((hr = source_reader_pull_stream_samples(stream)) == MF_E_TRANSFORM_NEED_MORE_INPUT) + if ((hr = source_reader_pull_stream_samples(reader, stream)) == MF_E_TRANSFORM_NEED_MORE_INPUT) return S_OK; } else @@ -507,17 +627,17 @@ static HRESULT source_reader_media_sample_handler(struct source_reader *reader, { if (id == reader->streams[i].id) { + /* FIXME: propagate processing errors? */ + EnterCriticalSection(&reader->streams[i].cs);
reader->streams[i].flags &= ~STREAM_FLAG_SAMPLE_REQUESTED; - hr = source_reader_process_sample(&reader->streams[i], sample); + hr = source_reader_process_sample(reader, &reader->streams[i], sample); + if (reader->streams[i].requests) + source_reader_request_sample(reader, &reader->streams[i]);
LeaveCriticalSection(&reader->streams[i].cs);
- /* FIXME: propagate processing errors? */ - - WakeAllConditionVariable(&reader->streams[i].sample_event); - break; } } @@ -564,10 +684,12 @@ static HRESULT source_reader_media_stream_state_handler(struct source_reader *re if (stream->decoder && SUCCEEDED(IMFTransform_ProcessMessage(stream->decoder, MFT_MESSAGE_COMMAND_DRAIN, 0))) { - if ((hr = source_reader_pull_stream_samples(stream)) != MF_E_TRANSFORM_NEED_MORE_INPUT) + if ((hr = source_reader_pull_stream_samples(reader, stream)) != MF_E_TRANSFORM_NEED_MORE_INPUT) WARN("Failed to pull pending samples, hr %#x.\n", hr); }
+ source_reader_queue_response(reader, stream, S_OK, MF_SOURCE_READERF_ENDOFSTREAM, 0, NULL); + break; case MEStreamSeeked: case MEStreamStarted: @@ -579,9 +701,8 @@ static HRESULT source_reader_media_stream_state_handler(struct source_reader *re timestamp = SUCCEEDED(hr) ? value.u.hVal.QuadPart : 0; PropVariantClear(&value);
- source_reader_queue_response(stream, hr, stream->index, MF_SOURCE_READERF_STREAMTICK, timestamp, NULL); + source_reader_queue_response(reader, stream, hr, MF_SOURCE_READERF_STREAMTICK, timestamp, NULL);
- WakeAllConditionVariable(&stream->sample_event); break; default: ; @@ -589,8 +710,6 @@ static HRESULT source_reader_media_stream_state_handler(struct source_reader *re
LeaveCriticalSection(&stream->cs);
- WakeAllConditionVariable(&stream->sample_event); - break; } } @@ -644,13 +763,248 @@ static HRESULT WINAPI source_reader_stream_events_callback_Invoke(IMFAsyncCallba
static const IMFAsyncCallbackVtbl stream_events_callback_vtbl = { - source_reader_source_events_callback_QueryInterface, + source_reader_callback_QueryInterface, source_reader_stream_events_callback_AddRef, source_reader_stream_events_callback_Release, - source_reader_source_events_callback_GetParameters, + source_reader_callback_GetParameters, source_reader_stream_events_callback_Invoke, };
+static ULONG WINAPI source_reader_async_commands_callback_AddRef(IMFAsyncCallback *iface) +{ + struct source_reader *reader = impl_from_async_commands_callback_IMFAsyncCallback(iface); + return IMFSourceReader_AddRef(&reader->IMFSourceReader_iface); +} + +static ULONG WINAPI source_reader_async_commands_callback_Release(IMFAsyncCallback *iface) +{ + struct source_reader *reader = impl_from_async_commands_callback_IMFAsyncCallback(iface); + return IMFSourceReader_Release(&reader->IMFSourceReader_iface); +} + +static struct stream_response *media_stream_pop_response(struct media_stream *stream) +{ + struct stream_response *response = NULL; + struct list *head; + + if ((head = list_head(&stream->responses))) + { + response = LIST_ENTRY(head, struct stream_response, entry); + list_remove(&response->entry); + } + + return response; +} + +static void source_reader_release_response(struct stream_response *response) +{ + if (response->sample) + IMFSample_Release(response->sample); + heap_free(response); +} + +static HRESULT source_reader_get_stream_selection(const struct source_reader *reader, DWORD index, BOOL *selected) +{ + IMFStreamDescriptor *sd; + + if (FAILED(IMFPresentationDescriptor_GetStreamDescriptorByIndex(reader->descriptor, index, selected, &sd))) + return MF_E_INVALIDSTREAMNUMBER; + IMFStreamDescriptor_Release(sd); + + return S_OK; +} + +static HRESULT source_reader_start_source(struct source_reader *reader) +{ + BOOL selection_changed = FALSE; + PROPVARIANT position; + HRESULT hr = S_OK; + DWORD i; + + if (reader->source_state == SOURCE_STATE_STARTED) + { + for (i = 0; i < reader->stream_count; ++i) + { + if (FAILED(hr = source_reader_get_stream_selection(reader, i, &reader->streams[i].selected))) + return hr; + selection_changed = reader->streams[i].selected ^ reader->streams[i].presented; + if (selection_changed) + break; + } + } + + position.u.hVal.QuadPart = 0; + if (reader->source_state != SOURCE_STATE_STARTED || selection_changed) + { + position.vt = reader->source_state == SOURCE_STATE_STARTED ? VT_EMPTY : VT_I8; + + /* Update cached stream selection if descriptor was accepted. */ + if (SUCCEEDED(hr = IMFMediaSource_Start(reader->source, reader->descriptor, &GUID_NULL, &position))) + { + for (i = 0; i < reader->stream_count; ++i) + reader->streams[i].presented = reader->streams[i].selected; + } + } + + return hr; +} + +static BOOL source_reader_get_read_result(struct source_reader *reader, struct media_stream *stream, DWORD flags, + HRESULT *status, DWORD *stream_index, DWORD *stream_flags, LONGLONG *timestamp, IMFSample **sample) +{ + struct stream_response *response = NULL; + BOOL request_sample = FALSE; + + if (list_empty(&stream->responses)) + { + *status = S_OK; + *stream_index = stream->index; + *timestamp = 0; + *sample = NULL; + + if (stream->state == STREAM_STATE_EOS) + { + *stream_flags = MF_SOURCE_READERF_ENDOFSTREAM; + } + else + { + request_sample = !(flags & MF_SOURCE_READER_CONTROLF_DRAIN); + *stream_flags = 0; + } + } + else + { + response = media_stream_pop_response(stream); + + *status = response->status; + *stream_index = stream->index; + *stream_flags = response->stream_flags; + *timestamp = response->timestamp; + *sample = response->sample; + if (*sample) + IMFSample_AddRef(*sample); + + source_reader_release_response(response); + } + + return request_sample; +} + +static HRESULT source_reader_get_stream_read_index(struct source_reader *reader, DWORD index, DWORD *stream_index) +{ + BOOL selected; + HRESULT hr; + + switch (index) + { + case MF_SOURCE_READER_FIRST_VIDEO_STREAM: + *stream_index = reader->first_video_stream_index; + break; + case MF_SOURCE_READER_FIRST_AUDIO_STREAM: + *stream_index = reader->first_audio_stream_index; + break; + case MF_SOURCE_READER_ANY_STREAM: + FIXME("Non-specific requests are not supported.\n"); + return E_NOTIMPL; + default: + *stream_index = index; + } + + /* Can't read from deselected streams. */ + if (SUCCEEDED(hr = source_reader_get_stream_selection(reader, *stream_index, &selected)) && !selected) + hr = MF_E_INVALIDREQUEST; + + return hr; +} + +static HRESULT WINAPI source_reader_async_commands_callback_Invoke(IMFAsyncCallback *iface, IMFAsyncResult *result) +{ + struct source_reader *reader = impl_from_async_commands_callback_IMFAsyncCallback(iface); + struct source_reader_async_command *command; + struct stream_response *response; + DWORD stream_index, stream_flags; + BOOL request_sample = FALSE; + struct media_stream *stream; + IMFSample *sample = NULL; + LONGLONG timestamp = 0; + HRESULT hr, status; + IUnknown *state; + + if (FAILED(hr = IMFAsyncResult_GetState(result, &state))) + return hr; + + command = impl_from_async_command_IUnknown(state); + + switch (command->op) + { + case SOURCE_READER_ASYNC_READ: + if (FAILED(hr = source_reader_get_stream_read_index(reader, command->stream_index, &stream_index))) + return hr; + + stream = &reader->streams[stream_index]; + + EnterCriticalSection(&stream->cs); + + if (SUCCEEDED(hr = source_reader_start_source(reader))) + { + request_sample = source_reader_get_read_result(reader, stream, command->flags, &status, &stream_index, + &stream_flags, ×tamp, &sample); + + if (request_sample) + { + stream->requests++; + source_reader_request_sample(reader, stream); + /* FIXME: set error stream/reader state on request failure */ + } + } + + LeaveCriticalSection(&stream->cs); + + if (!request_sample) + IMFSourceReaderCallback_OnReadSample(reader->async_callback, status, stream_index, stream_flags, + timestamp, sample); + + if (sample) + IMFSample_Release(sample); + + break; + + case SOURCE_READER_ASYNC_SAMPLE_READY: + stream = &reader->streams[command->stream_index]; + + EnterCriticalSection(&stream->cs); + response = media_stream_pop_response(stream); + LeaveCriticalSection(&stream->cs); + + if (response) + { + IMFSourceReaderCallback_OnReadSample(reader->async_callback, response->status, response->stream_index, + response->stream_flags, response->timestamp, response->sample); + source_reader_release_response(response); + } + + break; + case SOURCE_READER_ASYNC_FLUSH: + FIXME("Async flushing is not implemented.\n"); + break; + default: + ; + } + + IUnknown_Release(state); + + return S_OK; +} + +static const IMFAsyncCallbackVtbl async_commands_callback_vtbl = +{ + source_reader_callback_QueryInterface, + source_reader_async_commands_callback_AddRef, + source_reader_async_commands_callback_Release, + source_reader_callback_GetParameters, + source_reader_async_commands_callback_Invoke, +}; + static HRESULT WINAPI src_reader_QueryInterface(IMFSourceReader *iface, REFIID riid, void **out) { struct source_reader *reader = impl_from_IMFSourceReader(iface); @@ -683,13 +1037,6 @@ static ULONG WINAPI src_reader_AddRef(IMFSourceReader *iface) return refcount; }
-static void source_reader_release_response(struct stream_response *response) -{ - if (response->sample) - IMFSample_Release(response->sample); - heap_free(response); -} - static ULONG WINAPI src_reader_Release(IMFSourceReader *iface) { struct source_reader *reader = impl_from_IMFSourceReader(iface); @@ -735,17 +1082,6 @@ static ULONG WINAPI src_reader_Release(IMFSourceReader *iface) return refcount; }
-static HRESULT source_reader_get_stream_selection(const struct source_reader *reader, DWORD index, BOOL *selected) -{ - IMFStreamDescriptor *sd; - - if (FAILED(IMFPresentationDescriptor_GetStreamDescriptorByIndex(reader->descriptor, index, selected, &sd))) - return MF_E_INVALIDSTREAMNUMBER; - IMFStreamDescriptor_Release(sd); - - return S_OK; -} - static HRESULT WINAPI src_reader_GetStreamSelection(IMFSourceReader *iface, DWORD index, BOOL *selected) { struct source_reader *reader = impl_from_IMFSourceReader(iface); @@ -1132,101 +1468,36 @@ static HRESULT WINAPI src_reader_SetCurrentPosition(IMFSourceReader *iface, REFG return IMFMediaSource_Start(reader->source, reader->descriptor, format, position); }
-static struct stream_response *media_stream_pop_response(struct media_stream *stream) -{ - struct stream_response *response = NULL; - struct list *head; - - if ((head = list_head(&stream->responses))) - { - response = LIST_ENTRY(head, struct stream_response, entry); - list_remove(&response->entry); - } - - return response; -} - -static HRESULT source_reader_start_source(struct source_reader *reader) -{ - BOOL selection_changed = FALSE; - PROPVARIANT position; - HRESULT hr = S_OK; - DWORD i; - - if (reader->source_state == SOURCE_STATE_STARTED) - { - for (i = 0; i < reader->stream_count; ++i) - { - if (FAILED(hr = source_reader_get_stream_selection(reader, i, &reader->streams[i].selected))) - return hr; - selection_changed = reader->streams[i].selected ^ reader->streams[i].presented; - if (selection_changed) - break; - } - } - - position.u.hVal.QuadPart = 0; - if (reader->source_state != SOURCE_STATE_STARTED || selection_changed) - { - position.vt = reader->source_state == SOURCE_STATE_STARTED ? VT_EMPTY : VT_I8; - - /* Update cached stream selection if descriptor was accepted. */ - if (SUCCEEDED(hr = IMFMediaSource_Start(reader->source, reader->descriptor, &GUID_NULL, &position))) - { - for (i = 0; i < reader->stream_count; ++i) - reader->streams[i].presented = reader->streams[i].selected; - } - } - - return hr; -} - static HRESULT source_reader_read_sample(struct source_reader *reader, DWORD index, DWORD flags, DWORD *actual_index, DWORD *stream_flags, LONGLONG *timestamp, IMFSample **sample) { - struct stream_response *response; + unsigned int actual_index_tmp; struct media_stream *stream; + LONGLONG timestamp_tmp; + BOOL request_sample; DWORD stream_index; HRESULT hr = S_OK; - BOOL selected;
if (!stream_flags || !sample) return E_POINTER;
*sample = NULL;
- if (timestamp) - *timestamp = 0; - - switch (index) - { - case MF_SOURCE_READER_FIRST_VIDEO_STREAM: - stream_index = reader->first_video_stream_index; - break; - case MF_SOURCE_READER_FIRST_AUDIO_STREAM: - stream_index = reader->first_audio_stream_index; - break; - case MF_SOURCE_READER_ANY_STREAM: - FIXME("Non-specific requests are not supported.\n"); - return E_NOTIMPL; - default: - stream_index = index; - } + if (!timestamp) + timestamp = ×tamp_tmp;
- /* Can't read from deselected streams. */ - if (SUCCEEDED(hr = source_reader_get_stream_selection(reader, stream_index, &selected)) && !selected) - hr = MF_E_INVALIDREQUEST; + if (!actual_index) + actual_index = &actual_index_tmp;
- if (FAILED(hr)) + if (FAILED(hr = source_reader_get_stream_read_index(reader, index, &stream_index))) { + *actual_index = index; *stream_flags = MF_SOURCE_READERF_ERROR; - if (actual_index) - *actual_index = index; + *timestamp = 0; return hr; }
- if (actual_index) - *actual_index = stream_index; + *actual_index = stream_index;
stream = &reader->streams[stream_index];
@@ -1234,57 +1505,36 @@ static HRESULT source_reader_read_sample(struct source_reader *reader, DWORD ind
if (SUCCEEDED(hr = source_reader_start_source(reader))) { - if (!(flags & MF_SOURCE_READER_CONTROLF_DRAIN)) + request_sample = source_reader_get_read_result(reader, stream, flags, &hr, actual_index, stream_flags, + timestamp, sample); + + if (request_sample) { while (list_empty(&stream->responses) && stream->state != STREAM_STATE_EOS) { - if (stream->stream && !(stream->flags & STREAM_FLAG_SAMPLE_REQUESTED)) - { - if (FAILED(hr = IMFMediaStream_RequestSample(stream->stream, NULL))) - WARN("Sample request failed, hr %#x.\n", hr); - else - stream->flags |= STREAM_FLAG_SAMPLE_REQUESTED; - } + stream->requests++; + if (FAILED(hr = source_reader_request_sample(reader, stream))) + WARN("Failed to request a sample, hr %#x.\n", hr); SleepConditionVariableCS(&stream->sample_event, &stream->cs, INFINITE); } - }
- if ((response = media_stream_pop_response(stream))) - { - *stream_flags = response->stream_flags; - if (timestamp) - *timestamp = response->timestamp; - *sample = response->sample; - if (*sample) - IMFSample_AddRef(*sample); - hr = response->status; - source_reader_release_response(response); - } - else - { - *stream_flags = list_empty(&stream->responses) && stream->state == STREAM_STATE_EOS ? - MF_SOURCE_READERF_ENDOFSTREAM : 0; + source_reader_get_read_result(reader, stream, flags, &hr, actual_index, stream_flags, + timestamp, sample); } }
LeaveCriticalSection(&stream->cs);
- TRACE("Got sample %p, flags %#x.\n", *sample, *stream_flags); + TRACE("Stream %u, got sample %p, flags %#x.\n", *actual_index, *sample, *stream_flags);
return hr; }
-static HRESULT source_reader_read_sample_async(struct source_reader *reader, DWORD index, DWORD flags) -{ - FIXME("Async mode is not implemented.\n"); - - return E_NOTIMPL; -} - static HRESULT WINAPI src_reader_ReadSample(IMFSourceReader *iface, DWORD index, DWORD flags, DWORD *actual_index, DWORD *stream_flags, LONGLONG *timestamp, IMFSample **sample) { struct source_reader *reader = impl_from_IMFSourceReader(iface); + struct source_reader_async_command *command; HRESULT hr;
TRACE("%p, %#x, %#x, %p, %p, %p, %p\n", iface, index, flags, actual_index, stream_flags, timestamp, sample); @@ -1294,7 +1544,14 @@ static HRESULT WINAPI src_reader_ReadSample(IMFSourceReader *iface, DWORD index, if (actual_index || stream_flags || timestamp || sample) return E_INVALIDARG;
- hr = source_reader_read_sample_async(reader, index, flags); + if (FAILED(hr = source_reader_create_async_op(SOURCE_READER_ASYNC_READ, &command))) + return hr; + + command->stream_index = index; + command->flags = flags; + + hr = MFPutWorkItem(MFASYNC_CALLBACK_QUEUE_STANDARD, &reader->async_commands_callback, &command->IUnknown_iface); + IUnknown_Release(&command->IUnknown_iface); } else hr = source_reader_read_sample(reader, index, flags, actual_index, stream_flags, timestamp, sample); @@ -1304,9 +1561,29 @@ static HRESULT WINAPI src_reader_ReadSample(IMFSourceReader *iface, DWORD index,
static HRESULT WINAPI src_reader_Flush(IMFSourceReader *iface, DWORD index) { + struct source_reader *reader = impl_from_IMFSourceReader(iface); + struct source_reader_async_command *command; + HRESULT hr; + FIXME("%p, %#x.\n", iface, index);
- return E_NOTIMPL; + if (reader->async_callback) + { + if (FAILED(hr = source_reader_create_async_op(SOURCE_READER_ASYNC_FLUSH, &command))) + return hr; + + command->stream_index = index; + + hr = MFPutWorkItem(MFASYNC_CALLBACK_QUEUE_MULTITHREADED, &reader->async_commands_callback, + &command->IUnknown_iface); + IUnknown_Release(&command->IUnknown_iface); + } + else + { + hr = E_NOTIMPL; + } + + return hr; }
static HRESULT WINAPI src_reader_GetServiceForStream(IMFSourceReader *iface, DWORD index, REFGUID service, @@ -1466,6 +1743,7 @@ static HRESULT create_source_reader_from_source(IMFMediaSource *source, IMFAttri object->IMFSourceReader_iface.lpVtbl = &srcreader_vtbl; object->source_events_callback.lpVtbl = &source_events_callback_vtbl; object->stream_events_callback.lpVtbl = &stream_events_callback_vtbl; + object->async_commands_callback.lpVtbl = &async_commands_callback_vtbl; object->refcount = 1; object->source = source; IMFMediaSource_AddRef(object->source);
Signed-off-by: Nikolay Sivov nsivov@codeweavers.com --- dlls/mfreadwrite/main.c | 70 ++++++++++++++++++++++++++++++++++------- 1 file changed, 59 insertions(+), 11 deletions(-)
diff --git a/dlls/mfreadwrite/main.c b/dlls/mfreadwrite/main.c index fd5c7d3ebc..1648f1b266 100644 --- a/dlls/mfreadwrite/main.c +++ b/dlls/mfreadwrite/main.c @@ -917,6 +917,59 @@ static HRESULT source_reader_get_stream_read_index(struct source_reader *reader, return hr; }
+static void source_reader_release_responses(struct media_stream *stream) +{ + struct stream_response *ptr, *next; + + LIST_FOR_EACH_ENTRY_SAFE(ptr, next, &stream->responses, struct stream_response, entry) + { + list_remove(&ptr->entry); + source_reader_release_response(ptr); + } +} + +static void source_reader_flush_stream(struct source_reader *reader, DWORD stream_index) +{ + struct media_stream *stream = &reader->streams[stream_index]; + + EnterCriticalSection(&stream->cs); + + source_reader_release_responses(stream); + if (stream->decoder) + IMFTransform_ProcessMessage(stream->decoder, MFT_MESSAGE_COMMAND_FLUSH, 0); + stream->requests = 0; + + LeaveCriticalSection(&stream->cs); +} + +static HRESULT source_reader_flush(struct source_reader *reader, unsigned int index) +{ + unsigned int stream_index; + + switch (index) + { + case MF_SOURCE_READER_FIRST_VIDEO_STREAM: + stream_index = reader->first_video_stream_index; + break; + case MF_SOURCE_READER_FIRST_AUDIO_STREAM: + stream_index = reader->first_audio_stream_index; + break; + case MF_SOURCE_READER_ALL_STREAMS: + for (stream_index = 0; stream_index < reader->stream_count; ++stream_index) + { + source_reader_flush_stream(reader, stream_index); + } + + break; + default: + stream_index = index; + } + + source_reader_flush_stream(reader, stream_index); + + return S_OK; +} + static HRESULT WINAPI source_reader_async_commands_callback_Invoke(IMFAsyncCallback *iface, IMFAsyncResult *result) { struct source_reader *reader = impl_from_async_commands_callback_IMFAsyncCallback(iface); @@ -985,7 +1038,9 @@ static HRESULT WINAPI source_reader_async_commands_callback_Invoke(IMFAsyncCallb
break; case SOURCE_READER_ASYNC_FLUSH: - FIXME("Async flushing is not implemented.\n"); + source_reader_flush(reader, command->stream_index); + + IMFSourceReaderCallback_OnFlush(reader->async_callback, command->stream_index); break; default: ; @@ -1058,7 +1113,6 @@ static ULONG WINAPI src_reader_Release(IMFSourceReader *iface) for (i = 0; i < reader->stream_count; ++i) { struct media_stream *stream = &reader->streams[i]; - struct stream_response *ptr, *next;
if (stream->stream) IMFMediaStream_Release(stream->stream); @@ -1068,11 +1122,7 @@ static ULONG WINAPI src_reader_Release(IMFSourceReader *iface) IMFTransform_Release(stream->decoder); DeleteCriticalSection(&stream->cs);
- LIST_FOR_EACH_ENTRY_SAFE(ptr, next, &stream->responses, struct stream_response, entry) - { - list_remove(&ptr->entry); - source_reader_release_response(ptr); - } + source_reader_release_responses(stream); } heap_free(reader->streams); DeleteCriticalSection(&reader->cs); @@ -1565,7 +1615,7 @@ static HRESULT WINAPI src_reader_Flush(IMFSourceReader *iface, DWORD index) struct source_reader_async_command *command; HRESULT hr;
- FIXME("%p, %#x.\n", iface, index); + TRACE("%p, %#x.\n", iface, index);
if (reader->async_callback) { @@ -1579,9 +1629,7 @@ static HRESULT WINAPI src_reader_Flush(IMFSourceReader *iface, DWORD index) IUnknown_Release(&command->IUnknown_iface); } else - { - hr = E_NOTIMPL; - } + hr = source_reader_flush(reader, index);
return hr; }
On 2020-03-23 08:22, Nikolay Sivov wrote:
From: Derek Lesho dlesho@codeweavers.com
Signed-off-by: Nikolay Sivov nsivov@codeweavers.com
v4:
- changed decoder calls logic;
- added support for external samples;
- now pulling all samples right on EOS event.
v3:
moved decoding calls to event handler, queue now always contains processed samples;
removed draining logic for now, I believe we can do that explicitly on ReadSample() or on EndOfStream instead. Same helper could be used for sync/async read and EOS event;
added a check for output transform flags;
dlls/mfreadwrite/main.c | 128 ++++++++++++++++++++++++++++++++++------ 1 file changed, 110 insertions(+), 18 deletions(-)
diff --git a/dlls/mfreadwrite/main.c b/dlls/mfreadwrite/main.c index d3c10a4d11..aa46ac188b 100644 --- a/dlls/mfreadwrite/main.c +++ b/dlls/mfreadwrite/main.c @@ -366,6 +366,96 @@ static ULONG WINAPI source_reader_stream_events_callback_Release(IMFAsyncCallbac return IMFSourceReader_Release(&reader->IMFSourceReader_iface); }
+static void source_reader_queue_sample(struct media_stream *stream, IMFSample *sample) +{
- struct sample *pending_sample;
- if (!sample)
return;
- pending_sample = heap_alloc(sizeof(*pending_sample));
- pending_sample->sample = sample;
- IMFSample_AddRef(pending_sample->sample);
- list_add_tail(&stream->samples, &pending_sample->entry);
+}
+static HRESULT source_reader_pull_stream_samples(struct media_stream *stream) +{
- MFT_OUTPUT_STREAM_INFO stream_info = { 0 };
- MFT_OUTPUT_DATA_BUFFER out_buffer;
- IMFMediaBuffer *buffer;
- DWORD status;
- HRESULT hr;
- if (FAILED(hr = IMFTransform_GetOutputStreamInfo(stream->decoder, 0, &stream_info)))
- {
WARN("Failed to get output stream info, hr %#x.\n", hr);
return hr;
- }
- for (;;)
- {
memset(&out_buffer, 0, sizeof(out_buffer));
if (!(stream_info.dwFlags & MFT_OUTPUT_STREAM_PROVIDES_SAMPLES))
{
if (FAILED(hr = MFCreateSample(&out_buffer.pSample)))
break;
if (FAILED(hr = MFCreateAlignedMemoryBuffer(stream_info.cbSize, stream_info.cbAlignment, &buffer)))
{
IMFSample_Release(out_buffer.pSample);
break;
}
IMFSample_AddBuffer(out_buffer.pSample, buffer);
IMFMediaBuffer_Release(buffer);
}
if (FAILED(hr = IMFTransform_ProcessOutput(stream->decoder, 0, 1, &out_buffer, &status)))
break;
It's probably important to free the sample we provide after ProcessOutput returns MF_E_TRANSFORM_NEEDS_MORE_INPUT
source_reader_queue_sample(stream, out_buffer.pSample);
if (out_buffer.pSample)
IMFSample_Release(out_buffer.pSample);
if (out_buffer.pEvents)
IMFCollection_Release(out_buffer.pEvents);
- }
- return hr;
+}
+static HRESULT source_reader_process_sample(struct media_stream *stream, IMFSample *sample) +{
- HRESULT hr;
- if (!stream->decoder)
- {
source_reader_queue_sample(stream, sample);
return S_OK;
- }
- /* It's assumed that decoder has 1 input and 1 output, both id's are 0. */
- hr = source_reader_pull_stream_samples(stream);
- if (hr == MF_E_TRANSFORM_NEED_MORE_INPUT)
- {
if (FAILED(hr = IMFTransform_ProcessInput(stream->decoder, 0, sample, 0)))
{
WARN("Transform failed to process input, hr %#x.\n", hr);
return hr;
}
if ((hr = source_reader_pull_stream_samples(stream)) == MF_E_TRANSFORM_NEED_MORE_INPUT)
return S_OK;
- }
- else
WARN("Transform failed to process output, hr %#x.\n", hr);
- return hr;
+}
- static HRESULT source_reader_media_sample_handler(struct source_reader *reader, IMFMediaStream *stream, IMFMediaEvent *event) {
@@ -393,21 +483,14 @@ static HRESULT source_reader_media_sample_handler(struct source_reader *reader, { if (id == reader->streams[i].id) {
struct sample *pending_sample;
if (!(pending_sample = heap_alloc(sizeof(*pending_sample))))
{
hr = E_OUTOFMEMORY;
goto failed;
}
EnterCriticalSection(&reader->streams[i].cs);
pending_sample->sample = sample;
IMFSample_AddRef(pending_sample->sample);
hr = source_reader_process_sample(&reader->streams[i], sample);
EnterCriticalSection(&reader->streams[i].cs);
list_add_tail(&reader->streams[i].samples, &pending_sample->entry); LeaveCriticalSection(&reader->streams[i].cs);
/* FIXME: propagate processing errors? */
WakeAllConditionVariable(&reader->streams[i].sample_event); break;
@@ -417,7 +500,6 @@ static HRESULT source_reader_media_sample_handler(struct source_reader *reader, if (i == reader->stream_count) WARN("Stream with id %#x was not present in presentation descriptor.\n", id);
-failed: IMFSample_Release(sample);
return hr;
@@ -438,26 +520,36 @@ static HRESULT source_reader_media_stream_state_handler(struct source_reader *re
for (i = 0; i < reader->stream_count; ++i) {
if (id == reader->streams[i].id)
struct media_stream *stream = &reader->streams[i];
if (id == stream->id) {
EnterCriticalSection(&reader->streams[i].cs);
EnterCriticalSection(&stream->cs); switch (event) { case MEEndOfStream:
reader->streams[i].state = STREAM_STATE_EOS;
stream->state = STREAM_STATE_EOS;
ReadSample uses this state to set MF_SOURCE_READERF_ENDOFSTREAM. We don't want this flag to be set until the drain is complete.
if (stream->decoder && SUCCEEDED(IMFTransform_ProcessMessage(stream->decoder,
MFT_MESSAGE_COMMAND_DRAIN, 0)))
{
if ((hr = source_reader_pull_stream_samples(stream)) != MF_E_TRANSFORM_NEED_MORE_INPUT)
WARN("Failed to pull pending samples, hr %#x.\n", hr);
}
break; case MEStreamSeeked: case MEStreamStarted:
reader->streams[i].state = STREAM_STATE_READY;
stream->state = STREAM_STATE_READY; break; default: ; }
LeaveCriticalSection(&reader->streams[i].cs);
LeaveCriticalSection(&stream->cs);
WakeAllConditionVariable(&reader->streams[i].sample_event);
WakeAllConditionVariable(&stream->sample_event); break; }
On 3/23/20 6:28 PM, Derek Lesho wrote:
On 2020-03-23 08:22, Nikolay Sivov wrote:
From: Derek Lesho dlesho@codeweavers.com
Signed-off-by: Nikolay Sivov nsivov@codeweavers.com
v4:
- changed decoder calls logic;
- added support for external samples;
- now pulling all samples right on EOS event.
v3:
- moved decoding calls to event handler, queue now always contains
processed samples;
- removed draining logic for now, I believe we can do that explicitly
on ReadSample() or on EndOfStream instead. Same helper could be used for sync/async read and EOS event;
- added a check for output transform flags;
dlls/mfreadwrite/main.c | 128 ++++++++++++++++++++++++++++++++++------ 1 file changed, 110 insertions(+), 18 deletions(-)
diff --git a/dlls/mfreadwrite/main.c b/dlls/mfreadwrite/main.c index d3c10a4d11..aa46ac188b 100644 --- a/dlls/mfreadwrite/main.c +++ b/dlls/mfreadwrite/main.c @@ -366,6 +366,96 @@ static ULONG WINAPI source_reader_stream_events_callback_Release(IMFAsyncCallbac return IMFSourceReader_Release(&reader->IMFSourceReader_iface); } +static void source_reader_queue_sample(struct media_stream *stream, IMFSample *sample) +{ + struct sample *pending_sample;
+ if (!sample) + return;
+ pending_sample = heap_alloc(sizeof(*pending_sample)); + pending_sample->sample = sample; + IMFSample_AddRef(pending_sample->sample);
+ list_add_tail(&stream->samples, &pending_sample->entry); +}
+static HRESULT source_reader_pull_stream_samples(struct media_stream *stream) +{ + MFT_OUTPUT_STREAM_INFO stream_info = { 0 }; + MFT_OUTPUT_DATA_BUFFER out_buffer; + IMFMediaBuffer *buffer; + DWORD status; + HRESULT hr;
+ if (FAILED(hr = IMFTransform_GetOutputStreamInfo(stream->decoder, 0, &stream_info))) + { + WARN("Failed to get output stream info, hr %#x.\n", hr); + return hr; + }
+ for (;;) + { + memset(&out_buffer, 0, sizeof(out_buffer));
+ if (!(stream_info.dwFlags & MFT_OUTPUT_STREAM_PROVIDES_SAMPLES)) + { + if (FAILED(hr = MFCreateSample(&out_buffer.pSample))) + break;
+ if (FAILED(hr = MFCreateAlignedMemoryBuffer(stream_info.cbSize, stream_info.cbAlignment, &buffer))) + { + IMFSample_Release(out_buffer.pSample); + break; + }
+ IMFSample_AddBuffer(out_buffer.pSample, buffer); + IMFMediaBuffer_Release(buffer); + }
+ if (FAILED(hr = IMFTransform_ProcessOutput(stream->decoder, 0, 1, &out_buffer, &status))) + break;
It's probably important to free the sample we provide after ProcessOutput returns MF_E_TRANSFORM_NEEDS_MORE_INPUT
Right.
+ source_reader_queue_sample(stream, out_buffer.pSample); + if (out_buffer.pSample) + IMFSample_Release(out_buffer.pSample); + if (out_buffer.pEvents) + IMFCollection_Release(out_buffer.pEvents); + }
+ return hr; +}
+static HRESULT source_reader_process_sample(struct media_stream *stream, IMFSample *sample) +{ + HRESULT hr;
+ if (!stream->decoder) + { + source_reader_queue_sample(stream, sample); + return S_OK; + }
+ /* It's assumed that decoder has 1 input and 1 output, both id's are 0. */
+ hr = source_reader_pull_stream_samples(stream); + if (hr == MF_E_TRANSFORM_NEED_MORE_INPUT) + { + if (FAILED(hr = IMFTransform_ProcessInput(stream->decoder, 0, sample, 0))) + { + WARN("Transform failed to process input, hr %#x.\n", hr); + return hr; + }
+ if ((hr = source_reader_pull_stream_samples(stream)) == MF_E_TRANSFORM_NEED_MORE_INPUT) + return S_OK; + } + else + WARN("Transform failed to process output, hr %#x.\n", hr);
+ return hr; +}
static HRESULT source_reader_media_sample_handler(struct source_reader *reader, IMFMediaStream *stream, IMFMediaEvent *event) { @@ -393,21 +483,14 @@ static HRESULT source_reader_media_sample_handler(struct source_reader *reader, { if (id == reader->streams[i].id) { - struct sample *pending_sample;
- if (!(pending_sample = heap_alloc(sizeof(*pending_sample)))) - { - hr = E_OUTOFMEMORY; - goto failed; - }
- EnterCriticalSection(&reader->streams[i].cs);
- pending_sample->sample = sample; - IMFSample_AddRef(pending_sample->sample); + hr = source_reader_process_sample(&reader->streams[i], sample); - EnterCriticalSection(&reader->streams[i].cs); - list_add_tail(&reader->streams[i].samples, &pending_sample->entry); LeaveCriticalSection(&reader->streams[i].cs); + /* FIXME: propagate processing errors? */
WakeAllConditionVariable(&reader->streams[i].sample_event); break; @@ -417,7 +500,6 @@ static HRESULT source_reader_media_sample_handler(struct source_reader *reader, if (i == reader->stream_count) WARN("Stream with id %#x was not present in presentation descriptor.\n", id); -failed: IMFSample_Release(sample); return hr; @@ -438,26 +520,36 @@ static HRESULT source_reader_media_stream_state_handler(struct source_reader *re for (i = 0; i < reader->stream_count; ++i) { - if (id == reader->streams[i].id) + struct media_stream *stream = &reader->streams[i];
+ if (id == stream->id) {
- EnterCriticalSection(&reader->streams[i].cs);
+ EnterCriticalSection(&stream->cs); switch (event) { case MEEndOfStream: - reader->streams[i].state = STREAM_STATE_EOS; + stream->state = STREAM_STATE_EOS;
ReadSample uses this state to set MF_SOURCE_READERF_ENDOFSTREAM. We don't want this flag to be set until the drain is complete.
Order doesn't matter, because both segments are protected with same section.
+ if (stream->decoder && SUCCEEDED(IMFTransform_ProcessMessage(stream->decoder, + MFT_MESSAGE_COMMAND_DRAIN, 0))) + { + if ((hr = source_reader_pull_stream_samples(stream)) != MF_E_TRANSFORM_NEED_MORE_INPUT) + WARN("Failed to pull pending samples, hr %#x.\n", hr); + }
break; case MEStreamSeeked: case MEStreamStarted: - reader->streams[i].state = STREAM_STATE_READY; + stream->state = STREAM_STATE_READY; break; default: ; } - LeaveCriticalSection(&reader->streams[i].cs); + LeaveCriticalSection(&stream->cs); - WakeAllConditionVariable(&reader->streams[i].sample_event);
- WakeAllConditionVariable(&stream->sample_event);
break; }