Signed-off-by: Zebediah Figura z.figura12@gmail.com --- dlls/quartz/dsoundrender.c | 7 +------ dlls/strmbase/pin.c | 4 ++++ dlls/strmbase/renderer.c | 5 ----- 3 files changed, 5 insertions(+), 11 deletions(-)
diff --git a/dlls/quartz/dsoundrender.c b/dlls/quartz/dsoundrender.c index f07e7b0d1e0..7e08d90569e 100644 --- a/dlls/quartz/dsoundrender.c +++ b/dlls/quartz/dsoundrender.c @@ -374,18 +374,13 @@ static HRESULT WINAPI dsound_render_sink_Receive(struct strmbase_sink *iface, IM if (FAILED(hr = DSoundRender_PrepareReceive(filter, sample))) return hr;
- EnterCriticalSection(&filter->filter.stream_cs); - if (filter->filter.clock && SUCCEEDED(IMediaSample_GetTime(sample, &start, &stop))) strmbase_passthrough_update_time(&filter->passthrough, start);
if (filter->filter.state == State_Paused) SetEvent(filter->state_event);
- hr = DSoundRender_DoRenderSample(filter, sample); - - LeaveCriticalSection(&filter->filter.stream_cs); - return hr; + return DSoundRender_DoRenderSample(filter, sample); }
static HRESULT dsound_render_sink_query_interface(struct strmbase_pin *iface, REFIID iid, void **out) diff --git a/dlls/strmbase/pin.c b/dlls/strmbase/pin.c index dcfbdcd767e..54f8662e7ff 100644 --- a/dlls/strmbase/pin.c +++ b/dlls/strmbase/pin.c @@ -1140,7 +1140,11 @@ static HRESULT WINAPI MemInputPin_Receive(IMemInputPin *iface, IMediaSample *sam debugstr_w(pin->pin.name), sample);
if (pin->pFuncsTable->pfnReceive) + { + EnterCriticalSection(&pin->pin.filter->stream_cs); hr = pin->pFuncsTable->pfnReceive(pin, sample); + LeaveCriticalSection(&pin->pin.filter->stream_cs); + } return hr; }
diff --git a/dlls/strmbase/renderer.c b/dlls/strmbase/renderer.c index b63129e0e87..b890ab72bf6 100644 --- a/dlls/strmbase/renderer.c +++ b/dlls/strmbase/renderer.c @@ -192,8 +192,6 @@ static HRESULT WINAPI BaseRenderer_Receive(struct strmbase_sink *pin, IMediaSamp DeleteMediaType(mt); }
- EnterCriticalSection(&filter->filter.stream_cs); - if (filter->filter.clock && SUCCEEDED(IMediaSample_GetTime(sample, &start, &stop))) { strmbase_passthrough_update_time(&filter->passthrough, start); @@ -229,7 +227,6 @@ static HRESULT WINAPI BaseRenderer_Receive(struct strmbase_sink *pin, IMediaSamp
if (ret == 1) { - LeaveCriticalSection(&filter->filter.stream_cs); TRACE("Flush signaled; discarding current sample.\n"); return S_OK; } @@ -245,8 +242,6 @@ static HRESULT WINAPI BaseRenderer_Receive(struct strmbase_sink *pin, IMediaSamp
QualityControlRender_DoQOS(&filter->qc);
- LeaveCriticalSection(&filter->filter.stream_cs); - return hr; }
Signed-off-by: Zebediah Figura z.figura12@gmail.com --- dlls/quartz/dsoundrender.c | 3 --- dlls/strmbase/pin.c | 21 +++++++++++++-------- dlls/strmbase/renderer.c | 3 --- 3 files changed, 13 insertions(+), 14 deletions(-)
diff --git a/dlls/quartz/dsoundrender.c b/dlls/quartz/dsoundrender.c index 7e08d90569e..4f1da6eaef3 100644 --- a/dlls/quartz/dsoundrender.c +++ b/dlls/quartz/dsoundrender.c @@ -509,8 +509,6 @@ static HRESULT dsound_render_sink_eos(struct strmbase_sink *iface) void *buffer; DWORD size;
- EnterCriticalSection(&filter->filter.stream_cs); - filter->eos = TRUE;
if (graph && SUCCEEDED(IFilterGraph_QueryInterface(graph, @@ -529,7 +527,6 @@ static HRESULT dsound_render_sink_eos(struct strmbase_sink *iface) memset(buffer, 0, size); IDirectSoundBuffer_Unlock(filter->dsbuffer, buffer, size, NULL, 0);
- LeaveCriticalSection(&filter->filter.stream_cs); return S_OK; }
diff --git a/dlls/strmbase/pin.c b/dlls/strmbase/pin.c index 54f8662e7ff..e815f7b82df 100644 --- a/dlls/strmbase/pin.c +++ b/dlls/strmbase/pin.c @@ -926,21 +926,26 @@ static HRESULT deliver_endofstream(IPin* pin, LPVOID unused)
static HRESULT WINAPI sink_EndOfStream(IPin *iface) { - struct strmbase_sink *This = impl_sink_from_IPin(iface); + struct strmbase_sink *pin = impl_sink_from_IPin(iface); HRESULT hr = S_OK;
- TRACE("pin %p %s:%s.\n", This, debugstr_w(This->pin.filter->name), debugstr_w(This->pin.name)); + TRACE("pin %p %s:%s.\n", pin, debugstr_w(pin->pin.filter->name), debugstr_w(pin->pin.name));
- if (This->pFuncsTable->sink_eos) - return This->pFuncsTable->sink_eos(This); + if (pin->pFuncsTable->sink_eos) + { + EnterCriticalSection(&pin->pin.filter->stream_cs); + hr = pin->pFuncsTable->sink_eos(pin); + LeaveCriticalSection(&pin->pin.filter->stream_cs); + return hr; + }
- EnterCriticalSection(&This->pin.filter->filter_cs); - if (This->flushing) + EnterCriticalSection(&pin->pin.filter->filter_cs); + if (pin->flushing) hr = S_FALSE; - LeaveCriticalSection(&This->pin.filter->filter_cs); + LeaveCriticalSection(&pin->pin.filter->filter_cs);
if (hr == S_OK) - hr = SendFurther(This, deliver_endofstream, NULL); + hr = SendFurther(pin, deliver_endofstream, NULL); return hr; }
diff --git a/dlls/strmbase/renderer.c b/dlls/strmbase/renderer.c index b890ab72bf6..8da3d81de61 100644 --- a/dlls/strmbase/renderer.c +++ b/dlls/strmbase/renderer.c @@ -268,8 +268,6 @@ static HRESULT sink_eos(struct strmbase_sink *iface) IFilterGraph *graph = filter->filter.graph; IMediaEventSink *event_sink;
- EnterCriticalSection(&filter->filter.stream_cs); - filter->eos = TRUE;
if (graph && SUCCEEDED(IFilterGraph_QueryInterface(graph, @@ -282,7 +280,6 @@ static HRESULT sink_eos(struct strmbase_sink *iface) strmbase_passthrough_eos(&filter->passthrough); SetEvent(filter->state_event);
- LeaveCriticalSection(&filter->filter.stream_cs); return S_OK; }
Signed-off-by: Zebediah Figura z.figura12@gmail.com --- dlls/quartz/acmwrapper.c | 11 ----------- 1 file changed, 11 deletions(-)
diff --git a/dlls/quartz/acmwrapper.c b/dlls/quartz/acmwrapper.c index ada8c474640..81e856a00df 100644 --- a/dlls/quartz/acmwrapper.c +++ b/dlls/quartz/acmwrapper.c @@ -38,7 +38,6 @@ WINE_DEFAULT_DEBUG_CHANNEL(quartz); struct acm_wrapper { struct strmbase_filter filter; - CRITICAL_SECTION stream_cs;
struct strmbase_source source; IQualityControl source_IQualityControl_iface; @@ -108,13 +107,10 @@ static HRESULT WINAPI acm_wrapper_sink_Receive(struct strmbase_sink *iface, IMed if (This->sink.flushing) return S_FALSE;
- EnterCriticalSection(&This->stream_cs); - hr = IMediaSample_GetPointer(pSample, &pbSrcStream); if (FAILED(hr)) { ERR("Cannot get pointer to sample data (%x)\n", hr); - LeaveCriticalSection(&This->stream_cs); return hr; }
@@ -150,7 +146,6 @@ static HRESULT WINAPI acm_wrapper_sink_Receive(struct strmbase_sink *iface, IMed if (FAILED(hr)) { ERR("Unable to get delivery buffer (%x)\n", hr); - LeaveCriticalSection(&This->stream_cs); return hr; } IMediaSample_SetPreroll(pOutSample, preroll); @@ -267,7 +262,6 @@ error: This->lasttime_real = tStop; This->lasttime_sent = tMed;
- LeaveCriticalSection(&This->stream_cs); return hr; }
@@ -487,8 +481,6 @@ static void acm_wrapper_destroy(struct strmbase_filter *iface) strmbase_source_cleanup(&filter->source); strmbase_passthrough_cleanup(&filter->passthrough);
- filter->stream_cs.DebugInfo->Spare[0] = 0; - DeleteCriticalSection(&filter->stream_cs); FreeMediaType(&filter->mt); strmbase_filter_cleanup(&filter->filter); free(filter); @@ -532,9 +524,6 @@ HRESULT acm_wrapper_create(IUnknown *outer, IUnknown **out)
strmbase_filter_init(&object->filter, outer, &CLSID_ACMWrapper, &filter_ops);
- InitializeCriticalSection(&object->stream_cs); - object->stream_cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__": acm_wrapper.stream_cs"); - strmbase_sink_init(&object->sink, &object->filter, L"In", &sink_ops, NULL);
strmbase_source_init(&object->source, &object->filter, L"Out", &source_ops);
Signed-off-by: Zebediah Figura z.figura12@gmail.com --- dlls/quartz/avidec.c | 18 ++---------------- 1 file changed, 2 insertions(+), 16 deletions(-)
diff --git a/dlls/quartz/avidec.c b/dlls/quartz/avidec.c index 77d2b5e2aa5..b1e07fe5879 100644 --- a/dlls/quartz/avidec.c +++ b/dlls/quartz/avidec.c @@ -39,7 +39,6 @@ WINE_DEFAULT_DEBUG_CHANNEL(quartz); struct avi_decompressor { struct strmbase_filter filter; - CRITICAL_SECTION stream_cs;
struct strmbase_source source; IQualityControl source_IQualityControl_iface; @@ -129,13 +128,10 @@ static HRESULT WINAPI avi_decompressor_sink_Receive(struct strmbase_sink *iface, if (This->sink.flushing) return S_FALSE;
- EnterCriticalSection(&This->stream_cs); - hr = IMediaSample_GetPointer(pSample, &pbSrcStream); if (FAILED(hr)) { ERR("Cannot get pointer to sample data (%x)\n", hr); - LeaveCriticalSection(&This->stream_cs); return hr; }
@@ -149,7 +145,6 @@ static HRESULT WINAPI avi_decompressor_sink_Receive(struct strmbase_sink *iface, hr = BaseOutputPinImpl_GetDeliveryBuffer(&This->source, &pOutSample, NULL, NULL, 0); if (FAILED(hr)) { ERR("Unable to get delivery buffer (%x)\n", hr); - LeaveCriticalSection(&This->stream_cs); return hr; }
@@ -160,7 +155,6 @@ static HRESULT WINAPI avi_decompressor_sink_Receive(struct strmbase_sink *iface, if (FAILED(hr)) { ERR("Unable to get pointer to buffer (%x)\n", hr); IMediaSample_Release(pOutSample); - LeaveCriticalSection(&This->stream_cs); return hr; } cbDstStream = IMediaSample_GetSize(pOutSample); @@ -168,7 +162,6 @@ static HRESULT WINAPI avi_decompressor_sink_Receive(struct strmbase_sink *iface, { ERR("Sample size is too small (%u < %u).\n", cbDstStream, source_format->bmiHeader.biSizeImage); IMediaSample_Release(pOutSample); - LeaveCriticalSection(&This->stream_cs); return E_FAIL; }
@@ -187,7 +180,6 @@ static HRESULT WINAPI avi_decompressor_sink_Receive(struct strmbase_sink *iface, /* Drop sample if it's intended to be dropped */ if (flags & ICDECOMPRESS_HURRYUP) { IMediaSample_Release(pOutSample); - LeaveCriticalSection(&This->stream_cs); return S_OK; }
@@ -209,7 +201,6 @@ static HRESULT WINAPI avi_decompressor_sink_Receive(struct strmbase_sink *iface, ERR("Error sending sample (%x)\n", hr);
IMediaSample_Release(pOutSample); - LeaveCriticalSection(&This->stream_cs); return hr; }
@@ -491,12 +482,12 @@ static HRESULT WINAPI avi_decompressor_source_qc_Notify(IQualityControl *iface, TRACE("filter %p, sender %p, type %#x, proportion %u, late %s, timestamp %s.\n", filter, sender, q.Type, q.Proportion, debugstr_time(q.Late), debugstr_time(q.TimeStamp));
- EnterCriticalSection(&filter->stream_cs); + EnterCriticalSection(&filter->filter.stream_cs); if (q.Late > 0) filter->late = q.Late + q.TimeStamp; else filter->late = -1; - LeaveCriticalSection(&filter->stream_cs); + LeaveCriticalSection(&filter->filter.stream_cs); return S_OK; }
@@ -545,8 +536,6 @@ static void avi_decompressor_destroy(struct strmbase_filter *iface) strmbase_source_cleanup(&filter->source); strmbase_passthrough_cleanup(&filter->passthrough);
- filter->stream_cs.DebugInfo->Spare[0] = 0; - DeleteCriticalSection(&filter->stream_cs); strmbase_filter_cleanup(&filter->filter); free(filter);
@@ -614,9 +603,6 @@ HRESULT avi_dec_create(IUnknown *outer, IUnknown **out)
strmbase_filter_init(&object->filter, outer, &CLSID_AVIDec, &filter_ops);
- InitializeCriticalSection(&object->stream_cs); - object->stream_cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__": avi_decompressor.stream_cs"); - strmbase_sink_init(&object->sink, &object->filter, L"In", &sink_ops, NULL);
strmbase_source_init(&object->source, &object->filter, L"Out", &source_ops);
This is a rather large and complex change. It comprises several parts:
(1) Instead of directly sending EOS, segment, and sample events to the downstream DirectShow sink, first queue them in a local buffer (i.e. "pin->event").
(2) Spawn a separate thread per source pin (i.e. "stream_thread") which consumes said events and sends them downstream.
(3) When flushing or stopping, explicitly wait for this thread to pause or stop (respectively).
There are a few reasons for this:
(1) It reduces the number of Unix -> PE callbacks we need to make, easing PE conversion. This is not a great advantage *a priori*, and may not be worth a similar dedicated "handler" thread for most modules, but winegstreamer is different—we were already marshalling these calls onto another thread, and now that marshalling can go away (almost).
(2) Because GStreamer can only do pad negotiation (and hence autoplugging) while running (in contrast to DirectShow, which can do it while stopped), we currently have to renegotiate every time the pipeline is started. Most applications don't start the graph more than once, but even that requires two negotiations, and startup time is demonstrably too high. It would be nice to keep the graph in PAUSED state all of the time, but this is difficult to do without more fine-grained control over the streaming thread. [In particular, we cannot reliably wait for all samples to be delivered except by stopping the GStreamer pipeline.]
Signed-off-by: Zebediah Figura z.figura12@gmail.com --- dlls/winegstreamer/gst_private.h | 1 + dlls/winegstreamer/gstdemux.c | 341 ++++++++++++++++++++++++++----- 2 files changed, 290 insertions(+), 52 deletions(-)
diff --git a/dlls/winegstreamer/gst_private.h b/dlls/winegstreamer/gst_private.h index e591a95f3ca..9cd3c8adff5 100644 --- a/dlls/winegstreamer/gst_private.h +++ b/dlls/winegstreamer/gst_private.h @@ -22,6 +22,7 @@ #define __GST_PRIVATE_INCLUDED__
#include <stdarg.h> +#include <stdbool.h> #include <stdio.h> #include <gst/gst.h> #include <gst/video/video.h> diff --git a/dlls/winegstreamer/gstdemux.c b/dlls/winegstreamer/gstdemux.c index 4a0a95c68f3..30f6fbca168 100644 --- a/dlls/winegstreamer/gstdemux.c +++ b/dlls/winegstreamer/gstdemux.c @@ -60,6 +60,13 @@ struct parser
LONGLONG filesize;
+ CRITICAL_SECTION cs; + + /* FIXME: It would be nice to avoid duplicating these with strmbase. + * However, synchronization is tricky; we need access to be protected by a + * separate lock. */ + bool streaming; + BOOL initial, ignore_flush; GstElement *container; GstPad *my_src, *their_sink; @@ -74,6 +81,24 @@ struct parser HRESULT (*source_get_media_type)(struct parser_source *pin, unsigned int index, AM_MEDIA_TYPE *mt); };
+enum parser_event_type +{ + PARSER_EVENT_NONE = 0, + PARSER_EVENT_BUFFER, + PARSER_EVENT_EOS, + PARSER_EVENT_SEGMENT, +}; + +struct parser_event +{ + enum parser_event_type type; + union + { + GstBuffer *buffer; + GstEvent *segment; + } u; +}; + struct parser_source { struct strmbase_source pin; @@ -85,6 +110,11 @@ struct parser_source GstSegment *segment; GstCaps *caps; SourceSeeking seek; + + CONDITION_VARIABLE event_cv, event_empty_cv, flushing_cv, flush_stop_cv; + bool flushing, thread_blocked; + struct parser_event event; + HANDLE thread; };
static inline struct parser *impl_from_strmbase_filter(struct strmbase_filter *iface) @@ -657,49 +687,62 @@ static gboolean event_src(GstPad *pad, GstObject *parent, GstEvent *event) return ret; }
+static GstFlowReturn queue_stream_event(struct parser_source *pin, const struct parser_event *event) +{ + struct parser *filter = impl_from_strmbase_filter(pin->pin.pin.filter); + + EnterCriticalSection(&filter->cs); + while (filter->streaming && !pin->flushing && pin->event.type != PARSER_EVENT_NONE) + SleepConditionVariableCS(&pin->event_empty_cv, &filter->cs, INFINITE); + if (!filter->streaming || pin->flushing) + { + LeaveCriticalSection(&filter->cs); + TRACE("Filter is flushing; discarding event.\n"); + return GST_FLOW_FLUSHING; + } + pin->event = *event; + LeaveCriticalSection(&filter->cs); + WakeConditionVariable(&pin->event_cv); + TRACE("Event queued.\n"); + return GST_FLOW_OK; +} + static gboolean event_sink(GstPad *pad, GstObject *parent, GstEvent *event) { struct parser_source *pin = gst_pad_get_element_private(pad); + struct parser *filter = impl_from_strmbase_filter(pin->pin.pin.filter);
TRACE("pin %p, type "%s".\n", pin, GST_EVENT_TYPE_NAME(event));
- switch (event->type) { - case GST_EVENT_SEGMENT: { - gdouble rate, applied_rate; - gint64 stop, pos; - const GstSegment *segment; - - gst_event_parse_segment(event, &segment); - - pos = segment->position; - stop = segment->stop; - rate = segment->rate; - applied_rate = segment->applied_rate; - - if (segment->format != GST_FORMAT_TIME) + switch (event->type) + { + case GST_EVENT_SEGMENT: + if (pin->pin.pin.peer) { - FIXME("Unhandled format "%s".\n", gst_format_get_name(segment->format)); - break; + struct parser_event stream_event; + + stream_event.type = PARSER_EVENT_SEGMENT; + stream_event.u.segment = event; + if (queue_stream_event(pin, &stream_event) == GST_FLOW_OK) + { + /* Transfer our reference to the event to the thread. */ + return TRUE; + } } - - gst_segment_copy_into(segment, pin->segment); - - pos /= 100; - - if (stop > 0) - stop /= 100; - - if (pin->pin.pin.peer) - IPin_NewSegment(pin->pin.pin.peer, pos, stop, rate*applied_rate); - break; - } + case GST_EVENT_EOS: if (pin->pin.pin.peer) - IPin_EndOfStream(pin->pin.pin.peer); + { + struct parser_event stream_event; + + stream_event.type = PARSER_EVENT_EOS; + queue_stream_event(pin, &stream_event); + } else SetEvent(pin->eos_event); break; + case GST_EVENT_FLUSH_START: if (impl_from_strmbase_filter(pin->pin.pin.filter)->ignore_flush) { /* gst-plugins-base prior to 1.7 contains a bug which causes @@ -713,13 +756,53 @@ static gboolean event_sink(GstPad *pad, GstObject *parent, GstEvent *event) break; } if (pin->pin.pin.peer) + { IPin_BeginFlush(pin->pin.pin.peer); + + EnterCriticalSection(&filter->cs); + + pin->flushing = true; + WakeConditionVariable(&pin->event_cv); + WakeConditionVariable(&pin->event_empty_cv); + /* Wait for the thread to pause itself, to ensure that no stale + * samples are sent. */ + while (!pin->thread_blocked) + SleepConditionVariableCS(&pin->flushing_cv, &filter->cs, INFINITE); + + /* And flush out any buffered event. */ + switch (pin->event.type) + { + case PARSER_EVENT_NONE: + case PARSER_EVENT_EOS: + break; + + case PARSER_EVENT_BUFFER: + gst_buffer_unref(pin->event.u.buffer); + break; + + case PARSER_EVENT_SEGMENT: + gst_event_unref(pin->event.u.segment); + break; + } + pin->event.type = PARSER_EVENT_NONE; + + LeaveCriticalSection(&filter->cs); + } break; + case GST_EVENT_FLUSH_STOP: gst_segment_init(pin->segment, GST_FORMAT_TIME); if (pin->pin.pin.peer) + { + EnterCriticalSection(&filter->cs); + pin->flushing = false; + LeaveCriticalSection(&filter->cs); + WakeConditionVariable(&pin->flush_stop_cv); + IPin_EndFlush(pin->pin.pin.peer); + } break; + case GST_EVENT_CAPS: { GstCaps *caps; @@ -729,6 +812,7 @@ static gboolean event_sink(GstPad *pad, GstObject *parent, GstEvent *event) SetEvent(pin->caps_event); break; } + default: WARN("Ignoring "%s" event.\n", GST_EVENT_TYPE_NAME(event)); } @@ -806,6 +890,36 @@ static DWORD CALLBACK push_data(LPVOID iface) return 0; }
+static GstFlowReturn got_data_sink(GstPad *pad, GstObject *parent, GstBuffer *buffer) +{ + struct parser_source *pin = gst_pad_get_element_private(pad); + struct parser *filter = impl_from_strmbase_filter(pin->pin.pin.filter); + struct parser_event stream_event; + GstFlowReturn ret; + + TRACE("pad %p, pin %p, buffer %p.\n", pad, pin, buffer); + + if (filter->initial) + { + gst_buffer_unref(buffer); + return GST_FLOW_OK; + } + + if (!pin->pin.pin.peer) + { + gst_buffer_unref(buffer); + return GST_FLOW_NOT_LINKED; + } + + stream_event.type = PARSER_EVENT_BUFFER; + stream_event.u.buffer = buffer; + /* Transfer our reference to the buffer to the thread. */ + if ((ret = queue_stream_event(pin, &stream_event)) != GST_FLOW_OK) + gst_buffer_unref(buffer); + return ret; +} + +/* Fill and send a single IMediaSample. */ static HRESULT send_sample(struct parser_source *pin, IMediaSample *sample, GstBuffer *buf, GstMapInfo *info, gsize offset, gsize size, DWORD bytes_per_second) { @@ -869,21 +983,15 @@ static HRESULT send_sample(struct parser_source *pin, IMediaSample *sample, return hr; }
-static GstFlowReturn got_data_sink(GstPad *pad, GstObject *parent, GstBuffer *buf) +/* Send a single GStreamer buffer (splitting it into multiple IMediaSamples if + * necessary). */ +static void send_buffer(struct parser_source *pin, GstBuffer *buf) { - struct parser_source *pin = gst_pad_get_element_private(pad); - struct parser *This = impl_from_strmbase_filter(pin->pin.pin.filter); - HRESULT hr = S_OK; + HRESULT hr; + BYTE *ptr = NULL; IMediaSample *sample; GstMapInfo info;
- TRACE("%p %p\n", pad, buf); - - if (This->initial) { - gst_buffer_unref(buf); - return GST_FLOW_OK; - } - gst_buffer_map(buf, &info, GST_MAP_READ);
if (IsEqualGUID(&pin->pin.pin.mt.formattype, &FORMAT_WaveFormatEx) @@ -937,14 +1045,93 @@ static GstFlowReturn got_data_sink(GstPad *pad, GstObject *parent, GstBuffer *bu gst_buffer_unmap(buf, &info);
gst_buffer_unref(buf); +}
- if (hr == VFW_E_NOT_CONNECTED) - return GST_FLOW_NOT_LINKED; +static DWORD CALLBACK stream_thread(void *arg) +{ + struct parser_source *pin = arg; + struct parser *filter = impl_from_strmbase_filter(pin->pin.pin.filter);
- if (FAILED(hr)) - return GST_FLOW_FLUSHING; + TRACE("Starting streaming thread for pin %p.\n", pin);
- return GST_FLOW_OK; + for (;;) + { + struct parser_event event; + + EnterCriticalSection(&filter->cs); + + while (filter->streaming && !pin->flushing && pin->event.type == PARSER_EVENT_NONE) + SleepConditionVariableCS(&pin->event_cv, &filter->cs, INFINITE); + + if (pin->flushing) + { + TRACE("Filter is flushing; pausing thread.\n"); + pin->thread_blocked = true; + WakeConditionVariable(&pin->flushing_cv); + do + SleepConditionVariableCS(&pin->flush_stop_cv, &filter->cs, INFINITE); + while (pin->flushing); + pin->thread_blocked = false; + TRACE("Filter is no longer flushing; resuming thread.\n"); + } + + if (!filter->streaming) + { + LeaveCriticalSection(&filter->cs); + break; + } + + if (!pin->event.type) + { + LeaveCriticalSection(&filter->cs); + continue; + } + + event = pin->event; + pin->event.type = PARSER_EVENT_NONE; + WakeConditionVariable(&pin->event_empty_cv); + + LeaveCriticalSection(&filter->cs); + + TRACE("Got event of type %#x.\n", event.type); + + switch (event.type) + { + case PARSER_EVENT_BUFFER: + send_buffer(pin, event.u.buffer); + break; + + case PARSER_EVENT_EOS: + IPin_EndOfStream(pin->pin.pin.peer); + break; + + case PARSER_EVENT_SEGMENT: + { + const GstSegment *segment; + + gst_event_parse_segment(event.u.segment, &segment); + + if (segment->format != GST_FORMAT_TIME) + { + FIXME("Unhandled format "%s".\n", gst_format_get_name(segment->format)); + break; + } + + gst_segment_copy_into(segment, pin->segment); + + IPin_NewSegment(pin->pin.pin.peer, segment->position / 100, + segment->stop / 100, segment->rate * segment->applied_rate); + gst_event_unref(event.u.segment); + break; + } + + case PARSER_EVENT_NONE: + assert(0); + } + } + + TRACE("Streaming stopped; exiting.\n"); + return 0; }
static GstFlowReturn request_buffer_src(GstPad *pad, GstObject *parent, guint64 ofs, guint len, GstBuffer **buffer) @@ -1478,6 +1665,9 @@ static void parser_destroy(struct strmbase_filter *iface) gst_bus_set_sync_handler(filter->bus, NULL, NULL, NULL); gst_object_unref(filter->bus); } + + filter->cs.DebugInfo->Spare[0] = 0; + DeleteCriticalSection(&filter->cs); strmbase_sink_cleanup(&filter->sink); strmbase_filter_cleanup(&filter->filter); heap_free(filter); @@ -1493,12 +1683,21 @@ static HRESULT parser_init_stream(struct strmbase_filter *iface) if (!filter->container) return S_OK;
+ EnterCriticalSection(&filter->cs); + filter->streaming = true; + LeaveCriticalSection(&filter->cs); + for (i = 0; i < filter->source_count; ++i) { HRESULT hr;
- if (filter->sources[i]->pin.pin.peer && FAILED(hr = IMemAllocator_Commit(filter->sources[i]->pin.pAllocator))) + if (!filter->sources[i]->pin.pin.peer) + continue; + + if (FAILED(hr = IMemAllocator_Commit(filter->sources[i]->pin.pAllocator))) ERR("Failed to commit allocator, hr %#x.\n", hr); + + filter->sources[i]->thread = CreateThread(NULL, 0, stream_thread, filter->sources[i], 0, NULL); }
if (filter->no_more_pads_event) @@ -1544,6 +1743,21 @@ static HRESULT parser_cleanup_stream(struct strmbase_filter *iface) if (!filter->container) return S_OK;
+ EnterCriticalSection(&filter->cs); + filter->streaming = false; + LeaveCriticalSection(&filter->cs); + + for (i = 0; i < filter->source_count; ++i) + { + struct parser_source *pin = filter->sources[i]; + + if (!pin->pin.pin.peer) + continue; + + WakeConditionVariable(&pin->event_cv); + WakeConditionVariable(&pin->event_empty_cv); + } + filter->ignore_flush = TRUE; if ((ret = gst_element_set_state(filter->container, GST_STATE_READY)) == GST_STATE_CHANGE_FAILURE) { @@ -1555,8 +1769,16 @@ static HRESULT parser_cleanup_stream(struct strmbase_filter *iface)
for (i = 0; i < filter->source_count; ++i) { - if (filter->sources[i]->pin.pin.peer) - IMemAllocator_Decommit(filter->sources[i]->pin.pAllocator); + struct parser_source *pin = filter->sources[i]; + + if (!pin->pin.pin.peer) + continue; + + IMemAllocator_Decommit(pin->pin.pAllocator); + + WaitForSingleObject(pin->thread, INFINITE); + CloseHandle(pin->thread); + pin->thread = NULL; }
return S_OK; @@ -1797,6 +2019,13 @@ static BOOL parser_init_gstreamer(void) return TRUE; }
+static void parser_init_common(struct parser *object) +{ + object->error_event = CreateEventW(NULL, TRUE, FALSE, NULL); + InitializeCriticalSection(&object->cs); + object->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": parser.cs"); +} + HRESULT decodebin_parser_create(IUnknown *outer, IUnknown **out) { struct parser *object; @@ -1809,11 +2038,12 @@ HRESULT decodebin_parser_create(IUnknown *outer, IUnknown **out) if (!(object = heap_alloc_zero(sizeof(*object)))) return E_OUTOFMEMORY;
+ parser_init_common(object); + strmbase_filter_init(&object->filter, outer, &CLSID_decodebin_parser, &filter_ops); strmbase_sink_init(&object->sink, &object->filter, wcsInputPinName, &sink_ops, NULL);
object->no_more_pads_event = CreateEventW(NULL, FALSE, FALSE, NULL); - object->error_event = CreateEventW(NULL, TRUE, FALSE, NULL); object->init_gst = decodebin_parser_init_gst; object->source_query_accept = decodebin_parser_source_query_accept; object->source_get_media_type = decodebin_parser_source_get_media_type; @@ -2203,6 +2433,10 @@ static struct parser_source *create_pin(struct parser *filter, const WCHAR *name pin->IQualityControl_iface.lpVtbl = &GSTOutPin_QualityControl_Vtbl; strmbase_seeking_init(&pin->seek, &GST_Seeking_Vtbl, GST_ChangeStop, GST_ChangeCurrent, GST_ChangeRate); + InitializeConditionVariable(&pin->event_cv); + InitializeConditionVariable(&pin->event_empty_cv); + InitializeConditionVariable(&pin->flushing_cv); + InitializeConditionVariable(&pin->flush_stop_cv); BaseFilterImpl_IncrementPinVersion(&filter->filter);
sprintf(pad_name, "qz_sink_%u", filter->source_count); @@ -2444,10 +2678,11 @@ HRESULT wave_parser_create(IUnknown *outer, IUnknown **out) if (!(object = heap_alloc_zero(sizeof(*object)))) return E_OUTOFMEMORY;
+ parser_init_common(object); + strmbase_filter_init(&object->filter, outer, &CLSID_WAVEParser, &filter_ops); strmbase_sink_init(&object->sink, &object->filter, sink_name, &wave_parser_sink_ops, NULL); object->init_gst = wave_parser_init_gst; - object->error_event = CreateEventW(NULL, TRUE, FALSE, NULL); object->source_query_accept = wave_parser_source_query_accept; object->source_get_media_type = wave_parser_source_get_media_type;
@@ -2568,10 +2803,11 @@ HRESULT avi_splitter_create(IUnknown *outer, IUnknown **out) if (!(object = heap_alloc_zero(sizeof(*object)))) return E_OUTOFMEMORY;
+ parser_init_common(object); + strmbase_filter_init(&object->filter, outer, &CLSID_AviSplitter, &filter_ops); strmbase_sink_init(&object->sink, &object->filter, sink_name, &avi_splitter_sink_ops, NULL); object->no_more_pads_event = CreateEventW(NULL, FALSE, FALSE, NULL); - object->error_event = CreateEventW(NULL, TRUE, FALSE, NULL); object->init_gst = avi_splitter_init_gst; object->source_query_accept = avi_splitter_source_query_accept; object->source_get_media_type = avi_splitter_source_get_media_type; @@ -2725,12 +2961,13 @@ HRESULT mpeg_splitter_create(IUnknown *outer, IUnknown **out) if (!(object = heap_alloc_zero(sizeof(*object)))) return E_OUTOFMEMORY;
+ parser_init_common(object); + strmbase_filter_init(&object->filter, outer, &CLSID_MPEG1Splitter, &mpeg_splitter_ops); strmbase_sink_init(&object->sink, &object->filter, sink_name, &mpeg_splitter_sink_ops, NULL); object->IAMStreamSelect_iface.lpVtbl = &stream_select_vtbl;
object->duration_event = CreateEventW(NULL, FALSE, FALSE, NULL); - object->error_event = CreateEventW(NULL, TRUE, FALSE, NULL); object->init_gst = mpeg_splitter_init_gst; object->source_query_accept = mpeg_splitter_source_query_accept; object->source_get_media_type = mpeg_splitter_source_get_media_type;