From: Yuxuan Shui yshui@codeweavers.com
This is done by inserting a special "stepper" element behind the parser (if there is one). This element will keep the incoming buffers in a internal queue and not push them downstream immediately. Instead, it gives us control over when we push and how much is pushed. --- dlls/winegstreamer/unix_private.h | 8 ++ dlls/winegstreamer/unixlib.c | 4 + dlls/winegstreamer/wg_transform.c | 184 +++++++++++++++++++++++++++++- 3 files changed, 191 insertions(+), 5 deletions(-)
diff --git a/dlls/winegstreamer/unix_private.h b/dlls/winegstreamer/unix_private.h index b2cc036c914..149e8e50da5 100644 --- a/dlls/winegstreamer/unix_private.h +++ b/dlls/winegstreamer/unix_private.h @@ -41,6 +41,12 @@ extern GList *find_element_factories(GstElementFactoryListType type, GstRank min GstCaps *element_sink_caps, GstCaps *element_src_caps); extern GstElement *find_element(GstElementFactoryListType type, GstCaps *element_sink_caps, GstCaps *element_src_caps); +/* + * Append `element` to `container`, updates the pointer to the first and last elements in the + * pipeline. Returns whether the operation succeeded. + * + * This takes the ownership of `element` whether it succeeded or not. + */ extern bool append_element(GstElement *container, GstElement *element, GstElement **first, GstElement **last); extern bool link_src_to_sink(GstPad *src_pad, GstPad *sink_pad); extern bool link_src_to_element(GstPad *src_pad, GstElement *element); @@ -104,4 +110,6 @@ extern void wg_allocator_provide_sample(GstAllocator *allocator, struct wg_sampl extern void wg_allocator_release_sample(GstAllocator *allocator, struct wg_sample *sample, bool discard_data);
+GST_ELEMENT_REGISTER_DECLARE(winegstreamerstepper); + #endif /* __WINE_WINEGSTREAMER_UNIX_PRIVATE_H */ diff --git a/dlls/winegstreamer/unixlib.c b/dlls/winegstreamer/unixlib.c index 729008e1f46..7a07b2e0a2a 100644 --- a/dlls/winegstreamer/unixlib.c +++ b/dlls/winegstreamer/unixlib.c @@ -300,6 +300,10 @@ NTSTATUS wg_init_gstreamer(void *arg)
GST_INFO("GStreamer library version %s; wine built with %d.%d.%d.", gst_version_string(), GST_VERSION_MAJOR, GST_VERSION_MINOR, GST_VERSION_MICRO); + + if (!GST_ELEMENT_REGISTER(winegstreamerstepper, NULL)) + GST_ERROR("Failed to register the stepper element"); + return STATUS_SUCCESS; }
diff --git a/dlls/winegstreamer/wg_transform.c b/dlls/winegstreamer/wg_transform.c index 33758401a06..352415ad0b9 100644 --- a/dlls/winegstreamer/wg_transform.c +++ b/dlls/winegstreamer/wg_transform.c @@ -42,11 +42,40 @@
#define GST_SAMPLE_FLAG_WG_CAPS_CHANGED (GST_MINI_OBJECT_FLAG_LAST << 0)
+/* This GstElement takes buffers and events from its sink pad, instead of pushing them + * out the src pad, it keeps them in a internal queue until the step function + * is called manually. + */ +typedef struct _WgStepper +{ + GstElement element; + GstPad *src, *sink; + GstAtomicQueue *fifo; +} WgStepper; + +typedef struct _WgStepperClass +{ + GstElementClass parent_class; +} WgStepperClass; + +#define GST_TYPE_WG_STEPPER (wg_stepper_get_type()) +#define WG_STEPPER(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_WG_STEPPER, WgStepper)) +#define GST_WG_STEPPER_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_WG_STEPPER,WgStepperClass)) +#define GST_IS_WG_STEPPER(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_WG_STEPPER)) +#define GST_IS_WG_STEPPER_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_WG_STEPPER)) + +G_DEFINE_TYPE (WgStepper, wg_stepper, GST_TYPE_ELEMENT); +GST_ELEMENT_REGISTER_DEFINE(winegstreamerstepper, "winegstreamerstepper", GST_RANK_NONE, GST_TYPE_WG_STEPPER); + +static bool wg_stepper_step(WgStepper *stepper); +static void wg_stepper_flush(WgStepper *stepper); + struct wg_transform { struct wg_transform_attrs attrs;
GstElement *container; + WgStepper *stepper; GstAllocator *allocator; GstPad *my_src, *my_sink; GstSegment segment; @@ -493,7 +522,18 @@ static bool transform_create_decoder_elements(struct wg_transform *transform, if ((element = find_element(GST_ELEMENT_FACTORY_TYPE_PARSER, transform->input_caps, parsed_caps)) && !append_element(transform->container, element, first, last)) goto done; - else if (!element) + + if (element) + { + /* We try to intercept buffers produced by the parser, so if we push a large buffer into the + * parser, it won't push everything into the decoder all in one go. + */ + if ((element = create_element("winegstreamerstepper", NULL)) && + append_element(transform->container, element, first, last)) + /* element is owned by the container */ + transform->stepper = WG_STEPPER(element); + } + else { gst_caps_unref(parsed_caps); parsed_caps = gst_caps_ref(transform->input_caps); @@ -1008,7 +1048,8 @@ static NTSTATUS read_transform_output(struct wg_sample *sample, GstBuffer *buffe
static NTSTATUS complete_drain(struct wg_transform *transform) { - if (transform->draining && gst_atomic_queue_length(transform->input_queue) == 0) + bool stepper_empty = transform->stepper == NULL || gst_atomic_queue_length(transform->stepper->fifo) == 0; + if (transform->draining && gst_atomic_queue_length(transform->input_queue) == 0 && stepper_empty) { GstEvent *event; transform->draining = false; @@ -1035,14 +1076,23 @@ error:
static bool get_transform_output(struct wg_transform *transform, struct wg_sample *sample) { - GstBuffer *input_buffer; GstFlowReturn ret;
wg_allocator_provide_sample(transform->allocator, sample);
- while (!(transform->output_sample = gst_atomic_queue_pop(transform->output_queue)) - && (input_buffer = gst_atomic_queue_pop(transform->input_queue))) + while (!(transform->output_sample = gst_atomic_queue_pop(transform->output_queue))) { + GstBuffer *input_buffer; + if (transform->stepper && wg_stepper_step(transform->stepper)) + { + /* If we pushed anything from the stepper, we don't need to dequeue more buffers. */ + complete_drain(transform); + continue; + } + + if (!(input_buffer = gst_atomic_queue_pop(transform->input_queue))) + break; + if ((ret = gst_pad_push(transform->my_src, input_buffer))) GST_WARNING("Failed to push transform input, error %d", ret);
@@ -1182,6 +1232,9 @@ NTSTATUS wg_transform_flush(void *args) while ((input_buffer = gst_atomic_queue_pop(transform->input_queue))) gst_buffer_unref(input_buffer);
+ if (transform->stepper) + wg_stepper_flush(transform->stepper); + if ((status = wg_transform_drain(args))) return status;
@@ -1221,3 +1274,124 @@ NTSTATUS wg_transform_notify_qos(void *args)
return S_OK; } + +/* Move events and at most one buffer from the internal fifo queue to the output src pad. + * Returns true if anything is moved, or false if the fifo is empty. + */ +static bool wg_stepper_step(WgStepper *stepper) +{ + bool pushed = false; + gpointer ptr; + while ((ptr = gst_atomic_queue_pop(stepper->fifo))) + { + if (GST_IS_BUFFER(ptr)) + { + GST_TRACE("Forwarding buffer %"GST_PTR_FORMAT" from fifo", ptr); + gst_pad_push(stepper->src, GST_BUFFER(ptr)); + pushed = true; + break; + } + + if (GST_IS_EVENT(ptr)) + { + GST_TRACE("Processing event %"GST_PTR_FORMAT" from fifo", ptr); + gst_pad_event_default(stepper->sink, GST_OBJECT(stepper), GST_EVENT(ptr)); + pushed = true; + } + } + return pushed; +} + +static void wg_stepper_flush(WgStepper *stepper) +{ + gpointer ptr; + GST_TRACE("Discarding all objects in the fifo"); + while ((ptr = gst_atomic_queue_pop(stepper->fifo))) + { + if (GST_IS_EVENT(ptr)) + gst_event_unref(ptr); + else if (GST_IS_BUFFER(ptr)) + gst_buffer_unref(ptr); + } +} + +static GstStateChangeReturn wg_stepper_change_state(GstElement *element, GstStateChange transition) +{ + WgStepper *this = WG_STEPPER(element); + if (transition == GST_STATE_CHANGE_READY_TO_NULL) + wg_stepper_flush(this); + return GST_STATE_CHANGE_SUCCESS; +} + +static void wg_stepper_class_init(WgStepperClass *klass) +{ + gst_element_class_set_metadata(GST_ELEMENT_CLASS(klass), "winegstreamer buffer stepper", "Connector", + "Hold incoming buffer for manual pushing", "Yuxuan Shui yshui@codeweavers.com"); + klass->parent_class.change_state = wg_stepper_change_state; +} + +static GstFlowReturn wg_stepper_chain_cb(GstPad *pad, GstObject *parent, GstBuffer *buf) +{ + WgStepper *this = WG_STEPPER(parent); + GST_TRACE("Pushing buffer %"GST_PTR_FORMAT" into fifo", buf); + gst_atomic_queue_push(this->fifo, buf); + return GST_FLOW_OK; +} + +static gboolean wg_stepper_event_cb(GstPad *pad, GstObject *parent, GstEvent *event) +{ + WgStepper *this = WG_STEPPER(parent); + if (gst_atomic_queue_length(this->fifo) == 0) + { + GST_TRACE("Processing event %"GST_PTR_FORMAT" immediately", event); + gst_pad_event_default(pad, parent, event); + } + else + { + GST_TRACE("Pushing event %"GST_PTR_FORMAT" into fifo", event); + gst_atomic_queue_push(this->fifo, event); + } + return true; +} + +static gboolean wg_stepper_src_query_cb(GstPad *pad, GstObject *parent, GstQuery *query) +{ + WgStepper *this = WG_STEPPER(parent); + GstPad *peer = gst_pad_get_peer(this->sink); + if (!peer) return gst_pad_query_default(pad, parent, query); + GST_TRACE("Forwarding query %"GST_PTR_FORMAT" to upstream", query); + return gst_pad_query(peer, query); +} + +static void wg_stepper_init(WgStepper *stepper) +{ + static GstStaticPadTemplate sink_factory = + GST_STATIC_PAD_TEMPLATE ( + "sink", + GST_PAD_SINK, + GST_PAD_ALWAYS, + GST_STATIC_CAPS ("ANY") + ); + static GstStaticPadTemplate src_factory = + GST_STATIC_PAD_TEMPLATE( + "src", + GST_PAD_SRC, + GST_PAD_ALWAYS, + GST_STATIC_CAPS("ANY") + ); + stepper->sink = gst_pad_new_from_static_template(&sink_factory, "sink"); + gst_element_add_pad(GST_ELEMENT(stepper), stepper->sink); + gst_pad_set_chain_function(stepper->sink, wg_stepper_chain_cb); + gst_pad_set_event_function(stepper->sink, wg_stepper_event_cb); + GST_PAD_SET_PROXY_CAPS(stepper->sink); + gst_pad_set_active(stepper->sink, true); + + stepper->src = gst_pad_new_from_static_template(&src_factory, "src"); + gst_element_add_pad(GST_ELEMENT(stepper), stepper->src); + gst_pad_set_query_function(stepper->src, wg_stepper_src_query_cb); + gst_pad_set_active(stepper->src, true); + + stepper->fifo = gst_atomic_queue_new(4); + + GST_DEBUG("Created new stepper element %"GST_PTR_FORMAT, stepper); +}