It is possible that a stream is destroyed while another thread is waiting on event_empty_cond or event_cond. The waiting thread should have the opportunity to be rescheduled and exit the critical section before the condition variables and the mutex are destroyed.
-- v4: winegstreamer: Synchronize media source async commands and shutdown. winegstreamer: Free the GStreamer buffer when freeing a WG parser stream. winegstreamer: Synchronize access to the media source from callbacks. winegstreamer: Synchronize concurrent access to the media stream. winegstreamer: Synchronize concurrent access to the media source.
From: Giovanni Mascellani gmascellani@codeweavers.com
--- dlls/winegstreamer/media_source.c | 83 ++++++++++++++++++++++--------- 1 file changed, 60 insertions(+), 23 deletions(-)
diff --git a/dlls/winegstreamer/media_source.c b/dlls/winegstreamer/media_source.c index 9bb7a441a8f..5afe92d036b 100644 --- a/dlls/winegstreamer/media_source.c +++ b/dlls/winegstreamer/media_source.c @@ -92,6 +92,8 @@ struct media_source IMFMediaEventQueue *event_queue; IMFByteStream *byte_stream;
+ CRITICAL_SECTION cs; + struct wg_parser *wg_parser;
struct media_stream **streams; @@ -1114,7 +1116,9 @@ static HRESULT WINAPI media_source_rate_control_SetRate(IMFRateControl *iface, B if (FAILED(hr = IMFRateSupport_IsRateSupported(&source->IMFRateSupport_iface, thin, rate, NULL))) return hr;
+ EnterCriticalSection(&source->cs); source->rate = rate; + LeaveCriticalSection(&source->cs);
return IMFMediaEventQueue_QueueEventParamVar(source->event_queue, MESourceRateChanged, &GUID_NULL, S_OK, NULL); } @@ -1128,7 +1132,9 @@ static HRESULT WINAPI media_source_rate_control_GetRate(IMFRateControl *iface, B if (thin) *thin = FALSE;
+ EnterCriticalSection(&source->cs); *rate = source->rate; + LeaveCriticalSection(&source->cs);
return S_OK; } @@ -1190,6 +1196,8 @@ static ULONG WINAPI media_source_Release(IMFMediaSource *iface) { IMFMediaSource_Shutdown(&source->IMFMediaSource_iface); IMFMediaEventQueue_Release(source->event_queue); + source->cs.DebugInfo->Spare[0] = 0; + DeleteCriticalSection(&source->cs); free(source); }
@@ -1236,10 +1244,15 @@ static HRESULT WINAPI media_source_QueueEvent(IMFMediaSource *iface, MediaEventT static HRESULT WINAPI media_source_GetCharacteristics(IMFMediaSource *iface, DWORD *characteristics) { struct media_source *source = impl_from_IMFMediaSource(iface); + BOOL is_shutdown;
TRACE("%p, %p.\n", iface, characteristics);
- if (source->state == SOURCE_SHUTDOWN) + EnterCriticalSection(&source->cs); + is_shutdown = source->state == SOURCE_SHUTDOWN; + LeaveCriticalSection(&source->cs); + + if (is_shutdown) return MF_E_SHUTDOWN;
*characteristics = MFMEDIASOURCE_CAN_SEEK | MFMEDIASOURCE_CAN_PAUSE; @@ -1250,13 +1263,20 @@ static HRESULT WINAPI media_source_GetCharacteristics(IMFMediaSource *iface, DWO static HRESULT WINAPI media_source_CreatePresentationDescriptor(IMFMediaSource *iface, IMFPresentationDescriptor **descriptor) { struct media_source *source = impl_from_IMFMediaSource(iface); + HRESULT hr;
TRACE("%p, %p.\n", iface, descriptor);
+ EnterCriticalSection(&source->cs); + if (source->state == SOURCE_SHUTDOWN) - return MF_E_SHUTDOWN; + hr = MF_E_SHUTDOWN; + else + hr = IMFPresentationDescriptor_Clone(source->pres_desc, descriptor); + + LeaveCriticalSection(&source->cs);
- return IMFPresentationDescriptor_Clone(source->pres_desc, descriptor); + return hr; }
static HRESULT WINAPI media_source_Start(IMFMediaSource *iface, IMFPresentationDescriptor *descriptor, @@ -1268,13 +1288,17 @@ static HRESULT WINAPI media_source_Start(IMFMediaSource *iface, IMFPresentationD
TRACE("%p, %p, %p, %p.\n", iface, descriptor, time_format, position);
- if (source->state == SOURCE_SHUTDOWN) - return MF_E_SHUTDOWN; + EnterCriticalSection(&source->cs);
- if (!(IsEqualIID(time_format, &GUID_NULL))) - return MF_E_UNSUPPORTED_TIME_FORMAT; - - if (SUCCEEDED(hr = source_create_async_op(SOURCE_ASYNC_START, &command))) + if (source->state == SOURCE_SHUTDOWN) + { + hr = MF_E_SHUTDOWN; + } + else if (!(IsEqualIID(time_format, &GUID_NULL))) + { + hr = MF_E_UNSUPPORTED_TIME_FORMAT; + } + else if (SUCCEEDED(hr = source_create_async_op(SOURCE_ASYNC_START, &command))) { command->u.start.descriptor = descriptor; command->u.start.format = *time_format; @@ -1283,6 +1307,8 @@ static HRESULT WINAPI media_source_Start(IMFMediaSource *iface, IMFPresentationD hr = MFPutWorkItem(source->async_commands_queue, &source->async_commands_callback, &command->IUnknown_iface); }
+ LeaveCriticalSection(&source->cs); + return hr; }
@@ -1294,12 +1320,15 @@ static HRESULT WINAPI media_source_Stop(IMFMediaSource *iface)
TRACE("%p.\n", iface);
- if (source->state == SOURCE_SHUTDOWN) - return MF_E_SHUTDOWN; + EnterCriticalSection(&source->cs);
- if (SUCCEEDED(hr = source_create_async_op(SOURCE_ASYNC_STOP, &command))) + if (source->state == SOURCE_SHUTDOWN) + hr = MF_E_SHUTDOWN; + else if (SUCCEEDED(hr = source_create_async_op(SOURCE_ASYNC_STOP, &command))) hr = MFPutWorkItem(source->async_commands_queue, &source->async_commands_callback, &command->IUnknown_iface);
+ LeaveCriticalSection(&source->cs); + return hr; }
@@ -1311,30 +1340,38 @@ static HRESULT WINAPI media_source_Pause(IMFMediaSource *iface)
TRACE("%p.\n", iface);
- if (source->state == SOURCE_SHUTDOWN) - return MF_E_SHUTDOWN; + EnterCriticalSection(&source->cs);
- if (source->state != SOURCE_RUNNING) - return MF_E_INVALID_STATE_TRANSITION; - - if (SUCCEEDED(hr = source_create_async_op(SOURCE_ASYNC_PAUSE, &command))) + if (source->state == SOURCE_SHUTDOWN) + hr = MF_E_SHUTDOWN; + else if (source->state != SOURCE_RUNNING) + hr = MF_E_INVALID_STATE_TRANSITION; + else if (SUCCEEDED(hr = source_create_async_op(SOURCE_ASYNC_PAUSE, &command))) hr = MFPutWorkItem(source->async_commands_queue, &source->async_commands_callback, &command->IUnknown_iface);
+ LeaveCriticalSection(&source->cs); + return S_OK; }
static HRESULT WINAPI media_source_Shutdown(IMFMediaSource *iface) { struct media_source *source = impl_from_IMFMediaSource(iface); + BOOL is_shutdown; unsigned int i;
TRACE("%p.\n", iface);
- if (source->state == SOURCE_SHUTDOWN) - return MF_E_SHUTDOWN; - + EnterCriticalSection(&source->cs); + is_shutdown = source->state == SOURCE_SHUTDOWN; source->state = SOURCE_SHUTDOWN; + for (i = 0; i < source->stream_count; i++) + source->streams[i]->state = STREAM_SHUTDOWN; + LeaveCriticalSection(&source->cs); + + if (is_shutdown) + return MF_E_SHUTDOWN;
wg_parser_disconnect(source->wg_parser);
@@ -1350,8 +1387,6 @@ static HRESULT WINAPI media_source_Shutdown(IMFMediaSource *iface) { struct media_stream *stream = source->streams[i];
- stream->state = STREAM_SHUTDOWN; - IMFMediaEventQueue_Shutdown(stream->event_queue); IMFStreamDescriptor_Release(stream->descriptor); IMFMediaSource_Release(&stream->parent_source->IMFMediaSource_iface); @@ -1424,6 +1459,8 @@ static HRESULT media_source_constructor(IMFByteStream *bytestream, struct media_ object->byte_stream = bytestream; IMFByteStream_AddRef(bytestream); object->rate = 1.0f; + InitializeCriticalSection(&object->cs); + object->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": cs");
if (FAILED(hr = MFCreateEventQueue(&object->event_queue))) goto fail;
From: Giovanni Mascellani gmascellani@codeweavers.com
--- dlls/winegstreamer/media_source.c | 57 ++++++++++++++++++++++--------- 1 file changed, 40 insertions(+), 17 deletions(-)
diff --git a/dlls/winegstreamer/media_source.c b/dlls/winegstreamer/media_source.c index 5afe92d036b..fade993ca78 100644 --- a/dlls/winegstreamer/media_source.c +++ b/dlls/winegstreamer/media_source.c @@ -746,31 +746,51 @@ static HRESULT WINAPI media_stream_QueueEvent(IMFMediaStream *iface, MediaEventT static HRESULT WINAPI media_stream_GetMediaSource(IMFMediaStream *iface, IMFMediaSource **source) { struct media_stream *stream = impl_from_IMFMediaStream(iface); + HRESULT hr;
TRACE("%p, %p.\n", iface, source);
+ EnterCriticalSection(&stream->parent_source->cs); + if (stream->state == STREAM_SHUTDOWN) - return MF_E_SHUTDOWN; + { + hr = MF_E_SHUTDOWN; + } + else + { + IMFMediaSource_AddRef(&stream->parent_source->IMFMediaSource_iface); + *source = &stream->parent_source->IMFMediaSource_iface; + hr = S_OK; + }
- IMFMediaSource_AddRef(&stream->parent_source->IMFMediaSource_iface); - *source = &stream->parent_source->IMFMediaSource_iface; + LeaveCriticalSection(&stream->parent_source->cs);
- return S_OK; + return hr; }
static HRESULT WINAPI media_stream_GetStreamDescriptor(IMFMediaStream* iface, IMFStreamDescriptor **descriptor) { struct media_stream *stream = impl_from_IMFMediaStream(iface); + HRESULT hr;
TRACE("%p, %p.\n", iface, descriptor);
+ EnterCriticalSection(&stream->parent_source->cs); + if (stream->state == STREAM_SHUTDOWN) - return MF_E_SHUTDOWN; + { + hr = MF_E_SHUTDOWN; + } + else + { + IMFStreamDescriptor_AddRef(stream->descriptor); + *descriptor = stream->descriptor; + hr = S_OK; + }
- IMFStreamDescriptor_AddRef(stream->descriptor); - *descriptor = stream->descriptor; + LeaveCriticalSection(&stream->parent_source->cs);
- return S_OK; + return hr; }
static HRESULT WINAPI media_stream_RequestSample(IMFMediaStream *iface, IUnknown *token) @@ -781,21 +801,22 @@ static HRESULT WINAPI media_stream_RequestSample(IMFMediaStream *iface, IUnknown
TRACE("%p, %p.\n", iface, token);
- if (stream->state == STREAM_SHUTDOWN) - return MF_E_SHUTDOWN; + EnterCriticalSection(&stream->parent_source->cs);
- if (stream->state == STREAM_INACTIVE) + if (stream->state == STREAM_SHUTDOWN) + { + hr = MF_E_SHUTDOWN; + } + else if (stream->state == STREAM_INACTIVE) { WARN("Stream isn't active\n"); - return MF_E_MEDIA_SOURCE_WRONGSTATE; + hr = MF_E_MEDIA_SOURCE_WRONGSTATE; } - - if (stream->eos) + else if (stream->eos) { - return MF_E_END_OF_STREAM; + hr = MF_E_END_OF_STREAM; } - - if (SUCCEEDED(hr = source_create_async_op(SOURCE_ASYNC_REQUEST_SAMPLE, &command))) + else if (SUCCEEDED(hr = source_create_async_op(SOURCE_ASYNC_REQUEST_SAMPLE, &command))) { command->u.request_sample.stream = stream; if (token) @@ -806,6 +827,8 @@ static HRESULT WINAPI media_stream_RequestSample(IMFMediaStream *iface, IUnknown &stream->parent_source->async_commands_callback, &command->IUnknown_iface); }
+ LeaveCriticalSection(&stream->parent_source->cs); + return hr; }
From: Giovanni Mascellani gmascellani@codeweavers.com
--- dlls/winegstreamer/media_source.c | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-)
diff --git a/dlls/winegstreamer/media_source.c b/dlls/winegstreamer/media_source.c index fade993ca78..5fbc09e19ea 100644 --- a/dlls/winegstreamer/media_source.c +++ b/dlls/winegstreamer/media_source.c @@ -533,10 +533,20 @@ static void wait_on_sample(struct media_stream *stream, IUnknown *token) struct media_source *source = stream->parent_source; PROPVARIANT empty_var = {.vt = VT_EMPTY}; struct wg_parser_buffer buffer; + bool res;
TRACE("%p, %p\n", stream, token);
- if (wg_parser_stream_get_buffer(source->wg_parser, stream->wg_stream, &buffer)) + LeaveCriticalSection(&source->cs); + res = wg_parser_stream_get_buffer(source->wg_parser, stream->wg_stream, &buffer); + EnterCriticalSection(&source->cs); + + /* If the media source has been shut down during the wait, then + * resources have been released, so don't touch anything. */ + if (source->state == SOURCE_SHUTDOWN) + return; + + if (res) { send_buffer(stream, &buffer, token); } @@ -555,12 +565,18 @@ static HRESULT WINAPI source_async_commands_Invoke(IMFAsyncCallback *iface, IMFA IUnknown *state; HRESULT hr;
- if (source->state == SOURCE_SHUTDOWN) - return S_OK; - if (FAILED(hr = IMFAsyncResult_GetState(result, &state))) return hr;
+ EnterCriticalSection(&source->cs); + + if (source->state == SOURCE_SHUTDOWN) + { + LeaveCriticalSection(&source->cs); + IUnknown_Release(state); + return S_OK; + } + command = impl_from_async_command_IUnknown(state); switch (command->op) { @@ -581,6 +597,8 @@ static HRESULT WINAPI source_async_commands_Invoke(IMFAsyncCallback *iface, IMFA break; }
+ LeaveCriticalSection(&source->cs); + IUnknown_Release(state);
return S_OK;
From: Giovanni Mascellani gmascellani@codeweavers.com
--- dlls/winegstreamer/wg_parser.c | 7 +++++++ 1 file changed, 7 insertions(+)
diff --git a/dlls/winegstreamer/wg_parser.c b/dlls/winegstreamer/wg_parser.c index 14970d327c8..520388de7cf 100644 --- a/dlls/winegstreamer/wg_parser.c +++ b/dlls/winegstreamer/wg_parser.c @@ -768,6 +768,13 @@ static void free_stream(struct wg_parser_stream *stream) } gst_object_unref(stream->my_sink);
+ if (stream->buffer) + { + gst_buffer_unmap(stream->buffer, &stream->map_info); + gst_buffer_unref(stream->buffer); + stream->buffer = NULL; + } + pthread_cond_destroy(&stream->event_cond); pthread_cond_destroy(&stream->event_empty_cond);
From: Giovanni Mascellani gmascellani@codeweavers.com
This is required so that Shutdown() won't proceed while a sample request is blocked in wg_parser_stream_get_buffer(). --- dlls/winegstreamer/media_source.c | 10 ++++++++++ 1 file changed, 10 insertions(+)
diff --git a/dlls/winegstreamer/media_source.c b/dlls/winegstreamer/media_source.c index 5fbc09e19ea..ab7b2ebdc17 100644 --- a/dlls/winegstreamer/media_source.c +++ b/dlls/winegstreamer/media_source.c @@ -92,6 +92,7 @@ struct media_source IMFMediaEventQueue *event_queue; IMFByteStream *byte_stream;
+ CRITICAL_SECTION shutdown_cs; CRITICAL_SECTION cs;
struct wg_parser *wg_parser; @@ -568,11 +569,13 @@ static HRESULT WINAPI source_async_commands_Invoke(IMFAsyncCallback *iface, IMFA if (FAILED(hr = IMFAsyncResult_GetState(result, &state))) return hr;
+ EnterCriticalSection(&source->shutdown_cs); EnterCriticalSection(&source->cs);
if (source->state == SOURCE_SHUTDOWN) { LeaveCriticalSection(&source->cs); + LeaveCriticalSection(&source->shutdown_cs); IUnknown_Release(state); return S_OK; } @@ -598,6 +601,7 @@ static HRESULT WINAPI source_async_commands_Invoke(IMFAsyncCallback *iface, IMFA }
LeaveCriticalSection(&source->cs); + LeaveCriticalSection(&source->shutdown_cs);
IUnknown_Release(state);
@@ -1239,6 +1243,8 @@ static ULONG WINAPI media_source_Release(IMFMediaSource *iface) IMFMediaEventQueue_Release(source->event_queue); source->cs.DebugInfo->Spare[0] = 0; DeleteCriticalSection(&source->cs); + source->shutdown_cs.DebugInfo->Spare[0] = 0; + DeleteCriticalSection(&source->shutdown_cs); free(source); }
@@ -1404,12 +1410,14 @@ static HRESULT WINAPI media_source_Shutdown(IMFMediaSource *iface)
TRACE("%p.\n", iface);
+ EnterCriticalSection(&source->shutdown_cs); EnterCriticalSection(&source->cs); is_shutdown = source->state == SOURCE_SHUTDOWN; source->state = SOURCE_SHUTDOWN; for (i = 0; i < source->stream_count; i++) source->streams[i]->state = STREAM_SHUTDOWN; LeaveCriticalSection(&source->cs); + LeaveCriticalSection(&source->shutdown_cs);
if (is_shutdown) return MF_E_SHUTDOWN; @@ -1500,6 +1508,8 @@ static HRESULT media_source_constructor(IMFByteStream *bytestream, struct media_ object->byte_stream = bytestream; IMFByteStream_AddRef(bytestream); object->rate = 1.0f; + InitializeCriticalSection(&object->shutdown_cs); + object->shutdown_cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": shutdown_cs"); InitializeCriticalSection(&object->cs); object->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": cs");
On Fri Nov 18 10:46:02 2022 +0000, Giovanni Mascellani wrote:
I agree it's a bit weird, but `wg_parser_stream_get_buffer()` can block, and if the lock is kept basically no other media source or media stream method can be called. In general the lock shouldn't be kept for long, but in some cases it could. For example, we could be reading from a slow network. I agree at this point it's still unsafe, but that's fixed in a later patch ("winegstreamer: Synchronize media source async commands and shutdown.").
Possibly. On the other hand, we don't support interrupting such waits, and the only function that's plausibly going to be called at the same time is `wg_parser_stream_get_buffer()` from another thread. And because buffering is done asynchronously internally, I don't think it's going to make a difference in terms of timing if the two calls hold a mutex or not. (E.g. if it takes 20 ms to buffer a sample per stream, the first call will hold the mutex for 20 ms, but the second call should block on the first and then return almost immediately.)
Granted, that's an armchair analysis, and there could be timing problems I'm not anticipating.
On Fri Nov 18 20:32:47 2022 +0000, Zebediah Figura wrote:
Possibly. On the other hand, we don't support interrupting such waits, and the only function that's plausibly going to be called at the same time is `wg_parser_stream_get_buffer()` from another thread. And because buffering is done asynchronously internally, I don't think it's going to make a difference in terms of timing if the two calls hold a mutex or not. (E.g. if it takes 20 ms to buffer a sample per stream, the first call will hold the mutex for 20 ms, but the second call should block on the first and then return almost immediately.) Granted, that's an armchair analysis, and there could be timing problems I'm not anticipating.
Yeah, I am not very convinced. For example, I think it is totally plausible that methods like `GetCharacteristics`, `GetRate`, `SetRate` are often called on a time-sensitive thread, on the assumption that on Windows they're nearly instantaneous. Having them waiting on a lock that might wait for disk (or even network) access doesn't seem a good idea.
The API general idea seems to be that calls are usually very quick, and all the heavy lifting is done in background threads. I think we should respect this design point and avoid having calls wait on the background thread (except for `Shutdown()`, where this can't be avoided and it's a special call anyway; it could also be avoided, and that was my first solution indeed, but then we have to modify the WG parser so that the wait can be interrupted).
On Mon Nov 21 10:54:05 2022 +0000, Giovanni Mascellani wrote:
Yeah, I am not very convinced. For example, I think it is totally plausible that methods like `GetCharacteristics`, `GetRate`, `SetRate` are often called on a time-sensitive thread, on the assumption that on Windows they're nearly instantaneous. Having them waiting on a lock that might wait for disk (or even network) access doesn't seem a good idea. The API general idea seems to be that calls are usually very quick, and all the heavy lifting is done in background threads. I think we should respect this design point and avoid having calls wait on the background thread (except for `Shutdown()`, where this can't be avoided and it's a special call anyway; it could also be avoided, and that was my first solution indeed, but then we have to modify the WG parser so that the wait can be interrupted).
Having actually looked at 5/5, I guess I see what you're doing, but it seems an awkward way to do it. What you really want is to protect some state (mostly just "rate") with a more restricted mutex, but temporarily dropping that mutex then feels wrong in various ways. The fact that e.g. GetCharacteristics() accesses "state", which is sort of treated like it's protected by shutdown_cs, doesn't seem great either.
I wonder if the right solution here is an rwlock for "state" and associated fields ("wg_parser", "stream_count", etc.), and I guess a separate lock for "rate".
On Wed Nov 23 00:51:16 2022 +0000, Zebediah Figura wrote:
Having actually looked at 5/5, I guess I see what you're doing, but it seems an awkward way to do it. What you really want is to protect some state (mostly just "rate") with a more restricted mutex, but temporarily dropping that mutex then feels wrong in various ways. The fact that e.g. GetCharacteristics() accesses "state", which is sort of treated like it's protected by shutdown_cs, doesn't seem great either. I wonder if the right solution here is an rwlock for "state" and associated fields ("wg_parser", "stream_count", etc.), and I guess a separate lock for "rate".
No, `state` is not protected by `shutdown_cs`, but by `cs`, at least in my intentions. It is illegal to read or write `state` while you're not holding `cs`. What `shutdown_cs` protects is rather the WG parser, and precisely it excludes the simultaneous call of `wg_parser_disconnect()` and `wg_parser_stream_get_buffer()` (which are racy). Then, as a little optimization, `shutdown_cs` is released before `wg_parser_disconnect()` is called, because at that point you already know that no thread will ever be able to call `wg_parser_stream_get_buffer()`, since: * once `state` is set to `SOURCE_SHUTDOWN`, it is never changed any more; * each time `shutdown_cs` is entered, `state` is always checked to be different from `SOURCE_SHUTDOWN`.
In a sense, you can say that the WG parser is protected by the condition "`shutdown_cs` is loked OR `state == SOURCE_SHUTDOWN`". The idea is that in this way you can immediately drop the locks in `Shutdown()` and let all the waiters return immediately, instead of waiting for the cleanup to finish.
However, I realize that a significant additional correctness burden for probably rather limited benefit, so I will try to push a variation in which `shutdown_cs` is more literally directly protecting access to the WG parser. The alternative would be to avoid `shutdown_cs` at all and modify the WG parser so that `wg_parser_disconnect()` and `wg_parser_stream_get_buffer()` are not racy. However that requires putting more locking on that side, and I am not sure it's a good idea (it is what my original patch would do, except that that was an incomplete implementation).
I don't really see how rwlocks would help with this problem.