Signed-off-by: Derek Lesho dlesho@codeweavers.com --- dlls/winegstreamer/media_source.c | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-)
diff --git a/dlls/winegstreamer/media_source.c b/dlls/winegstreamer/media_source.c index 01ab626254a..383afcfce75 100644 --- a/dlls/winegstreamer/media_source.c +++ b/dlls/winegstreamer/media_source.c @@ -733,6 +733,9 @@ static HRESULT new_media_stream(struct media_source *source, object->IMFMediaStream_iface.lpVtbl = &media_stream_vtbl; object->ref = 1;
+ if (FAILED(hr = MFCreateEventQueue(&object->event_queue))) + goto fail; + IMFMediaSource_AddRef(&source->IMFMediaSource_iface); object->parent_source = source; object->stream_id = stream_id; @@ -741,9 +744,6 @@ static HRESULT new_media_stream(struct media_source *source, object->eos = FALSE; object->wg_stream = wg_stream;
- if (FAILED(hr = MFCreateEventQueue(&object->event_queue))) - goto fail; - TRACE("Created stream object %p.\n", object);
*out_stream = object; @@ -1211,7 +1211,8 @@ static HRESULT WINAPI media_source_Shutdown(IMFMediaSource *iface)
source->state = SOURCE_SHUTDOWN;
- unix_funcs->wg_parser_disconnect(source->wg_parser); + if (source->stream_count) + unix_funcs->wg_parser_disconnect(source->wg_parser);
if (source->read_thread) { @@ -1231,6 +1232,9 @@ static HRESULT WINAPI media_source_Shutdown(IMFMediaSource *iface) { struct media_stream *stream = source->streams[i];
+ if (!stream) + continue; + stream->state = STREAM_SHUTDOWN;
if (stream->event_queue) @@ -1245,7 +1249,7 @@ static HRESULT WINAPI media_source_Shutdown(IMFMediaSource *iface)
unix_funcs->wg_parser_destroy(source->wg_parser);
- if (source->stream_count) + if (source->streams) free(source->streams);
if (source->async_commands_queue) @@ -1353,7 +1357,6 @@ static HRESULT media_source_constructor(IMFByteStream *bytestream, struct media_ if (FAILED(hr = media_stream_init_desc(object->streams[i]))) { ERR("Failed to finish initialization of media stream %p, hr %x.\n", object->streams[i], hr); - IMFMediaStream_Release(&object->streams[i]->IMFMediaStream_iface); goto fail; } } @@ -1392,6 +1395,8 @@ static HRESULT media_source_constructor(IMFByteStream *bytestream, struct media_ fail: WARN("Failed to construct MFMediaSource, hr %#x.\n", hr);
+ if (object->wg_parser) + IMFMediaSource_Shutdown(&object->IMFMediaSource_iface); free(descriptors); IMFMediaSource_Release(&object->IMFMediaSource_iface); return hr;
This adds a allocate/blit for the input data, but this is necessary for both WOW64 support and an internal rework of the source path in wg_parser to use GstAppSrc. Source data is usually compressed anyway, so it shouldn't affect performance too badly.
Signed-off-by: Derek Lesho dlesho@codeweavers.com --- dlls/winegstreamer/gst_private.h | 7 ++++--- dlls/winegstreamer/media_source.c | 11 +++++++++-- dlls/winegstreamer/quartz_parser.c | 8 ++++++-- dlls/winegstreamer/wg_parser.c | 18 +++++++++++------- 4 files changed, 30 insertions(+), 14 deletions(-)
diff --git a/dlls/winegstreamer/gst_private.h b/dlls/winegstreamer/gst_private.h index c6c99b1dd55..9943682facb 100644 --- a/dlls/winegstreamer/gst_private.h +++ b/dlls/winegstreamer/gst_private.h @@ -169,9 +169,10 @@ struct unix_funcs void (CDECL *wg_parser_begin_flush)(struct wg_parser *parser); void (CDECL *wg_parser_end_flush)(struct wg_parser *parser);
- bool (CDECL *wg_parser_get_read_request)(struct wg_parser *parser, - void **data, uint64_t *offset, uint32_t *size); - void (CDECL *wg_parser_complete_read_request)(struct wg_parser *parser, bool ret); + bool (CDECL *wg_parser_get_next_read_offset)(struct wg_parser *parser, + uint64_t *offset, uint32_t *size); + void (CDECL *wg_parser_push_data)(struct wg_parser *parser, + const void *data, uint32_t size);
void (CDECL *wg_parser_set_unlimited_buffering)(struct wg_parser *parser);
diff --git a/dlls/winegstreamer/media_source.c b/dlls/winegstreamer/media_source.c index 383afcfce75..abd9a220a7f 100644 --- a/dlls/winegstreamer/media_source.c +++ b/dlls/winegstreamer/media_source.c @@ -541,14 +541,21 @@ static DWORD CALLBACK read_thread(void *arg) HRESULT hr; void *data;
- if (!unix_funcs->wg_parser_get_read_request(source->wg_parser, &data, &offset, &size)) + if (!unix_funcs->wg_parser_get_next_read_offset(source->wg_parser, &offset, &size)) continue;
+ data = malloc(size); + ret_size = 0; + if (SUCCEEDED(hr = IMFByteStream_SetCurrentPosition(byte_stream, offset))) hr = IMFByteStream_Read(byte_stream, data, size, &ret_size); + if (FAILED(hr)) + ERR("Failed to read source stream bytes %p+%u. hr=%#x\n", data, size, hr); if (SUCCEEDED(hr) && ret_size != size) ERR("Unexpected short read: requested %u bytes, got %u.\n", size, ret_size); - unix_funcs->wg_parser_complete_read_request(source->wg_parser, SUCCEEDED(hr)); + unix_funcs->wg_parser_push_data(source->wg_parser, SUCCEEDED(hr) ? data : NULL, ret_size); + + free(data); }
TRACE("Media source is shutting down; exiting.\n"); diff --git a/dlls/winegstreamer/quartz_parser.c b/dlls/winegstreamer/quartz_parser.c index 09a916d7f5c..c85cbe4cf44 100644 --- a/dlls/winegstreamer/quartz_parser.c +++ b/dlls/winegstreamer/quartz_parser.c @@ -795,10 +795,14 @@ static DWORD CALLBACK read_thread(void *arg) HRESULT hr; void *data;
- if (!unix_funcs->wg_parser_get_read_request(filter->wg_parser, &data, &offset, &size)) + if (!unix_funcs->wg_parser_get_next_read_offset(filter->wg_parser, &offset, &size)) continue; + data = malloc(size); hr = IAsyncReader_SyncRead(filter->reader, offset, size, data); - unix_funcs->wg_parser_complete_read_request(filter->wg_parser, SUCCEEDED(hr)); + if (FAILED(hr)) + ERR("Async Reader failed to failed to read %p+%u. hr=%#x\n", data, size, hr); + unix_funcs->wg_parser_push_data(filter->wg_parser, SUCCEEDED(hr) ? data : NULL, size); + free(data); }
TRACE("Streaming stopped; exiting.\n"); diff --git a/dlls/winegstreamer/wg_parser.c b/dlls/winegstreamer/wg_parser.c index cd12a23d0c8..5de8ba84ed3 100644 --- a/dlls/winegstreamer/wg_parser.c +++ b/dlls/winegstreamer/wg_parser.c @@ -517,8 +517,8 @@ static void CDECL wg_parser_end_flush(struct wg_parser *parser) pthread_mutex_unlock(&parser->mutex); }
-static bool CDECL wg_parser_get_read_request(struct wg_parser *parser, - void **data, uint64_t *offset, uint32_t *size) +static bool CDECL wg_parser_get_next_read_offset(struct wg_parser *parser, + uint64_t *offset, uint32_t *size) { pthread_mutex_lock(&parser->mutex);
@@ -531,7 +531,6 @@ static bool CDECL wg_parser_get_read_request(struct wg_parser *parser, return false; }
- *data = parser->read_request.data; *offset = parser->read_request.offset; *size = parser->read_request.size;
@@ -539,11 +538,15 @@ static bool CDECL wg_parser_get_read_request(struct wg_parser *parser, return true; }
-static void CDECL wg_parser_complete_read_request(struct wg_parser *parser, bool ret) +static void CDECL wg_parser_push_data(struct wg_parser *parser, + const void *data, uint32_t size) { pthread_mutex_lock(&parser->mutex); + parser->read_request.size = size; parser->read_request.done = true; - parser->read_request.ret = ret; + parser->read_request.ret = !!data; + if (data) + memcpy(parser->read_request.data, data, size); parser->read_request.data = NULL; pthread_mutex_unlock(&parser->mutex); pthread_cond_signal(&parser->read_done_cond); @@ -1217,6 +1220,7 @@ static GstFlowReturn src_getrange_cb(GstPad *pad, GstObject *parent, pthread_cond_wait(&parser->read_done_cond, &parser->mutex);
ret = parser->read_request.ret; + gst_buffer_set_size(*buffer, parser->read_request.size);
pthread_mutex_unlock(&parser->mutex);
@@ -1918,8 +1922,8 @@ static const struct unix_funcs funcs = wg_parser_begin_flush, wg_parser_end_flush,
- wg_parser_get_read_request, - wg_parser_complete_read_request, + wg_parser_get_next_read_offset, + wg_parser_push_data,
wg_parser_set_unlimited_buffering,
Signed-off-by: Derek Lesho dlesho@codeweavers.com --- dlls/winegstreamer/wg_parser.c | 379 +++++++-------------------------- 1 file changed, 81 insertions(+), 298 deletions(-)
diff --git a/dlls/winegstreamer/wg_parser.c b/dlls/winegstreamer/wg_parser.c index 5de8ba84ed3..7f2627235f1 100644 --- a/dlls/winegstreamer/wg_parser.c +++ b/dlls/winegstreamer/wg_parser.c @@ -33,6 +33,7 @@ #include <gst/gst.h> #include <gst/video/video.h> #include <gst/audio/audio.h> +#include <gst/app/gstappsrc.h>
/* GStreamer callbacks may be called on threads not created by Wine, and * therefore cannot access the Wine TEB. This means that we must use GStreamer @@ -49,28 +50,23 @@ struct wg_parser struct wg_parser_stream **streams; unsigned int stream_count;
- GstElement *container, *decodebin; + GstElement *container, *appsrc, *decodebin; GstBus *bus; - GstPad *my_src, *their_sink;
- guint64 file_size, start_offset, next_offset, stop_offset; + guint64 file_size; guint64 next_pull_offset;
- pthread_t push_thread; - pthread_mutex_t mutex;
pthread_cond_t init_cond; bool no_more_pads, has_duration, error;
- pthread_cond_t read_cond, read_done_cond; + pthread_cond_t read_cond; struct { void *data; uint64_t offset; uint32_t size; - bool done; - bool ret; } read_request;
bool flushing, sink_connected; @@ -522,7 +518,7 @@ static bool CDECL wg_parser_get_next_read_offset(struct wg_parser *parser, { pthread_mutex_lock(&parser->mutex);
- while (parser->sink_connected && !parser->read_request.data) + while (parser->sink_connected && parser->read_request.offset == -1) pthread_cond_wait(&parser->read_cond, &parser->mutex);
if (!parser->sink_connected) @@ -541,15 +537,47 @@ static bool CDECL wg_parser_get_next_read_offset(struct wg_parser *parser, static void CDECL wg_parser_push_data(struct wg_parser *parser, const void *data, uint32_t size) { + GstBuffer *buffer; + GstFlowReturn ret; + + if (!data) + { + /* premature EOS should trigger an error path */ + pthread_mutex_lock(&parser->mutex); + parser->read_request.offset = -1; + g_signal_emit_by_name(G_OBJECT(parser->appsrc), "end-of-stream", &ret); + pthread_mutex_unlock(&parser->mutex); + return; + } + + /* We could avoid this extra copy using gst_buffer_new_wrapped. + However, PE wouldn't know when to release the buffer allocations as the buffer + objects are queued, so we'd have to create a ring buffer the size of the gstappsrc + queue on the PE side and validate that we don't overrun on the unix side. I'm not + yet convinced that trying to reduce copies of compressed data is worth the + complexity. */ + buffer = gst_buffer_new_and_alloc(size); + gst_buffer_fill(buffer, 0, data, size); + pthread_mutex_lock(&parser->mutex); - parser->read_request.size = size; - parser->read_request.done = true; - parser->read_request.ret = !!data; - if (data) - memcpy(parser->read_request.data, data, size); - parser->read_request.data = NULL; + + assert(parser->read_request.offset != -1); + GST_BUFFER_OFFSET(buffer) = parser->read_request.offset; + g_signal_emit_by_name(G_OBJECT(parser->appsrc), "push-buffer", buffer, &ret); + parser->read_request.offset = -1; + + /* In random-access mode, GST_FLOW_EOS shouldn't be returned */ + assert(ret == GST_FLOW_OK || ret == GST_FLOW_FLUSHING); + if (ret == GST_FLOW_OK) + parser->next_pull_offset += size; + else + gst_buffer_unref(buffer); + + assert(parser->next_pull_offset <= parser->file_size); + if (parser->next_pull_offset == parser->file_size) + g_signal_emit_by_name(G_OBJECT(parser->appsrc), "end-of-stream", &ret); + pthread_mutex_unlock(&parser->mutex); - pthread_cond_signal(&parser->read_done_cond); }
static void CDECL wg_parser_set_unlimited_buffering(struct wg_parser *parser) @@ -1180,198 +1208,41 @@ static void pad_removed_cb(GstElement *element, GstPad *pad, gpointer user) g_free(name); }
-static GstFlowReturn src_getrange_cb(GstPad *pad, GstObject *parent, - guint64 offset, guint size, GstBuffer **buffer) +static void src_need_data(GstElement *appsrc, guint length, gpointer user) { - struct wg_parser *parser = gst_pad_get_element_private(pad); - GstBuffer *new_buffer = NULL; - GstMapInfo map_info; - bool ret; - - GST_LOG("pad %p, offset %" G_GINT64_MODIFIER "u, length %u, buffer %p.", pad, offset, size, *buffer); - - if (offset == GST_BUFFER_OFFSET_NONE) - offset = parser->next_pull_offset; - parser->next_pull_offset = offset + size; - if (offset >= parser->file_size) - return GST_FLOW_EOS; - if (offset + size >= parser->file_size) - size = parser->file_size - offset; - - if (!*buffer) - *buffer = new_buffer = gst_buffer_new_and_alloc(size); - - gst_buffer_map(*buffer, &map_info, GST_MAP_WRITE); + struct wg_parser *parser = user;
pthread_mutex_lock(&parser->mutex);
- assert(!parser->read_request.data); - parser->read_request.data = map_info.data; - parser->read_request.offset = offset; - parser->read_request.size = size; - parser->read_request.done = false; - pthread_cond_signal(&parser->read_cond); - - /* Note that we don't unblock this wait on GST_EVENT_FLUSH_START. We expect - * the upstream pin to flush if necessary. We should never be blocked on - * read_thread() not running. */ - - while (!parser->read_request.done) - pthread_cond_wait(&parser->read_done_cond, &parser->mutex); - - ret = parser->read_request.ret; - gst_buffer_set_size(*buffer, parser->read_request.size); - - pthread_mutex_unlock(&parser->mutex); - - gst_buffer_unmap(*buffer, &map_info); - - GST_LOG("Request returned %d.", ret); - - if (!ret && new_buffer) - gst_buffer_unref(new_buffer); - - return ret ? GST_FLOW_OK : GST_FLOW_ERROR; -} - -static gboolean src_query_cb(GstPad *pad, GstObject *parent, GstQuery *query) -{ - struct wg_parser *parser = gst_pad_get_element_private(pad); - GstFormat format; - - GST_LOG("parser %p, type %s.", parser, GST_QUERY_TYPE_NAME(query)); - - switch (GST_QUERY_TYPE(query)) - { - case GST_QUERY_DURATION: - gst_query_parse_duration(query, &format, NULL); - if (format == GST_FORMAT_PERCENT) - { - gst_query_set_duration(query, GST_FORMAT_PERCENT, GST_FORMAT_PERCENT_MAX); - return TRUE; - } - else if (format == GST_FORMAT_BYTES) - { - gst_query_set_duration(query, GST_FORMAT_BYTES, parser->file_size); - return TRUE; - } - return FALSE; - - case GST_QUERY_SEEKING: - gst_query_parse_seeking (query, &format, NULL, NULL, NULL); - if (format != GST_FORMAT_BYTES) - { - GST_WARNING("Cannot seek using format "%s".", gst_format_get_name(format)); - return FALSE; - } - gst_query_set_seeking(query, GST_FORMAT_BYTES, 1, 0, parser->file_size); - return TRUE; - - case GST_QUERY_SCHEDULING: - gst_query_set_scheduling(query, GST_SCHEDULING_FLAG_SEEKABLE, 1, -1, 0); - gst_query_add_scheduling_mode(query, GST_PAD_MODE_PUSH); - gst_query_add_scheduling_mode(query, GST_PAD_MODE_PULL); - return TRUE; - - default: - GST_WARNING("Unhandled query type %s.", GST_QUERY_TYPE_NAME(query)); - return FALSE; - } -} - -static void *push_data(void *arg) -{ - struct wg_parser *parser = arg; - GstBuffer *buffer; - guint max_size; - - GST_DEBUG("Starting push thread."); - - if (!(buffer = gst_buffer_new_allocate(NULL, 16384, NULL))) - { - GST_ERROR("Failed to allocate memory."); - return NULL; - } - - max_size = parser->stop_offset ? parser->stop_offset : parser->file_size; - - for (;;) + /* Sometimes GstAppSrc sends identical need-data requests when it is woken up, + we can rely on push-buffer not having completed (and hence offset being -1) + because they are blocked on the internal mutex held by the pulling thread + calling this callback. */ + if (parser->read_request.offset != -1) { - ULONG size; - int ret; - - if (parser->next_offset >= max_size) - break; - size = min(16384, max_size - parser->next_offset); - - if ((ret = src_getrange_cb(parser->my_src, NULL, parser->next_offset, size, &buffer)) < 0) - { - GST_ERROR("Failed to read data, ret %s.", gst_flow_get_name(ret)); - break; - } - - parser->next_offset += size; - - buffer->duration = buffer->pts = -1; - if ((ret = gst_pad_push(parser->my_src, buffer)) < 0) - { - GST_ERROR("Failed to push data, ret %s.", gst_flow_get_name(ret)); - break; - } + pthread_mutex_unlock(&parser->mutex); + return; } + parser->read_request.offset = parser->next_pull_offset; + parser->read_request.size = length;
- gst_buffer_unref(buffer); - - gst_pad_push_event(parser->my_src, gst_event_new_eos()); - - GST_DEBUG("Stopping push thread."); + pthread_cond_signal(&parser->read_cond);
- return NULL; + pthread_mutex_unlock(&parser->mutex); }
-static gboolean activate_push(GstPad *pad, gboolean activate) +static gboolean src_seek_data(GstElement *appsrc, guint64 offset, gpointer user) { - struct wg_parser *parser = gst_pad_get_element_private(pad); - - if (!activate) - { - if (parser->push_thread) - { - pthread_join(parser->push_thread, NULL); - parser->push_thread = 0; - } - } - else if (!parser->push_thread) - { - int ret; + struct wg_parser *parser = user;
- if ((ret = pthread_create(&parser->push_thread, NULL, push_data, parser))) - { - GST_ERROR("Failed to create push thread: %s", strerror(errno)); - parser->push_thread = 0; - return FALSE; - } - } - return TRUE; -} + pthread_mutex_lock(&parser->mutex);
-static gboolean src_activate_mode_cb(GstPad *pad, GstObject *parent, GstPadMode mode, gboolean activate) -{ - struct wg_parser *parser = gst_pad_get_element_private(pad); + assert(parser->read_request.offset == -1); + parser->next_pull_offset = offset;
- GST_DEBUG("%s source pad for parser %p in %s mode.", - activate ? "Activating" : "Deactivating", parser, gst_pad_mode_get_name(mode)); + pthread_mutex_unlock(&parser->mutex);
- switch (mode) - { - case GST_PAD_MODE_PULL: - return TRUE; - case GST_PAD_MODE_PUSH: - return activate_push(pad, activate); - case GST_PAD_MODE_NONE: - break; - } - return FALSE; + return true; }
static GstBusSyncReply bus_handler_cb(GstBus *bus, GstMessage *msg, gpointer user) @@ -1418,85 +1289,8 @@ static GstBusSyncReply bus_handler_cb(GstBus *bus, GstMessage *msg, gpointer use return GST_BUS_DROP; }
-static gboolean src_perform_seek(struct wg_parser *parser, GstEvent *event) -{ - BOOL thread = !!parser->push_thread; - GstSeekType cur_type, stop_type; - GstFormat seek_format; - GstEvent *flush_event; - GstSeekFlags flags; - gint64 cur, stop; - guint32 seqnum; - gdouble rate; - - gst_event_parse_seek(event, &rate, &seek_format, &flags, - &cur_type, &cur, &stop_type, &stop); - - if (seek_format != GST_FORMAT_BYTES) - { - GST_FIXME("Unhandled format "%s".", gst_format_get_name(seek_format)); - return FALSE; - } - - seqnum = gst_event_get_seqnum(event); - - /* send flush start */ - if (flags & GST_SEEK_FLAG_FLUSH) - { - flush_event = gst_event_new_flush_start(); - gst_event_set_seqnum(flush_event, seqnum); - gst_pad_push_event(parser->my_src, flush_event); - if (thread) - gst_pad_set_active(parser->my_src, 1); - } - - parser->next_offset = parser->start_offset = cur; - - /* and prepare to continue streaming */ - if (flags & GST_SEEK_FLAG_FLUSH) - { - flush_event = gst_event_new_flush_stop(TRUE); - gst_event_set_seqnum(flush_event, seqnum); - gst_pad_push_event(parser->my_src, flush_event); - if (thread) - gst_pad_set_active(parser->my_src, 1); - } - - return TRUE; -} - -static gboolean src_event_cb(GstPad *pad, GstObject *parent, GstEvent *event) -{ - struct wg_parser *parser = gst_pad_get_element_private(pad); - gboolean ret = TRUE; - - GST_LOG("parser %p, type "%s".", parser, GST_EVENT_TYPE_NAME(event)); - - switch (event->type) - { - case GST_EVENT_SEEK: - ret = src_perform_seek(parser, event); - break; - - case GST_EVENT_FLUSH_START: - case GST_EVENT_FLUSH_STOP: - case GST_EVENT_QOS: - case GST_EVENT_RECONFIGURE: - break; - - default: - GST_WARNING("Ignoring "%s" event.", GST_EVENT_TYPE_NAME(event)); - ret = FALSE; - break; - } - gst_event_unref(event); - return ret; -} - static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_size) { - GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE("quartz_src", - GST_PAD_SRC, GST_PAD_ALWAYS, GST_STATIC_CAPS_ANY); unsigned int i;
parser->file_size = file_size; @@ -1511,14 +1305,15 @@ static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_s parser->container = gst_bin_new(NULL); gst_element_set_bus(parser->container, parser->bus);
- parser->my_src = gst_pad_new_from_static_template(&src_template, "quartz-src"); - gst_pad_set_getrange_function(parser->my_src, src_getrange_cb); - gst_pad_set_query_function(parser->my_src, src_query_cb); - gst_pad_set_activatemode_function(parser->my_src, src_activate_mode_cb); - gst_pad_set_event_function(parser->my_src, src_event_cb); - gst_pad_set_element_private(parser->my_src, parser); + if (!(parser->appsrc = create_element("appsrc", "base"))) + return E_FAIL; + gst_bin_add(GST_BIN(parser->container), parser->appsrc); + + g_object_set(parser->appsrc, "stream-type", GST_APP_STREAM_TYPE_RANDOM_ACCESS, NULL); + g_object_set(parser->appsrc, "size", parser->file_size, NULL); + g_signal_connect(parser->appsrc, "need-data", G_CALLBACK(src_need_data), parser); + g_signal_connect(parser->appsrc, "seek-data", G_CALLBACK(src_seek_data), parser);
- parser->start_offset = parser->next_offset = parser->stop_offset = 0; parser->next_pull_offset = 0;
if (!parser->init_gst(parser)) @@ -1598,7 +1393,6 @@ static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_s
pthread_mutex_unlock(&parser->mutex);
- parser->next_offset = 0; return S_OK; }
@@ -1640,10 +1434,6 @@ static void CDECL wg_parser_disconnect(struct wg_parser *parser) pthread_mutex_unlock(&parser->mutex);
gst_element_set_state(parser->container, GST_STATE_NULL); - gst_pad_unlink(parser->my_src, parser->their_sink); - gst_object_unref(parser->my_src); - gst_object_unref(parser->their_sink); - parser->my_src = parser->their_sink = NULL;
pthread_mutex_lock(&parser->mutex); parser->sink_connected = false; @@ -1678,15 +1468,13 @@ static BOOL decodebin_parser_init_gst(struct wg_parser *parser) g_signal_connect(element, "autoplug-select", G_CALLBACK(autoplug_select_cb), parser); g_signal_connect(element, "no-more-pads", G_CALLBACK(no_more_pads_cb), parser);
- parser->their_sink = gst_element_get_static_pad(element, "sink"); - pthread_mutex_lock(&parser->mutex); parser->no_more_pads = parser->error = false; pthread_mutex_unlock(&parser->mutex);
- if ((ret = gst_pad_link(parser->my_src, parser->their_sink)) < 0) + if (!gst_element_link(parser->appsrc, parser->decodebin)) { - GST_ERROR("Failed to link pads, error %d.\n", ret); + GST_ERROR("Failed to link app source.\n"); return FALSE; }
@@ -1725,15 +1513,13 @@ static BOOL avi_parser_init_gst(struct wg_parser *parser) g_signal_connect(element, "pad-removed", G_CALLBACK(pad_removed_cb), parser); g_signal_connect(element, "no-more-pads", G_CALLBACK(no_more_pads_cb), parser);
- parser->their_sink = gst_element_get_static_pad(element, "sink"); - pthread_mutex_lock(&parser->mutex); parser->no_more_pads = parser->error = false; pthread_mutex_unlock(&parser->mutex);
- if ((ret = gst_pad_link(parser->my_src, parser->their_sink)) < 0) + if (!gst_element_link(parser->appsrc, element)) { - GST_ERROR("Failed to link pads, error %d.\n", ret); + GST_ERROR("Failed to link app source.\n"); return FALSE; }
@@ -1769,10 +1555,9 @@ static BOOL mpeg_audio_parser_init_gst(struct wg_parser *parser)
gst_bin_add(GST_BIN(parser->container), element);
- parser->their_sink = gst_element_get_static_pad(element, "sink"); - if ((ret = gst_pad_link(parser->my_src, parser->their_sink)) < 0) + if (!gst_element_link(parser->appsrc, element)) { - GST_ERROR("Failed to link sink pads, error %d.\n", ret); + GST_ERROR("Failed to link app source.\n"); return FALSE; }
@@ -1809,10 +1594,9 @@ static BOOL wave_parser_init_gst(struct wg_parser *parser)
gst_bin_add(GST_BIN(parser->container), element);
- parser->their_sink = gst_element_get_static_pad(element, "sink"); - if ((ret = gst_pad_link(parser->my_src, parser->their_sink)) < 0) + if (!gst_element_link(parser->appsrc, element)) { - GST_ERROR("Failed to link sink pads, error %d.\n", ret); + GST_ERROR("Failed to link app source.\n"); return FALSE; }
@@ -1849,8 +1633,8 @@ static struct wg_parser *wg_parser_create(void) pthread_mutex_init(&parser->mutex, NULL); pthread_cond_init(&parser->init_cond, NULL); pthread_cond_init(&parser->read_cond, NULL); - pthread_cond_init(&parser->read_done_cond, NULL); parser->flushing = true; + parser->read_request.offset = -1;
GST_DEBUG("Created winegstreamer parser %p.\n", parser); return parser; @@ -1903,7 +1687,6 @@ static void CDECL wg_parser_destroy(struct wg_parser *parser) pthread_mutex_destroy(&parser->mutex); pthread_cond_destroy(&parser->init_cond); pthread_cond_destroy(&parser->read_cond); - pthread_cond_destroy(&parser->read_done_cond);
free(parser); }
On 9/1/21 4:05 PM, Derek Lesho wrote:
This field should be unused now, and as far as I can tell it is.
"offset" is unsigned. I don't necessarily object to using a sentinel value, but it should probably be UINT64_MAX in that case.
That said... now we rather awkwardly duplicate the offset into next_pull_offset and read_request.offset. As far as I can tell, either these are identical, or read_request.offset is -1, which makes me think that one of them should just be a boolean flag.
(Actually, this isn't quite true, because I think you don't update read_request.offset unless we get need-data, but you still return it from wg_parser_get_next_read_offset(). That means that if we don't get need-data we'll end up sending the same buffer offset twice, which, while not apparently invalid, is probably not what we wanted to do. So there's another reason we probably want one offset and a boolean flag.)
Hmm, this is as much a commentary on 2/5, but, I feel like EOS should be signaled in a consistent way, probably by passing a short (or even zero) size with a valid buffer.
The other way we can get here is error condition, and I'm not even sure we want to call wg_parser_push_data() in that case. The GStreamer side can't exactly do anything useful with that information.
In any case we shouldn't have to call g_signal_emit_by_name("end-of-stream") more than once in this function.
The awkward thing about this is that either we might pass the wrong size to wg_parser_get_next_read_offset(), or appsrc guarantees that we don't but we're not validating it (i.e. we're only validating the offset). For what it's worth, I'm not sure the documentation guarantees either one :-/
Either way, this should probably live in wg_parser_get_next_read_offset() and not here.
Well, sure, but do we really need this if block at all?
Is this guaranteed, though?
On 9/6/21 4:14 PM, Zebediah Figura wrote:
Well, we need some way to not block forever upon an error. Traditionally, we used GStreamer error propagation for this; if we returned an error in getrange_cb, any of the blocking initialization functions would unblock and we could cleanup, and after initialization, we'd get an error in get_event. Absent this, we need another mechanism to invoke a cleanup. wg_parser_disconnect(/wg_parser_destroy later) in read_thread could work if we made sure that no other threads use the objects after destruction, and for patch 2 we unblock getrange_cb upon disconnection. Is this what you were going for?
My impression from "EOS should be signaled in a consistent way, probably by passing a short (or even zero) size with a valid buffer. " was that we should be determining EOS on the client side of the interface, and push_data seems a more natural fit for something determined on the client side, not to mention that in a future push mode, we will have to be determining EOS on the client side.
Yes, we do. Without it, if there a spurious wakeup of appsrc's wait loop in gst_app_src_create (get_range) [1], and the "push-buffer" signal is signaled just after the loop's g_cond_wait reacquires the mutex on return [1], we could have "push-buffer" blocked on acquisition of the mutex [2] held by gst_app_src_create. The function would block before pushing the buffer to the internal queue. Then, gst_app_src_create continues and, seeing as there are no buffers in the queue, unlocks the mutex and calls need_data. Here we have our race:
- if the need_data's code is called first, read_request's offset and size of overwritten with the same values. Then, the code waiting on the mutex pushes the buffer responding to our request, and everything continues as normal (as if another need_data never occurred).
(Prepare for the text wall 😁)
- If the push-buffer code waiting on the mutex continues first, the internal buffer queue has the buffer written to it, and the push-buffer signal returns. Afterwards, read_request.offset is set to the sentinel value indicating that the request has been responded to, and that a new need-data is awaited, push_data returns. Then, need_data is run, acquiring the mutex too late to catch the previous buffer send, and it requests a read of the same size directly following the prior read request, since the offset had been updated by push_data. gst_app_src_create then sees the pushed buffer and returns it back to the getrange-related function, only for another getrange to come in with an offset not directly following the last request's. While the client code is reading the invalid request's data, src_seek_data is called. Right now we have an assert to make sure that there is no active request when seek_data is called, but if we didn't, we'd just set next_pull_offset to the seek's value. Then, need_data would be called again, but all it would do is update the size to the new request's size. After this, push_data (in response to the previous request) would be called and, for simplicity's sake, if the size of the two requests were identical, a push-buffer would be called, src_seek_data would pick up and have no way of knowing that the data is coming from after previous request's location in the file, which could cause any number of errors. If we keep around both next_pull_offset and read_request.offset, we could compare them in push_data and discard the buffer if they don't match, but this seems a lot more complicated than catching this case earlier on and preventing all this confusion.
1: https://github.com/GStreamer/gst-plugins-base/blob/master/gst-libs/gst/app/g...
2: https://github.com/GStreamer/gst-plugins-base/blob/master/gst-libs/gst/app/g...
Do you mean by the publicly documented interface or the actual code? Looking at the code, in random access mode, it seems that the answer is yes. getrange turns into a seek and need-data combo and getrange would only be called after the previous buffer is sent. And for push mode, I don't think we'll ever get a seek request (and if we did there'd be no way to service it, as MFTs aren't seekable). Either way though, the assert probably isn't too necessary and is only there to increase understanding, if can easily be left out if you'd like.
On 9/8/21 2:25 PM, Derek Lesho wrote:
Ah, right, we'd temporarily break things if we didn't signal read errors somehow. So that's probably a good idea to keep around, at least temporarily.
I could be satisfied with, say, passing a NULL buffer to signal error, and a valid buffer but short or zero size to signal short read.
Hmm, actually, you're probably right, we should signal EOS on the client side, especially if we want to support sources that don't have a fixed size (in theory these exist, although I've never seen one...)
Note though that quartz kind of awkwardly depends on the Unix library to do the right thing. So that would have to be fixed first.
Okay, so I was operating under the assumption that appsrc would drop buffers if the offset didn't match what it wanted, since that seemed like the only safe way to implement the behaviour the documentation describes. Apparently it doesn't. That means that I guess we can't send more than one buffer at a time, i.e. we really need to give it only what it asks for.
I'll have to submit a patch to GStreamer to clarify the documentation.
In effect I guess that means we should keep the need-data/enough-data introduced by this patch only for stream mode, i.e. push mode. For random access mode we should treat need-data as license to send only a single buffer. I.e. upon receiving wg_parser_push_data() I guess we should reset whatever flag or sentinel to nonsignaled, so that a subsequent wg_parser_get_next_read_offset() blocks.
I guess this is essentially addressed by my previous comment.
On 9/8/21 6:21 PM, Zebediah Figura (she/her) wrote:
Right, to be honest I completely forgot that we were sending in the offset through the buffer's field, but it seems that neither app source nor GstPad validate it (maybe something downstream would though?). Rethinking this after some time away from the keyboard, I now realize that my conditional only triggers in the first unproblematic case, and that only the assert in seek_data would catch the problem of the 2nd case. I also now realize that, moving to using only next_pull_offset and a boolean flag, push_data would set on the buffer the updated/seeked offset, not the offset of the data that it actually retrieved. But even if we keep both of them to verify the offset in push_data ourselves, we would still have the possibility of going out of sync in continuous pulls unless GStreamer itself verified the offset, although I haven't though about the consequences of that.
I'm not sure what you're referring to here, I didn't add enough-data handling to this patch.
That's already being done in this patch, and doesn't actually fix the issue. I'm not actually sure what the correct way to fix this would be, tomorrow I'll try consulting with the GStreamer devs to make sure I'm not making up a problem and see what they suggest.
On 9/8/21 8:36 PM, Derek Lesho wrote:
Sorry, I think I initially failed to read your description carefully enough, and assumed it was something different.
I see the problem now, but in fact this conditional (the one in your patch) isn't enough to fix it. We can get a race this bad:
thread A -------------------------------------------------- queue is empty emit need_data queue is empty wg_parser_get_next_read_offset wg_parser_push_data emit need_data wg_parser_get_next_read_offset ... wg_parser_push_data
and push the wrong buffer. It won't be the same buffer (we won't get a seek, so we're supposed to push the next one consecutively), but it will be the wrong one.
Unfortunately I think this means that appsrc is unusably broken. It can and should be fixed upstream, one way or another, but we can't use it in our code, at least not for a while.
I could also be missing something, though.
On 9/9/21 12:00 AM, Zebediah Figura wrote:
Yeah, this definitely seems to be something that should be fixed upstream, I'm going to submit an issue. In the meantime though, we can workaround it by looking at the current-level-buffers property after we acquire the mutex in need_data, this will tell us if any buffers have been queued, and in such case the need-data would obviously be spurious.
I could also be missing something, though.
Signed-off-by: Derek Lesho dlesho@codeweavers.com --- dlls/winegstreamer/gst_private.h | 1 - dlls/winegstreamer/media_source.c | 13 ++-- dlls/winegstreamer/quartz_parser.c | 39 ++++------- dlls/winegstreamer/wg_parser.c | 100 +++++++++++++++-------------- 4 files changed, 69 insertions(+), 84 deletions(-)
diff --git a/dlls/winegstreamer/gst_private.h b/dlls/winegstreamer/gst_private.h index 9943682facb..36d88f7b723 100644 --- a/dlls/winegstreamer/gst_private.h +++ b/dlls/winegstreamer/gst_private.h @@ -164,7 +164,6 @@ struct unix_funcs void (CDECL *wg_parser_destroy)(struct wg_parser *parser);
HRESULT (CDECL *wg_parser_connect)(struct wg_parser *parser, uint64_t file_size); - void (CDECL *wg_parser_disconnect)(struct wg_parser *parser);
void (CDECL *wg_parser_begin_flush)(struct wg_parser *parser); void (CDECL *wg_parser_end_flush)(struct wg_parser *parser); diff --git a/dlls/winegstreamer/media_source.c b/dlls/winegstreamer/media_source.c index abd9a220a7f..75b3b399f4b 100644 --- a/dlls/winegstreamer/media_source.c +++ b/dlls/winegstreamer/media_source.c @@ -103,7 +103,6 @@ struct media_source LONGLONG start_time;
HANDLE read_thread; - bool read_thread_shutdown; };
static inline struct media_stream *impl_from_IMFMediaStream(IMFMediaStream *iface) @@ -533,7 +532,7 @@ static DWORD CALLBACK read_thread(void *arg)
TRACE("Starting read thread for media source %p.\n", source);
- while (!source->read_thread_shutdown) + for(;;) { uint64_t offset; ULONG ret_size; @@ -542,7 +541,7 @@ static DWORD CALLBACK read_thread(void *arg) void *data;
if (!unix_funcs->wg_parser_get_next_read_offset(source->wg_parser, &offset, &size)) - continue; + break;
data = malloc(size); ret_size = 0; @@ -1218,12 +1217,10 @@ static HRESULT WINAPI media_source_Shutdown(IMFMediaSource *iface)
source->state = SOURCE_SHUTDOWN;
- if (source->stream_count) - unix_funcs->wg_parser_disconnect(source->wg_parser); + unix_funcs->wg_parser_destroy(source->wg_parser);
if (source->read_thread) { - source->read_thread_shutdown = true; WaitForSingleObject(source->read_thread, INFINITE); CloseHandle(source->read_thread); } @@ -1254,8 +1251,6 @@ static HRESULT WINAPI media_source_Shutdown(IMFMediaSource *iface) IMFMediaStream_Release(&stream->IMFMediaStream_iface); }
- unix_funcs->wg_parser_destroy(source->wg_parser); - if (source->streams) free(source->streams);
@@ -1402,7 +1397,7 @@ static HRESULT media_source_constructor(IMFByteStream *bytestream, struct media_ fail: WARN("Failed to construct MFMediaSource, hr %#x.\n", hr);
- if (object->wg_parser) + if (parser) IMFMediaSource_Shutdown(&object->IMFMediaSource_iface); free(descriptors); IMFMediaSource_Release(&object->IMFMediaSource_iface); diff --git a/dlls/winegstreamer/quartz_parser.c b/dlls/winegstreamer/quartz_parser.c index c85cbe4cf44..3e35e7d1503 100644 --- a/dlls/winegstreamer/quartz_parser.c +++ b/dlls/winegstreamer/quartz_parser.c @@ -59,6 +59,7 @@ struct parser
HANDLE read_thread;
+ struct wg_parser * (*parser_create)(void); BOOL (*init_gst)(struct parser *filter); HRESULT (*source_query_accept)(struct parser_source *pin, const AM_MEDIA_TYPE *mt); HRESULT (*source_get_media_type)(struct parser_source *pin, unsigned int index, AM_MEDIA_TYPE *mt); @@ -788,7 +789,7 @@ static DWORD CALLBACK read_thread(void *arg)
TRACE("Starting read thread for filter %p.\n", filter);
- while (filter->sink_connected) + for(;;) { uint64_t offset; uint32_t size; @@ -796,7 +797,7 @@ static DWORD CALLBACK read_thread(void *arg) void *data;
if (!unix_funcs->wg_parser_get_next_read_offset(filter->wg_parser, &offset, &size)) - continue; + break; data = malloc(size); hr = IAsyncReader_SyncRead(filter->reader, offset, size, data); if (FAILED(hr)) @@ -853,8 +854,6 @@ static void parser_destroy(struct strmbase_filter *iface) IAsyncReader_Release(filter->reader); filter->reader = NULL;
- unix_funcs->wg_parser_destroy(filter->wg_parser); - strmbase_sink_cleanup(&filter->sink); strmbase_filter_cleanup(&filter->filter); free(filter); @@ -959,6 +958,12 @@ static HRESULT parser_sink_connect(struct strmbase_sink *iface, IPin *peer, cons
IAsyncReader_Length(filter->reader, &file_size, &unused);
+ if (!(filter->wg_parser = filter->parser_create())) + { + hr = E_OUTOFMEMORY; + goto err; + } + filter->sink_connected = true; filter->read_thread = CreateThread(NULL, 0, read_thread, filter, 0, NULL);
@@ -1096,11 +1101,7 @@ HRESULT decodebin_parser_create(IUnknown *outer, IUnknown **out) if (!(object = calloc(1, sizeof(*object)))) return E_OUTOFMEMORY;
- if (!(object->wg_parser = unix_funcs->wg_decodebin_parser_create())) - { - free(object); - return E_OUTOFMEMORY; - } + object->parser_create = unix_funcs->wg_decodebin_parser_create;
strmbase_filter_init(&object->filter, outer, &CLSID_decodebin_parser, &filter_ops); strmbase_sink_init(&object->sink, &object->filter, L"input pin", &sink_ops, NULL); @@ -1531,7 +1532,7 @@ static HRESULT GST_RemoveOutputPins(struct parser *This) if (!This->sink_connected) return S_OK;
- unix_funcs->wg_parser_disconnect(This->wg_parser); + unix_funcs->wg_parser_destroy(This->wg_parser);
/* read_thread() needs to stay alive to service any read requests GStreamer * sends, so we can only shut it down after GStreamer stops. */ @@ -1627,11 +1628,7 @@ HRESULT wave_parser_create(IUnknown *outer, IUnknown **out) if (!(object = calloc(1, sizeof(*object)))) return E_OUTOFMEMORY;
- if (!(object->wg_parser = unix_funcs->wg_wave_parser_create())) - { - free(object); - return E_OUTOFMEMORY; - } + object->parser_create = unix_funcs->wg_wave_parser_create;
strmbase_filter_init(&object->filter, outer, &CLSID_WAVEParser, &filter_ops); strmbase_sink_init(&object->sink, &object->filter, L"input pin", &wave_parser_sink_ops, NULL); @@ -1713,11 +1710,7 @@ HRESULT avi_splitter_create(IUnknown *outer, IUnknown **out) if (!(object = calloc(1, sizeof(*object)))) return E_OUTOFMEMORY;
- if (!(object->wg_parser = unix_funcs->wg_avi_parser_create())) - { - free(object); - return E_OUTOFMEMORY; - } + object->parser_create = unix_funcs->wg_avi_parser_create;
strmbase_filter_init(&object->filter, outer, &CLSID_AviSplitter, &filter_ops); strmbase_sink_init(&object->sink, &object->filter, L"input pin", &avi_splitter_sink_ops, NULL); @@ -1820,11 +1813,7 @@ HRESULT mpeg_splitter_create(IUnknown *outer, IUnknown **out) if (!(object = calloc(1, sizeof(*object)))) return E_OUTOFMEMORY;
- if (!(object->wg_parser = unix_funcs->wg_mpeg_audio_parser_create())) - { - free(object); - return E_OUTOFMEMORY; - } + object->parser_create = unix_funcs->wg_mpeg_audio_parser_create;
strmbase_filter_init(&object->filter, outer, &CLSID_MPEG1Splitter, &mpeg_splitter_ops); strmbase_sink_init(&object->sink, &object->filter, L"Input", &mpeg_splitter_sink_ops, NULL); diff --git a/dlls/winegstreamer/wg_parser.c b/dlls/winegstreamer/wg_parser.c index 7f2627235f1..d29298632a4 100644 --- a/dlls/winegstreamer/wg_parser.c +++ b/dlls/winegstreamer/wg_parser.c @@ -58,8 +58,8 @@ struct wg_parser
pthread_mutex_t mutex;
- pthread_cond_t init_cond; - bool no_more_pads, has_duration, error; + pthread_cond_t state_cond; + bool no_more_pads, has_duration, error, shutdown;
pthread_cond_t read_cond; struct @@ -518,12 +518,13 @@ static bool CDECL wg_parser_get_next_read_offset(struct wg_parser *parser, { pthread_mutex_lock(&parser->mutex);
- while (parser->sink_connected && parser->read_request.offset == -1) + while (!parser->shutdown && parser->read_request.offset == -1) pthread_cond_wait(&parser->read_cond, &parser->mutex);
- if (!parser->sink_connected) + if (parser->shutdown) { pthread_mutex_unlock(&parser->mutex); + pthread_cond_signal(&parser->state_cond); return false; }
@@ -779,7 +780,7 @@ static void no_more_pads_cb(GstElement *element, gpointer user) pthread_mutex_lock(&parser->mutex); parser->no_more_pads = true; pthread_mutex_unlock(&parser->mutex); - pthread_cond_signal(&parser->init_cond); + pthread_cond_signal(&parser->state_cond); }
static GstFlowReturn queue_stream_event(struct wg_parser_stream *stream, @@ -866,7 +867,7 @@ static gboolean sink_event_cb(GstPad *pad, GstObject *parent, GstEvent *event) pthread_mutex_lock(&parser->mutex); stream->eos = true; pthread_mutex_unlock(&parser->mutex); - pthread_cond_signal(&parser->init_cond); + pthread_cond_signal(&parser->state_cond); } break;
@@ -917,7 +918,7 @@ static gboolean sink_event_cb(GstPad *pad, GstObject *parent, GstEvent *event) wg_format_from_caps(&stream->preferred_format, caps); stream->has_caps = true; pthread_mutex_unlock(&parser->mutex); - pthread_cond_signal(&parser->init_cond); + pthread_cond_signal(&parser->state_cond); break; }
@@ -1264,7 +1265,7 @@ static GstBusSyncReply bus_handler_cb(GstBus *bus, GstMessage *msg, gpointer use pthread_mutex_lock(&parser->mutex); parser->error = true; pthread_mutex_unlock(&parser->mutex); - pthread_cond_signal(&parser->init_cond); + pthread_cond_signal(&parser->state_cond); break;
case GST_MESSAGE_WARNING: @@ -1279,7 +1280,7 @@ static GstBusSyncReply bus_handler_cb(GstBus *bus, GstMessage *msg, gpointer use pthread_mutex_lock(&parser->mutex); parser->has_duration = true; pthread_mutex_unlock(&parser->mutex); - pthread_cond_signal(&parser->init_cond); + pthread_cond_signal(&parser->state_cond); break;
default: @@ -1294,7 +1295,6 @@ static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_s unsigned int i;
parser->file_size = file_size; - parser->sink_connected = true;
if (!parser->bus) { @@ -1327,7 +1327,7 @@ static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_s gint64 duration;
while (!stream->has_caps && !parser->error) - pthread_cond_wait(&parser->init_cond, &parser->mutex); + pthread_cond_wait(&parser->state_cond, &parser->mutex);
/* GStreamer doesn't actually provide any guarantees about when duration * is available, even for seekable streams. It's basically built for @@ -1386,13 +1386,14 @@ static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_s } else { - pthread_cond_wait(&parser->init_cond, &parser->mutex); + pthread_cond_wait(&parser->state_cond, &parser->mutex); } } }
pthread_mutex_unlock(&parser->mutex);
+ parser->sink_connected = true; return S_OK; }
@@ -1420,38 +1421,6 @@ static void free_stream(struct wg_parser_stream *stream) free(stream); }
-static void CDECL wg_parser_disconnect(struct wg_parser *parser) -{ - unsigned int i; - - /* Unblock all of our streams. */ - pthread_mutex_lock(&parser->mutex); - for (i = 0; i < parser->stream_count; ++i) - { - parser->streams[i]->flushing = true; - pthread_cond_signal(&parser->streams[i]->event_empty_cond); - } - pthread_mutex_unlock(&parser->mutex); - - gst_element_set_state(parser->container, GST_STATE_NULL); - - pthread_mutex_lock(&parser->mutex); - parser->sink_connected = false; - pthread_mutex_unlock(&parser->mutex); - pthread_cond_signal(&parser->read_cond); - - for (i = 0; i < parser->stream_count; ++i) - free_stream(parser->streams[i]); - - parser->stream_count = 0; - free(parser->streams); - parser->streams = NULL; - - gst_element_set_bus(parser->container, NULL); - gst_object_unref(parser->container); - parser->container = NULL; -} - static BOOL decodebin_parser_init_gst(struct wg_parser *parser) { GstElement *element; @@ -1488,7 +1457,7 @@ static BOOL decodebin_parser_init_gst(struct wg_parser *parser)
pthread_mutex_lock(&parser->mutex); while (!parser->no_more_pads && !parser->error) - pthread_cond_wait(&parser->init_cond, &parser->mutex); + pthread_cond_wait(&parser->state_cond, &parser->mutex); if (parser->error) { pthread_mutex_unlock(&parser->mutex); @@ -1533,7 +1502,7 @@ static BOOL avi_parser_init_gst(struct wg_parser *parser)
pthread_mutex_lock(&parser->mutex); while (!parser->no_more_pads && !parser->error) - pthread_cond_wait(&parser->init_cond, &parser->mutex); + pthread_cond_wait(&parser->state_cond, &parser->mutex); if (parser->error) { pthread_mutex_unlock(&parser->mutex); @@ -1631,7 +1600,7 @@ static struct wg_parser *wg_parser_create(void) return NULL;
pthread_mutex_init(&parser->mutex, NULL); - pthread_cond_init(&parser->init_cond, NULL); + pthread_cond_init(&parser->state_cond, NULL); pthread_cond_init(&parser->read_cond, NULL); parser->flushing = true; parser->read_request.offset = -1; @@ -1678,6 +1647,40 @@ static struct wg_parser * CDECL wg_wave_parser_create(void)
static void CDECL wg_parser_destroy(struct wg_parser *parser) { + unsigned int i; + + /* shut down read thread first to post-shutdown push_data */ + pthread_mutex_lock(&parser->mutex); + parser->shutdown = true; + pthread_cond_signal(&parser->read_cond); + pthread_cond_wait(&parser->state_cond, &parser->mutex); + pthread_mutex_unlock(&parser->mutex); + + if (parser->sink_connected) + { + /* Unblock all of our streams. */ + pthread_mutex_lock(&parser->mutex); + for (i = 0; i < parser->stream_count; ++i) + { + parser->streams[i]->flushing = true; + pthread_cond_signal(&parser->streams[i]->event_empty_cond); + } + pthread_mutex_unlock(&parser->mutex); + + gst_element_set_state(parser->container, GST_STATE_NULL); + + for (i = 0; i < parser->stream_count; ++i) + free_stream(parser->streams[i]); + + parser->stream_count = 0; + free(parser->streams); + parser->streams = NULL; + + gst_element_set_bus(parser->container, NULL); + gst_object_unref(parser->container); + parser->container = NULL; + } + if (parser->bus) { gst_bus_set_sync_handler(parser->bus, NULL, NULL, NULL); @@ -1685,7 +1688,7 @@ static void CDECL wg_parser_destroy(struct wg_parser *parser) }
pthread_mutex_destroy(&parser->mutex); - pthread_cond_destroy(&parser->init_cond); + pthread_cond_destroy(&parser->state_cond); pthread_cond_destroy(&parser->read_cond);
free(parser); @@ -1700,7 +1703,6 @@ static const struct unix_funcs funcs = wg_parser_destroy,
wg_parser_connect, - wg_parser_disconnect,
wg_parser_begin_flush, wg_parser_end_flush,
--- dlls/winegstreamer/gst_private.h | 17 +- dlls/winegstreamer/media_source.c | 24 ++- dlls/winegstreamer/quartz_parser.c | 29 +-- dlls/winegstreamer/wg_parser.c | 308 +++++++++++++---------------- 4 files changed, 183 insertions(+), 195 deletions(-)
diff --git a/dlls/winegstreamer/gst_private.h b/dlls/winegstreamer/gst_private.h index 36d88f7b723..f6c6eff1e0c 100644 --- a/dlls/winegstreamer/gst_private.h +++ b/dlls/winegstreamer/gst_private.h @@ -155,16 +155,21 @@ struct wg_parser_event }; C_ASSERT(sizeof(struct wg_parser_event) == 40);
+enum wg_parser_type +{ + WG_DECODEBIN_PARSER, + WG_AVI_PARSER, + WG_MPEG_AUDIO_PARSER, + WG_WAVE_PARSER, +}; + +struct wg_parser; + struct unix_funcs { - struct wg_parser *(CDECL *wg_decodebin_parser_create)(void); - struct wg_parser *(CDECL *wg_avi_parser_create)(void); - struct wg_parser *(CDECL *wg_mpeg_audio_parser_create)(void); - struct wg_parser *(CDECL *wg_wave_parser_create)(void); + HRESULT (CDECL *wg_parser_create)(enum wg_parser_type parser_type, uint64_t file_size, struct wg_parser **parser); void (CDECL *wg_parser_destroy)(struct wg_parser *parser);
- HRESULT (CDECL *wg_parser_connect)(struct wg_parser *parser, uint64_t file_size); - void (CDECL *wg_parser_begin_flush)(struct wg_parser *parser); void (CDECL *wg_parser_end_flush)(struct wg_parser *parser);
diff --git a/dlls/winegstreamer/media_source.c b/dlls/winegstreamer/media_source.c index 75b3b399f4b..928ee63b649 100644 --- a/dlls/winegstreamer/media_source.c +++ b/dlls/winegstreamer/media_source.c @@ -530,6 +530,12 @@ static DWORD CALLBACK read_thread(void *arg) struct media_source *source = arg; IMFByteStream *byte_stream = source->byte_stream;
+ while (!source->wg_parser && source->state != SOURCE_SHUTDOWN) + continue; + + if (source->state == SOURCE_SHUTDOWN) + return 0; + TRACE("Starting read thread for media source %p.\n", source);
for(;;) @@ -1321,19 +1327,17 @@ static HRESULT media_source_constructor(IMFByteStream *bytestream, struct media_ if (FAILED(hr = MFAllocateWorkQueue(&object->async_commands_queue))) goto fail;
- if (!(parser = unix_funcs->wg_decodebin_parser_create())) - { - hr = E_OUTOFMEMORY; - goto fail; - } - object->wg_parser = parser; - - object->read_thread = CreateThread(NULL, 0, read_thread, object, 0, NULL); - object->state = SOURCE_OPENING; + object->read_thread = CreateThread(NULL, 0, read_thread, object, 0, NULL);
- if (FAILED(hr = unix_funcs->wg_parser_connect(parser, file_size))) + if (FAILED(hr = unix_funcs->wg_parser_create(WG_DECODEBIN_PARSER, file_size, &object->wg_parser))) + { + object->state = SOURCE_SHUTDOWN; + WaitForSingleObject(object->read_thread, INFINITE); + CloseHandle(object->read_thread); goto fail; + } + parser = object->wg_parser;
/* In Media Foundation, sources may read from any media source stream * without fear of blocking due to buffering limits on another. Trailmakers, diff --git a/dlls/winegstreamer/quartz_parser.c b/dlls/winegstreamer/quartz_parser.c index 3e35e7d1503..a5293394202 100644 --- a/dlls/winegstreamer/quartz_parser.c +++ b/dlls/winegstreamer/quartz_parser.c @@ -59,7 +59,7 @@ struct parser
HANDLE read_thread;
- struct wg_parser * (*parser_create)(void); + enum wg_parser_type parser_type; BOOL (*init_gst)(struct parser *filter); HRESULT (*source_query_accept)(struct parser_source *pin, const AM_MEDIA_TYPE *mt); HRESULT (*source_get_media_type)(struct parser_source *pin, unsigned int index, AM_MEDIA_TYPE *mt); @@ -787,6 +787,12 @@ static DWORD CALLBACK read_thread(void *arg) { struct parser *filter = arg;
+ while (!filter->wg_parser && filter->sink_connected) + continue; + + if (!filter->sink_connected) + return 0; + TRACE("Starting read thread for filter %p.\n", filter);
for(;;) @@ -958,17 +964,16 @@ static HRESULT parser_sink_connect(struct strmbase_sink *iface, IPin *peer, cons
IAsyncReader_Length(filter->reader, &file_size, &unused);
- if (!(filter->wg_parser = filter->parser_create())) - { - hr = E_OUTOFMEMORY; - goto err; - } - filter->sink_connected = true; filter->read_thread = CreateThread(NULL, 0, read_thread, filter, 0, NULL);
- if (FAILED(hr = unix_funcs->wg_parser_connect(filter->wg_parser, file_size))) + if (FAILED(hr = unix_funcs->wg_parser_create(filter->parser_type, file_size, &filter->wg_parser))) + { + filter->sink_connected = false; + WaitForSingleObject(filter->read_thread, INFINITE); + CloseHandle(filter->read_thread); goto err; + }
if (!filter->init_gst(filter)) goto err; @@ -1101,7 +1106,7 @@ HRESULT decodebin_parser_create(IUnknown *outer, IUnknown **out) if (!(object = calloc(1, sizeof(*object)))) return E_OUTOFMEMORY;
- object->parser_create = unix_funcs->wg_decodebin_parser_create; + object->parser_type = WG_DECODEBIN_PARSER;
strmbase_filter_init(&object->filter, outer, &CLSID_decodebin_parser, &filter_ops); strmbase_sink_init(&object->sink, &object->filter, L"input pin", &sink_ops, NULL); @@ -1628,7 +1633,7 @@ HRESULT wave_parser_create(IUnknown *outer, IUnknown **out) if (!(object = calloc(1, sizeof(*object)))) return E_OUTOFMEMORY;
- object->parser_create = unix_funcs->wg_wave_parser_create; + object->parser_type = WG_WAVE_PARSER;
strmbase_filter_init(&object->filter, outer, &CLSID_WAVEParser, &filter_ops); strmbase_sink_init(&object->sink, &object->filter, L"input pin", &wave_parser_sink_ops, NULL); @@ -1710,7 +1715,7 @@ HRESULT avi_splitter_create(IUnknown *outer, IUnknown **out) if (!(object = calloc(1, sizeof(*object)))) return E_OUTOFMEMORY;
- object->parser_create = unix_funcs->wg_avi_parser_create; + object->parser_type = WG_AVI_PARSER;
strmbase_filter_init(&object->filter, outer, &CLSID_AviSplitter, &filter_ops); strmbase_sink_init(&object->sink, &object->filter, L"input pin", &avi_splitter_sink_ops, NULL); @@ -1813,7 +1818,7 @@ HRESULT mpeg_splitter_create(IUnknown *outer, IUnknown **out) if (!(object = calloc(1, sizeof(*object)))) return E_OUTOFMEMORY;
- object->parser_create = unix_funcs->wg_mpeg_audio_parser_create; + object->parser_type = WG_MPEG_AUDIO_PARSER;
strmbase_filter_init(&object->filter, outer, &CLSID_MPEG1Splitter, &mpeg_splitter_ops); strmbase_sink_init(&object->sink, &object->filter, L"Input", &mpeg_splitter_sink_ops, NULL); diff --git a/dlls/winegstreamer/wg_parser.c b/dlls/winegstreamer/wg_parser.c index d29298632a4..7726e6aca30 100644 --- a/dlls/winegstreamer/wg_parser.c +++ b/dlls/winegstreamer/wg_parser.c @@ -45,8 +45,6 @@ GST_DEBUG_CATEGORY_STATIC(wine);
struct wg_parser { - BOOL (*init_gst)(struct wg_parser *parser); - struct wg_parser_stream **streams; unsigned int stream_count;
@@ -69,7 +67,7 @@ struct wg_parser uint32_t size; } read_request;
- bool flushing, sink_connected; + bool flushing; };
struct wg_parser_stream @@ -1290,113 +1288,6 @@ static GstBusSyncReply bus_handler_cb(GstBus *bus, GstMessage *msg, gpointer use return GST_BUS_DROP; }
-static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_size) -{ - unsigned int i; - - parser->file_size = file_size; - - if (!parser->bus) - { - parser->bus = gst_bus_new(); - gst_bus_set_sync_handler(parser->bus, bus_handler_cb, parser, NULL); - } - - parser->container = gst_bin_new(NULL); - gst_element_set_bus(parser->container, parser->bus); - - if (!(parser->appsrc = create_element("appsrc", "base"))) - return E_FAIL; - gst_bin_add(GST_BIN(parser->container), parser->appsrc); - - g_object_set(parser->appsrc, "stream-type", GST_APP_STREAM_TYPE_RANDOM_ACCESS, NULL); - g_object_set(parser->appsrc, "size", parser->file_size, NULL); - g_signal_connect(parser->appsrc, "need-data", G_CALLBACK(src_need_data), parser); - g_signal_connect(parser->appsrc, "seek-data", G_CALLBACK(src_seek_data), parser); - - parser->next_pull_offset = 0; - - if (!parser->init_gst(parser)) - return E_FAIL; - - pthread_mutex_lock(&parser->mutex); - - for (i = 0; i < parser->stream_count; ++i) - { - struct wg_parser_stream *stream = parser->streams[i]; - gint64 duration; - - while (!stream->has_caps && !parser->error) - pthread_cond_wait(&parser->state_cond, &parser->mutex); - - /* GStreamer doesn't actually provide any guarantees about when duration - * is available, even for seekable streams. It's basically built for - * applications that don't care, e.g. movie players that can display - * a duration once it's available, and update it visually if a better - * estimate is found. This doesn't really match well with DirectShow or - * Media Foundation, which both expect duration to be available - * immediately on connecting, so we have to use some complex heuristics - * to try to actually get a usable duration. - * - * Some elements (avidemux, wavparse, qtdemux) record duration almost - * immediately, before fixing caps. Such elements don't send - * duration-changed messages. Therefore always try querying duration - * after caps have been found. - * - * Some elements (mpegaudioparse) send duration-changed. In the case of - * a mp3 stream without seek tables it will not be sent immediately, but - * only after enough frames have been parsed to form an estimate. They - * may send it multiple times with increasingly accurate estimates, but - * unfortunately we have no way of knowing whether another estimate will - * be sent, so we always take the first one. We assume that if the - * duration is not immediately available then the element will always - * send duration-changed. - */ - - for (;;) - { - if (parser->error) - { - pthread_mutex_unlock(&parser->mutex); - return E_FAIL; - } - if (gst_pad_query_duration(stream->their_src, GST_FORMAT_TIME, &duration)) - { - stream->duration = duration / 100; - break; - } - - if (stream->eos) - { - stream->duration = 0; - GST_WARNING("Failed to query duration.\n"); - break; - } - - /* Elements based on GstBaseParse send duration-changed before - * actually updating the duration in GStreamer versions prior - * to 1.17.1. See gstreamer.git:d28e0b4147fe7073b2. So after - * receiving duration-changed we have to continue polling until - * the query succeeds. */ - if (parser->has_duration) - { - pthread_mutex_unlock(&parser->mutex); - g_usleep(10000); - pthread_mutex_lock(&parser->mutex); - } - else - { - pthread_cond_wait(&parser->state_cond, &parser->mutex); - } - } - } - - pthread_mutex_unlock(&parser->mutex); - - parser->sink_connected = true; - return S_OK; -} - static void free_stream(struct wg_parser_stream *stream) { if (stream->their_src) @@ -1592,57 +1483,148 @@ static BOOL wave_parser_init_gst(struct wg_parser *parser) return TRUE; }
-static struct wg_parser *wg_parser_create(void) +static void CDECL wg_parser_destroy(struct wg_parser *parser); + +static HRESULT CDECL wg_parser_create(enum wg_parser_type parser_type, uint64_t file_size, struct wg_parser **parser_out) { struct wg_parser *parser; + unsigned int i;
if (!(parser = calloc(1, sizeof(*parser)))) - return NULL; + return E_OUTOFMEMORY;
pthread_mutex_init(&parser->mutex, NULL); pthread_cond_init(&parser->state_cond, NULL); pthread_cond_init(&parser->read_cond, NULL); parser->flushing = true; parser->read_request.offset = -1; + parser->file_size = file_size;
- GST_DEBUG("Created winegstreamer parser %p.\n", parser); - return parser; -} + *parser_out = parser;
-static struct wg_parser * CDECL wg_decodebin_parser_create(void) -{ - struct wg_parser *parser; + if (!parser->bus) + { + parser->bus = gst_bus_new(); + gst_bus_set_sync_handler(parser->bus, bus_handler_cb, parser, NULL); + }
- if ((parser = wg_parser_create())) - parser->init_gst = decodebin_parser_init_gst; - return parser; -} + parser->container = gst_bin_new(NULL); + gst_element_set_bus(parser->container, parser->bus);
-static struct wg_parser * CDECL wg_avi_parser_create(void) -{ - struct wg_parser *parser; + if (!(parser->appsrc = create_element("appsrc", "base"))) + goto fail; + gst_bin_add(GST_BIN(parser->container), parser->appsrc);
- if ((parser = wg_parser_create())) - parser->init_gst = avi_parser_init_gst; - return parser; -} + g_object_set(parser->appsrc, "stream-type", GST_APP_STREAM_TYPE_RANDOM_ACCESS, NULL); + g_object_set(parser->appsrc, "size", parser->file_size, NULL); + g_signal_connect(parser->appsrc, "need-data", G_CALLBACK(src_need_data), parser); + g_signal_connect(parser->appsrc, "seek-data", G_CALLBACK(src_seek_data), parser);
-static struct wg_parser * CDECL wg_mpeg_audio_parser_create(void) -{ - struct wg_parser *parser; + parser->next_pull_offset = 0;
- if ((parser = wg_parser_create())) - parser->init_gst = mpeg_audio_parser_init_gst; - return parser; -} + switch (parser_type) + { + case WG_DECODEBIN_PARSER: + if (!decodebin_parser_init_gst(parser)) + goto fail; + break; + case WG_AVI_PARSER: + if (!avi_parser_init_gst(parser)) + goto fail; + break; + case WG_MPEG_AUDIO_PARSER: + if (!mpeg_audio_parser_init_gst(parser)) + goto fail; + break; + case WG_WAVE_PARSER: + if (!wave_parser_init_gst(parser)) + goto fail; + break; + default: + assert(0); + }
-static struct wg_parser * CDECL wg_wave_parser_create(void) -{ - struct wg_parser *parser; + pthread_mutex_lock(&parser->mutex);
- if ((parser = wg_parser_create())) - parser->init_gst = wave_parser_init_gst; - return parser; + for (i = 0; i < parser->stream_count; ++i) + { + struct wg_parser_stream *stream = parser->streams[i]; + gint64 duration; + + while (!stream->has_caps && !parser->error) + pthread_cond_wait(&parser->state_cond, &parser->mutex); + + /* GStreamer doesn't actually provide any guarantees about when duration + * is available, even for seekable streams. It's basically built for + * applications that don't care, e.g. movie players that can display + * a duration once it's available, and update it visually if a better + * estimate is found. This doesn't really match well with DirectShow or + * Media Foundation, which both expect duration to be available + * immediately on connecting, so we have to use some complex heuristics + * to try to actually get a usable duration. + * + * Some elements (avidemux, wavparse, qtdemux) record duration almost + * immediately, before fixing caps. Such elements don't send + * duration-changed messages. Therefore always try querying duration + * after caps have been found. + * + * Some elements (mpegaudioparse) send duration-changed. In the case of + * a mp3 stream without seek tables it will not be sent immediately, but + * only after enough frames have been parsed to form an estimate. They + * may send it multiple times with increasingly accurate estimates, but + * unfortunately we have no way of knowing whether another estimate will + * be sent, so we always take the first one. We assume that if the + * duration is not immediately available then the element will always + * send duration-changed. + */ + + for (;;) + { + if (parser->error) + { + pthread_mutex_unlock(&parser->mutex); + goto fail; + } + if (gst_pad_query_duration(stream->their_src, GST_FORMAT_TIME, &duration)) + { + stream->duration = duration / 100; + break; + } + + if (stream->eos) + { + stream->duration = 0; + GST_WARNING("Failed to query duration.\n"); + break; + } + + /* Elements based on GstBaseParse send duration-changed before + * actually updating the duration in GStreamer versions prior + * to 1.17.1. See gstreamer.git:d28e0b4147fe7073b2. So after + * receiving duration-changed we have to continue polling until + * the query succeeds. */ + if (parser->has_duration) + { + pthread_mutex_unlock(&parser->mutex); + g_usleep(10000); + pthread_mutex_lock(&parser->mutex); + } + else + { + pthread_cond_wait(&parser->state_cond, &parser->mutex); + } + } + } + + pthread_mutex_unlock(&parser->mutex); + + GST_DEBUG("Created winegstreamer parser %p.\n", parser); + return S_OK; + + fail: + wg_parser_destroy(parser); + *parser_out = NULL; + return E_FAIL; }
static void CDECL wg_parser_destroy(struct wg_parser *parser) @@ -1656,30 +1638,27 @@ static void CDECL wg_parser_destroy(struct wg_parser *parser) pthread_cond_wait(&parser->state_cond, &parser->mutex); pthread_mutex_unlock(&parser->mutex);
- if (parser->sink_connected) + /* Unblock all of our streams. */ + pthread_mutex_lock(&parser->mutex); + for (i = 0; i < parser->stream_count; ++i) { - /* Unblock all of our streams. */ - pthread_mutex_lock(&parser->mutex); - for (i = 0; i < parser->stream_count; ++i) - { - parser->streams[i]->flushing = true; - pthread_cond_signal(&parser->streams[i]->event_empty_cond); - } - pthread_mutex_unlock(&parser->mutex); + parser->streams[i]->flushing = true; + pthread_cond_signal(&parser->streams[i]->event_empty_cond); + } + pthread_mutex_unlock(&parser->mutex);
- gst_element_set_state(parser->container, GST_STATE_NULL); + gst_element_set_state(parser->container, GST_STATE_NULL);
- for (i = 0; i < parser->stream_count; ++i) - free_stream(parser->streams[i]); + for (i = 0; i < parser->stream_count; ++i) + free_stream(parser->streams[i]);
- parser->stream_count = 0; - free(parser->streams); - parser->streams = NULL; + parser->stream_count = 0; + free(parser->streams); + parser->streams = NULL;
- gst_element_set_bus(parser->container, NULL); - gst_object_unref(parser->container); - parser->container = NULL; - } + gst_element_set_bus(parser->container, NULL); + gst_object_unref(parser->container); + parser->container = NULL;
if (parser->bus) { @@ -1696,14 +1675,9 @@ static void CDECL wg_parser_destroy(struct wg_parser *parser)
static const struct unix_funcs funcs = { - wg_decodebin_parser_create, - wg_avi_parser_create, - wg_mpeg_audio_parser_create, - wg_wave_parser_create, + wg_parser_create, wg_parser_destroy,
- wg_parser_connect, - wg_parser_begin_flush, wg_parser_end_flush,
So I've thought about this patch some more, and since the goal here is to make adding push-mode support more streamlined, I'm thinking that a bigger rework than this would be preferred. In the current patch, we simply merge _create and _connect, having the read thread spin until the parser is ready (just like previously it spinned until sink_connect was true). However, in push mode, we need to return from the creation function and move on to ::ProcessInput and ::ProcessOutput, so maybe it would be a better idea to have _create() handle everything to gst_element_set_state(pipeline, PAUSED), and restructure other functions (i.e. get_stream_count, get_stream, get_stream_duration) to block on the corresponding step of initialization.
This would keep the pull mode functions fundamentally the same, since they query for all this data right after the parser is initialized, but allow push-mode transforms to yield control to the consumer of the transform when gstreamer wants data.
I'm going to start work on this now, but if there are any better ideas, I'd love to hear them.
On 9/2/21 11:06 AM, Derek Lesho wrote:
Hmm...
So I originally advocated for merging create/connect and disconnect/destroy. Unfortunately I then realized that you can't just do this.
On the create side, we need to block until all the GStreamer elements are done initializing, which means we of course need data. That's been pretty well described.
On the destroy side, we really need to terminate the read thread before we destroy the wg_parser object. There are multiple ways to do this, but ultimately I suspect keeping connect/disconnect around is the best one. I haven't looked in detail at patch 4/5 but I suspect it's either racy, or ends up being too complex. Sorry about that.
With regard to push mode, we don't have the race at destruction, which is nice. We do have weirdness at creation, though, as you describe.
I think what you've come up with is probably the only sensible solution, although it's also not clear to me what mfplat's actual requirements are.
I also think we want it to end up being *only* get_stream_count that suffers from this (which is honestly a bit weird given its name, but meh, whatever). We can't even construct stream objects until we're fully initialized.
On 9/1/21 4:05 PM, Derek Lesho wrote:
Typo in the title...
I don't particularly want to nitpick mfplat code, but,
(1) you're using a goto label only once, and
(2) it devolves into a free(), which seems easier and clearer.
Personally I tend to prefer making cleanup explicit in initialization functions. It's less fragile; you can be sure that you won't break anything by changing your destroy method. It also tends to require less conditionals in the destroy method.
This would also end up making most of the rest of the hunks in this patch go away.
A moot point if you take my advice, but, media_source_Release() already does this. Not to mention that conditional looks really janky.
On 9/6/21 8:23 PM, Zebediah Figura wrote:
In addition to points Zebediah made.
This is awkward because it implies some relationship between wg_parser being initialized and able to disconnect when stream_count is set. I think it's cleaner to let wg_parser_disconnect() handle null, or change condition to if (wg_parser).
This should be redundant, no?
I think expected way would be to have steam_count consistent with a number of non-null streams, so you don't have to check for this.
On 9/6/21 1:37 PM, Nikolay Sivov wrote:
The point here is that it's possible connection fails, in which case wg_parser is initialized but is not connected. There is no case right now where stream_count is unset but the source is connected, so I opted to use it. In the end, I will move this check into the constructor, as Zebediah suggested.
No, it is possible for calloc to fail, in which case stream_count is set but not the array.
So we should be modifying stream_count again in the failure path? I don't agree that this is more clear but I will do so.