Signed-off-by: Zebediah Figura zfigura@codeweavers.com --- dlls/winegstreamer/wg_parser.c | 78 +++++++++++----------------------- 1 file changed, 24 insertions(+), 54 deletions(-)
diff --git a/dlls/winegstreamer/wg_parser.c b/dlls/winegstreamer/wg_parser.c index 169f223c5a5..68e453e9944 100644 --- a/dlls/winegstreamer/wg_parser.c +++ b/dlls/winegstreamer/wg_parser.c @@ -1494,6 +1494,7 @@ static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_s GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE("quartz_src", GST_PAD_SRC, GST_PAD_ALWAYS, GST_STATIC_CAPS_ANY); unsigned int i; + int ret;
parser->file_size = file_size; parser->sink_connected = true; @@ -1516,12 +1517,29 @@ static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_s
parser->start_offset = parser->next_offset = parser->stop_offset = 0; parser->next_pull_offset = 0; + parser->error = false;
if (!parser->init_gst(parser)) return E_FAIL;
+ gst_element_set_state(parser->container, GST_STATE_PAUSED); + ret = gst_element_get_state(parser->container, NULL, NULL, -1); + if (ret == GST_STATE_CHANGE_FAILURE) + { + GST_ERROR("Failed to play stream.\n"); + return E_FAIL; + } + pthread_mutex_lock(&parser->mutex);
+ while (!parser->no_more_pads && !parser->error) + pthread_cond_wait(&parser->init_cond, &parser->mutex); + if (parser->error) + { + pthread_mutex_unlock(&parser->mutex); + return E_FAIL; + } + for (i = 0; i < parser->stream_count; ++i) { struct wg_parser_stream *stream = parser->streams[i]; @@ -1677,7 +1695,7 @@ static BOOL decodebin_parser_init_gst(struct wg_parser *parser) parser->their_sink = gst_element_get_static_pad(element, "sink");
pthread_mutex_lock(&parser->mutex); - parser->no_more_pads = parser->error = false; + parser->no_more_pads = false; pthread_mutex_unlock(&parser->mutex);
if ((ret = gst_pad_link(parser->my_src, parser->their_sink)) < 0) @@ -1686,24 +1704,6 @@ static BOOL decodebin_parser_init_gst(struct wg_parser *parser) return FALSE; }
- gst_element_set_state(parser->container, GST_STATE_PAUSED); - ret = gst_element_get_state(parser->container, NULL, NULL, -1); - if (ret == GST_STATE_CHANGE_FAILURE) - { - GST_ERROR("Failed to play stream.\n"); - return FALSE; - } - - pthread_mutex_lock(&parser->mutex); - while (!parser->no_more_pads && !parser->error) - pthread_cond_wait(&parser->init_cond, &parser->mutex); - if (parser->error) - { - pthread_mutex_unlock(&parser->mutex); - return FALSE; - } - pthread_mutex_unlock(&parser->mutex); - return TRUE; }
@@ -1724,7 +1724,7 @@ static BOOL avi_parser_init_gst(struct wg_parser *parser) parser->their_sink = gst_element_get_static_pad(element, "sink");
pthread_mutex_lock(&parser->mutex); - parser->no_more_pads = parser->error = false; + parser->no_more_pads = false; pthread_mutex_unlock(&parser->mutex);
if ((ret = gst_pad_link(parser->my_src, parser->their_sink)) < 0) @@ -1733,24 +1733,6 @@ static BOOL avi_parser_init_gst(struct wg_parser *parser) return FALSE; }
- gst_element_set_state(parser->container, GST_STATE_PAUSED); - ret = gst_element_get_state(parser->container, NULL, NULL, -1); - if (ret == GST_STATE_CHANGE_FAILURE) - { - GST_ERROR("Failed to play stream.\n"); - return FALSE; - } - - pthread_mutex_lock(&parser->mutex); - while (!parser->no_more_pads && !parser->error) - pthread_cond_wait(&parser->init_cond, &parser->mutex); - if (parser->error) - { - pthread_mutex_unlock(&parser->mutex); - return FALSE; - } - pthread_mutex_unlock(&parser->mutex); - return TRUE; }
@@ -1781,15 +1763,9 @@ static BOOL mpeg_audio_parser_init_gst(struct wg_parser *parser) GST_ERROR("Failed to link source pads, error %d.\n", ret); return FALSE; } - gst_pad_set_active(stream->my_sink, 1); - gst_element_set_state(parser->container, GST_STATE_PAUSED); - ret = gst_element_get_state(parser->container, NULL, NULL, -1); - if (ret == GST_STATE_CHANGE_FAILURE) - { - GST_ERROR("Failed to play stream.\n"); - return FALSE; - } + + parser->no_more_pads = true;
return TRUE; } @@ -1821,15 +1797,9 @@ static BOOL wave_parser_init_gst(struct wg_parser *parser) GST_ERROR("Failed to link source pads, error %d.\n", ret); return FALSE; } - gst_pad_set_active(stream->my_sink, 1); - gst_element_set_state(parser->container, GST_STATE_PAUSED); - ret = gst_element_get_state(parser->container, NULL, NULL, -1); - if (ret == GST_STATE_CHANGE_FAILURE) - { - GST_ERROR("Failed to play stream.\n"); - return FALSE; - } + + parser->no_more_pads = true;
return TRUE; }
In particular, unset the sink_connected value, and make sure that subsequent wg_parser_get_read_request calls don't hang.
Signed-off-by: Zebediah Figura zfigura@codeweavers.com --- dlls/winegstreamer/wg_parser.c | 76 ++++++++++++++++++++++++---------- 1 file changed, 53 insertions(+), 23 deletions(-)
diff --git a/dlls/winegstreamer/wg_parser.c b/dlls/winegstreamer/wg_parser.c index 68e453e9944..b1e435cbba3 100644 --- a/dlls/winegstreamer/wg_parser.c +++ b/dlls/winegstreamer/wg_parser.c @@ -1034,6 +1034,30 @@ static struct wg_parser_stream *create_stream(struct wg_parser *parser) return stream; }
+static void free_stream(struct wg_parser_stream *stream) +{ + if (stream->their_src) + { + if (stream->post_sink) + { + gst_pad_unlink(stream->their_src, stream->post_sink); + gst_pad_unlink(stream->post_src, stream->my_sink); + gst_object_unref(stream->post_src); + gst_object_unref(stream->post_sink); + stream->post_src = stream->post_sink = NULL; + } + else + gst_pad_unlink(stream->their_src, stream->my_sink); + gst_object_unref(stream->their_src); + } + gst_object_unref(stream->my_sink); + + pthread_cond_destroy(&stream->event_cond); + pthread_cond_destroy(&stream->event_empty_cond); + + free(stream); +} + static void pad_added_cb(GstElement *element, GstPad *pad, gpointer user) { struct wg_parser *parser = user; @@ -1520,14 +1544,14 @@ static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_s parser->error = false;
if (!parser->init_gst(parser)) - return E_FAIL; + goto out;
gst_element_set_state(parser->container, GST_STATE_PAUSED); ret = gst_element_get_state(parser->container, NULL, NULL, -1); if (ret == GST_STATE_CHANGE_FAILURE) { GST_ERROR("Failed to play stream.\n"); - return E_FAIL; + goto out; }
pthread_mutex_lock(&parser->mutex); @@ -1537,7 +1561,7 @@ static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_s if (parser->error) { pthread_mutex_unlock(&parser->mutex); - return E_FAIL; + goto out; }
for (i = 0; i < parser->stream_count; ++i) @@ -1577,7 +1601,7 @@ static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_s if (parser->error) { pthread_mutex_unlock(&parser->mutex); - return E_FAIL; + goto out; } if (gst_pad_query_duration(stream->their_src, GST_FORMAT_TIME, &duration)) { @@ -1614,30 +1638,36 @@ static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_s
parser->next_offset = 0; return S_OK; -}
-static void free_stream(struct wg_parser_stream *stream) -{ - if (stream->their_src) +out: + if (parser->container) + gst_element_set_state(parser->container, GST_STATE_NULL); + if (parser->their_sink) { - if (stream->post_sink) - { - gst_pad_unlink(stream->their_src, stream->post_sink); - gst_pad_unlink(stream->post_src, stream->my_sink); - gst_object_unref(stream->post_src); - gst_object_unref(stream->post_sink); - stream->post_src = stream->post_sink = NULL; - } - else - gst_pad_unlink(stream->their_src, stream->my_sink); - gst_object_unref(stream->their_src); + gst_pad_unlink(parser->my_src, parser->their_sink); + gst_object_unref(parser->their_sink); + parser->my_src = parser->their_sink = NULL; } - gst_object_unref(stream->my_sink);
- pthread_cond_destroy(&stream->event_cond); - pthread_cond_destroy(&stream->event_empty_cond); + for (i = 0; i < parser->stream_count; ++i) + free_stream(parser->streams[i]); + parser->stream_count = 0; + free(parser->streams); + parser->streams = NULL;
- free(stream); + if (parser->container) + { + gst_element_set_bus(parser->container, NULL); + gst_object_unref(parser->container); + parser->container = NULL; + } + + pthread_mutex_lock(&parser->mutex); + parser->sink_connected = false; + pthread_mutex_unlock(&parser->mutex); + pthread_cond_signal(&parser->read_cond); + + return E_FAIL; }
static void CDECL wg_parser_disconnect(struct wg_parser *parser)
From: Derek Lesho dlesho@codeweavers.com
Instead of having mixing together IMFMediaSource::Shutdown() and the constructors' failure paths, creating confusion about what should be released where, designate ::Shutdown/::Release to shutting down fully initialized objects without checks, and keep the partially-created object cleanup code in the constructor.
Signed-off-by: Derek Lesho dlesho@codeweavers.com Signed-off-by: Zebediah Figura zfigura@codeweavers.com --- dlls/winegstreamer/gst_private.h | 1 + dlls/winegstreamer/media_source.c | 102 +++++++++++++++++++----------- 2 files changed, 66 insertions(+), 37 deletions(-)
diff --git a/dlls/winegstreamer/gst_private.h b/dlls/winegstreamer/gst_private.h index c6c99b1dd55..c29fc4a2437 100644 --- a/dlls/winegstreamer/gst_private.h +++ b/dlls/winegstreamer/gst_private.h @@ -22,6 +22,7 @@ #define __GST_PRIVATE_INCLUDED__
#include <assert.h> +#include <limits.h> #include <stdarg.h> #include <stdbool.h> #include <stdint.h> diff --git a/dlls/winegstreamer/media_source.c b/dlls/winegstreamer/media_source.c index 01ab626254a..95bdb9b488e 100644 --- a/dlls/winegstreamer/media_source.c +++ b/dlls/winegstreamer/media_source.c @@ -733,6 +733,12 @@ static HRESULT new_media_stream(struct media_source *source, object->IMFMediaStream_iface.lpVtbl = &media_stream_vtbl; object->ref = 1;
+ if (FAILED(hr = MFCreateEventQueue(&object->event_queue))) + { + free(object); + return hr; + } + IMFMediaSource_AddRef(&source->IMFMediaSource_iface); object->parent_source = source; object->stream_id = stream_id; @@ -741,20 +747,11 @@ static HRESULT new_media_stream(struct media_source *source, object->eos = FALSE; object->wg_stream = wg_stream;
- if (FAILED(hr = MFCreateEventQueue(&object->event_queue))) - goto fail; - TRACE("Created stream object %p.\n", object);
*out_stream = object;
return S_OK; - -fail: - WARN("Failed to construct media stream, hr %#x.\n", hr); - - IMFMediaStream_Release(&object->IMFMediaStream_iface); - return hr; }
static HRESULT media_stream_init_desc(struct media_stream *stream) @@ -847,10 +844,16 @@ static HRESULT media_stream_init_desc(struct media_stream *stream) goto done;
if (FAILED(hr = IMFStreamDescriptor_GetMediaTypeHandler(stream->descriptor, &type_handler))) + { + IMFStreamDescriptor_Release(stream->descriptor); goto done; + }
if (FAILED(hr = IMFMediaTypeHandler_SetCurrentMediaType(type_handler, stream_types[0]))) + { + IMFStreamDescriptor_Release(stream->descriptor); goto done; + }
done: if (type_handler) @@ -1213,19 +1216,13 @@ static HRESULT WINAPI media_source_Shutdown(IMFMediaSource *iface)
unix_funcs->wg_parser_disconnect(source->wg_parser);
- if (source->read_thread) - { - source->read_thread_shutdown = true; - WaitForSingleObject(source->read_thread, INFINITE); - CloseHandle(source->read_thread); - } + source->read_thread_shutdown = true; + WaitForSingleObject(source->read_thread, INFINITE); + CloseHandle(source->read_thread);
- if (source->pres_desc) - IMFPresentationDescriptor_Release(source->pres_desc); - if (source->event_queue) - IMFMediaEventQueue_Shutdown(source->event_queue); - if (source->byte_stream) - IMFByteStream_Release(source->byte_stream); + IMFPresentationDescriptor_Release(source->pres_desc); + IMFMediaEventQueue_Shutdown(source->event_queue); + IMFByteStream_Release(source->byte_stream);
for (i = 0; i < source->stream_count; i++) { @@ -1233,23 +1230,18 @@ static HRESULT WINAPI media_source_Shutdown(IMFMediaSource *iface)
stream->state = STREAM_SHUTDOWN;
- if (stream->event_queue) - IMFMediaEventQueue_Shutdown(stream->event_queue); - if (stream->descriptor) - IMFStreamDescriptor_Release(stream->descriptor); - if (stream->parent_source) - IMFMediaSource_Release(&stream->parent_source->IMFMediaSource_iface); + IMFMediaEventQueue_Shutdown(stream->event_queue); + IMFStreamDescriptor_Release(stream->descriptor); + IMFMediaSource_Release(&stream->parent_source->IMFMediaSource_iface);
IMFMediaStream_Release(&stream->IMFMediaStream_iface); }
unix_funcs->wg_parser_destroy(source->wg_parser);
- if (source->stream_count) - free(source->streams); + free(source->streams);
- if (source->async_commands_queue) - MFUnlockWorkQueue(source->async_commands_queue); + MFUnlockWorkQueue(source->async_commands_queue);
return S_OK; } @@ -1274,6 +1266,7 @@ static const IMFMediaSourceVtbl IMFMediaSource_vtbl = static HRESULT media_source_constructor(IMFByteStream *bytestream, struct media_source **out_media_source) { IMFStreamDescriptor **descriptors = NULL; + unsigned int stream_count = UINT_MAX; struct media_source *object; UINT64 total_pres_time = 0; struct wg_parser *parser; @@ -1337,15 +1330,15 @@ static HRESULT media_source_constructor(IMFByteStream *bytestream, struct media_ * leak occurs with native. */ unix_funcs->wg_parser_set_unlimited_buffering(parser);
- object->stream_count = unix_funcs->wg_parser_get_stream_count(parser); + stream_count = unix_funcs->wg_parser_get_stream_count(parser);
- if (!(object->streams = calloc(object->stream_count, sizeof(*object->streams)))) + if (!(object->streams = calloc(stream_count, sizeof(*object->streams)))) { hr = E_OUTOFMEMORY; goto fail; }
- for (i = 0; i < object->stream_count; ++i) + for (i = 0; i < stream_count; ++i) { if (FAILED(hr = new_media_stream(object, unix_funcs->wg_parser_get_stream(parser, i), i, &object->streams[i]))) goto fail; @@ -1353,9 +1346,13 @@ static HRESULT media_source_constructor(IMFByteStream *bytestream, struct media_ if (FAILED(hr = media_stream_init_desc(object->streams[i]))) { ERR("Failed to finish initialization of media stream %p, hr %x.\n", object->streams[i], hr); - IMFMediaStream_Release(&object->streams[i]->IMFMediaStream_iface); + IMFMediaSource_Release(&object->streams[i]->parent_source->IMFMediaSource_iface); + IMFMediaEventQueue_Release(object->streams[i]->event_queue); + free(object->streams[i]); goto fail; } + + object->stream_count++; }
/* init presentation descriptor */ @@ -1392,8 +1389,39 @@ static HRESULT media_source_constructor(IMFByteStream *bytestream, struct media_ fail: WARN("Failed to construct MFMediaSource, hr %#x.\n", hr);
- free(descriptors); - IMFMediaSource_Release(&object->IMFMediaSource_iface); + if (descriptors) + { + for (i = 0; i < object->stream_count; i++) + IMFStreamDescriptor_Release(descriptors[i]); + free(descriptors); + } + for (i = 0; i < object->stream_count; i++) + { + struct media_stream *stream = object->streams[i]; + + IMFMediaEventQueue_Release(stream->event_queue); + IMFStreamDescriptor_Release(stream->descriptor); + IMFMediaSource_Release(&stream->parent_source->IMFMediaSource_iface); + + free(stream); + } + free(object->streams); + if (stream_count != UINT_MAX) + unix_funcs->wg_parser_disconnect(object->wg_parser); + if (object->read_thread) + { + object->read_thread_shutdown = true; + WaitForSingleObject(object->read_thread, INFINITE); + CloseHandle(object->read_thread); + } + if (object->wg_parser) + unix_funcs->wg_parser_destroy(object->wg_parser); + if (object->async_commands_queue) + MFUnlockWorkQueue(object->async_commands_queue); + if (object->event_queue) + IMFMediaEventQueue_Release(object->event_queue); + IMFByteStream_Release(object->byte_stream); + free(object); return hr; }
From: Derek Lesho dlesho@codeweavers.com
This necessitates an extra blit for the input data, but this is necessary for both WoW64 support and an internal rework of the source path in wg_parser to use GstAppSrc. Since source data is usually compressed and not a bottleneck, we don't expect this to affect performance.
Signed-off-by: Derek Lesho dlesho@codeweavers.com Signed-off-by: Zebediah Figura zfigura@codeweavers.com --- dlls/winegstreamer/gst_private.h | 7 ++++--- dlls/winegstreamer/media_source.c | 28 ++++++++++++++++++++++++---- dlls/winegstreamer/quartz_parser.c | 26 +++++++++++++++++++++++--- dlls/winegstreamer/wg_parser.c | 28 +++++++++++++++------------- 4 files changed, 66 insertions(+), 23 deletions(-)
diff --git a/dlls/winegstreamer/gst_private.h b/dlls/winegstreamer/gst_private.h index c29fc4a2437..49e06b31369 100644 --- a/dlls/winegstreamer/gst_private.h +++ b/dlls/winegstreamer/gst_private.h @@ -170,9 +170,10 @@ struct unix_funcs void (CDECL *wg_parser_begin_flush)(struct wg_parser *parser); void (CDECL *wg_parser_end_flush)(struct wg_parser *parser);
- bool (CDECL *wg_parser_get_read_request)(struct wg_parser *parser, - void **data, uint64_t *offset, uint32_t *size); - void (CDECL *wg_parser_complete_read_request)(struct wg_parser *parser, bool ret); + bool (CDECL *wg_parser_get_next_read_offset)(struct wg_parser *parser, + uint64_t *offset, uint32_t *size); + void (CDECL *wg_parser_push_data)(struct wg_parser *parser, + const void *data, uint32_t size);
void (CDECL *wg_parser_set_unlimited_buffering)(struct wg_parser *parser);
diff --git a/dlls/winegstreamer/media_source.c b/dlls/winegstreamer/media_source.c index 95bdb9b488e..825bad8da27 100644 --- a/dlls/winegstreamer/media_source.c +++ b/dlls/winegstreamer/media_source.c @@ -530,6 +530,11 @@ static DWORD CALLBACK read_thread(void *arg) { struct media_source *source = arg; IMFByteStream *byte_stream = source->byte_stream; + uint32_t buffer_size = 0; + uint64_t file_size; + void *data = NULL; + + IMFByteStream_GetLength(byte_stream, &file_size);
TRACE("Starting read thread for media source %p.\n", source);
@@ -539,18 +544,33 @@ static DWORD CALLBACK read_thread(void *arg) ULONG ret_size; uint32_t size; HRESULT hr; - void *data;
- if (!unix_funcs->wg_parser_get_read_request(source->wg_parser, &data, &offset, &size)) + if (!unix_funcs->wg_parser_get_next_read_offset(source->wg_parser, &offset, &size)) continue;
+ if (offset >= file_size) + size = 0; + else if (offset + size >= file_size) + size = file_size - offset; + + if (size > buffer_size) + { + buffer_size = size; + data = realloc(data, size); + } + + ret_size = 0; + if (SUCCEEDED(hr = IMFByteStream_SetCurrentPosition(byte_stream, offset))) hr = IMFByteStream_Read(byte_stream, data, size, &ret_size); - if (SUCCEEDED(hr) && ret_size != size) + if (FAILED(hr)) + ERR("Failed to read %u bytes at offset %I64u, hr %#x.\n", size, offset, hr); + else if (ret_size != size) ERR("Unexpected short read: requested %u bytes, got %u.\n", size, ret_size); - unix_funcs->wg_parser_complete_read_request(source->wg_parser, SUCCEEDED(hr)); + unix_funcs->wg_parser_push_data(source->wg_parser, SUCCEEDED(hr) ? data : NULL, ret_size); }
+ free(data); TRACE("Media source is shutting down; exiting.\n"); return 0; } diff --git a/dlls/winegstreamer/quartz_parser.c b/dlls/winegstreamer/quartz_parser.c index 09a916d7f5c..5299f4dc2ed 100644 --- a/dlls/winegstreamer/quartz_parser.c +++ b/dlls/winegstreamer/quartz_parser.c @@ -785,6 +785,11 @@ static DWORD CALLBACK stream_thread(void *arg) static DWORD CALLBACK read_thread(void *arg) { struct parser *filter = arg; + LONGLONG file_size, unused; + uint32_t buffer_size = 0; + void *data = NULL; + + IAsyncReader_Length(filter->reader, &file_size, &unused);
TRACE("Starting read thread for filter %p.\n", filter);
@@ -793,14 +798,29 @@ static DWORD CALLBACK read_thread(void *arg) uint64_t offset; uint32_t size; HRESULT hr; - void *data;
- if (!unix_funcs->wg_parser_get_read_request(filter->wg_parser, &data, &offset, &size)) + if (!unix_funcs->wg_parser_get_next_read_offset(filter->wg_parser, &offset, &size)) continue; + + if (offset >= file_size) + size = 0; + else if (offset + size >= file_size) + size = file_size - offset; + + if (size > buffer_size) + { + buffer_size = size; + data = realloc(data, size); + } + hr = IAsyncReader_SyncRead(filter->reader, offset, size, data); - unix_funcs->wg_parser_complete_read_request(filter->wg_parser, SUCCEEDED(hr)); + if (FAILED(hr)) + ERR("Failed to read %u bytes at offset %I64u, hr %#x.\n", size, offset, hr); + + unix_funcs->wg_parser_push_data(filter->wg_parser, SUCCEEDED(hr) ? data : NULL, size); }
+ free(data); TRACE("Streaming stopped; exiting.\n"); return 0; } diff --git a/dlls/winegstreamer/wg_parser.c b/dlls/winegstreamer/wg_parser.c index b1e435cbba3..7b2a12fd7a9 100644 --- a/dlls/winegstreamer/wg_parser.c +++ b/dlls/winegstreamer/wg_parser.c @@ -517,8 +517,8 @@ static void CDECL wg_parser_end_flush(struct wg_parser *parser) pthread_mutex_unlock(&parser->mutex); }
-static bool CDECL wg_parser_get_read_request(struct wg_parser *parser, - void **data, uint64_t *offset, uint32_t *size) +static bool CDECL wg_parser_get_next_read_offset(struct wg_parser *parser, + uint64_t *offset, uint32_t *size) { pthread_mutex_lock(&parser->mutex);
@@ -531,7 +531,6 @@ static bool CDECL wg_parser_get_read_request(struct wg_parser *parser, return false; }
- *data = parser->read_request.data; *offset = parser->read_request.offset; *size = parser->read_request.size;
@@ -539,11 +538,15 @@ static bool CDECL wg_parser_get_read_request(struct wg_parser *parser, return true; }
-static void CDECL wg_parser_complete_read_request(struct wg_parser *parser, bool ret) +static void CDECL wg_parser_push_data(struct wg_parser *parser, + const void *data, uint32_t size) { pthread_mutex_lock(&parser->mutex); + parser->read_request.size = size; parser->read_request.done = true; - parser->read_request.ret = ret; + parser->read_request.ret = !!data; + if (data) + memcpy(parser->read_request.data, data, size); parser->read_request.data = NULL; pthread_mutex_unlock(&parser->mutex); pthread_cond_signal(&parser->read_done_cond); @@ -1214,10 +1217,6 @@ static GstFlowReturn src_getrange_cb(GstPad *pad, GstObject *parent, if (offset == GST_BUFFER_OFFSET_NONE) offset = parser->next_pull_offset; parser->next_pull_offset = offset + size; - if (offset >= parser->file_size) - return GST_FLOW_EOS; - if (offset + size >= parser->file_size) - size = parser->file_size - offset;
if (!*buffer) *buffer = new_buffer = gst_buffer_new_and_alloc(size); @@ -1241,6 +1240,7 @@ static GstFlowReturn src_getrange_cb(GstPad *pad, GstObject *parent, pthread_cond_wait(&parser->read_done_cond, &parser->mutex);
ret = parser->read_request.ret; + gst_buffer_set_size(*buffer, parser->read_request.size);
pthread_mutex_unlock(&parser->mutex);
@@ -1248,10 +1248,12 @@ static GstFlowReturn src_getrange_cb(GstPad *pad, GstObject *parent,
GST_LOG("Request returned %d.", ret);
- if (!ret && new_buffer) + if ((!ret || !size) && new_buffer) gst_buffer_unref(new_buffer);
- return ret ? GST_FLOW_OK : GST_FLOW_ERROR; + if (ret) + return size ? GST_FLOW_OK : GST_FLOW_EOS; + return GST_FLOW_ERROR; }
static gboolean src_query_cb(GstPad *pad, GstObject *parent, GstQuery *query) @@ -1917,8 +1919,8 @@ static const struct unix_funcs funcs = wg_parser_begin_flush, wg_parser_end_flush,
- wg_parser_get_read_request, - wg_parser_complete_read_request, + wg_parser_get_next_read_offset, + wg_parser_push_data,
wg_parser_set_unlimited_buffering,
From: Derek Lesho dlesho@codeweavers.com
Signed-off-by: Derek Lesho dlesho@codeweavers.com Signed-off-by: Zebediah Figura zfigura@codeweavers.com --- dlls/winegstreamer/wg_parser.c | 434 +++++++++------------------------ 1 file changed, 122 insertions(+), 312 deletions(-)
diff --git a/dlls/winegstreamer/wg_parser.c b/dlls/winegstreamer/wg_parser.c index 7b2a12fd7a9..7d4eb135217 100644 --- a/dlls/winegstreamer/wg_parser.c +++ b/dlls/winegstreamer/wg_parser.c @@ -33,6 +33,7 @@ #include <gst/gst.h> #include <gst/video/video.h> #include <gst/audio/audio.h> +#include <gst/app/gstappsrc.h>
/* GStreamer callbacks may be called on threads not created by Wine, and * therefore cannot access the Wine TEB. This means that we must use GStreamer @@ -49,28 +50,20 @@ struct wg_parser struct wg_parser_stream **streams; unsigned int stream_count;
- GstElement *container, *decodebin; + GstElement *container, *appsrc, *decodebin; GstBus *bus; - GstPad *my_src, *their_sink; - - guint64 file_size, start_offset, next_offset, stop_offset; - guint64 next_pull_offset; - - pthread_t push_thread;
pthread_mutex_t mutex;
pthread_cond_t init_cond; bool no_more_pads, has_duration, error;
- pthread_cond_t read_cond, read_done_cond; + pthread_cond_t read_cond; struct { - void *data; uint64_t offset; uint32_t size; - bool done; - bool ret; + bool pending; } read_request;
bool flushing, sink_connected; @@ -522,7 +515,7 @@ static bool CDECL wg_parser_get_next_read_offset(struct wg_parser *parser, { pthread_mutex_lock(&parser->mutex);
- while (parser->sink_connected && !parser->read_request.data) + while (parser->sink_connected && !parser->read_request.pending) pthread_cond_wait(&parser->read_cond, &parser->mutex);
if (!parser->sink_connected) @@ -541,15 +534,69 @@ static bool CDECL wg_parser_get_next_read_offset(struct wg_parser *parser, static void CDECL wg_parser_push_data(struct wg_parser *parser, const void *data, uint32_t size) { + GstMessage *message; + GstFlowReturn ret; + GstBuffer *buffer; + GError *error; + + if (!data) + { + pthread_mutex_lock(&parser->mutex); + + if (parser->sink_connected) + { + error = g_error_new(G_FILE_ERROR, G_FILE_ERROR_FAILED, "WG-Parser client failed to read data at offset %" G_GUINT64_FORMAT, parser->read_request.offset); + message = gst_message_new_error(NULL, error, ""); + gst_bus_post(parser->bus, message); + parser->read_request.pending = false; + } + + pthread_mutex_unlock(&parser->mutex); + return; + } + + if (!size) + { + pthread_mutex_lock(&parser->mutex); + + if (parser->sink_connected) + g_signal_emit_by_name(G_OBJECT(parser->appsrc), "end-of-stream", &ret); + parser->read_request.pending = false; + + pthread_mutex_unlock(&parser->mutex); + return; + } + + /* We will always perform an extra blit here. We can avoid this in some + * cases by wrapping a client-allocated buffer using + * gst_buffer_new_wrapped(). However, releasing the memory is non-trivial, + * since GStreamer will hold onto a reference for an arbitrarily long + * period of time. Until there's evidence to suggest that the blit causes a + * performance problem, leave it alone. */ + buffer = gst_buffer_new_and_alloc(size); + gst_buffer_fill(buffer, 0, data, size); + pthread_mutex_lock(&parser->mutex); - parser->read_request.size = size; - parser->read_request.done = true; - parser->read_request.ret = !!data; - if (data) - memcpy(parser->read_request.data, data, size); - parser->read_request.data = NULL; + + if (!parser->sink_connected) + { + pthread_mutex_unlock(&parser->mutex); + gst_buffer_unref(buffer); + return; + } + + assert(parser->read_request.pending); + + GST_BUFFER_OFFSET(buffer) = parser->read_request.offset; + g_signal_emit_by_name(G_OBJECT(parser->appsrc), "push-buffer", buffer, &ret); + + /* In random-access mode, GST_FLOW_EOS shouldn't be returned. */ + assert(ret == GST_FLOW_OK || ret == GST_FLOW_FLUSHING); + if (ret == GST_FLOW_OK) + parser->read_request.offset += size; + + parser->read_request.pending = false; pthread_mutex_unlock(&parser->mutex); - pthread_cond_signal(&parser->read_done_cond); }
static void CDECL wg_parser_set_unlimited_buffering(struct wg_parser *parser) @@ -1204,196 +1251,56 @@ static void pad_removed_cb(GstElement *element, GstPad *pad, gpointer user) g_free(name); }
-static GstFlowReturn src_getrange_cb(GstPad *pad, GstObject *parent, - guint64 offset, guint size, GstBuffer **buffer) +static void src_need_data(GstElement *appsrc, guint length, gpointer user) { - struct wg_parser *parser = gst_pad_get_element_private(pad); - GstBuffer *new_buffer = NULL; - GstMapInfo map_info; - bool ret; - - GST_LOG("pad %p, offset %" G_GINT64_MODIFIER "u, length %u, buffer %p.", pad, offset, size, *buffer); - - if (offset == GST_BUFFER_OFFSET_NONE) - offset = parser->next_pull_offset; - parser->next_pull_offset = offset + size; - - if (!*buffer) - *buffer = new_buffer = gst_buffer_new_and_alloc(size); - - gst_buffer_map(*buffer, &map_info, GST_MAP_WRITE); + struct wg_parser *parser = user; + guint64 queued_bytes;
pthread_mutex_lock(&parser->mutex);
- assert(!parser->read_request.data); - parser->read_request.data = map_info.data; - parser->read_request.offset = offset; - parser->read_request.size = size; - parser->read_request.done = false; + /* As of GStreamer 1.18, appsrc suffers from a race condition. When in + * random access mode (and when underlyingly in pull mode), appsrc may + * spuriously send multiple requests for the same offset and length. If it + * receives the same buffer twice (or consecutive buffers), it will blindly + * queue them and satisfy subsequent getrange requests from downstream + * elements with the wrong buffers. + * + * Internally, this function is called inside of a loop, which also pops + * buffers from the internal queue. Accordingly we can safely treat this + * request as spurious by checking if we have already sent data; since the + * data is consumed by this thread it will not have been consumed yet. + * + * The bug is documented in greater detail here: + * + * https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/-/issues/937 + */ + g_object_get(G_OBJECT(appsrc), "current-level-bytes", &queued_bytes, NULL); + if (queued_bytes) + { + pthread_mutex_unlock(&parser->mutex); + return; + } + + parser->read_request.pending = true; + parser->read_request.size = length; + pthread_cond_signal(&parser->read_cond);
- /* Note that we don't unblock this wait on GST_EVENT_FLUSH_START. We expect - * the upstream pin to flush if necessary. We should never be blocked on - * read_thread() not running. */ + pthread_mutex_unlock(&parser->mutex); +}
- while (!parser->read_request.done) - pthread_cond_wait(&parser->read_done_cond, &parser->mutex); +static gboolean src_seek_data(GstElement *appsrc, guint64 offset, gpointer user) +{ + struct wg_parser *parser = user;
- ret = parser->read_request.ret; - gst_buffer_set_size(*buffer, parser->read_request.size); + pthread_mutex_lock(&parser->mutex); + + assert(!parser->read_request.pending); + parser->read_request.offset = offset;
pthread_mutex_unlock(&parser->mutex);
- gst_buffer_unmap(*buffer, &map_info); - - GST_LOG("Request returned %d.", ret); - - if ((!ret || !size) && new_buffer) - gst_buffer_unref(new_buffer); - - if (ret) - return size ? GST_FLOW_OK : GST_FLOW_EOS; - return GST_FLOW_ERROR; -} - -static gboolean src_query_cb(GstPad *pad, GstObject *parent, GstQuery *query) -{ - struct wg_parser *parser = gst_pad_get_element_private(pad); - GstFormat format; - - GST_LOG("parser %p, type %s.", parser, GST_QUERY_TYPE_NAME(query)); - - switch (GST_QUERY_TYPE(query)) - { - case GST_QUERY_DURATION: - gst_query_parse_duration(query, &format, NULL); - if (format == GST_FORMAT_PERCENT) - { - gst_query_set_duration(query, GST_FORMAT_PERCENT, GST_FORMAT_PERCENT_MAX); - return TRUE; - } - else if (format == GST_FORMAT_BYTES) - { - gst_query_set_duration(query, GST_FORMAT_BYTES, parser->file_size); - return TRUE; - } - return FALSE; - - case GST_QUERY_SEEKING: - gst_query_parse_seeking (query, &format, NULL, NULL, NULL); - if (format != GST_FORMAT_BYTES) - { - GST_WARNING("Cannot seek using format "%s".", gst_format_get_name(format)); - return FALSE; - } - gst_query_set_seeking(query, GST_FORMAT_BYTES, 1, 0, parser->file_size); - return TRUE; - - case GST_QUERY_SCHEDULING: - gst_query_set_scheduling(query, GST_SCHEDULING_FLAG_SEEKABLE, 1, -1, 0); - gst_query_add_scheduling_mode(query, GST_PAD_MODE_PUSH); - gst_query_add_scheduling_mode(query, GST_PAD_MODE_PULL); - return TRUE; - - default: - GST_WARNING("Unhandled query type %s.", GST_QUERY_TYPE_NAME(query)); - return FALSE; - } -} - -static void *push_data(void *arg) -{ - struct wg_parser *parser = arg; - GstBuffer *buffer; - guint max_size; - - GST_DEBUG("Starting push thread."); - - if (!(buffer = gst_buffer_new_allocate(NULL, 16384, NULL))) - { - GST_ERROR("Failed to allocate memory."); - return NULL; - } - - max_size = parser->stop_offset ? parser->stop_offset : parser->file_size; - - for (;;) - { - ULONG size; - int ret; - - if (parser->next_offset >= max_size) - break; - size = min(16384, max_size - parser->next_offset); - - if ((ret = src_getrange_cb(parser->my_src, NULL, parser->next_offset, size, &buffer)) < 0) - { - GST_ERROR("Failed to read data, ret %s.", gst_flow_get_name(ret)); - break; - } - - parser->next_offset += size; - - buffer->duration = buffer->pts = -1; - if ((ret = gst_pad_push(parser->my_src, buffer)) < 0) - { - GST_ERROR("Failed to push data, ret %s.", gst_flow_get_name(ret)); - break; - } - } - - gst_buffer_unref(buffer); - - gst_pad_push_event(parser->my_src, gst_event_new_eos()); - - GST_DEBUG("Stopping push thread."); - - return NULL; -} - -static gboolean activate_push(GstPad *pad, gboolean activate) -{ - struct wg_parser *parser = gst_pad_get_element_private(pad); - - if (!activate) - { - if (parser->push_thread) - { - pthread_join(parser->push_thread, NULL); - parser->push_thread = 0; - } - } - else if (!parser->push_thread) - { - int ret; - - if ((ret = pthread_create(&parser->push_thread, NULL, push_data, parser))) - { - GST_ERROR("Failed to create push thread: %s", strerror(errno)); - parser->push_thread = 0; - return FALSE; - } - } - return TRUE; -} - -static gboolean src_activate_mode_cb(GstPad *pad, GstObject *parent, GstPadMode mode, gboolean activate) -{ - struct wg_parser *parser = gst_pad_get_element_private(pad); - - GST_DEBUG("%s source pad for parser %p in %s mode.", - activate ? "Activating" : "Deactivating", parser, gst_pad_mode_get_name(mode)); - - switch (mode) - { - case GST_PAD_MODE_PULL: - return TRUE; - case GST_PAD_MODE_PUSH: - return activate_push(pad, activate); - case GST_PAD_MODE_NONE: - break; - } - return FALSE; + return true; }
static GstBusSyncReply bus_handler_cb(GstBus *bus, GstMessage *msg, gpointer user) @@ -1440,89 +1347,11 @@ static GstBusSyncReply bus_handler_cb(GstBus *bus, GstMessage *msg, gpointer use return GST_BUS_DROP; }
-static gboolean src_perform_seek(struct wg_parser *parser, GstEvent *event) -{ - BOOL thread = !!parser->push_thread; - GstSeekType cur_type, stop_type; - GstFormat seek_format; - GstEvent *flush_event; - GstSeekFlags flags; - gint64 cur, stop; - guint32 seqnum; - gdouble rate; - - gst_event_parse_seek(event, &rate, &seek_format, &flags, - &cur_type, &cur, &stop_type, &stop); - - if (seek_format != GST_FORMAT_BYTES) - { - GST_FIXME("Unhandled format "%s".", gst_format_get_name(seek_format)); - return FALSE; - } - - seqnum = gst_event_get_seqnum(event); - - /* send flush start */ - if (flags & GST_SEEK_FLAG_FLUSH) - { - flush_event = gst_event_new_flush_start(); - gst_event_set_seqnum(flush_event, seqnum); - gst_pad_push_event(parser->my_src, flush_event); - if (thread) - gst_pad_set_active(parser->my_src, 1); - } - - parser->next_offset = parser->start_offset = cur; - - /* and prepare to continue streaming */ - if (flags & GST_SEEK_FLAG_FLUSH) - { - flush_event = gst_event_new_flush_stop(TRUE); - gst_event_set_seqnum(flush_event, seqnum); - gst_pad_push_event(parser->my_src, flush_event); - if (thread) - gst_pad_set_active(parser->my_src, 1); - } - - return TRUE; -} - -static gboolean src_event_cb(GstPad *pad, GstObject *parent, GstEvent *event) -{ - struct wg_parser *parser = gst_pad_get_element_private(pad); - gboolean ret = TRUE; - - GST_LOG("parser %p, type "%s".", parser, GST_EVENT_TYPE_NAME(event)); - - switch (event->type) - { - case GST_EVENT_SEEK: - ret = src_perform_seek(parser, event); - break; - - case GST_EVENT_FLUSH_START: - case GST_EVENT_FLUSH_STOP: - case GST_EVENT_QOS: - case GST_EVENT_RECONFIGURE: - break; - - default: - GST_WARNING("Ignoring "%s" event.", GST_EVENT_TYPE_NAME(event)); - ret = FALSE; - break; - } - gst_event_unref(event); - return ret; -} - static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_size) { - GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE("quartz_src", - GST_PAD_SRC, GST_PAD_ALWAYS, GST_STATIC_CAPS_ANY); unsigned int i; int ret;
- parser->file_size = file_size; parser->sink_connected = true;
if (!parser->bus) @@ -1534,15 +1363,16 @@ static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_s parser->container = gst_bin_new(NULL); gst_element_set_bus(parser->container, parser->bus);
- parser->my_src = gst_pad_new_from_static_template(&src_template, "quartz-src"); - gst_pad_set_getrange_function(parser->my_src, src_getrange_cb); - gst_pad_set_query_function(parser->my_src, src_query_cb); - gst_pad_set_activatemode_function(parser->my_src, src_activate_mode_cb); - gst_pad_set_event_function(parser->my_src, src_event_cb); - gst_pad_set_element_private(parser->my_src, parser); + if (!(parser->appsrc = create_element("appsrc", "base"))) + return E_FAIL; + gst_bin_add(GST_BIN(parser->container), parser->appsrc);
- parser->start_offset = parser->next_offset = parser->stop_offset = 0; - parser->next_pull_offset = 0; + g_object_set(parser->appsrc, "stream-type", GST_APP_STREAM_TYPE_RANDOM_ACCESS, NULL); + g_object_set(parser->appsrc, "size", file_size, NULL); + g_signal_connect(parser->appsrc, "need-data", G_CALLBACK(src_need_data), parser); + g_signal_connect(parser->appsrc, "seek-data", G_CALLBACK(src_seek_data), parser); + + parser->read_request.offset = 0; parser->error = false;
if (!parser->init_gst(parser)) @@ -1638,18 +1468,11 @@ static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_s
pthread_mutex_unlock(&parser->mutex);
- parser->next_offset = 0; return S_OK;
out: if (parser->container) gst_element_set_state(parser->container, GST_STATE_NULL); - if (parser->their_sink) - { - gst_pad_unlink(parser->my_src, parser->their_sink); - gst_object_unref(parser->their_sink); - parser->my_src = parser->their_sink = NULL; - }
for (i = 0; i < parser->stream_count; ++i) free_stream(parser->streams[i]); @@ -1686,10 +1509,6 @@ static void CDECL wg_parser_disconnect(struct wg_parser *parser) pthread_mutex_unlock(&parser->mutex);
gst_element_set_state(parser->container, GST_STATE_NULL); - gst_pad_unlink(parser->my_src, parser->their_sink); - gst_object_unref(parser->my_src); - gst_object_unref(parser->their_sink); - parser->my_src = parser->their_sink = NULL;
pthread_mutex_lock(&parser->mutex); parser->sink_connected = false; @@ -1711,7 +1530,6 @@ static void CDECL wg_parser_disconnect(struct wg_parser *parser) static BOOL decodebin_parser_init_gst(struct wg_parser *parser) { GstElement *element; - int ret;
if (!(element = create_element("decodebin", "base"))) return FALSE; @@ -1724,15 +1542,13 @@ static BOOL decodebin_parser_init_gst(struct wg_parser *parser) g_signal_connect(element, "autoplug-select", G_CALLBACK(autoplug_select_cb), parser); g_signal_connect(element, "no-more-pads", G_CALLBACK(no_more_pads_cb), parser);
- parser->their_sink = gst_element_get_static_pad(element, "sink"); - pthread_mutex_lock(&parser->mutex); parser->no_more_pads = false; pthread_mutex_unlock(&parser->mutex);
- if ((ret = gst_pad_link(parser->my_src, parser->their_sink)) < 0) + if (!gst_element_link(parser->appsrc, parser->decodebin)) { - GST_ERROR("Failed to link pads, error %d.\n", ret); + GST_ERROR("Failed to link appsrc.\n"); return FALSE; }
@@ -1753,15 +1569,13 @@ static BOOL avi_parser_init_gst(struct wg_parser *parser) g_signal_connect(element, "pad-removed", G_CALLBACK(pad_removed_cb), parser); g_signal_connect(element, "no-more-pads", G_CALLBACK(no_more_pads_cb), parser);
- parser->their_sink = gst_element_get_static_pad(element, "sink"); - pthread_mutex_lock(&parser->mutex); parser->no_more_pads = false; pthread_mutex_unlock(&parser->mutex);
- if ((ret = gst_pad_link(parser->my_src, parser->their_sink)) < 0) + if (!gst_element_link(parser->appsrc, element)) { - GST_ERROR("Failed to link pads, error %d.\n", ret); + GST_ERROR("Failed to link appsrc.\n"); return FALSE; }
@@ -1779,10 +1593,9 @@ static BOOL mpeg_audio_parser_init_gst(struct wg_parser *parser)
gst_bin_add(GST_BIN(parser->container), element);
- parser->their_sink = gst_element_get_static_pad(element, "sink"); - if ((ret = gst_pad_link(parser->my_src, parser->their_sink)) < 0) + if (!gst_element_link(parser->appsrc, element)) { - GST_ERROR("Failed to link sink pads, error %d.\n", ret); + GST_ERROR("Failed to link appsrc.\n"); return FALSE; }
@@ -1813,10 +1626,9 @@ static BOOL wave_parser_init_gst(struct wg_parser *parser)
gst_bin_add(GST_BIN(parser->container), element);
- parser->their_sink = gst_element_get_static_pad(element, "sink"); - if ((ret = gst_pad_link(parser->my_src, parser->their_sink)) < 0) + if (!gst_element_link(parser->appsrc, element)) { - GST_ERROR("Failed to link sink pads, error %d.\n", ret); + GST_ERROR("Failed to link appsrc.\n"); return FALSE; }
@@ -1846,7 +1658,6 @@ static struct wg_parser *wg_parser_create(void) pthread_mutex_init(&parser->mutex, NULL); pthread_cond_init(&parser->init_cond, NULL); pthread_cond_init(&parser->read_cond, NULL); - pthread_cond_init(&parser->read_done_cond, NULL); parser->flushing = true;
GST_DEBUG("Created winegstreamer parser %p.\n", parser); @@ -1900,7 +1711,6 @@ static void CDECL wg_parser_destroy(struct wg_parser *parser) pthread_mutex_destroy(&parser->mutex); pthread_cond_destroy(&parser->init_cond); pthread_cond_destroy(&parser->read_cond); - pthread_cond_destroy(&parser->read_done_cond);
free(parser); }
On Wed, 15 Sep 2021, Zebediah Figura wrote: [...]
+static gboolean src_seek_data(GstElement *appsrc, guint64 offset, gpointer user) +{
- struct wg_parser *parser = user;
- ret = parser->read_request.ret;
- gst_buffer_set_size(*buffer, parser->read_request.size);
- pthread_mutex_lock(&parser->mutex);
- assert(!parser->read_request.pending);
quartz:mpegsplit randomly triggers this assertion.
https://bugs.winehq.org/show_bug.cgi?id=51774
https://test.winehq.org/data/patterns.html#quartz:mpegsplit
On 9/21/21 9:36 AM, Francois Gouget wrote:
On Wed, 15 Sep 2021, Zebediah Figura wrote: [...]
+static gboolean src_seek_data(GstElement *appsrc, guint64 offset, gpointer user) +{
- struct wg_parser *parser = user;
- ret = parser->read_request.ret;
- gst_buffer_set_size(*buffer, parser->read_request.size);
- pthread_mutex_lock(&parser->mutex);
- assert(!parser->read_request.pending);
quartz:mpegsplit randomly triggers this assertion.
I had this brought to my attention by Gijs and ended up debugging it myself. The short version is that appsrc has even more problems than we thought. Unfortunately I did not carefully check the flushing code when reviewing.
The problem is that a flushing seek can both start and finish between wg_parser_get_next_read_offset and wg_parser_push_data. This will cause seek-data and need-data to be sent, then the flush will interrupt appsrc, after which it will send the same signals again. This isn't a problem in itself—we can just remove the assertion as it is in fact bogus—except that it means that, once again, we will be sending the wrong data.
Note that we can't validate the buffers sent ourself (i.e. do appsrc's job for it), because these buffers can be validated and sent before we even get seek-data, and yet can still be wrong.
There are two ways I see to fix this while still using appsrc, and both of them seem very janky:
1. Rely on the fact that seek-data is sent during a seek, and use it to effectively invalidate any buffers sent before the seek. Unlike the aforementioned problem with validation, this will actually work: at the time that seek-data is sent appsrc should be flushing, so any buffers we send before it will be discarded by appsrc, and any buffers we send afterward can be discarded by us if they aren't valid. This is very fragile, though. There's really no reason for appsrc to send seek-data for random-access streams in the first place, and this kind of synchronization is easy to get wrong if I haven't already.
2. Send buffers synchronously from within need-data. Honestly I have to assume that the developers of appsrc only ever tried this case, even though they have documented that you can send data "from a separate thread". In this case we'd have to start flushing the read thread before and after seeking.
The other options we have are to remove appsrc. I had originally wanted to use appsrc for three reasons: first, it would abstract away most of the difference between push and pull mode; second, it would deal with pad activation and state change logic for us; and third, it would take care of all the nonsense surrounding flushing and synchronization so that we didn't have to do that. Unfortunately it is quickly turning out that the third part doesn't hold, and the first part holds less and less as well.
Based on that I see two more options:
3. Build our own element using basesrc. For seekable parsers this does still abstract away the difference between push and pull mode. For PE-level push mode (e.g. MFTs) it doesn't, and I'm not sure what the best move is there. We'd have to do about the same amount of work as solution #2, but at least we'd be working within clearly documented parameters, plus we actually have the ability to properly deal with flushes using the unlock/unlock_stop callbacks. The main downside here is that we have to deal with GLib OOP nonsense.
4. Go back to using a custom pad.
On 9/21/21 13:02, Zebediah Figura wrote:
- Rely on the fact that seek-data is sent during a seek, and use it
to effectively invalidate any buffers sent before the seek. Unlike the aforementioned problem with validation, this will actually work: at the time that seek-data is sent appsrc should be flushing, so any buffers we send before it will be discarded by appsrc, and any buffers we send afterward can be discarded by us if they aren't valid. This is very fragile, though. There's really no reason for appsrc to send seek-data for random-access streams in the first place, and this kind of synchronization is easy to get wrong if I haven't already.
This sounds like the best way forward in my opinion, how is it fragile? It seems that if we just ensure that a pushed buffer takes into account the latest seek/need data pair, not much can go wrong.
On 9/24/21 3:05 AM, Derek Lesho wrote:
On 9/21/21 13:02, Zebediah Figura wrote:
- Rely on the fact that seek-data is sent during a seek, and use it
to effectively invalidate any buffers sent before the seek. Unlike the aforementioned problem with validation, this will actually work: at the time that seek-data is sent appsrc should be flushing, so any buffers we send before it will be discarded by appsrc, and any buffers we send afterward can be discarded by us if they aren't valid. This is very fragile, though. There's really no reason for appsrc to send seek-data for random-access streams in the first place, and this kind of synchronization is easy to get wrong if I haven't already.
This sounds like the best way forward in my opinion, how is it fragile?
It's fragile in general because we're making a lot of assumptions, that aren't documented, about when and from which thread gstappsrc will send these signals, and what synchronization guarantees it applies when doing so.
It seems that if we just ensure that a pushed buffer takes into account the latest seek/need data pair, not much can go wrong.
That's not enough by itself. We can, generally speaking, send a buffer after a seek-data/need-data is triggered but before it is actually sent. This happens in practice with flushes.
In order to get around that, you need the hack I mentioned: assume that a flush will be accompanied by a seek-data signal. This is especially fragile because there's no reason for appsrc to even be sending this signal in the first place (it only really makes sense in "seekable" mode), and they could easily decide to not do that.
On 9/24/21 12:38, Zebediah Figura wrote:
On 9/24/21 3:05 AM, Derek Lesho wrote:
On 9/21/21 13:02, Zebediah Figura wrote:
- Rely on the fact that seek-data is sent during a seek, and use it
to effectively invalidate any buffers sent before the seek. Unlike the aforementioned problem with validation, this will actually work: at the time that seek-data is sent appsrc should be flushing, so any buffers we send before it will be discarded by appsrc, and any buffers we send afterward can be discarded by us if they aren't valid. This is very fragile, though. There's really no reason for appsrc to send seek-data for random-access streams in the first place, and this kind of synchronization is easy to get wrong if I haven't already.
This sounds like the best way forward in my opinion, how is it fragile?
It's fragile in general because we're making a lot of assumptions, that aren't documented, about when and from which thread gstappsrc will send these signals, and what synchronization guarantees it applies when doing so.
My perspective is that if we can get it working, documenting the two required quirks to the GStreamer project, eventually the problem will be be cleared up and/or fixed, and we can remove the quirks then.
It seems that if we just ensure that a pushed buffer takes into account the latest seek/need data pair, not much can go wrong.
That's not enough by itself. We can, generally speaking, send a buffer after a seek-data/need-data is triggered but before it is actually sent.
Before what is sent?
This happens in practice with flushes.
In order to get around that, you need the hack I mentioned: assume that a flush will be accompanied by a seek-data signal. This is especially fragile because there's no reason for appsrc to even be sending this signal in the first place (it only really makes sense in "seekable" mode), and they could easily decide to not do that.
I'm not sure I understand, whenever the next buffer app source wants is not consecutively after the last, seek-data is required (and sent). What does "assume the flush will be accompanied by a seek-data signal" mean? The app source client doesn't have any conception of a flush, just seek-data and need-data, and to fix this problem all we need to do is add a quite rational check that push_data is responding to the latest seek/need data pair.
On 9/24/21 14:23, Derek Lesho wrote:
On 9/24/21 12:38, Zebediah Figura wrote:
On 9/24/21 3:05 AM, Derek Lesho wrote:
On 9/21/21 13:02, Zebediah Figura wrote:
- Rely on the fact that seek-data is sent during a seek, and use it
to effectively invalidate any buffers sent before the seek. Unlike the aforementioned problem with validation, this will actually work: at the time that seek-data is sent appsrc should be flushing, so any buffers we send before it will be discarded by appsrc, and any buffers we send afterward can be discarded by us if they aren't valid. This is very fragile, though. There's really no reason for appsrc to send seek-data for random-access streams in the first place, and this kind of synchronization is easy to get wrong if I haven't already.
This sounds like the best way forward in my opinion, how is it fragile?
It's fragile in general because we're making a lot of assumptions, that aren't documented, about when and from which thread gstappsrc will send these signals, and what synchronization guarantees it applies when doing so.
My perspective is that if we can get it working, documenting the two required quirks to the GStreamer project, eventually the problem will be be cleared up and/or fixed, and we can remove the quirks then.
The problem is that we're still going to have to keep those workarounds for a long time. Not to mention that clearly filing a bug is not enough to get the attention of the GStreamer developers.
As the maintainer of this code, I don't think I feel very comfortable relying this much on the internals of appsrc, whether as workarounds or not.
It seems that if we just ensure that a pushed buffer takes into account the latest seek/need data pair, not much can go wrong.
That's not enough by itself. We can, generally speaking, send a buffer after a seek-data/need-data is triggered but before it is actually sent.
Before what is sent?
Before the seek-data or need-data signal is sent. There is a window there, however short.
This happens in practice with flushes.
In order to get around that, you need the hack I mentioned: assume that a flush will be accompanied by a seek-data signal. This is especially fragile because there's no reason for appsrc to even be sending this signal in the first place (it only really makes sense in "seekable" mode), and they could easily decide to not do that.
I'm not sure I understand, whenever the next buffer app source wants is not consecutively after the last, seek-data is required (and sent). What does "assume the flush will be accompanied by a seek-data signal" mean? The app source client doesn't have any conception of a flush, just seek-data and need-data, and to fix this problem all we need to do is add a quite rational check that push_data is responding to the latest seek/need data pair.
Let me try to explain more clearly. At least one race, the one I've been trying to describe, looks like this:
read thread main thread ------------------------------------- push seek event retrieve data validate offset send flush-start emit seek-data send flush-stop push-buffer
At which point appsrc will get the wrong buffer.
Solution 1 has us rely on the seek-data signal to effectively wait for the read thread (most likely using the parser mutex) and, in a sense, put a barrier between reads occuring before and after the flush.
On 9/28/21 07:23, Zebediah Figura (she/her) wrote:
On 9/24/21 14:23, Derek Lesho wrote:
On 9/24/21 12:38, Zebediah Figura wrote:
On 9/24/21 3:05 AM, Derek Lesho wrote:
On 9/21/21 13:02, Zebediah Figura wrote:
- Rely on the fact that seek-data is sent during a seek, and use it
to effectively invalidate any buffers sent before the seek. Unlike the aforementioned problem with validation, this will actually work: at the time that seek-data is sent appsrc should be flushing, so any buffers we send before it will be discarded by appsrc, and any buffers we send afterward can be discarded by us if they aren't valid. This is very fragile, though. There's really no reason for appsrc to send seek-data for random-access streams in the first place, and this kind of synchronization is easy to get wrong if I haven't already.
This sounds like the best way forward in my opinion, how is it fragile?
It's fragile in general because we're making a lot of assumptions, that aren't documented, about when and from which thread gstappsrc will send these signals, and what synchronization guarantees it applies when doing so.
My perspective is that if we can get it working, documenting the two required quirks to the GStreamer project, eventually the problem will be be cleared up and/or fixed, and we can remove the quirks then.
The problem is that we're still going to have to keep those workarounds for a long time. Not to mention that clearly filing a bug is not enough to get the attention of the GStreamer developers.
As the maintainer of this code, I don't think I feel very comfortable relying this much on the internals of appsrc, whether as workarounds or not.
It seems that if we just ensure that a pushed buffer takes into account the latest seek/need data pair, not much can go wrong.
That's not enough by itself. We can, generally speaking, send a buffer after a seek-data/need-data is triggered but before it is actually sent.
Before what is sent?
Before the seek-data or need-data signal is sent. There is a window there, however short.
I'm not sure exactly what the difference between triggering a signal and sending a signal is, but assuming you mean that if seek-data doesn't acquire the mutex, it would be possible to push-buffer to not see the seek, then send the buffer after seek-data, then yeah, we do need to put seek-data and push-buffer in the same mutex to make sure that push-buffer is responding to the latest seek/need data callbacks.
This happens in practice with flushes.
In order to get around that, you need the hack I mentioned: assume that a flush will be accompanied by a seek-data signal. This is especially fragile because there's no reason for appsrc to even be sending this signal in the first place (it only really makes sense in "seekable" mode), and they could easily decide to not do that.
I'm not sure I understand, whenever the next buffer app source wants is not consecutively after the last, seek-data is required (and sent). What does "assume the flush will be accompanied by a seek-data signal" mean? The app source client doesn't have any conception of a flush, just seek-data and need-data, and to fix this problem all we need to do is add a quite rational check that push_data is responding to the latest seek/need data pair.
Let me try to explain more clearly. At least one race, the one I've been trying to describe, looks like this:
read thread main thread
push seek event retrieve data validate offset send flush-start emit seek-data send flush-stop push-buffer
At which point appsrc will get the wrong buffer.
Solution 1 has us rely on the seek-data signal to effectively wait for the read thread (most likely using the parser mutex) and, in a sense, put a barrier between reads occuring before and after the flush.
I mean, yeah, but from the perspective of the interface, all we are doing is making sure that buffers that are responding to requests from before a seek-data don't get sent, which I really don't think should be too controversial. That the seek happens to be caused by a flush shouldn't matter, we're putting a barrier between buffers sent for a different offset, and the current offset we've gotten from seek-data. Honestly, I'm not even sure this is a GStreamer bug, should they really have to handle a case where the client responds with an incorrect buffer after a seek?
On 9/28/21 03:08, Derek Lesho wrote:
On 9/28/21 07:23, Zebediah Figura (she/her) wrote:
On 9/24/21 14:23, Derek Lesho wrote:
On 9/24/21 12:38, Zebediah Figura wrote:
On 9/24/21 3:05 AM, Derek Lesho wrote:
On 9/21/21 13:02, Zebediah Figura wrote:
- Rely on the fact that seek-data is sent during a seek, and use it
to effectively invalidate any buffers sent before the seek. Unlike the aforementioned problem with validation, this will actually work: at the time that seek-data is sent appsrc should be flushing, so any buffers we send before it will be discarded by appsrc, and any buffers we send afterward can be discarded by us if they aren't valid. This is very fragile, though. There's really no reason for appsrc to send seek-data for random-access streams in the first place, and this kind of synchronization is easy to get wrong if I haven't already.
This sounds like the best way forward in my opinion, how is it fragile?
It's fragile in general because we're making a lot of assumptions, that aren't documented, about when and from which thread gstappsrc will send these signals, and what synchronization guarantees it applies when doing so.
My perspective is that if we can get it working, documenting the two required quirks to the GStreamer project, eventually the problem will be be cleared up and/or fixed, and we can remove the quirks then.
The problem is that we're still going to have to keep those workarounds for a long time. Not to mention that clearly filing a bug is not enough to get the attention of the GStreamer developers.
As the maintainer of this code, I don't think I feel very comfortable relying this much on the internals of appsrc, whether as workarounds or not.
It seems that if we just ensure that a pushed buffer takes into account the latest seek/need data pair, not much can go wrong.
That's not enough by itself. We can, generally speaking, send a buffer after a seek-data/need-data is triggered but before it is actually sent.
Before what is sent?
Before the seek-data or need-data signal is sent. There is a window there, however short.
I'm not sure exactly what the difference between triggering a signal and sending a signal is, but assuming you mean that if seek-data doesn't acquire the mutex, it would be possible to push-buffer to not see the seek, then send the buffer after seek-data, then yeah, we do need to put seek-data and push-buffer in the same mutex to make sure that push-buffer is responding to the latest seek/need data callbacks.
This happens in practice with flushes.
In order to get around that, you need the hack I mentioned: assume that a flush will be accompanied by a seek-data signal. This is especially fragile because there's no reason for appsrc to even be sending this signal in the first place (it only really makes sense in "seekable" mode), and they could easily decide to not do that.
I'm not sure I understand, whenever the next buffer app source wants is not consecutively after the last, seek-data is required (and sent). What does "assume the flush will be accompanied by a seek-data signal" mean? The app source client doesn't have any conception of a flush, just seek-data and need-data, and to fix this problem all we need to do is add a quite rational check that push_data is responding to the latest seek/need data pair.
Let me try to explain more clearly. At least one race, the one I've been trying to describe, looks like this:
read thread main thread
push seek event retrieve data validate offset send flush-start emit seek-data send flush-stop push-buffer
At which point appsrc will get the wrong buffer.
Solution 1 has us rely on the seek-data signal to effectively wait for the read thread (most likely using the parser mutex) and, in a sense, put a barrier between reads occuring before and after the flush.
I mean, yeah, but from the perspective of the interface, all we are doing is making sure that buffers that are responding to requests from before a seek-data don't get sent, which I really don't think should be too controversial. That the seek happens to be caused by a flush shouldn't matter, we're putting a barrier between buffers sent for a different offset, and the current offset we've gotten from seek-data. Honestly, I'm not even sure this is a GStreamer bug, should they really have to handle a case where the client responds with an incorrect buffer after a seek?
I don't think this is obvious at all. It's certainly not called out in the documentation. And, moreover, it's a lot of work, and it doesn't even look like a remotely idiomatic solution. Not to mention that, as I've said, appsrc really has no reason to send seek-data at all here.
I'm pretty sure that you're correct that putting the whole of wg_parser_push_data inside a mutex, and the seek-data callback inside the same mutex, is actually sufficient to fix this race. But that stops being true if appsrc stops sending seek-data here.
Fundamentally the API contract here is that you don't send old buffers after a flush stops. DirectShow and GStreamer are both built on that idea. I believe that Media Foundation does away with it, which is one of the good things about Media Foundation. appsrc doesn't expose flushes to us, and it doesn't seem to do a good enough job of taking care of them itself. Trying to infer when they happen is too fragile for my taste.
On 9/29/21 00:18, Zebediah Figura (she/her) wrote:
On 9/28/21 03:08, Derek Lesho wrote:
On 9/28/21 07:23, Zebediah Figura (she/her) wrote:
On 9/24/21 14:23, Derek Lesho wrote:
On 9/24/21 12:38, Zebediah Figura wrote:
On 9/24/21 3:05 AM, Derek Lesho wrote:
On 9/21/21 13:02, Zebediah Figura wrote:
> > > 1. Rely on the fact that seek-data is sent during a seek, and > use it > to effectively invalidate any buffers sent before the seek. > Unlike the > aforementioned problem with validation, this will actually work: at > the time that seek-data is sent appsrc should be flushing, so any > buffers we send before it will be discarded by appsrc, and any > buffers > we send afterward can be discarded by us if they aren't valid. > This is > very fragile, though. There's really no reason for appsrc to send > seek-data for random-access streams in the first place, and this > kind > of synchronization is easy to get wrong if I haven't already.
This sounds like the best way forward in my opinion, how is it fragile?
It's fragile in general because we're making a lot of assumptions, that aren't documented, about when and from which thread gstappsrc will send these signals, and what synchronization guarantees it applies when doing so.
My perspective is that if we can get it working, documenting the two required quirks to the GStreamer project, eventually the problem will be be cleared up and/or fixed, and we can remove the quirks then.
The problem is that we're still going to have to keep those workarounds for a long time. Not to mention that clearly filing a bug is not enough to get the attention of the GStreamer developers.
As the maintainer of this code, I don't think I feel very comfortable relying this much on the internals of appsrc, whether as workarounds or not.
It seems that if we just ensure that a pushed buffer takes into account the latest seek/need data pair, not much can go wrong.
That's not enough by itself. We can, generally speaking, send a buffer after a seek-data/need-data is triggered but before it is actually sent.
Before what is sent?
Before the seek-data or need-data signal is sent. There is a window there, however short.
I'm not sure exactly what the difference between triggering a signal and sending a signal is, but assuming you mean that if seek-data doesn't acquire the mutex, it would be possible to push-buffer to not see the seek, then send the buffer after seek-data, then yeah, we do need to put seek-data and push-buffer in the same mutex to make sure that push-buffer is responding to the latest seek/need data callbacks.
This happens in practice with flushes.
In order to get around that, you need the hack I mentioned: assume that a flush will be accompanied by a seek-data signal. This is especially fragile because there's no reason for appsrc to even be sending this signal in the first place (it only really makes sense in "seekable" mode), and they could easily decide to not do that.
I'm not sure I understand, whenever the next buffer app source wants is not consecutively after the last, seek-data is required (and sent). What does "assume the flush will be accompanied by a seek-data signal" mean? The app source client doesn't have any conception of a flush, just seek-data and need-data, and to fix this problem all we need to do is add a quite rational check that push_data is responding to the latest seek/need data pair.
Let me try to explain more clearly. At least one race, the one I've been trying to describe, looks like this:
read thread main thread
push seek event retrieve data validate offset send flush-start emit seek-data send flush-stop push-buffer
At which point appsrc will get the wrong buffer.
Solution 1 has us rely on the seek-data signal to effectively wait for the read thread (most likely using the parser mutex) and, in a sense, put a barrier between reads occuring before and after the flush.
I mean, yeah, but from the perspective of the interface, all we are doing is making sure that buffers that are responding to requests from before a seek-data don't get sent, which I really don't think should be too controversial. That the seek happens to be caused by a flush shouldn't matter, we're putting a barrier between buffers sent for a different offset, and the current offset we've gotten from seek-data. Honestly, I'm not even sure this is a GStreamer bug, should they really have to handle a case where the client responds with an incorrect buffer after a seek?
I don't think this is obvious at all. It's certainly not called out in the documentation.
The documentation does say "After receiving the seek-data signal, the application should push-buffers from the new position."
And, moreover, it's a lot of work, and it doesn't even look like a remotely idiomatic solution.
As I've already said, making sure we don't push buffers to an old offset after receiving a seek seems quite idiomatic to me.
Not to mention that, as I've said, appsrc really has no reason to send seek-data at all here.
It sends it here to ensure that pre-flush buffers are discarded. After it calls seek-data, it clears the queue, because of the aforementioned expectation, namely that the app will respond to seek-data, this works.
I'm pretty sure that you're correct that putting the whole of wg_parser_push_data inside a mutex, and the seek-data callback inside the same mutex, is actually sufficient to fix this race. But that stops being true if appsrc stops sending seek-data here.
What reason would appsrc have to stop sending seek-data here, it has to as far as I can see.
Fundamentally the API contract here is that you don't send old buffers after a flush stops. DirectShow and GStreamer are both built on that idea. I believe that Media Foundation does away with it, which is one of the good things about Media Foundation. appsrc doesn't expose flushes to us, and it doesn't seem to do a good enough job of taking care of them itself.
I think that requiring us not to send old buffers after seek-data and sending seek-data when they flush, then discarding the rest of the queue is a pretty good way of taking care of flushes.
Trying to infer when they happen is too fragile for my taste.
We don't need to infer when they happen, all we need to do is follow the guideline not to send buffers to a previous offset after seek-data.
On 9/29/21 02:59, Derek Lesho wrote:
On 9/29/21 00:18, Zebediah Figura (she/her) wrote:
On 9/28/21 03:08, Derek Lesho wrote:
On 9/28/21 07:23, Zebediah Figura (she/her) wrote:
On 9/24/21 14:23, Derek Lesho wrote:
On 9/24/21 12:38, Zebediah Figura wrote:
On 9/24/21 3:05 AM, Derek Lesho wrote: > On 9/21/21 13:02, Zebediah Figura wrote: > >> >> >> 1. Rely on the fact that seek-data is sent during a seek, and >> use it >> to effectively invalidate any buffers sent before the seek. >> Unlike the >> aforementioned problem with validation, this will actually work: at >> the time that seek-data is sent appsrc should be flushing, so any >> buffers we send before it will be discarded by appsrc, and any >> buffers >> we send afterward can be discarded by us if they aren't valid. >> This is >> very fragile, though. There's really no reason for appsrc to send >> seek-data for random-access streams in the first place, and this >> kind >> of synchronization is easy to get wrong if I haven't already. > > This sounds like the best way forward in my opinion, how is it > fragile?
It's fragile in general because we're making a lot of assumptions, that aren't documented, about when and from which thread gstappsrc will send these signals, and what synchronization guarantees it applies when doing so.
My perspective is that if we can get it working, documenting the two required quirks to the GStreamer project, eventually the problem will be be cleared up and/or fixed, and we can remove the quirks then.
The problem is that we're still going to have to keep those workarounds for a long time. Not to mention that clearly filing a bug is not enough to get the attention of the GStreamer developers.
As the maintainer of this code, I don't think I feel very comfortable relying this much on the internals of appsrc, whether as workarounds or not.
> It seems that if we just ensure that a pushed buffer takes into > account > the latest seek/need data pair, not much can go wrong. >
That's not enough by itself. We can, generally speaking, send a buffer after a seek-data/need-data is triggered but before it is actually sent.
Before what is sent?
Before the seek-data or need-data signal is sent. There is a window there, however short.
I'm not sure exactly what the difference between triggering a signal and sending a signal is, but assuming you mean that if seek-data doesn't acquire the mutex, it would be possible to push-buffer to not see the seek, then send the buffer after seek-data, then yeah, we do need to put seek-data and push-buffer in the same mutex to make sure that push-buffer is responding to the latest seek/need data callbacks.
This happens in practice with flushes.
In order to get around that, you need the hack I mentioned: assume that a flush will be accompanied by a seek-data signal. This is especially fragile because there's no reason for appsrc to even be sending this signal in the first place (it only really makes sense in "seekable" mode), and they could easily decide to not do that.
I'm not sure I understand, whenever the next buffer app source wants is not consecutively after the last, seek-data is required (and sent). What does "assume the flush will be accompanied by a seek-data signal" mean? The app source client doesn't have any conception of a flush, just seek-data and need-data, and to fix this problem all we need to do is add a quite rational check that push_data is responding to the latest seek/need data pair.
Let me try to explain more clearly. At least one race, the one I've been trying to describe, looks like this:
read thread main thread
push seek event retrieve data validate offset send flush-start emit seek-data send flush-stop push-buffer
At which point appsrc will get the wrong buffer.
Solution 1 has us rely on the seek-data signal to effectively wait for the read thread (most likely using the parser mutex) and, in a sense, put a barrier between reads occuring before and after the flush.
I mean, yeah, but from the perspective of the interface, all we are doing is making sure that buffers that are responding to requests from before a seek-data don't get sent, which I really don't think should be too controversial. That the seek happens to be caused by a flush shouldn't matter, we're putting a barrier between buffers sent for a different offset, and the current offset we've gotten from seek-data. Honestly, I'm not even sure this is a GStreamer bug, should they really have to handle a case where the client responds with an incorrect buffer after a seek?
I don't think this is obvious at all. It's certainly not called out in the documentation.
The documentation does say "After receiving the seek-data signal, the application should push-buffers from the new position."
There's quite a difference between that and "make sure that any old buffers are flushed out, and don't send any new buffers from the old position".
And, moreover, it's a lot of work, and it doesn't even look like a remotely idiomatic solution.
As I've already said, making sure we don't push buffers to an old offset after receiving a seek seems quite idiomatic to me.
Not to mention that, as I've said, appsrc really has no reason to send seek-data at all here.
It sends it here to ensure that pre-flush buffers are discarded. After it calls seek-data, it clears the queue, because of the aforementioned expectation, namely that the app will respond to seek-data, this works.
Well, no, I don't think it does. I think it sends seek-data here for "seekable" mode, because that actually makes sense, and ends up also sending it for "random-access" mode unnecessarily.
I'm pretty sure that you're correct that putting the whole of wg_parser_push_data inside a mutex, and the seek-data callback inside the same mutex, is actually sufficient to fix this race. But that stops being true if appsrc stops sending seek-data here.
What reason would appsrc have to stop sending seek-data here, it has to as far as I can see.
Fundamentally the API contract here is that you don't send old buffers after a flush stops. DirectShow and GStreamer are both built on that idea. I believe that Media Foundation does away with it, which is one of the good things about Media Foundation. appsrc doesn't expose flushes to us, and it doesn't seem to do a good enough job of taking care of them itself.
I think that requiring us not to send old buffers after seek-data and sending seek-data when they flush, then discarding the rest of the queue is a pretty good way of taking care of flushes.
Trying to infer when they happen is too fragile for my taste.
We don't need to infer when they happen, all we need to do is follow the guideline not to send buffers to a previous offset after seek-data.
On 9/29/21 17:30, Zebediah Figura (she/her) wrote:
On 9/29/21 02:59, Derek Lesho wrote:
On 9/29/21 00:18, Zebediah Figura (she/her) wrote:
On 9/28/21 03:08, Derek Lesho wrote:
On 9/28/21 07:23, Zebediah Figura (she/her) wrote:
On 9/24/21 14:23, Derek Lesho wrote:
On 9/24/21 12:38, Zebediah Figura wrote: > On 9/24/21 3:05 AM, Derek Lesho wrote: >> On 9/21/21 13:02, Zebediah Figura wrote: >> >>> >>> >>> 1. Rely on the fact that seek-data is sent during a seek, and >>> use it >>> to effectively invalidate any buffers sent before the seek. >>> Unlike the >>> aforementioned problem with validation, this will actually >>> work: at >>> the time that seek-data is sent appsrc should be flushing, so any >>> buffers we send before it will be discarded by appsrc, and any >>> buffers >>> we send afterward can be discarded by us if they aren't valid. >>> This is >>> very fragile, though. There's really no reason for appsrc to send >>> seek-data for random-access streams in the first place, and >>> this kind >>> of synchronization is easy to get wrong if I haven't already. >> >> This sounds like the best way forward in my opinion, how is it >> fragile? > > It's fragile in general because we're making a lot of > assumptions, that aren't documented, about when and from which > thread gstappsrc will send these signals, and what > synchronization guarantees it applies when doing so.
My perspective is that if we can get it working, documenting the two required quirks to the GStreamer project, eventually the problem will be be cleared up and/or fixed, and we can remove the quirks then.
The problem is that we're still going to have to keep those workarounds for a long time. Not to mention that clearly filing a bug is not enough to get the attention of the GStreamer developers.
As the maintainer of this code, I don't think I feel very comfortable relying this much on the internals of appsrc, whether as workarounds or not.
> >> It seems that if we just ensure that a pushed buffer takes into >> account >> the latest seek/need data pair, not much can go wrong. >> > > That's not enough by itself. We can, generally speaking, send a > buffer after a seek-data/need-data is triggered but before it is > actually sent. Before what is sent?
Before the seek-data or need-data signal is sent. There is a window there, however short.
I'm not sure exactly what the difference between triggering a signal and sending a signal is, but assuming you mean that if seek-data doesn't acquire the mutex, it would be possible to push-buffer to not see the seek, then send the buffer after seek-data, then yeah, we do need to put seek-data and push-buffer in the same mutex to make sure that push-buffer is responding to the latest seek/need data callbacks.
> This happens in practice with flushes. > > In order to get around that, you need the hack I mentioned: > assume that a flush will be accompanied by a seek-data signal. > This is especially fragile because there's no reason for appsrc > to even be sending this signal in the first place (it only > really makes sense in "seekable" mode), and they could easily > decide to not do that.
I'm not sure I understand, whenever the next buffer app source wants is not consecutively after the last, seek-data is required (and sent). What does "assume the flush will be accompanied by a seek-data signal" mean? The app source client doesn't have any conception of a flush, just seek-data and need-data, and to fix this problem all we need to do is add a quite rational check that push_data is responding to the latest seek/need data pair.
Let me try to explain more clearly. At least one race, the one I've been trying to describe, looks like this:
read thread main thread
push seek event retrieve data validate offset send flush-start emit seek-data send flush-stop push-buffer
At which point appsrc will get the wrong buffer.
Solution 1 has us rely on the seek-data signal to effectively wait for the read thread (most likely using the parser mutex) and, in a sense, put a barrier between reads occuring before and after the flush.
I mean, yeah, but from the perspective of the interface, all we are doing is making sure that buffers that are responding to requests from before a seek-data don't get sent, which I really don't think should be too controversial. That the seek happens to be caused by a flush shouldn't matter, we're putting a barrier between buffers sent for a different offset, and the current offset we've gotten from seek-data. Honestly, I'm not even sure this is a GStreamer bug, should they really have to handle a case where the client responds with an incorrect buffer after a seek?
I don't think this is obvious at all. It's certainly not called out in the documentation.
The documentation does say "After receiving the seek-data signal, the application should push-buffers from the new position."
There's quite a difference between that and "make sure that any old buffers are flushed out,
Old buffers don't need to be flushed out, app source does that.
and don't send any new buffers from the old position".
"don't send any new buffers from the old position" logically has the same meaning as "should push-buffers from the new position".
And, moreover, it's a lot of work, and it doesn't even look like a remotely idiomatic solution.
As I've already said, making sure we don't push buffers to an old offset after receiving a seek seems quite idiomatic to me.
Not to mention that, as I've said, appsrc really has no reason to send seek-data at all here.
It sends it here to ensure that pre-flush buffers are discarded. After it calls seek-data, it clears the queue, because of the aforementioned expectation, namely that the app will respond to seek-data, this works.
Well, no, I don't think it does. I think it sends seek-data here for "seekable" mode, because that actually makes sense, and ends up also sending it for "random-access" mode unnecessarily.
Sending it for "random-access" mode does makes sense, because downstream wants all future(post flush) buffers to be from a new offset, so it needs to tell the app to stop sending buffers from the old location, and start from the new location, then discard the queued buffer/s for the old location if needed. As mentioned above this, according to the documentation, when a seek-data is sent, buffers sent afterwards must be responsive to the new offset.
I'm pretty sure that you're correct that putting the whole of wg_parser_push_data inside a mutex, and the seek-data callback inside the same mutex, is actually sufficient to fix this race. But that stops being true if appsrc stops sending seek-data here.
What reason would appsrc have to stop sending seek-data here, it has to as far as I can see.
Fundamentally the API contract here is that you don't send old buffers after a flush stops. DirectShow and GStreamer are both built on that idea. I believe that Media Foundation does away with it, which is one of the good things about Media Foundation. appsrc doesn't expose flushes to us, and it doesn't seem to do a good enough job of taking care of them itself.
I think that requiring us not to send old buffers after seek-data and sending seek-data when they flush, then discarding the rest of the queue is a pretty good way of taking care of flushes.
Trying to infer when they happen is too fragile for my taste.
We don't need to infer when they happen, all we need to do is follow the guideline not to send buffers to a previous offset after seek-data.
On 9/29/21 2:37 PM, Derek Lesho wrote:
On 9/29/21 17:30, Zebediah Figura (she/her) wrote:
On 9/29/21 02:59, Derek Lesho wrote:
On 9/29/21 00:18, Zebediah Figura (she/her) wrote:
On 9/28/21 03:08, Derek Lesho wrote:
On 9/28/21 07:23, Zebediah Figura (she/her) wrote:
On 9/24/21 14:23, Derek Lesho wrote: > > On 9/24/21 12:38, Zebediah Figura wrote: >> On 9/24/21 3:05 AM, Derek Lesho wrote: >>> On 9/21/21 13:02, Zebediah Figura wrote: >>> >>>> >>>> >>>> 1. Rely on the fact that seek-data is sent during a seek, and >>>> use it >>>> to effectively invalidate any buffers sent before the seek. >>>> Unlike the >>>> aforementioned problem with validation, this will actually >>>> work: at >>>> the time that seek-data is sent appsrc should be flushing, so any >>>> buffers we send before it will be discarded by appsrc, and any >>>> buffers >>>> we send afterward can be discarded by us if they aren't valid. >>>> This is >>>> very fragile, though. There's really no reason for appsrc to send >>>> seek-data for random-access streams in the first place, and >>>> this kind >>>> of synchronization is easy to get wrong if I haven't already. >>> >>> This sounds like the best way forward in my opinion, how is it >>> fragile? >> >> It's fragile in general because we're making a lot of >> assumptions, that aren't documented, about when and from which >> thread gstappsrc will send these signals, and what >> synchronization guarantees it applies when doing so. > > My perspective is that if we can get it working, documenting the > two required quirks to the GStreamer project, eventually the > problem will be be cleared up and/or fixed, and we can remove the > quirks then.
The problem is that we're still going to have to keep those workarounds for a long time. Not to mention that clearly filing a bug is not enough to get the attention of the GStreamer developers.
As the maintainer of this code, I don't think I feel very comfortable relying this much on the internals of appsrc, whether as workarounds or not.
> >> >>> It seems that if we just ensure that a pushed buffer takes into >>> account >>> the latest seek/need data pair, not much can go wrong. >>> >> >> That's not enough by itself. We can, generally speaking, send a >> buffer after a seek-data/need-data is triggered but before it is >> actually sent. > Before what is sent?
Before the seek-data or need-data signal is sent. There is a window there, however short.
I'm not sure exactly what the difference between triggering a signal and sending a signal is, but assuming you mean that if seek-data doesn't acquire the mutex, it would be possible to push-buffer to not see the seek, then send the buffer after seek-data, then yeah, we do need to put seek-data and push-buffer in the same mutex to make sure that push-buffer is responding to the latest seek/need data callbacks.
>> This happens in practice with flushes. >> >> In order to get around that, you need the hack I mentioned: >> assume that a flush will be accompanied by a seek-data signal. >> This is especially fragile because there's no reason for appsrc >> to even be sending this signal in the first place (it only >> really makes sense in "seekable" mode), and they could easily >> decide to not do that. > > I'm not sure I understand, whenever the next buffer app source > wants is not consecutively after the last, seek-data is required > (and sent). What does "assume the flush will be accompanied by a > seek-data signal" mean? The app source client doesn't have any > conception of a flush, just seek-data and need-data, and to fix > this problem all we need to do is add a quite rational check that > push_data is responding to the latest seek/need data pair.
Let me try to explain more clearly. At least one race, the one I've been trying to describe, looks like this:
read thread main thread
push seek event retrieve data validate offset send flush-start emit seek-data send flush-stop push-buffer
At which point appsrc will get the wrong buffer.
Solution 1 has us rely on the seek-data signal to effectively wait for the read thread (most likely using the parser mutex) and, in a sense, put a barrier between reads occuring before and after the flush.
I mean, yeah, but from the perspective of the interface, all we are doing is making sure that buffers that are responding to requests from before a seek-data don't get sent, which I really don't think should be too controversial. That the seek happens to be caused by a flush shouldn't matter, we're putting a barrier between buffers sent for a different offset, and the current offset we've gotten from seek-data. Honestly, I'm not even sure this is a GStreamer bug, should they really have to handle a case where the client responds with an incorrect buffer after a seek?
I don't think this is obvious at all. It's certainly not called out in the documentation.
The documentation does say "After receiving the seek-data signal, the application should push-buffers from the new position."
There's quite a difference between that and "make sure that any old buffers are flushed out,
Old buffers don't need to be flushed out, app source does that.
and don't send any new buffers from the old position".
"don't send any new buffers from the old position" logically has the same meaning as "should push-buffers from the new position".
And, moreover, it's a lot of work, and it doesn't even look like a remotely idiomatic solution.
As I've already said, making sure we don't push buffers to an old offset after receiving a seek seems quite idiomatic to me.
Not to mention that, as I've said, appsrc really has no reason to send seek-data at all here.
It sends it here to ensure that pre-flush buffers are discarded. After it calls seek-data, it clears the queue, because of the aforementioned expectation, namely that the app will respond to seek-data, this works.
Well, no, I don't think it does. I think it sends seek-data here for "seekable" mode, because that actually makes sense, and ends up also sending it for "random-access" mode unnecessarily.
Sending it for "random-access" mode does makes sense, because downstream wants all future(post flush) buffers to be from a new offset, so it needs to tell the app to stop sending buffers from the old location, and start from the new location, then discard the queued buffer/s for the old location if needed. As mentioned above this, according to the documentation, when a seek-data is sent, buffers sent afterwards must be responsive to the new offset.
Sorry, I remain unconvinced that this approach is a good one.
On 9/29/21 22:23, Zebediah Figura wrote:
Sorry, I remain unconvinced that this approach is a good one.
Can't argue with that, I think I'll move on from working on this, at-least for now.