Signed-off-by: Giovanni Mascellani gmascellani@codeweavers.com --- dlls/winegstreamer/media_source.c | 95 +++++++++++++++++++++++++++---- 1 file changed, 85 insertions(+), 10 deletions(-)
diff --git a/dlls/winegstreamer/media_source.c b/dlls/winegstreamer/media_source.c index 3c87bbb2146..496aab545e7 100644 --- a/dlls/winegstreamer/media_source.c +++ b/dlls/winegstreamer/media_source.c @@ -27,6 +27,12 @@
WINE_DEFAULT_DEBUG_CHANNEL(mfplat);
+struct pending_sample +{ + struct list entry; + IMFSample *sample; +}; + struct media_stream { IMFMediaStream IMFMediaStream_iface; @@ -34,6 +40,7 @@ struct media_stream struct media_source *parent_source; IMFMediaEventQueue *event_queue; IMFStreamDescriptor *descriptor; + struct list pending_samples;
struct wg_parser_stream *wg_stream;
@@ -50,6 +57,7 @@ struct media_stream enum source_async_op { SOURCE_ASYNC_START, + SOURCE_ASYNC_PAUSE, SOURCE_ASYNC_STOP, SOURCE_ASYNC_REQUEST_SAMPLE, }; @@ -96,6 +104,7 @@ struct media_source { SOURCE_OPENING, SOURCE_STOPPED, + SOURCE_PAUSED, SOURCE_RUNNING, SOURCE_SHUTDOWN, } state; @@ -257,6 +266,23 @@ static IMFStreamDescriptor *stream_descriptor_from_id(IMFPresentationDescriptor return NULL; }
+static void flush_pending_queue(struct media_stream *stream, BOOL send_events) +{ + struct pending_sample *pending, *pending2; + + LIST_FOR_EACH_ENTRY_SAFE(pending, pending2, &stream->pending_samples, struct pending_sample, entry) + { + if (send_events) + { + IMFMediaEventQueue_QueueEventParamUnk(stream->event_queue, MEMediaSample, + &GUID_NULL, S_OK, (IUnknown *)pending->sample); + } + IMFSample_Release(pending->sample); + list_remove(&pending->entry); + free(pending); + } +} + static void start_pipeline(struct media_source *source, struct source_async_command *command) { PROPVARIANT *position = &command->u.start.position; @@ -269,7 +295,8 @@ static void start_pipeline(struct media_source *source, struct source_async_comm position->vt = VT_I8; position->hVal.QuadPart = 0; } - source->start_time = position->hVal.QuadPart; + if (position->vt != VT_EMPTY) + source->start_time = position->hVal.QuadPart;
for (i = 0; i < source->stream_count; i++) { @@ -318,6 +345,8 @@ static void start_pipeline(struct media_source *source, struct source_async_comm
IMFMediaEventQueue_QueueEventParamVar(stream->event_queue, seek_message ? MEStreamSeeked : MEStreamStarted, &GUID_NULL, S_OK, position); + + flush_pending_queue(stream, TRUE); } }
@@ -327,11 +356,30 @@ static void start_pipeline(struct media_source *source, struct source_async_comm
source->state = SOURCE_RUNNING;
- unix_funcs->wg_parser_stream_seek(source->streams[0]->wg_stream, 1.0, - position->hVal.QuadPart, 0, AM_SEEKING_AbsolutePositioning, AM_SEEKING_NoPositioning); + if (position->vt != VT_EMPTY) + unix_funcs->wg_parser_stream_seek(source->streams[0]->wg_stream, 1.0, + position->hVal.QuadPart, 0, AM_SEEKING_AbsolutePositioning, AM_SEEKING_NoPositioning); unix_funcs->wg_parser_end_flush(source->wg_parser); }
+static void pause_pipeline(struct media_source *source) +{ + unsigned int i; + + for (i = 0; i < source->stream_count; i++) + { + struct media_stream *stream = source->streams[i]; + if (stream->state != STREAM_INACTIVE) + { + IMFMediaEventQueue_QueueEventParamVar(stream->event_queue, MEStreamPaused, &GUID_NULL, S_OK, NULL); + } + } + + IMFMediaEventQueue_QueueEventParamVar(source->event_queue, MESourcePaused, &GUID_NULL, S_OK, NULL); + + source->state = SOURCE_PAUSED; +} + static void stop_pipeline(struct media_source *source) { unsigned int i; @@ -343,6 +391,7 @@ static void stop_pipeline(struct media_source *source) struct media_stream *stream = source->streams[i]; if (stream->state != STREAM_INACTIVE) { + flush_pending_queue(stream, FALSE); IMFMediaEventQueue_QueueEventParamVar(stream->event_queue, MEStreamStopped, &GUID_NULL, S_OK, NULL); unix_funcs->wg_parser_stream_disable(stream->wg_stream); } @@ -437,12 +486,26 @@ static void send_buffer(struct media_stream *stream, const struct wg_parser_even if (token) IMFSample_SetUnknown(sample, &MFSampleExtension_Token, token);
- IMFMediaEventQueue_QueueEventParamUnk(stream->event_queue, MEMediaSample, - &GUID_NULL, S_OK, (IUnknown *)sample); + if (stream->parent_source->state == SOURCE_PAUSED) + { + struct pending_sample *pending = malloc(sizeof(*pending)); + if (!pending) + { + ERR("Cannot allocate pending sample\n"); + goto out; + } + pending->sample = sample; + sample = NULL; + list_add_tail(&stream->pending_samples, &pending->entry); + } + else + IMFMediaEventQueue_QueueEventParamUnk(stream->event_queue, MEMediaSample, + &GUID_NULL, S_OK, (IUnknown *)sample);
out: IMFMediaBuffer_Release(buffer); - IMFSample_Release(sample); + if (sample) + IMFSample_Release(sample); }
static void wait_on_sample(struct media_stream *stream, IUnknown *token) @@ -501,6 +564,9 @@ static HRESULT WINAPI source_async_commands_Invoke(IMFAsyncCallback *iface, IMFA case SOURCE_ASYNC_START: start_pipeline(source, command); break; + case SOURCE_ASYNC_PAUSE: + pause_pipeline(source); + break; case SOURCE_ASYNC_STOP: stop_pipeline(source); break; @@ -597,6 +663,7 @@ static ULONG WINAPI media_stream_Release(IMFMediaStream *iface) { if (stream->event_queue) IMFMediaEventQueue_Release(stream->event_queue); + flush_pending_queue(stream, FALSE); free(stream); }
@@ -699,7 +766,6 @@ static HRESULT WINAPI media_stream_RequestSample(IMFMediaStream *iface, IUnknown IUnknown_AddRef(token); command->u.request_sample.token = token;
- /* Once pause support is added, this will need to put into a stream queue, and synchronization will need to be added*/ hr = MFPutWorkItem(stream->parent_source->async_commands_queue, &stream->parent_source->async_commands_callback, &command->IUnknown_iface); }
@@ -738,6 +804,7 @@ static HRESULT new_media_stream(struct media_source *source, object->state = STREAM_INACTIVE; object->eos = FALSE; object->wg_stream = wg_stream; + list_init(&object->pending_samples);
if (FAILED(hr = MFCreateEventQueue(&object->event_queue))) goto fail; @@ -1123,7 +1190,7 @@ static HRESULT WINAPI media_source_GetCharacteristics(IMFMediaSource *iface, DWO if (source->state == SOURCE_SHUTDOWN) return MF_E_SHUTDOWN;
- *characteristics = MFMEDIASOURCE_CAN_SEEK; + *characteristics = MFMEDIASOURCE_CAN_SEEK | MFMEDIASOURCE_CAN_PAUSE;
return S_OK; } @@ -1187,13 +1254,21 @@ static HRESULT WINAPI media_source_Stop(IMFMediaSource *iface) static HRESULT WINAPI media_source_Pause(IMFMediaSource *iface) { struct media_source *source = impl_from_IMFMediaSource(iface); + struct source_async_command *command; + HRESULT hr;
- FIXME("(%p): stub\n", source); + TRACE("(%p)\n", source);
if (source->state == SOURCE_SHUTDOWN) return MF_E_SHUTDOWN;
- return E_NOTIMPL; + if (source->state != SOURCE_RUNNING) + return MF_E_INVALID_STATE_TRANSITION; + + if (SUCCEEDED(hr = source_create_async_op(SOURCE_ASYNC_PAUSE, &command))) + hr = MFPutWorkItem(source->async_commands_queue, &source->async_commands_callback, &command->IUnknown_iface); + + return hr; }
static HRESULT WINAPI media_source_Shutdown(IMFMediaSource *iface)
After end of presentation, the session should return to the stopped state, which means that can be restarted again. END_OF_PRESENTATION flags should not remain set in this case.
Signed-off-by: Giovanni Mascellani gmascellani@codeweavers.com --- dlls/mf/session.c | 6 ++++++ 1 file changed, 6 insertions(+)
diff --git a/dlls/mf/session.c b/dlls/mf/session.c index 9f233295773..ef2d15c9ef5 100644 --- a/dlls/mf/session.c +++ b/dlls/mf/session.c @@ -1061,6 +1061,7 @@ static void session_set_stopped(struct media_session *session, HRESULT status) { MediaEventType event_type; IMFMediaEvent *event; + struct media_source *source;;
session->state = SESSION_STATE_STOPPED; event_type = session->presentation.flags & SESSION_FLAG_END_OF_PRESENTATION ? MESessionEnded : MESessionStopped; @@ -1071,6 +1072,11 @@ static void session_set_stopped(struct media_session *session, HRESULT status) IMFMediaEventQueue_QueueEvent(session->event_queue, event); IMFMediaEvent_Release(event); } + session->presentation.flags &= ~SESSION_FLAG_END_OF_PRESENTATION; + LIST_FOR_EACH_ENTRY(source, &session->presentation.sources, struct media_source, entry) + { + source->flags &= ~SOURCE_FLAG_END_OF_PRESENTATION; + } session_command_complete(session); }
On 6/10/21 6:33 PM, Giovanni Mascellani wrote:
After end of presentation, the session should return to the stopped state, which means that can be restarted again. END_OF_PRESENTATION flags should not remain set in this case.
Just making sure, to reproduce this I need to let it play to the end, and then calling Start(0) doesn't replay?
Hi,
Il 11/06/21 08:04, Nikolay Sivov ha scritto:
Just making sure, to reproduce this I need to let it play to the end, and then calling Start(0) doesn't replay?
Yes, you reproduce to the end, then restart with Start(0); the session restarts, but with the SESSION_FLAG_END_OF_PRESENTATION flag (incorrectly) set. This means, for example, that if you later call Close it will do the wrong thing for example at [1]. This ultimately leads at not delivering the MESessionClosed event, which causes a freeze in Deep Rock Galactic.
[1] https://source.winehq.org/git/wine.git/blob/refs/heads/master:/dlls/mf/sessi...
Giovanni.
On 6/10/21 11:33 AM, Giovanni Mascellani wrote:
Signed-off-by: Giovanni Mascellani gmascellani@codeweavers.com
dlls/winegstreamer/media_source.c | 95 +++++++++++++++++++++++++++---- 1 file changed, 85 insertions(+), 10 deletions(-)
This patch probably warrants a few tests. I would test at least
- what happens to ::RequestSample calls when a stream is paused.
- whether MESourcePaused or the MEStreamPaused events come first.
diff --git a/dlls/winegstreamer/media_source.c b/dlls/winegstreamer/media_source.c index 3c87bbb2146..496aab545e7 100644 --- a/dlls/winegstreamer/media_source.c +++ b/dlls/winegstreamer/media_source.c @@ -27,6 +27,12 @@
WINE_DEFAULT_DEBUG_CHANNEL(mfplat);
+struct pending_sample +{
- struct list entry;
- IMFSample *sample;
+};
- struct media_stream { IMFMediaStream IMFMediaStream_iface;
@@ -34,6 +40,7 @@ struct media_stream struct media_source *parent_source; IMFMediaEventQueue *event_queue; IMFStreamDescriptor *descriptor;
struct list pending_samples;
struct wg_parser_stream *wg_stream;
@@ -50,6 +57,7 @@ struct media_stream enum source_async_op { SOURCE_ASYNC_START,
- SOURCE_ASYNC_PAUSE, SOURCE_ASYNC_STOP, SOURCE_ASYNC_REQUEST_SAMPLE, };
@@ -96,6 +104,7 @@ struct media_source { SOURCE_OPENING, SOURCE_STOPPED,
SOURCE_PAUSED, SOURCE_RUNNING, SOURCE_SHUTDOWN, } state;
@@ -257,6 +266,23 @@ static IMFStreamDescriptor *stream_descriptor_from_id(IMFPresentationDescriptor return NULL; }
+static void flush_pending_queue(struct media_stream *stream, BOOL send_events) +{
- struct pending_sample *pending, *pending2;
- LIST_FOR_EACH_ENTRY_SAFE(pending, pending2, &stream->pending_samples, struct pending_sample, entry)
- {
if (send_events)
{
IMFMediaEventQueue_QueueEventParamUnk(stream->event_queue, MEMediaSample,
&GUID_NULL, S_OK, (IUnknown *)pending->sample);
}
IMFSample_Release(pending->sample);
list_remove(&pending->entry);
free(pending);
- }
+}
- static void start_pipeline(struct media_source *source, struct source_async_command *command) { PROPVARIANT *position = &command->u.start.position;
@@ -269,7 +295,8 @@ static void start_pipeline(struct media_source *source, struct source_async_comm position->vt = VT_I8; position->hVal.QuadPart = 0; }
- source->start_time = position->hVal.QuadPart;
if (position->vt != VT_EMPTY)
source->start_time = position->hVal.QuadPart; for (i = 0; i < source->stream_count; i++) {
@@ -318,6 +345,8 @@ static void start_pipeline(struct media_source *source, struct source_async_comm
IMFMediaEventQueue_QueueEventParamVar(stream->event_queue, seek_message ? MEStreamSeeked : MEStreamStarted, &GUID_NULL, S_OK, position);
flush_pending_queue(stream, TRUE); } }
@@ -327,11 +356,30 @@ static void start_pipeline(struct media_source *source, struct source_async_comm
source->state = SOURCE_RUNNING;
- unix_funcs->wg_parser_stream_seek(source->streams[0]->wg_stream, 1.0,
position->hVal.QuadPart, 0, AM_SEEKING_AbsolutePositioning, AM_SEEKING_NoPositioning);
- if (position->vt != VT_EMPTY)
unix_funcs->wg_parser_stream_seek(source->streams[0]->wg_stream, 1.0,
}position->hVal.QuadPart, 0, AM_SEEKING_AbsolutePositioning, AM_SEEKING_NoPositioning); unix_funcs->wg_parser_end_flush(source->wg_parser);
+static void pause_pipeline(struct media_source *source) +{
- unsigned int i;
- for (i = 0; i < source->stream_count; i++)
- {
struct media_stream *stream = source->streams[i];
if (stream->state != STREAM_INACTIVE)
{
IMFMediaEventQueue_QueueEventParamVar(stream->event_queue, MEStreamPaused, &GUID_NULL, S_OK, NULL);
}
- }
- IMFMediaEventQueue_QueueEventParamVar(source->event_queue, MESourcePaused, &GUID_NULL, S_OK, NULL);
- source->state = SOURCE_PAUSED;
+}
- static void stop_pipeline(struct media_source *source) { unsigned int i;
@@ -343,6 +391,7 @@ static void stop_pipeline(struct media_source *source) struct media_stream *stream = source->streams[i]; if (stream->state != STREAM_INACTIVE) {
flush_pending_queue(stream, FALSE); IMFMediaEventQueue_QueueEventParamVar(stream->event_queue, MEStreamStopped, &GUID_NULL, S_OK, NULL); unix_funcs->wg_parser_stream_disable(stream->wg_stream); }
@@ -437,12 +486,26 @@ static void send_buffer(struct media_stream *stream, const struct wg_parser_even if (token) IMFSample_SetUnknown(sample, &MFSampleExtension_Token, token);
- IMFMediaEventQueue_QueueEventParamUnk(stream->event_queue, MEMediaSample,
&GUID_NULL, S_OK, (IUnknown *)sample);
- if (stream->parent_source->state == SOURCE_PAUSED)
- {
struct pending_sample *pending = malloc(sizeof(*pending));
if (!pending)
{
ERR("Cannot allocate pending sample\n");
goto out;
}
pending->sample = sample;
sample = NULL;
list_add_tail(&stream->pending_samples, &pending->entry);
Since our media source outputs uncompressed samples, if it's common behavior for an application to request a significant amount of samples while the source is paused, we might instead want to add a separate sample request queue. (Lest we use too much memory) I think this is what I was referring to in my old comment.
}
else
IMFMediaEventQueue_QueueEventParamUnk(stream->event_queue, MEMediaSample,
&GUID_NULL, S_OK, (IUnknown *)sample);
out: IMFMediaBuffer_Release(buffer);
- IMFSample_Release(sample);
if (sample)
IMFSample_Release(sample);
}
static void wait_on_sample(struct media_stream *stream, IUnknown *token)
@@ -501,6 +564,9 @@ static HRESULT WINAPI source_async_commands_Invoke(IMFAsyncCallback *iface, IMFA case SOURCE_ASYNC_START: start_pipeline(source, command); break;
case SOURCE_ASYNC_PAUSE:
pause_pipeline(source);
break; case SOURCE_ASYNC_STOP: stop_pipeline(source); break;
@@ -597,6 +663,7 @@ static ULONG WINAPI media_stream_Release(IMFMediaStream *iface) { if (stream->event_queue) IMFMediaEventQueue_Release(stream->event_queue);
flush_pending_queue(stream, FALSE); free(stream); }
@@ -699,7 +766,6 @@ static HRESULT WINAPI media_stream_RequestSample(IMFMediaStream *iface, IUnknown IUnknown_AddRef(token); command->u.request_sample.token = token;
/* Once pause support is added, this will need to put into a stream queue, and synchronization will need to be added*/ hr = MFPutWorkItem(stream->parent_source->async_commands_queue, &stream->parent_source->async_commands_callback, &command->IUnknown_iface); }
@@ -738,6 +804,7 @@ static HRESULT new_media_stream(struct media_source *source, object->state = STREAM_INACTIVE; object->eos = FALSE; object->wg_stream = wg_stream;
list_init(&object->pending_samples);
if (FAILED(hr = MFCreateEventQueue(&object->event_queue))) goto fail;
@@ -1123,7 +1190,7 @@ static HRESULT WINAPI media_source_GetCharacteristics(IMFMediaSource *iface, DWO if (source->state == SOURCE_SHUTDOWN) return MF_E_SHUTDOWN;
- *characteristics = MFMEDIASOURCE_CAN_SEEK;
*characteristics = MFMEDIASOURCE_CAN_SEEK | MFMEDIASOURCE_CAN_PAUSE;
return S_OK; }
@@ -1187,13 +1254,21 @@ static HRESULT WINAPI media_source_Stop(IMFMediaSource *iface) static HRESULT WINAPI media_source_Pause(IMFMediaSource *iface) { struct media_source *source = impl_from_IMFMediaSource(iface);
- struct source_async_command *command;
- HRESULT hr;
- FIXME("(%p): stub\n", source);
TRACE("(%p)\n", source);
if (source->state == SOURCE_SHUTDOWN) return MF_E_SHUTDOWN;
- return E_NOTIMPL;
if (source->state != SOURCE_RUNNING)
return MF_E_INVALID_STATE_TRANSITION;
if (SUCCEEDED(hr = source_create_async_op(SOURCE_ASYNC_PAUSE, &command)))
hr = MFPutWorkItem(source->async_commands_queue, &source->async_commands_callback, &command->IUnknown_iface);
return hr; }
static HRESULT WINAPI media_source_Shutdown(IMFMediaSource *iface)
Hi,
Il 10/06/21 18:12, Derek Lesho ha scritto:
This patch probably warrants a few tests. I would test at least
In line of principle I would agree. The reason I am not submitting any is that Nikolay told me on the chat, as I understand it, that for this kind of patches he prefers tests to be left out-of-tree. Since he is the one who usually reviews my MF patches, I am inclined to meet his requests.
- what happens to ::RequestSample calls when a stream is paused.
As soon as the source is restarted, a corresponding number of MEMediaSample is generated.
I assumed that this means that samples are generated anyway during pause and are later emitted after Start, but as others have noted this is not necessarily true: requests could be queued instead of samples. The two cases differ for example if the Start includes a seek (if requests are stored, then samples are emitted at the freshly seeked location, otherwise they are emitted at the location of pausing). And thinking again about it storing the requests seems more logical than storing the samples. Tomorrow I'll check on Windows to see what's happening there.
- whether MESourcePaused or the MEStreamPaused events come first.
Hmm, does this really make sense? They are emitted on two different queues. Does MF guarantees order of delivery across different queues? Even if one is submitted before the other, the system scheduler might decide to execute in the opposite order, if it wakes up the other thread first. Or is there a stronger guarantee?
- IMFMediaEventQueue_QueueEventParamUnk(stream->event_queue, MEMediaSample, - &GUID_NULL, S_OK, (IUnknown *)sample); + if (stream->parent_source->state == SOURCE_PAUSED) + { + struct pending_sample *pending = malloc(sizeof(*pending)); + if (!pending) + { + ERR("Cannot allocate pending sample\n"); + goto out; + } + pending->sample = sample; + sample = NULL; + list_add_tail(&stream->pending_samples, &pending->entry);
Since our media source outputs uncompressed samples, if it's common behavior for an application to request a significant amount of samples while the source is paused, we might instead want to add a separate sample request queue. (Lest we use too much memory) I think this is what I was referring to in my old comment.
More than the memory usage (which I doubt will be significant: after all until now apparently even the usage of Pause was not that pressing), I find concerning the observable behavior, as I said above. I will check better.
Thanks for the review, Giovanni.
On 6/10/21 3:17 PM, Giovanni Mascellani wrote:
Hi,
Il 10/06/21 18:12, Derek Lesho ha scritto:
This patch probably warrants a few tests. I would test at least
In line of principle I would agree. The reason I am not submitting any is that Nikolay told me on the chat, as I understand it, that for this kind of patches he prefers tests to be left out-of-tree. Since he is the one who usually reviews my MF patches, I am inclined to meet his requests.
- what happens to ::RequestSample calls when a stream is paused.
As soon as the source is restarted, a corresponding number of MEMediaSample is generated.
I assumed that this means that samples are generated anyway during pause and are later emitted after Start, but as others have noted this is not necessarily true: requests could be queued instead of samples. The two cases differ for example if the Start includes a seek (if requests are stored, then samples are emitted at the freshly seeked location, otherwise they are emitted at the location of pausing). And thinking again about it storing the requests seems more logical than storing the samples. Tomorrow I'll check on Windows to see what's happening there.
Yeah that would be really useful to see. My impression is that, since on Windows the stream has a separate request queue, the simplest implementation would just stop processing requests on the stream queue while the stream is paused. This wouldn't cause problems with the source receiving the start request, as it has its own queue and then upon starting again the stream would run through its saved requests.
- whether MESourcePaused or the MEStreamPaused events come first.
Hmm, does this really make sense? They are emitted on two different queues. Does MF guarantees order of delivery across different queues? Even if one is submitted before the other, the system scheduler might decide to execute in the opposite order, if it wakes up the other thread first. Or is there a stronger guarantee?
You're right, never mind.
- IMFMediaEventQueue_QueueEventParamUnk(stream->event_queue,
MEMediaSample, - &GUID_NULL, S_OK, (IUnknown *)sample); + if (stream->parent_source->state == SOURCE_PAUSED) + { + struct pending_sample *pending = malloc(sizeof(*pending)); + if (!pending) + { + ERR("Cannot allocate pending sample\n"); + goto out; + } + pending->sample = sample; + sample = NULL; + list_add_tail(&stream->pending_samples, &pending->entry);
Since our media source outputs uncompressed samples, if it's common behavior for an application to request a significant amount of samples while the source is paused, we might instead want to add a separate sample request queue. (Lest we use too much memory) I think this is what I was referring to in my old comment.
More than the memory usage (which I doubt will be significant: after all until now apparently even the usage of Pause was not that pressing), I find concerning the observable behavior, as I said above. I will check better.
Thanks for the review, Giovanni.
Hi,
Il 10/06/21 21:25, Derek Lesho ha scritto:
Yeah that would be really useful to see. My impression is that, since on Windows the stream has a separate request queue, the simplest implementation would just stop processing requests on the stream queue while the stream is paused. This wouldn't cause problems with the source receiving the start request, as it has its own queue and then upon starting again the stream would run through its saved requests.
In the end, I didn't end up implementing this strategy. I agree that it is conceptually simpler and probably it is what happens on Windows, but I believe my implementation (v2) should be essentially equivalent and it requires less locking and coordination between different queues (because there is just one).
Giovanni.
On 6/11/21 6:53 AM, Giovanni Mascellani wrote:
Hi,
Il 10/06/21 21:25, Derek Lesho ha scritto:
Yeah that would be really useful to see. My impression is that, since on Windows the stream has a separate request queue, the simplest implementation would just stop processing requests on the stream queue while the stream is paused. This wouldn't cause problems with the source receiving the start request, as it has its own queue and then upon starting again the stream would run through its saved requests.
In the end, I didn't end up implementing this strategy. I agree that it is conceptually simpler and probably it is what happens on Windows, but I believe my implementation (v2) should be essentially equivalent and it requires less locking and coordination between different queues (because there is just one).
Yep, fully agreed. I believe my original message was unclear, I more meant "given that Windows decided to use the (admittedly more complex) multi-queue approach, the simplest implementation on their end would probably just stop processing requests on pause, and therefore it would probably be more accurate to queue requests than samples".
Giovanni.
On 6/10/21 10:17 PM, Giovanni Mascellani wrote:
Hi,
Il 10/06/21 18:12, Derek Lesho ha scritto:
This patch probably warrants a few tests. I would test at least
In line of principle I would agree. The reason I am not submitting any is that Nikolay told me on the chat, as I understand it, that for this kind of patches he prefers tests to be left out-of-tree. Since he is the one who usually reviews my MF patches, I am inclined to meet his requests.
That was about event tests mostly, that proved to be a problem. Test programs are still useful obviously.
- what happens to ::RequestSample calls when a stream is paused.
As soon as the source is restarted, a corresponding number of MEMediaSample is generated.
I assumed that this means that samples are generated anyway during pause and are later emitted after Start, but as others have noted this is not necessarily true: requests could be queued instead of samples. The two cases differ for example if the Start includes a seek (if requests are stored, then samples are emitted at the freshly seeked location, otherwise they are emitted at the location of pausing). And thinking again about it storing the requests seems more logical than storing the samples. Tomorrow I'll check on Windows to see what's happening there.
Counter sounds certainly easier. How are you going to check that on Windows? By looking at stream data access? Event if it does read when paused, I don't know if we care, because that shouldn't happen in session context, and in source reader context source is never paused.
- whether MESourcePaused or the MEStreamPaused events come first.
Hmm, does this really make sense? They are emitted on two different queues. Does MF guarantees order of delivery across different queues? Even if one is submitted before the other, the system scheduler might decide to execute in the opposite order, if it wakes up the other thread first. Or is there a stronger guarantee?
They don't explicitly guarantee that, but I think it's more reliable than it appears to be.
Hi,
Il 11/06/21 07:55, Nikolay Sivov ha scritto:
Counter sounds certainly easier. How are you going to check that on Windows? By looking at stream data access? Event if it does read when paused, I don't know if we care, because that shouldn't happen in session context, and in source reader context source is never paused.
No, I am not looking at the stream access. I don't think it is really meaningful to do it, it can depends on a lot of implementation details, including local caching, and I don't want to unravel that.
What I noticed is the following observable behavior on Windows: if you request samples while the stream is paused and then restart without seeking, then you immediately receive a corresponding number of samples, of course starting from the position you left your stream at. I don't know whether they have been processed while the stream was paused or just after it was restarted, but it shouldn't matter much, because I need to replicate the behavior of the media source, not its internal status.
When I pause the media source, request samples and then restart with a seek, then the sample requests I had done while paused are dropped.
In order to implement this behavior, it is enough to queue sample requests while paused, together with their tokens. After yesterday's discussion, this seems more sensible than queuing samples, because it saves on CPU time and memory.
I will submit an updated version of my patch soon.
Giovanni.
I agree with Derek's comment here. Assuming that buffering samples like this is the right thing to do, I also wonder:
* should we buffer samples, or requests?
* should the samples/requests be stored in a flat array instead of a linked list? Based on usage patterns it seems that'd be easier and more efficient.
On 6/10/21 7:25 PM, Zebediah Figura (she/her) wrote:
I agree with Derek's comment here. Assuming that buffering samples like this is the right thing to do, I also wonder:
should we buffer samples, or requests?
should the samples/requests be stored in a flat array instead of a
linked list? Based on usage patterns it seems that'd be easier and more efficient.
Agreed, list vs array is not really important one. Why do we need to buffer samples, is that to cover for discrepancy for already issued requests before Pause() was called? And what should really happen there, if we queue async command on RequestSample() (which is already questionable) any number of times, and then call Pause(), or Stop(), we should have a way to drop pending requests. And that's not possible with MFPutWorkItem().
With this patch any Start() when paused results in draining all the samples to the receiver, and that's not obviously correct, unless that was somehow tested. I imagine if you seek to a new position in either direction, you're not really interested in old samples. If session currently does not handle this correctly (by resetting expected samples counters, etc), we should figure out how to test it separately.
- unix_funcs->wg_parser_stream_seek(source->streams[0]->wg_stream, 1.0,
position->hVal.QuadPart, 0, AM_SEEKING_AbsolutePositioning, AM_SEEKING_NoPositioning);
- if (position->vt != VT_EMPTY)
unix_funcs->wg_parser_stream_seek(source->streams[0]->wg_stream, 1.0,
unix_funcs->wg_parser_end_flush(source->wg_parser);position->hVal.QuadPart, 0, AM_SEEKING_AbsolutePositioning, AM_SEEKING_NoPositioning);
Is this really relevant to Pause() ?
@@ -318,6 +345,8 @@ static void start_pipeline(struct media_source *source, struct source_async_comm
IMFMediaEventQueue_QueueEventParamVar(stream->event_queue, seek_message ? MEStreamSeeked : MEStreamStarted, &GUID_NULL, S_OK, position);
}flush_pending_queue(stream, TRUE); }
This will send old samples before MESourceStarted/MESourceSeeked, which is questionable.
Hi,
Il 10/06/21 18:59, Nikolay Sivov ha scritto:
On 6/10/21 7:25 PM, Zebediah Figura (she/her) wrote:
I agree with Derek's comment here. Assuming that buffering samples like this is the right thing to do, I also wonder:
should we buffer samples, or requests?
should the samples/requests be stored in a flat array instead of a
linked list? Based on usage patterns it seems that'd be easier and more efficient.
Agreed, list vs array is not really important one. Why do we need to buffer samples, is that to cover for discrepancy for already issued requests before Pause() was called? And what should really happen there, if we queue async command on RequestSample() (which is already questionable) any number of times, and then call Pause(), or Stop(), we should have a way to drop pending requests. And that's not possible with MFPutWorkItem().
With this patch any Start() when paused results in draining all the samples to the receiver, and that's not obviously correct, unless that was somehow tested. I imagine if you seek to a new position in either direction, you're not really interested in old samples. If session currently does not handle this correctly (by resetting expected samples counters, etc), we should figure out how to test it separately.
We need to somehow remember how many times RequestSample was called during pause, because when we start again we have to emit a corresponding number of samples. Windows does that.
As for whether we need to buffer requests or samples, I will check tomorrow, as I already discussed replying to other reviews.
- unix_funcs->wg_parser_stream_seek(source->streams[0]->wg_stream, 1.0,
position->hVal.QuadPart, 0, AM_SEEKING_AbsolutePositioning, AM_SEEKING_NoPositioning);
- if (position->vt != VT_EMPTY)
unix_funcs->wg_parser_stream_seek(source->streams[0]->wg_stream, 1.0,
position->hVal.QuadPart, 0, AM_SEEKING_AbsolutePositioning, AM_SEEKING_NoPositioning); unix_funcs->wg_parser_end_flush(source->wg_parser);
Is this really relevant to Pause() ?
No, you're right. I will move it to another patch.
@@ -318,6 +345,8 @@ static void start_pipeline(struct media_source *source, struct source_async_comm
IMFMediaEventQueue_QueueEventParamVar(stream->event_queue, seek_message ? MEStreamSeeked : MEStreamStarted, &GUID_NULL, S_OK, position);
flush_pending_queue(stream, TRUE); } }
This will send old samples before MESourceStarted/MESourceSeeked, which is questionable.
As I asked to Derek, is it relevant to compare which event comes first from two different queues? I see no indication that two different queues should be somehow synchronized. Even if queued MESourceStarted before, the application could receive it later.
Thanks, Giovanni.
On 6/10/21 10:36 PM, Giovanni Mascellani wrote:
Hi,
Il 10/06/21 18:59, Nikolay Sivov ha scritto:
On 6/10/21 7:25 PM, Zebediah Figura (she/her) wrote:
I agree with Derek's comment here. Assuming that buffering samples like this is the right thing to do, I also wonder:
should we buffer samples, or requests?
should the samples/requests be stored in a flat array instead of a
linked list? Based on usage patterns it seems that'd be easier and more efficient.
Agreed, list vs array is not really important one. Why do we need to buffer samples, is that to cover for discrepancy for already issued requests before Pause() was called? And what should really happen there, if we queue async command on RequestSample() (which is already questionable) any number of times, and then call Pause(), or Stop(), we should have a way to drop pending requests. And that's not possible with MFPutWorkItem().
With this patch any Start() when paused results in draining all the samples to the receiver, and that's not obviously correct, unless that was somehow tested. I imagine if you seek to a new position in either direction, you're not really interested in old samples. If session currently does not handle this correctly (by resetting expected samples counters, etc), we should figure out how to test it separately.
We need to somehow remember how many times RequestSample was called during pause, because when we start again we have to emit a corresponding number of samples. Windows does that.
As for whether we need to buffer requests or samples, I will check tomorrow, as I already discussed replying to other reviews.
- unix_funcs->wg_parser_stream_seek(source->streams[0]->wg_stream, 1.0, - position->hVal.QuadPart, 0, AM_SEEKING_AbsolutePositioning, AM_SEEKING_NoPositioning); + if (position->vt != VT_EMPTY) + unix_funcs->wg_parser_stream_seek(source->streams[0]->wg_stream, 1.0, + position->hVal.QuadPart, 0, AM_SEEKING_AbsolutePositioning, AM_SEEKING_NoPositioning); unix_funcs->wg_parser_end_flush(source->wg_parser);
Is this really relevant to Pause() ?
No, you're right. I will move it to another patch.
@@ -318,6 +345,8 @@ static void start_pipeline(struct media_source *source, struct source_async_comm IMFMediaEventQueue_QueueEventParamVar(stream->event_queue, seek_message ? MEStreamSeeked : MEStreamStarted, &GUID_NULL, S_OK, position);
+ flush_pending_queue(stream, TRUE); } }
This will send old samples before MESourceStarted/MESourceSeeked, which is questionable.
As I asked to Derek, is it relevant to compare which event comes first from two different queues? I see no indication that two different queues should be somehow synchronized. Even if queued MESourceStarted before, the application could receive it later.
You can easily check that. Default event queue implementation uses standard system queue, which is single-threaded. Since you queue events always from the same thread, I think they might get in order. My main concern is to get session working correctly in this case, when source state has not fully transitioned, but samples are coming already.
Thanks, Giovanni.
Hi,
Il 11/06/21 07:45, Nikolay Sivov ha scritto:
You can easily check that. Default event queue implementation uses standard system queue, which is single-threaded. Since you queue events always from the same thread, I think they might get in order. My main concern is to get session working correctly in this case, when source state has not fully transitioned, but samples are coming already.
I wrote a test program on Windows that spawns two threads, each of them getting events from a queue; a third thread posts events on the two queues, first the first queue and then the second queue. The results confirm my hypothesis: in a significant amount of the tests, the second thread processes the event before the first queue, even if the even on the first queue was posted before.
This is expected: even if the event queues are globally single-threaded, the two threads waiting on them are not by definition; thus even if the first event is guaranteed to be processed before the second one (which in itself is not a guarantee I'd give for granted), the operating system can very well decide to unschedule the waiting threads while they're just about to process the event, so in practice any order can happen. As a matter of facts, I don't think it is even sensible to speak of "order" between two threads, unless there are synchronization points.
If our session depends on stuff happening before other without this being regulated by a synchronization point, then our session has bugs, and we should fix those instead of hoping that the operating system will always cooperate.
Giovanni.
Hi,
the only point that I have not discussed replying to Derek is:
Il 10/06/21 18:25, Zebediah Figura (she/her) ha scritto:
- should the samples/requests be stored in a flat array instead of a
linked list? Based on usage patterns it seems that'd be easier and more efficient.
I used a list because I saw that they're very used in Wine and got the feeling they're somehow the "standard way" for ordered containers where you don't need random access.
Personally I have no strong feelings in this specific case, neither ease nor efficiency seem to me to be clearly in favor of either of the two alternatives, especially given that the two approach have the same asymptotic complexity and that, in practice, I expect this feature to be hardly ever used, let alone for many samples/requests.
I'll do whatever gives me better probability to have a valuable Signed-off-by. :-P
Thanks for the review, Giovanni.
On 6/10/21 2:25 PM, Giovanni Mascellani wrote:
Hi,
the only point that I have not discussed replying to Derek is:
Il 10/06/21 18:25, Zebediah Figura (she/her) ha scritto:
- should the samples/requests be stored in a flat array instead of a
linked list? Based on usage patterns it seems that'd be easier and more efficient.
I used a list because I saw that they're very used in Wine and got the feeling they're somehow the "standard way" for ordered containers where you don't need random access.
Personally I have no strong feelings in this specific case, neither ease nor efficiency seem to me to be clearly in favor of either of the two alternatives, especially given that the two approach have the same asymptotic complexity and that, in practice, I expect this feature to be hardly ever used, let alone for many samples/requests.
I'll do whatever gives me better probability to have a valuable Signed-off-by. :-P
I mean, just from a drive-by it looks a bit easier, since all you're doing is storing them in order and then deleting them all at once—no random-access insertion or deletion. Granted, there is a reason my question was phrased like a question ;-)
I think in general Wine code uses lists or arrays, depending on which is actually easier, or if (as rarely) there's a performance concern. Very often that's lists, but not always.