This is a rather large and complex change. It comprises several parts:
(1) Instead of directly sending EOS, segment, and sample events to the downstream DirectShow sink, first queue them in a local buffer (i.e. "pin->event").
(2) Spawn a separate thread per source pin (i.e. "stream_thread") which consumes said events and sends them downstream.
(3) When flushing or stopping, explicitly wait for this thread to pause or stop (respectively).
There are a few reasons for this:
(1) It reduces the number of Unix -> PE callbacks we need to make, easing PE conversion. This is not a great advantage *a priori*, and may not be worth a similar dedicated "handler" thread for most modules, but winegstreamer is different—we were already marshalling these calls onto another thread, and now that marshalling can go away (almost).
(2) Because GStreamer can only do pad negotiation (and hence autoplugging) while running (in contrast to DirectShow, which can do it while stopped), we currently have to renegotiate every time the pipeline is started. Most applications don't start the graph more than once, but even that requires two negotiations, and startup time is demonstrably too high. It would be nice to keep the graph in PAUSED state all of the time, but this is difficult to do without more fine-grained control over the streaming thread. [In particular, we cannot reliably wait for all samples to be delivered except by stopping the GStreamer pipeline.]
Signed-off-by: Zebediah Figura z.figura12@gmail.com --- dlls/winegstreamer/gst_private.h | 1 + dlls/winegstreamer/gstdemux.c | 341 ++++++++++++++++++++++++++----- 2 files changed, 290 insertions(+), 52 deletions(-)
diff --git a/dlls/winegstreamer/gst_private.h b/dlls/winegstreamer/gst_private.h index e591a95f3ca..9cd3c8adff5 100644 --- a/dlls/winegstreamer/gst_private.h +++ b/dlls/winegstreamer/gst_private.h @@ -22,6 +22,7 @@ #define __GST_PRIVATE_INCLUDED__
#include <stdarg.h> +#include <stdbool.h> #include <stdio.h> #include <gst/gst.h> #include <gst/video/video.h> diff --git a/dlls/winegstreamer/gstdemux.c b/dlls/winegstreamer/gstdemux.c index 4a0a95c68f3..30f6fbca168 100644 --- a/dlls/winegstreamer/gstdemux.c +++ b/dlls/winegstreamer/gstdemux.c @@ -60,6 +60,13 @@ struct parser
LONGLONG filesize;
+ CRITICAL_SECTION cs; + + /* FIXME: It would be nice to avoid duplicating these with strmbase. + * However, synchronization is tricky; we need access to be protected by a + * separate lock. */ + bool streaming; + BOOL initial, ignore_flush; GstElement *container; GstPad *my_src, *their_sink; @@ -74,6 +81,24 @@ struct parser HRESULT (*source_get_media_type)(struct parser_source *pin, unsigned int index, AM_MEDIA_TYPE *mt); };
+enum parser_event_type +{ + PARSER_EVENT_NONE = 0, + PARSER_EVENT_BUFFER, + PARSER_EVENT_EOS, + PARSER_EVENT_SEGMENT, +}; + +struct parser_event +{ + enum parser_event_type type; + union + { + GstBuffer *buffer; + GstEvent *segment; + } u; +}; + struct parser_source { struct strmbase_source pin; @@ -85,6 +110,11 @@ struct parser_source GstSegment *segment; GstCaps *caps; SourceSeeking seek; + + CONDITION_VARIABLE event_cv, event_empty_cv, flushing_cv, flush_stop_cv; + bool flushing, thread_blocked; + struct parser_event event; + HANDLE thread; };
static inline struct parser *impl_from_strmbase_filter(struct strmbase_filter *iface) @@ -657,49 +687,62 @@ static gboolean event_src(GstPad *pad, GstObject *parent, GstEvent *event) return ret; }
+static GstFlowReturn queue_stream_event(struct parser_source *pin, const struct parser_event *event) +{ + struct parser *filter = impl_from_strmbase_filter(pin->pin.pin.filter); + + EnterCriticalSection(&filter->cs); + while (filter->streaming && !pin->flushing && pin->event.type != PARSER_EVENT_NONE) + SleepConditionVariableCS(&pin->event_empty_cv, &filter->cs, INFINITE); + if (!filter->streaming || pin->flushing) + { + LeaveCriticalSection(&filter->cs); + TRACE("Filter is flushing; discarding event.\n"); + return GST_FLOW_FLUSHING; + } + pin->event = *event; + LeaveCriticalSection(&filter->cs); + WakeConditionVariable(&pin->event_cv); + TRACE("Event queued.\n"); + return GST_FLOW_OK; +} + static gboolean event_sink(GstPad *pad, GstObject *parent, GstEvent *event) { struct parser_source *pin = gst_pad_get_element_private(pad); + struct parser *filter = impl_from_strmbase_filter(pin->pin.pin.filter);
TRACE("pin %p, type "%s".\n", pin, GST_EVENT_TYPE_NAME(event));
- switch (event->type) { - case GST_EVENT_SEGMENT: { - gdouble rate, applied_rate; - gint64 stop, pos; - const GstSegment *segment; - - gst_event_parse_segment(event, &segment); - - pos = segment->position; - stop = segment->stop; - rate = segment->rate; - applied_rate = segment->applied_rate; - - if (segment->format != GST_FORMAT_TIME) + switch (event->type) + { + case GST_EVENT_SEGMENT: + if (pin->pin.pin.peer) { - FIXME("Unhandled format "%s".\n", gst_format_get_name(segment->format)); - break; + struct parser_event stream_event; + + stream_event.type = PARSER_EVENT_SEGMENT; + stream_event.u.segment = event; + if (queue_stream_event(pin, &stream_event) == GST_FLOW_OK) + { + /* Transfer our reference to the event to the thread. */ + return TRUE; + } } - - gst_segment_copy_into(segment, pin->segment); - - pos /= 100; - - if (stop > 0) - stop /= 100; - - if (pin->pin.pin.peer) - IPin_NewSegment(pin->pin.pin.peer, pos, stop, rate*applied_rate); - break; - } + case GST_EVENT_EOS: if (pin->pin.pin.peer) - IPin_EndOfStream(pin->pin.pin.peer); + { + struct parser_event stream_event; + + stream_event.type = PARSER_EVENT_EOS; + queue_stream_event(pin, &stream_event); + } else SetEvent(pin->eos_event); break; + case GST_EVENT_FLUSH_START: if (impl_from_strmbase_filter(pin->pin.pin.filter)->ignore_flush) { /* gst-plugins-base prior to 1.7 contains a bug which causes @@ -713,13 +756,53 @@ static gboolean event_sink(GstPad *pad, GstObject *parent, GstEvent *event) break; } if (pin->pin.pin.peer) + { IPin_BeginFlush(pin->pin.pin.peer); + + EnterCriticalSection(&filter->cs); + + pin->flushing = true; + WakeConditionVariable(&pin->event_cv); + WakeConditionVariable(&pin->event_empty_cv); + /* Wait for the thread to pause itself, to ensure that no stale + * samples are sent. */ + while (!pin->thread_blocked) + SleepConditionVariableCS(&pin->flushing_cv, &filter->cs, INFINITE); + + /* And flush out any buffered event. */ + switch (pin->event.type) + { + case PARSER_EVENT_NONE: + case PARSER_EVENT_EOS: + break; + + case PARSER_EVENT_BUFFER: + gst_buffer_unref(pin->event.u.buffer); + break; + + case PARSER_EVENT_SEGMENT: + gst_event_unref(pin->event.u.segment); + break; + } + pin->event.type = PARSER_EVENT_NONE; + + LeaveCriticalSection(&filter->cs); + } break; + case GST_EVENT_FLUSH_STOP: gst_segment_init(pin->segment, GST_FORMAT_TIME); if (pin->pin.pin.peer) + { + EnterCriticalSection(&filter->cs); + pin->flushing = false; + LeaveCriticalSection(&filter->cs); + WakeConditionVariable(&pin->flush_stop_cv); + IPin_EndFlush(pin->pin.pin.peer); + } break; + case GST_EVENT_CAPS: { GstCaps *caps; @@ -729,6 +812,7 @@ static gboolean event_sink(GstPad *pad, GstObject *parent, GstEvent *event) SetEvent(pin->caps_event); break; } + default: WARN("Ignoring "%s" event.\n", GST_EVENT_TYPE_NAME(event)); } @@ -806,6 +890,36 @@ static DWORD CALLBACK push_data(LPVOID iface) return 0; }
+static GstFlowReturn got_data_sink(GstPad *pad, GstObject *parent, GstBuffer *buffer) +{ + struct parser_source *pin = gst_pad_get_element_private(pad); + struct parser *filter = impl_from_strmbase_filter(pin->pin.pin.filter); + struct parser_event stream_event; + GstFlowReturn ret; + + TRACE("pad %p, pin %p, buffer %p.\n", pad, pin, buffer); + + if (filter->initial) + { + gst_buffer_unref(buffer); + return GST_FLOW_OK; + } + + if (!pin->pin.pin.peer) + { + gst_buffer_unref(buffer); + return GST_FLOW_NOT_LINKED; + } + + stream_event.type = PARSER_EVENT_BUFFER; + stream_event.u.buffer = buffer; + /* Transfer our reference to the buffer to the thread. */ + if ((ret = queue_stream_event(pin, &stream_event)) != GST_FLOW_OK) + gst_buffer_unref(buffer); + return ret; +} + +/* Fill and send a single IMediaSample. */ static HRESULT send_sample(struct parser_source *pin, IMediaSample *sample, GstBuffer *buf, GstMapInfo *info, gsize offset, gsize size, DWORD bytes_per_second) { @@ -869,21 +983,15 @@ static HRESULT send_sample(struct parser_source *pin, IMediaSample *sample, return hr; }
-static GstFlowReturn got_data_sink(GstPad *pad, GstObject *parent, GstBuffer *buf) +/* Send a single GStreamer buffer (splitting it into multiple IMediaSamples if + * necessary). */ +static void send_buffer(struct parser_source *pin, GstBuffer *buf) { - struct parser_source *pin = gst_pad_get_element_private(pad); - struct parser *This = impl_from_strmbase_filter(pin->pin.pin.filter); - HRESULT hr = S_OK; + HRESULT hr; + BYTE *ptr = NULL; IMediaSample *sample; GstMapInfo info;
- TRACE("%p %p\n", pad, buf); - - if (This->initial) { - gst_buffer_unref(buf); - return GST_FLOW_OK; - } - gst_buffer_map(buf, &info, GST_MAP_READ);
if (IsEqualGUID(&pin->pin.pin.mt.formattype, &FORMAT_WaveFormatEx) @@ -937,14 +1045,93 @@ static GstFlowReturn got_data_sink(GstPad *pad, GstObject *parent, GstBuffer *bu gst_buffer_unmap(buf, &info);
gst_buffer_unref(buf); +}
- if (hr == VFW_E_NOT_CONNECTED) - return GST_FLOW_NOT_LINKED; +static DWORD CALLBACK stream_thread(void *arg) +{ + struct parser_source *pin = arg; + struct parser *filter = impl_from_strmbase_filter(pin->pin.pin.filter);
- if (FAILED(hr)) - return GST_FLOW_FLUSHING; + TRACE("Starting streaming thread for pin %p.\n", pin);
- return GST_FLOW_OK; + for (;;) + { + struct parser_event event; + + EnterCriticalSection(&filter->cs); + + while (filter->streaming && !pin->flushing && pin->event.type == PARSER_EVENT_NONE) + SleepConditionVariableCS(&pin->event_cv, &filter->cs, INFINITE); + + if (pin->flushing) + { + TRACE("Filter is flushing; pausing thread.\n"); + pin->thread_blocked = true; + WakeConditionVariable(&pin->flushing_cv); + do + SleepConditionVariableCS(&pin->flush_stop_cv, &filter->cs, INFINITE); + while (pin->flushing); + pin->thread_blocked = false; + TRACE("Filter is no longer flushing; resuming thread.\n"); + } + + if (!filter->streaming) + { + LeaveCriticalSection(&filter->cs); + break; + } + + if (!pin->event.type) + { + LeaveCriticalSection(&filter->cs); + continue; + } + + event = pin->event; + pin->event.type = PARSER_EVENT_NONE; + WakeConditionVariable(&pin->event_empty_cv); + + LeaveCriticalSection(&filter->cs); + + TRACE("Got event of type %#x.\n", event.type); + + switch (event.type) + { + case PARSER_EVENT_BUFFER: + send_buffer(pin, event.u.buffer); + break; + + case PARSER_EVENT_EOS: + IPin_EndOfStream(pin->pin.pin.peer); + break; + + case PARSER_EVENT_SEGMENT: + { + const GstSegment *segment; + + gst_event_parse_segment(event.u.segment, &segment); + + if (segment->format != GST_FORMAT_TIME) + { + FIXME("Unhandled format "%s".\n", gst_format_get_name(segment->format)); + break; + } + + gst_segment_copy_into(segment, pin->segment); + + IPin_NewSegment(pin->pin.pin.peer, segment->position / 100, + segment->stop / 100, segment->rate * segment->applied_rate); + gst_event_unref(event.u.segment); + break; + } + + case PARSER_EVENT_NONE: + assert(0); + } + } + + TRACE("Streaming stopped; exiting.\n"); + return 0; }
static GstFlowReturn request_buffer_src(GstPad *pad, GstObject *parent, guint64 ofs, guint len, GstBuffer **buffer) @@ -1478,6 +1665,9 @@ static void parser_destroy(struct strmbase_filter *iface) gst_bus_set_sync_handler(filter->bus, NULL, NULL, NULL); gst_object_unref(filter->bus); } + + filter->cs.DebugInfo->Spare[0] = 0; + DeleteCriticalSection(&filter->cs); strmbase_sink_cleanup(&filter->sink); strmbase_filter_cleanup(&filter->filter); heap_free(filter); @@ -1493,12 +1683,21 @@ static HRESULT parser_init_stream(struct strmbase_filter *iface) if (!filter->container) return S_OK;
+ EnterCriticalSection(&filter->cs); + filter->streaming = true; + LeaveCriticalSection(&filter->cs); + for (i = 0; i < filter->source_count; ++i) { HRESULT hr;
- if (filter->sources[i]->pin.pin.peer && FAILED(hr = IMemAllocator_Commit(filter->sources[i]->pin.pAllocator))) + if (!filter->sources[i]->pin.pin.peer) + continue; + + if (FAILED(hr = IMemAllocator_Commit(filter->sources[i]->pin.pAllocator))) ERR("Failed to commit allocator, hr %#x.\n", hr); + + filter->sources[i]->thread = CreateThread(NULL, 0, stream_thread, filter->sources[i], 0, NULL); }
if (filter->no_more_pads_event) @@ -1544,6 +1743,21 @@ static HRESULT parser_cleanup_stream(struct strmbase_filter *iface) if (!filter->container) return S_OK;
+ EnterCriticalSection(&filter->cs); + filter->streaming = false; + LeaveCriticalSection(&filter->cs); + + for (i = 0; i < filter->source_count; ++i) + { + struct parser_source *pin = filter->sources[i]; + + if (!pin->pin.pin.peer) + continue; + + WakeConditionVariable(&pin->event_cv); + WakeConditionVariable(&pin->event_empty_cv); + } + filter->ignore_flush = TRUE; if ((ret = gst_element_set_state(filter->container, GST_STATE_READY)) == GST_STATE_CHANGE_FAILURE) { @@ -1555,8 +1769,16 @@ static HRESULT parser_cleanup_stream(struct strmbase_filter *iface)
for (i = 0; i < filter->source_count; ++i) { - if (filter->sources[i]->pin.pin.peer) - IMemAllocator_Decommit(filter->sources[i]->pin.pAllocator); + struct parser_source *pin = filter->sources[i]; + + if (!pin->pin.pin.peer) + continue; + + IMemAllocator_Decommit(pin->pin.pAllocator); + + WaitForSingleObject(pin->thread, INFINITE); + CloseHandle(pin->thread); + pin->thread = NULL; }
return S_OK; @@ -1797,6 +2019,13 @@ static BOOL parser_init_gstreamer(void) return TRUE; }
+static void parser_init_common(struct parser *object) +{ + object->error_event = CreateEventW(NULL, TRUE, FALSE, NULL); + InitializeCriticalSection(&object->cs); + object->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": parser.cs"); +} + HRESULT decodebin_parser_create(IUnknown *outer, IUnknown **out) { struct parser *object; @@ -1809,11 +2038,12 @@ HRESULT decodebin_parser_create(IUnknown *outer, IUnknown **out) if (!(object = heap_alloc_zero(sizeof(*object)))) return E_OUTOFMEMORY;
+ parser_init_common(object); + strmbase_filter_init(&object->filter, outer, &CLSID_decodebin_parser, &filter_ops); strmbase_sink_init(&object->sink, &object->filter, wcsInputPinName, &sink_ops, NULL);
object->no_more_pads_event = CreateEventW(NULL, FALSE, FALSE, NULL); - object->error_event = CreateEventW(NULL, TRUE, FALSE, NULL); object->init_gst = decodebin_parser_init_gst; object->source_query_accept = decodebin_parser_source_query_accept; object->source_get_media_type = decodebin_parser_source_get_media_type; @@ -2203,6 +2433,10 @@ static struct parser_source *create_pin(struct parser *filter, const WCHAR *name pin->IQualityControl_iface.lpVtbl = &GSTOutPin_QualityControl_Vtbl; strmbase_seeking_init(&pin->seek, &GST_Seeking_Vtbl, GST_ChangeStop, GST_ChangeCurrent, GST_ChangeRate); + InitializeConditionVariable(&pin->event_cv); + InitializeConditionVariable(&pin->event_empty_cv); + InitializeConditionVariable(&pin->flushing_cv); + InitializeConditionVariable(&pin->flush_stop_cv); BaseFilterImpl_IncrementPinVersion(&filter->filter);
sprintf(pad_name, "qz_sink_%u", filter->source_count); @@ -2444,10 +2678,11 @@ HRESULT wave_parser_create(IUnknown *outer, IUnknown **out) if (!(object = heap_alloc_zero(sizeof(*object)))) return E_OUTOFMEMORY;
+ parser_init_common(object); + strmbase_filter_init(&object->filter, outer, &CLSID_WAVEParser, &filter_ops); strmbase_sink_init(&object->sink, &object->filter, sink_name, &wave_parser_sink_ops, NULL); object->init_gst = wave_parser_init_gst; - object->error_event = CreateEventW(NULL, TRUE, FALSE, NULL); object->source_query_accept = wave_parser_source_query_accept; object->source_get_media_type = wave_parser_source_get_media_type;
@@ -2568,10 +2803,11 @@ HRESULT avi_splitter_create(IUnknown *outer, IUnknown **out) if (!(object = heap_alloc_zero(sizeof(*object)))) return E_OUTOFMEMORY;
+ parser_init_common(object); + strmbase_filter_init(&object->filter, outer, &CLSID_AviSplitter, &filter_ops); strmbase_sink_init(&object->sink, &object->filter, sink_name, &avi_splitter_sink_ops, NULL); object->no_more_pads_event = CreateEventW(NULL, FALSE, FALSE, NULL); - object->error_event = CreateEventW(NULL, TRUE, FALSE, NULL); object->init_gst = avi_splitter_init_gst; object->source_query_accept = avi_splitter_source_query_accept; object->source_get_media_type = avi_splitter_source_get_media_type; @@ -2725,12 +2961,13 @@ HRESULT mpeg_splitter_create(IUnknown *outer, IUnknown **out) if (!(object = heap_alloc_zero(sizeof(*object)))) return E_OUTOFMEMORY;
+ parser_init_common(object); + strmbase_filter_init(&object->filter, outer, &CLSID_MPEG1Splitter, &mpeg_splitter_ops); strmbase_sink_init(&object->sink, &object->filter, sink_name, &mpeg_splitter_sink_ops, NULL); object->IAMStreamSelect_iface.lpVtbl = &stream_select_vtbl;
object->duration_event = CreateEventW(NULL, FALSE, FALSE, NULL); - object->error_event = CreateEventW(NULL, TRUE, FALSE, NULL); object->init_gst = mpeg_splitter_init_gst; object->source_query_accept = mpeg_splitter_source_query_accept; object->source_get_media_type = mpeg_splitter_source_get_media_type;