Signed-off-by: Nikolay Sivov nsivov@codeweavers.com --- dlls/mfreadwrite/reader.c | 228 ++++++++++++++++++++++---------------- 1 file changed, 130 insertions(+), 98 deletions(-)
diff --git a/dlls/mfreadwrite/reader.c b/dlls/mfreadwrite/reader.c index 9a7c5e5fd7..b8e1ab0106 100644 --- a/dlls/mfreadwrite/reader.c +++ b/dlls/mfreadwrite/reader.c @@ -108,9 +108,6 @@ struct media_stream IMFTransform *decoder; DWORD id; unsigned int index; - CRITICAL_SECTION cs; - CONDITION_VARIABLE sample_event; - struct list responses; enum media_stream_state state; BOOL selected; BOOL presented; @@ -150,7 +147,9 @@ struct source_reader enum media_source_state source_state; struct media_stream *streams; DWORD stream_count; + struct list responses; CRITICAL_SECTION cs; + CONDITION_VARIABLE sample_event; };
static inline struct source_reader *impl_from_IMFSourceReader(IMFSourceReader *iface) @@ -327,7 +326,7 @@ static void source_reader_queue_response(struct source_reader *reader, struct me if (response->sample) IMFSample_AddRef(response->sample);
- list_add_tail(&stream->responses, &response->entry); + list_add_tail(&reader->responses, &response->entry);
if (stream->requests) { @@ -343,7 +342,7 @@ static void source_reader_queue_response(struct source_reader *reader, struct me } } else - WakeAllConditionVariable(&stream->sample_event); + WakeAllConditionVariable(&reader->sample_event);
stream->requests--; } @@ -386,12 +385,15 @@ static HRESULT source_reader_new_stream_handler(struct source_reader *reader, IM return hr; }
+ EnterCriticalSection(&reader->cs); + for (i = 0; i < reader->stream_count; ++i) { if (id == reader->streams[i].id) { - if (!InterlockedCompareExchangePointer((void **)&reader->streams[i].stream, stream, NULL)) + if (!reader->streams[i].stream) { + reader->streams[i].stream = stream; IMFMediaStream_AddRef(reader->streams[i].stream); if (FAILED(hr = IMFMediaStream_BeginGetEvent(stream, &reader->stream_events_callback, (IUnknown *)stream))) @@ -399,10 +401,8 @@ static HRESULT source_reader_new_stream_handler(struct source_reader *reader, IM WARN("Failed to subscribe to stream events, hr %#x.\n", hr); }
- EnterCriticalSection(&reader->streams[i].cs); if (reader->streams[i].requests) source_reader_request_sample(reader, &reader->streams[i]); - LeaveCriticalSection(&reader->streams[i].cs); } break; } @@ -411,6 +411,8 @@ static HRESULT source_reader_new_stream_handler(struct source_reader *reader, IM if (i == reader->stream_count) WARN("Stream with id %#x was not present in presentation descriptor.\n", id);
+ LeaveCriticalSection(&reader->cs); + IMFMediaStream_Release(stream);
return hr; @@ -630,21 +632,19 @@ static HRESULT source_reader_media_sample_handler(struct source_reader *reader, return hr; }
+ EnterCriticalSection(&reader->cs); + for (i = 0; i < reader->stream_count; ++i) { if (id == reader->streams[i].id) { /* FIXME: propagate processing errors? */
- EnterCriticalSection(&reader->streams[i].cs); - reader->streams[i].flags &= ~STREAM_FLAG_SAMPLE_REQUESTED; hr = source_reader_process_sample(reader, &reader->streams[i], sample); if (reader->streams[i].requests) source_reader_request_sample(reader, &reader->streams[i]);
- LeaveCriticalSection(&reader->streams[i].cs); - break; } } @@ -652,6 +652,8 @@ static HRESULT source_reader_media_sample_handler(struct source_reader *reader, if (i == reader->stream_count) WARN("Stream with id %#x was not present in presentation descriptor.\n", id);
+ LeaveCriticalSection(&reader->cs); + IMFSample_Release(sample);
return hr; @@ -675,14 +677,14 @@ static HRESULT source_reader_media_stream_state_handler(struct source_reader *re return hr; }
+ EnterCriticalSection(&reader->cs); + for (i = 0; i < reader->stream_count; ++i) { struct media_stream *stream = &reader->streams[i];
if (id == stream->id) { - EnterCriticalSection(&stream->cs); - switch (event_type) { case MEEndOfStream: @@ -716,12 +718,12 @@ static HRESULT source_reader_media_stream_state_handler(struct source_reader *re ; }
- LeaveCriticalSection(&stream->cs); - break; } }
+ LeaveCriticalSection(&reader->cs); + return S_OK; }
@@ -790,18 +792,33 @@ static ULONG WINAPI source_reader_async_commands_callback_Release(IMFAsyncCallba return IMFSourceReader_Release(&reader->IMFSourceReader_iface); }
-static struct stream_response *media_stream_pop_response(struct media_stream *stream) +static struct stream_response *media_stream_pop_response(struct source_reader *reader, struct media_stream *stream) { - struct stream_response *response = NULL; + struct stream_response *response; struct list *head;
- if ((head = list_head(&stream->responses))) + if (stream) { - response = LIST_ENTRY(head, struct stream_response, entry); - list_remove(&response->entry); + LIST_FOR_EACH_ENTRY(response, &reader->responses, struct stream_response, entry) + { + if (response->stream_index == stream->index) + { + list_remove(&response->entry); + return response; + } + } + } + else + { + if ((head = list_head(&reader->responses))) + { + response = LIST_ENTRY(head, struct stream_response, entry); + list_remove(&response->entry); + return response; + } }
- return response; + return NULL; }
static void source_reader_release_response(struct stream_response *response) @@ -857,13 +874,38 @@ static HRESULT source_reader_start_source(struct source_reader *reader) return hr; }
+static BOOL source_reader_got_response_for_stream(struct source_reader *reader, struct media_stream *stream) +{ + struct stream_response *response; + + LIST_FOR_EACH_ENTRY(response, &reader->responses, struct stream_response, entry) + { + if (response->stream_index == stream->index) + return TRUE; + } + + return FALSE; +} + static BOOL source_reader_get_read_result(struct source_reader *reader, struct media_stream *stream, DWORD flags, HRESULT *status, DWORD *stream_index, DWORD *stream_flags, LONGLONG *timestamp, IMFSample **sample) { struct stream_response *response = NULL; BOOL request_sample = FALSE;
- if (list_empty(&stream->responses)) + if ((response = media_stream_pop_response(reader, stream))) + { + *status = response->status; + *stream_index = stream->index; + *stream_flags = response->stream_flags; + *timestamp = response->timestamp; + *sample = response->sample; + if (*sample) + IMFSample_AddRef(*sample); + + source_reader_release_response(response); + } + else { *status = S_OK; *stream_index = stream->index; @@ -880,20 +922,6 @@ static BOOL source_reader_get_read_result(struct source_reader *reader, struct m *stream_flags = 0; } } - else - { - response = media_stream_pop_response(stream); - - *status = response->status; - *stream_index = stream->index; - *stream_flags = response->stream_flags; - *timestamp = response->timestamp; - *sample = response->sample; - if (*sample) - IMFSample_AddRef(*sample); - - source_reader_release_response(response); - }
return !request_sample; } @@ -925,12 +953,19 @@ static HRESULT source_reader_get_stream_read_index(struct source_reader *reader, return hr; }
-static void source_reader_release_responses(struct media_stream *stream) +static void source_reader_release_responses(struct source_reader *reader, struct media_stream *stream) { struct stream_response *ptr, *next;
- LIST_FOR_EACH_ENTRY_SAFE(ptr, next, &stream->responses, struct stream_response, entry) + LIST_FOR_EACH_ENTRY_SAFE(ptr, next, &reader->responses, struct stream_response, entry) { + if (stream && stream->index != ptr->stream_index && + ptr->stream_index != MF_SOURCE_READER_FIRST_VIDEO_STREAM && + ptr->stream_index != MF_SOURCE_READER_FIRST_AUDIO_STREAM && + ptr->stream_index != MF_SOURCE_READER_ANY_STREAM) + { + continue; + } list_remove(&ptr->entry); source_reader_release_response(ptr); } @@ -938,42 +973,42 @@ static void source_reader_release_responses(struct media_stream *stream)
static void source_reader_flush_stream(struct source_reader *reader, DWORD stream_index) { - struct media_stream *stream = &reader->streams[stream_index]; - - EnterCriticalSection(&stream->cs); + struct media_stream *stream = stream_index == MF_SOURCE_READER_ALL_STREAMS ? NULL : &reader->streams[stream_index];
- source_reader_release_responses(stream); + source_reader_release_responses(reader, stream); if (stream->decoder) IMFTransform_ProcessMessage(stream->decoder, MFT_MESSAGE_COMMAND_FLUSH, 0); stream->requests = 0; - - LeaveCriticalSection(&stream->cs); }
static HRESULT source_reader_flush(struct source_reader *reader, unsigned int index) { unsigned int stream_index;
- switch (index) + EnterCriticalSection(&reader->cs); + + if (index == MF_SOURCE_READER_ALL_STREAMS) { - case MF_SOURCE_READER_FIRST_VIDEO_STREAM: - stream_index = reader->first_video_stream_index; - break; - case MF_SOURCE_READER_FIRST_AUDIO_STREAM: - stream_index = reader->first_audio_stream_index; - break; - case MF_SOURCE_READER_ALL_STREAMS: - for (stream_index = 0; stream_index < reader->stream_count; ++stream_index) - { - source_reader_flush_stream(reader, stream_index); - } + source_reader_flush_stream(reader, index); + } + else + { + switch (index) + { + case MF_SOURCE_READER_FIRST_VIDEO_STREAM: + stream_index = reader->first_video_stream_index; + break; + case MF_SOURCE_READER_FIRST_AUDIO_STREAM: + stream_index = reader->first_audio_stream_index; + break; + default: + stream_index = index; + }
- break; - default: - stream_index = index; + source_reader_flush_stream(reader, stream_index); }
- source_reader_flush_stream(reader, stream_index); + LeaveCriticalSection(&reader->cs);
return S_OK; } @@ -1002,9 +1037,9 @@ static HRESULT WINAPI source_reader_async_commands_callback_Invoke(IMFAsyncCallb if (FAILED(hr = source_reader_get_stream_read_index(reader, command->stream_index, &stream_index))) return hr;
- stream = &reader->streams[stream_index]; + EnterCriticalSection(&reader->cs);
- EnterCriticalSection(&stream->cs); + stream = &reader->streams[stream_index];
if (SUCCEEDED(hr = source_reader_start_source(reader))) { @@ -1017,7 +1052,7 @@ static HRESULT WINAPI source_reader_async_commands_callback_Invoke(IMFAsyncCallb } }
- LeaveCriticalSection(&stream->cs); + LeaveCriticalSection(&reader->cs);
if (report_sample) IMFSourceReaderCallback_OnReadSample(reader->async_callback, status, stream_index, stream_flags, @@ -1029,11 +1064,10 @@ static HRESULT WINAPI source_reader_async_commands_callback_Invoke(IMFAsyncCallb break;
case SOURCE_READER_ASYNC_SAMPLE_READY: - stream = &reader->streams[command->stream_index];
- EnterCriticalSection(&stream->cs); - response = media_stream_pop_response(stream); - LeaveCriticalSection(&stream->cs); + EnterCriticalSection(&reader->cs); + response = media_stream_pop_response(reader, NULL); + LeaveCriticalSection(&reader->cs);
if (response) { @@ -1126,10 +1160,8 @@ static ULONG WINAPI src_reader_Release(IMFSourceReader *iface) IMFMediaType_Release(stream->current); if (stream->decoder) IMFTransform_Release(stream->decoder); - DeleteCriticalSection(&stream->cs); - - source_reader_release_responses(stream); } + source_reader_release_responses(reader, NULL); heap_free(reader->streams); DeleteCriticalSection(&reader->cs); heap_free(reader); @@ -1544,39 +1576,40 @@ static HRESULT source_reader_read_sample(struct source_reader *reader, DWORD ind if (!actual_index) actual_index = &actual_index_tmp;
- if (FAILED(hr = source_reader_get_stream_read_index(reader, index, &stream_index))) - { - *actual_index = index; - *stream_flags = MF_SOURCE_READERF_ERROR; - *timestamp = 0; - return hr; - } - - *actual_index = stream_index; - - stream = &reader->streams[stream_index]; - - EnterCriticalSection(&stream->cs); + EnterCriticalSection(&reader->cs);
if (SUCCEEDED(hr = source_reader_start_source(reader))) { - if (!source_reader_get_read_result(reader, stream, flags, &hr, actual_index, stream_flags, - timestamp, sample)) + if (SUCCEEDED(hr = source_reader_get_stream_read_index(reader, index, &stream_index))) { - while (list_empty(&stream->responses) && stream->state != STREAM_STATE_EOS) + *actual_index = stream_index; + + stream = &reader->streams[stream_index]; + + if (!source_reader_get_read_result(reader, stream, flags, &hr, actual_index, stream_flags, + timestamp, sample)) { - stream->requests++; - if (FAILED(hr = source_reader_request_sample(reader, stream))) - WARN("Failed to request a sample, hr %#x.\n", hr); - SleepConditionVariableCS(&stream->sample_event, &stream->cs, INFINITE); - } + while (!source_reader_got_response_for_stream(reader, stream) && stream->state != STREAM_STATE_EOS) + { + stream->requests++; + if (FAILED(hr = source_reader_request_sample(reader, stream))) + WARN("Failed to request a sample, hr %#x.\n", hr); + SleepConditionVariableCS(&reader->sample_event, &reader->cs, INFINITE); + }
- source_reader_get_read_result(reader, stream, flags, &hr, actual_index, stream_flags, - timestamp, sample); + source_reader_get_read_result(reader, stream, flags, &hr, actual_index, stream_flags, + timestamp, sample); + } + } + else + { + *actual_index = index; + *stream_flags = MF_SOURCE_READERF_ERROR; + *timestamp = 0; } }
- LeaveCriticalSection(&stream->cs); + LeaveCriticalSection(&reader->cs);
TRACE("Stream %u, got sample %p, flags %#x.\n", *actual_index, *sample, *stream_flags);
@@ -1796,9 +1829,11 @@ static HRESULT create_source_reader_from_source(IMFMediaSource *source, IMFAttri object->stream_events_callback.lpVtbl = &stream_events_callback_vtbl; object->async_commands_callback.lpVtbl = &async_commands_callback_vtbl; object->refcount = 1; + list_init(&object->responses); object->source = source; IMFMediaSource_AddRef(object->source); InitializeCriticalSection(&object->cs); + InitializeConditionVariable(&object->sample_event);
if (FAILED(hr = IMFMediaSource_CreatePresentationDescriptor(object->source, &object->descriptor))) goto failed; @@ -1845,9 +1880,6 @@ static HRESULT create_source_reader_from_source(IMFMediaSource *source, IMFAttri break;
object->streams[i].index = i; - InitializeCriticalSection(&object->streams[i].cs); - InitializeConditionVariable(&object->streams[i].sample_event); - list_init(&object->streams[i].responses); }
if (FAILED(hr))