From: Yuxuan Shui yshui@codeweavers.com
This is done by inserting a special "hold" element behind the parser (if there is one). This element will hold onto the incoming buffers and not immediately push them downstream. Instead, it gives us control over when we push and how much is pushed. --- dlls/winegstreamer/unix_private.h | 2 + dlls/winegstreamer/unixlib.c | 4 + dlls/winegstreamer/wg_transform.c | 169 +++++++++++++++++++++++++++++- 3 files changed, 171 insertions(+), 4 deletions(-)
diff --git a/dlls/winegstreamer/unix_private.h b/dlls/winegstreamer/unix_private.h index b2cc036c914..caab137fcbc 100644 --- a/dlls/winegstreamer/unix_private.h +++ b/dlls/winegstreamer/unix_private.h @@ -104,4 +104,6 @@ extern void wg_allocator_provide_sample(GstAllocator *allocator, struct wg_sampl extern void wg_allocator_release_sample(GstAllocator *allocator, struct wg_sample *sample, bool discard_data);
+GST_ELEMENT_REGISTER_DECLARE(hold); + #endif /* __WINE_WINEGSTREAMER_UNIX_PRIVATE_H */ diff --git a/dlls/winegstreamer/unixlib.c b/dlls/winegstreamer/unixlib.c index 729008e1f46..08eebf52f43 100644 --- a/dlls/winegstreamer/unixlib.c +++ b/dlls/winegstreamer/unixlib.c @@ -300,6 +300,10 @@ NTSTATUS wg_init_gstreamer(void *arg)
GST_INFO("GStreamer library version %s; wine built with %d.%d.%d.", gst_version_string(), GST_VERSION_MAJOR, GST_VERSION_MINOR, GST_VERSION_MICRO); + + if (!GST_ELEMENT_REGISTER(hold, NULL)) + GST_ERROR("Failed to register the hold element"); + return STATUS_SUCCESS; }
diff --git a/dlls/winegstreamer/wg_transform.c b/dlls/winegstreamer/wg_transform.c index 33758401a06..13b81636305 100644 --- a/dlls/winegstreamer/wg_transform.c +++ b/dlls/winegstreamer/wg_transform.c @@ -42,11 +42,39 @@
#define GST_SAMPLE_FLAG_WG_CAPS_CHANGED (GST_MINI_OBJECT_FLAG_LAST << 0)
+/* This GstElement takes buffers from its sink pad, instead of pushing them + * out the src pad, it keeps them in a internal queue until the push function + * is called manually. + */ +typedef struct _GstHold +{ + GstElement element; + GstPad *src, *sink; + GstAtomicQueue *fifo; +} GstHold; + +typedef struct _GstHoldClass +{ + GstElementClass parent_class; +} GstHoldClass; + +#define GST_TYPE_HOLD (gst_hold_get_type()) +#define GST_HOLD(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_HOLD, GstHold)) +#define GST_MY_FILTER_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_HOLD,GstHoldClass)) +#define GST_IS_MY_FILTER(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_HOLD)) +#define GST_IS_MY_FILTER_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_HOLD)) + +G_DEFINE_TYPE (GstHold, gst_hold, GST_TYPE_ELEMENT); +GST_ELEMENT_REGISTER_DEFINE(hold, "hold", GST_RANK_NONE, GST_TYPE_HOLD); + +static bool gst_hold_push(GstHold *hold); + struct wg_transform { struct wg_transform_attrs attrs;
GstElement *container; + GstHold *hold; GstAllocator *allocator; GstPad *my_src, *my_sink; GstSegment segment; @@ -493,7 +521,27 @@ static bool transform_create_decoder_elements(struct wg_transform *transform, if ((element = find_element(GST_ELEMENT_FACTORY_TYPE_PARSER, transform->input_caps, parsed_caps)) && !append_element(transform->container, element, first, last)) goto done; - else if (!element) + + if (element) + { + /* We try to intercept buffers produced by the parser, so if we push a large buffer into the + * parser, it won't push everything into the decoder all in one go. + */ + if ((element = create_element("hold", NULL))) + { + if (!append_element(transform->container, element, first, last)) + { + GST_ERROR("Failed to append a hold element to the parser"); + g_object_unref(element); + } + else + /* element is owned by the container */ + transform->hold = GST_HOLD(element); + } + else + GST_ERROR("Failed to create a hold element"); + } + else { gst_caps_unref(parsed_caps); parsed_caps = gst_caps_ref(transform->input_caps); @@ -1035,14 +1083,20 @@ error:
static bool get_transform_output(struct wg_transform *transform, struct wg_sample *sample) { - GstBuffer *input_buffer; GstFlowReturn ret;
wg_allocator_provide_sample(transform->allocator, sample);
- while (!(transform->output_sample = gst_atomic_queue_pop(transform->output_queue)) - && (input_buffer = gst_atomic_queue_pop(transform->input_queue))) + while (!(transform->output_sample = gst_atomic_queue_pop(transform->output_queue))) { + GstBuffer *input_buffer; + if (transform->hold && gst_hold_push(transform->hold)) + /* If we pushed anything from the hold, we don't need to dequeue more buffers. */ + continue; + + if (!(input_buffer = gst_atomic_queue_pop(transform->input_queue))) + break; + if ((ret = gst_pad_push(transform->my_src, input_buffer))) GST_WARNING("Failed to push transform input, error %d", ret);
@@ -1221,3 +1275,110 @@ NTSTATUS wg_transform_notify_qos(void *args)
return S_OK; } + +/* Move one buffer from the internal fifo queue to the output src pad. + * Returns true if a buffer is moved, or false if the fifo is empty. + */ +static bool gst_hold_push(GstHold *hold) +{ + bool buffer_pushed = false; + gpointer ptr; + while (!buffer_pushed && (ptr = gst_atomic_queue_pop(hold->fifo))) + { + if (GST_IS_BUFFER(ptr)) + { + GST_TRACE("Forwarding buffer %"GST_PTR_FORMAT" from fifo", ptr); + gst_pad_push(hold->src, GST_BUFFER(ptr)); + buffer_pushed = true; + } + else if (GST_IS_EVENT(ptr)) + { + GST_TRACE("Processing event %"GST_PTR_FORMAT" from fifo", ptr); + gst_pad_event_default(hold->sink, GST_OBJECT(hold), GST_EVENT(ptr)); + } + } + return buffer_pushed; +} + +static GstStateChangeReturn gst_hold_change_state(GstElement *element, GstStateChange transition) +{ + GstHold *this = GST_HOLD(element); + if (transition == GST_STATE_CHANGE_READY_TO_NULL) + { + gpointer ptr; + GST_TRACE("Discarding all objects in the fifo"); + while ((ptr = gst_atomic_queue_pop(this->fifo))) + { + if (GST_IS_EVENT(ptr)) + gst_event_unref(ptr); + else if (GST_IS_BUFFER(ptr)) + gst_buffer_unref(ptr); + } + } + return GST_STATE_CHANGE_SUCCESS; +} + +static void gst_hold_class_init(GstHoldClass *klass) +{ + gst_element_class_set_metadata(GST_ELEMENT_CLASS(klass), "winegstreamer buffer hold", "Connector", + "Hold incoming buffer for manual pushing", "Yuxuan Shui yshui@codeweavers.com"); + klass->parent_class.change_state = gst_hold_change_state; +} + +static GstFlowReturn gst_hold_chain_cb(GstPad *pad, GstObject *parent, GstBuffer *buf) +{ + GstHold *this = GST_HOLD(parent); + GST_TRACE("Pushing buffer %"GST_PTR_FORMAT" into fifo", buf); + gst_atomic_queue_push(this->fifo, buf); + return GST_FLOW_OK; +} + +static gboolean gst_hold_event_cb(GstPad *pad, GstObject *parent, GstEvent *event) +{ + GstHold *this = GST_HOLD(parent); + GST_TRACE("Pushing event %"GST_PTR_FORMAT" into fifo", event); + gst_atomic_queue_push(this->fifo, event); + return true; +} + +static gboolean gst_hold_src_query_cb(GstPad *pad, GstObject *parent, GstQuery *query) +{ + GstHold *this = GST_HOLD(parent); + GstPad *peer = gst_pad_get_peer(this->sink); + if (!peer) return gst_pad_query_default(pad, parent, query); + GST_TRACE("Forwarding query %"GST_PTR_FORMAT" to upstream", query); + return gst_pad_query(peer, query); +} + +static void gst_hold_init(GstHold *hold) +{ + static GstStaticPadTemplate sink_factory = + GST_STATIC_PAD_TEMPLATE ( + "sink", + GST_PAD_SINK, + GST_PAD_ALWAYS, + GST_STATIC_CAPS ("ANY") + ); + static GstStaticPadTemplate src_factory = + GST_STATIC_PAD_TEMPLATE( + "src", + GST_PAD_SRC, + GST_PAD_ALWAYS, + GST_STATIC_CAPS("ANY") + ); + hold->sink = gst_pad_new_from_static_template(&sink_factory, "sink"); + gst_element_add_pad(GST_ELEMENT(hold), hold->sink); + gst_pad_set_chain_function(hold->sink, gst_hold_chain_cb); + gst_pad_set_event_function(hold->sink, gst_hold_event_cb); + GST_PAD_SET_PROXY_CAPS(hold->sink); + gst_pad_set_active(hold->sink, true); + + hold->src = gst_pad_new_from_static_template(&src_factory, "src"); + gst_element_add_pad(GST_ELEMENT(hold), hold->src); + gst_pad_set_query_function(hold->src, gst_hold_src_query_cb); + gst_pad_set_active(hold->src, true); + + hold->fifo = gst_atomic_queue_new(4); + + GST_DEBUG("Created new hold element %"GST_PTR_FORMAT, hold); +}