Signed-off-by: Zebediah Figura z.figura12@gmail.com --- dlls/winegstreamer/gstdemux.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/dlls/winegstreamer/gstdemux.c b/dlls/winegstreamer/gstdemux.c index 5181d0951d5..5c13d5e06f3 100644 --- a/dlls/winegstreamer/gstdemux.c +++ b/dlls/winegstreamer/gstdemux.c @@ -1136,7 +1136,7 @@ static void removed_decoded_pad(GstElement *bin, GstPad *pad, gpointer user) unsigned int i; char *name;
- TRACE("filter %p, bin %p, pad %p.\n", filter, bin, pad); + GST_LOG("filter %p, bin %p, pad %p.", filter, bin, pad);
for (i = 0; i < filter->source_count; ++i) { @@ -1155,7 +1155,7 @@ static void removed_decoded_pad(GstElement *bin, GstPad *pad, gpointer user) }
name = gst_pad_get_name(pad); - WARN("No pin matching pad %s found.\n", debugstr_a(name)); + GST_LOG("No pin matching pad "%s" found.", name); g_free(name); }
Signed-off-by: Zebediah Figura z.figura12@gmail.com --- dlls/winegstreamer/gst_cbs.c | 11 ----------- dlls/winegstreamer/gst_cbs.h | 2 -- dlls/winegstreamer/gstdemux.c | 10 ++-------- 3 files changed, 2 insertions(+), 21 deletions(-)
diff --git a/dlls/winegstreamer/gst_cbs.c b/dlls/winegstreamer/gst_cbs.c index 6c5877ce785..ee2452987d9 100644 --- a/dlls/winegstreamer/gst_cbs.c +++ b/dlls/winegstreamer/gst_cbs.c @@ -177,17 +177,6 @@ GstFlowReturn request_buffer_src_wrapper(GstPad *pad, GstObject *parent, guint64 return cbdata.u.getrange_data.ret; }
-void removed_decoded_pad_wrapper(GstElement *bin, GstPad *pad, gpointer user) -{ - struct cb_data cbdata = { REMOVED_DECODED_PAD }; - - cbdata.u.pad_removed_data.element = bin; - cbdata.u.pad_removed_data.pad = pad; - cbdata.u.pad_removed_data.user = user; - - call_cb(&cbdata); -} - gboolean query_sink_wrapper(GstPad *pad, GstObject *parent, GstQuery *query) { struct cb_data cbdata = { QUERY_SINK }; diff --git a/dlls/winegstreamer/gst_cbs.h b/dlls/winegstreamer/gst_cbs.h index f063a0a2a7b..e80fcd41e4a 100644 --- a/dlls/winegstreamer/gst_cbs.h +++ b/dlls/winegstreamer/gst_cbs.h @@ -33,7 +33,6 @@ enum CB_TYPE { EXISTING_NEW_PAD, ACTIVATE_MODE, REQUEST_BUFFER_SRC, - REMOVED_DECODED_PAD, QUERY_SINK, GSTDEMUX_MAX, BYTESTREAM_WRAPPER_PULL, @@ -119,7 +118,6 @@ void existing_new_pad_wrapper(GstElement *bin, GstPad *pad, gpointer user) DECLS gboolean activate_mode_wrapper(GstPad *pad, GstObject *parent, GstPadMode mode, gboolean activate) DECLSPEC_HIDDEN; GstFlowReturn request_buffer_src_wrapper(GstPad *pad, GstObject *parent, guint64 ofs, guint len, GstBuffer **buf) DECLSPEC_HIDDEN; GstFlowReturn got_data_wrapper(GstPad *pad, GstObject *parent, GstBuffer *buf) DECLSPEC_HIDDEN; -void removed_decoded_pad_wrapper(GstElement *bin, GstPad *pad, gpointer user) DECLSPEC_HIDDEN; void Gstreamer_transform_pad_added_wrapper(GstElement *filter, GstPad *pad, gpointer user) DECLSPEC_HIDDEN; gboolean query_sink_wrapper(GstPad *pad, GstObject *parent, GstQuery *query) DECLSPEC_HIDDEN; GstFlowReturn bytestream_wrapper_pull_wrapper(GstPad *pad, GstObject *parent, guint64 ofs, guint len, GstBuffer **buf) DECLSPEC_HIDDEN; diff --git a/dlls/winegstreamer/gstdemux.c b/dlls/winegstreamer/gstdemux.c index 5c13d5e06f3..b2d102b528e 100644 --- a/dlls/winegstreamer/gstdemux.c +++ b/dlls/winegstreamer/gstdemux.c @@ -1765,7 +1765,7 @@ static BOOL decodebin_parser_init_gst(struct parser *filter) gst_bin_add(GST_BIN(filter->container), element);
g_signal_connect(element, "pad-added", G_CALLBACK(existing_new_pad_wrapper), filter); - g_signal_connect(element, "pad-removed", G_CALLBACK(removed_decoded_pad_wrapper), filter); + g_signal_connect(element, "pad-removed", G_CALLBACK(removed_decoded_pad), filter); g_signal_connect(element, "autoplug-select", G_CALLBACK(autoplug_blacklist), filter); g_signal_connect(element, "no-more-pads", G_CALLBACK(no_more_pads), filter);
@@ -2447,12 +2447,6 @@ void perform_cb_gstdemux(struct cb_data *cbdata) data->ofs, data->len, data->buf); break; } - case REMOVED_DECODED_PAD: - { - struct pad_removed_data *data = &cbdata->u.pad_removed_data; - removed_decoded_pad(data->element, data->pad, data->user); - break; - } case QUERY_SINK: { struct query_sink_data *data = &cbdata->u.query_sink_data; @@ -2617,7 +2611,7 @@ static BOOL avi_splitter_init_gst(struct parser *filter) gst_bin_add(GST_BIN(filter->container), element);
g_signal_connect(element, "pad-added", G_CALLBACK(existing_new_pad_wrapper), filter); - g_signal_connect(element, "pad-removed", G_CALLBACK(removed_decoded_pad_wrapper), filter); + g_signal_connect(element, "pad-removed", G_CALLBACK(removed_decoded_pad), filter); g_signal_connect(element, "no-more-pads", G_CALLBACK(no_more_pads), filter);
filter->their_sink = gst_element_get_static_pad(element, "sink");
Synchronization around flushing (on both sides) is tricky here, and the solution used by this patch means that a "kernel" thread can be blocked on a "user" thread. However, since it's not a real kernel thread and can't bring down the rest of the "kernel", I don't think this is a concern.
Signed-off-by: Zebediah Figura z.figura12@gmail.com --- dlls/winegstreamer/gstdemux.c | 116 +++++++++++++++++++++++++++++++--- 1 file changed, 106 insertions(+), 10 deletions(-)
diff --git a/dlls/winegstreamer/gstdemux.c b/dlls/winegstreamer/gstdemux.c index b2d102b528e..fc861735cd7 100644 --- a/dlls/winegstreamer/gstdemux.c +++ b/dlls/winegstreamer/gstdemux.c @@ -63,7 +63,7 @@ struct parser /* FIXME: It would be nice to avoid duplicating these with strmbase. * However, synchronization is tricky; we need access to be protected by a * separate lock. */ - bool streaming, flushing; + bool streaming, flushing, sink_connected;
GstElement *container; GstPad *my_src, *their_sink; @@ -76,6 +76,17 @@ struct parser
HANDLE push_thread;
+ HANDLE read_thread; + pthread_cond_t read_cond, read_done_cond; + struct + { + GstBuffer *buffer; + uint64_t offset; + uint32_t size; + bool done; + GstFlowReturn ret; + } read_request; + BOOL (*init_gst)(struct parser *filter); HRESULT (*source_query_accept)(struct parser_source *pin, const AM_MEDIA_TYPE *mt); HRESULT (*source_get_media_type)(struct parser_source *pin, unsigned int index, AM_MEDIA_TYPE *mt); @@ -684,6 +695,12 @@ static GstFlowReturn queue_stream_event(struct parser_source *pin, const struct { struct parser *filter = impl_from_strmbase_filter(pin->pin.pin.filter);
+ /* Unlike request_buffer_src() [q.v.], we need to watch for GStreamer + * flushes here. The difference is that we can be blocked by the streaming + * thread not running (or itself flushing on the DirectShow side). + * request_buffer_src() can only be blocked by the upstream source, and that + * is solved by flushing the upstream source. */ + pthread_mutex_lock(&filter->mutex); while (!pin->flushing && pin->event.type != PARSER_EVENT_NONE) pthread_cond_wait(&pin->event_empty_cond, &filter->mutex); @@ -1095,14 +1112,51 @@ static DWORD CALLBACK stream_thread(void *arg) return 0; }
-static GstFlowReturn request_buffer_src(GstPad *pad, GstObject *parent, guint64 ofs, guint len, GstBuffer **buffer) +static GstFlowReturn request_buffer_src(GstPad *pad, GstObject *parent, guint64 offset, guint size, GstBuffer **buffer) { - struct parser *This = gst_pad_get_element_private(pad); + struct parser *filter = gst_pad_get_element_private(pad); GstBuffer *new_buffer = NULL; + GstFlowReturn ret; + + GST_LOG("pad %p, offset %" G_GINT64_MODIFIER "u, length %u, buffer %p.", pad, offset, size, *buffer); + + if (!*buffer) + *buffer = new_buffer = gst_buffer_new_and_alloc(size); + + pthread_mutex_lock(&filter->mutex); + + assert(!filter->read_request.buffer); + filter->read_request.buffer = *buffer; + filter->read_request.offset = offset; + filter->read_request.size = size; + filter->read_request.done = false; + pthread_cond_signal(&filter->read_cond); + + /* Note that we don't unblock this wait on GST_EVENT_FLUSH_START. We expect + * the upstream pin to flush if necessary. We should never be blocked on + * read_thread() not running. */ + + while (!filter->read_request.done) + pthread_cond_wait(&filter->read_done_cond, &filter->mutex); + + ret = filter->read_request.ret; + + pthread_mutex_unlock(&filter->mutex); + + GST_LOG("Request returned %s.", gst_flow_get_name(ret)); + + if (ret != GST_FLOW_OK && new_buffer) + gst_buffer_unref(new_buffer); + + return ret; +} + +static GstFlowReturn read_buffer(struct parser *This, guint64 ofs, guint len, GstBuffer *buffer) +{ HRESULT hr; GstMapInfo info;
- TRACE("pad %p, offset %s, length %u, buffer %p.\n", pad, wine_dbgstr_longlong(ofs), len, *buffer); + TRACE("filter %p, offset %s, length %u, buffer %p.\n", This, wine_dbgstr_longlong(ofs), len, buffer);
if (ofs == GST_BUFFER_OFFSET_NONE) ofs = This->nextpullofs; @@ -1114,22 +1168,47 @@ static GstFlowReturn request_buffer_src(GstPad *pad, GstObject *parent, guint64 len = This->filesize - ofs; This->nextpullofs = ofs + len;
- if (!*buffer) - *buffer = new_buffer = gst_buffer_new_and_alloc(len); - gst_buffer_map(*buffer, &info, GST_MAP_WRITE); + gst_buffer_map(buffer, &info, GST_MAP_WRITE); hr = IAsyncReader_SyncRead(This->reader, ofs, len, info.data); - gst_buffer_unmap(*buffer, &info); + gst_buffer_unmap(buffer, &info); if (FAILED(hr)) { ERR("Failed to read data, hr %#x.\n", hr); - if (new_buffer) - gst_buffer_unref(new_buffer); return GST_FLOW_ERROR; }
return GST_FLOW_OK; }
+static DWORD CALLBACK read_thread(void *arg) +{ + struct parser *filter = arg; + + TRACE("Starting read thread for filter %p.\n", filter); + + pthread_mutex_lock(&filter->mutex); + + for (;;) + { + while (filter->sink_connected && !filter->read_request.buffer) + pthread_cond_wait(&filter->read_cond, &filter->mutex); + + if (!filter->sink_connected) + break; + + filter->read_request.done = true; + filter->read_request.ret = read_buffer(filter, filter->read_request.offset, + filter->read_request.size, filter->read_request.buffer); + filter->read_request.buffer = NULL; + pthread_cond_signal(&filter->read_done_cond); + } + + pthread_mutex_unlock(&filter->mutex); + + TRACE("Streaming stopped; exiting.\n"); + return 0; +} + static void removed_decoded_pad(GstElement *bin, GstPad *pad, gpointer user) { struct parser *filter = user; @@ -1506,6 +1585,10 @@ static HRESULT GST_Connect(struct parser *This, IPin *pConnectPin)
IAsyncReader_Length(This->reader, &This->filesize, &avail);
+ This->sink_connected = true; + + This->read_thread = CreateThread(NULL, 0, read_thread, This, 0, NULL); + if (!This->bus) { This->bus = gst_bus_new(); gst_bus_set_sync_handler(This->bus, watch_bus, This, NULL); @@ -1599,6 +1682,8 @@ static void parser_destroy(struct strmbase_filter *iface) gst_object_unref(filter->bus); }
+ pthread_cond_destroy(&filter->read_cond); + pthread_cond_destroy(&filter->read_done_cond); pthread_cond_destroy(&filter->init_cond); pthread_mutex_destroy(&filter->mutex);
@@ -1895,6 +1980,8 @@ static void parser_init_common(struct parser *object) { pthread_mutex_init(&object->mutex, NULL); pthread_cond_init(&object->init_cond, NULL); + pthread_cond_init(&object->read_cond, NULL); + pthread_cond_init(&object->read_done_cond, NULL); object->flushing = true; }
@@ -2411,6 +2498,15 @@ static HRESULT GST_RemoveOutputPins(struct parser *This) gst_object_unref(This->their_sink); This->my_src = This->their_sink = NULL;
+ /* read_thread() needs to stay alive to service any read requests GStreamer + * sends, so we can only shut it down after GStreamer stops. */ + pthread_mutex_lock(&This->mutex); + This->sink_connected = false; + pthread_mutex_unlock(&This->mutex); + pthread_cond_signal(&This->read_cond); + WaitForSingleObject(This->read_thread, INFINITE); + CloseHandle(This->read_thread); + for (i = 0; i < This->source_count; ++i) free_source_pin(This->sources[i]);
Signed-off-by: Zebediah Figura z.figura12@gmail.com --- dlls/winegstreamer/gst_cbs.c | 16 ---------------- dlls/winegstreamer/gst_cbs.h | 2 -- dlls/winegstreamer/gstdemux.c | 9 +-------- 3 files changed, 1 insertion(+), 26 deletions(-)
diff --git a/dlls/winegstreamer/gst_cbs.c b/dlls/winegstreamer/gst_cbs.c index ee2452987d9..5267091c5e1 100644 --- a/dlls/winegstreamer/gst_cbs.c +++ b/dlls/winegstreamer/gst_cbs.c @@ -161,22 +161,6 @@ gboolean activate_mode_wrapper(GstPad *pad, GstObject *parent, GstPadMode mode, return cbdata.u.activate_mode_data.ret; }
-GstFlowReturn request_buffer_src_wrapper(GstPad *pad, GstObject *parent, guint64 ofs, guint len, - GstBuffer **buf) -{ - struct cb_data cbdata = { REQUEST_BUFFER_SRC }; - - cbdata.u.getrange_data.pad = pad; - cbdata.u.getrange_data.parent = parent; - cbdata.u.getrange_data.ofs = ofs; - cbdata.u.getrange_data.len = len; - cbdata.u.getrange_data.buf = buf; - - call_cb(&cbdata); - - return cbdata.u.getrange_data.ret; -} - gboolean query_sink_wrapper(GstPad *pad, GstObject *parent, GstQuery *query) { struct cb_data cbdata = { QUERY_SINK }; diff --git a/dlls/winegstreamer/gst_cbs.h b/dlls/winegstreamer/gst_cbs.h index e80fcd41e4a..4afde78ba7d 100644 --- a/dlls/winegstreamer/gst_cbs.h +++ b/dlls/winegstreamer/gst_cbs.h @@ -32,7 +32,6 @@ typedef enum { enum CB_TYPE { EXISTING_NEW_PAD, ACTIVATE_MODE, - REQUEST_BUFFER_SRC, QUERY_SINK, GSTDEMUX_MAX, BYTESTREAM_WRAPPER_PULL, @@ -116,7 +115,6 @@ void perform_cb_media_source(struct cb_data *data) DECLSPEC_HIDDEN;
void existing_new_pad_wrapper(GstElement *bin, GstPad *pad, gpointer user) DECLSPEC_HIDDEN; gboolean activate_mode_wrapper(GstPad *pad, GstObject *parent, GstPadMode mode, gboolean activate) DECLSPEC_HIDDEN; -GstFlowReturn request_buffer_src_wrapper(GstPad *pad, GstObject *parent, guint64 ofs, guint len, GstBuffer **buf) DECLSPEC_HIDDEN; GstFlowReturn got_data_wrapper(GstPad *pad, GstObject *parent, GstBuffer *buf) DECLSPEC_HIDDEN; void Gstreamer_transform_pad_added_wrapper(GstElement *filter, GstPad *pad, gpointer user) DECLSPEC_HIDDEN; gboolean query_sink_wrapper(GstPad *pad, GstObject *parent, GstQuery *query) DECLSPEC_HIDDEN; diff --git a/dlls/winegstreamer/gstdemux.c b/dlls/winegstreamer/gstdemux.c index fc861735cd7..5c6266f4bd0 100644 --- a/dlls/winegstreamer/gstdemux.c +++ b/dlls/winegstreamer/gstdemux.c @@ -1598,7 +1598,7 @@ static HRESULT GST_Connect(struct parser *This, IPin *pConnectPin) gst_element_set_bus(This->container, This->bus);
This->my_src = gst_pad_new_from_static_template(&src_template, "quartz-src"); - gst_pad_set_getrange_function(This->my_src, request_buffer_src_wrapper); + gst_pad_set_getrange_function(This->my_src, request_buffer_src); gst_pad_set_query_function(This->my_src, query_function); gst_pad_set_activatemode_function(This->my_src, activate_mode_wrapper); gst_pad_set_event_function(This->my_src, event_src); @@ -2536,13 +2536,6 @@ void perform_cb_gstdemux(struct cb_data *cbdata) cbdata->u.activate_mode_data.ret = activate_mode(data->pad, data->parent, data->mode, data->activate); break; } - case REQUEST_BUFFER_SRC: - { - struct getrange_data *data = &cbdata->u.getrange_data; - cbdata->u.getrange_data.ret = request_buffer_src(data->pad, data->parent, - data->ofs, data->len, data->buf); - break; - } case QUERY_SINK: { struct query_sink_data *data = &cbdata->u.query_sink_data;