## Context
This is a problem discovered when running "Max: The Curse of Brotherhood" under proton. This game writes 4MB a piece into `wg_transform`, which contains about 4 seconds of compressed video. `wg_transform` calls `gst_pad_push` in `transform_ProcessOutput`, which takes ~300ms (i.e. 20 frames @ 60fps) to decode this buffer of video. So the end result is the game hitches every 4 seconds while playing cut scene videos.
Proton currently has a special case for this particular game, for which it breaks up the buffer into small chunks to avoid long blocks. This MR adopts that and applies it generally.
One concern raised by @redmcg is:
The only issue I can think of is if there are any decoders which don't have a parser; then they might not know how to deal with an arbitrary 4096 byte buffer (the parser is generally responsible for taking arbitrary buffers and producing something the decoder can work with, for example: a full frame).
So this MR only enables this strategy when there is a parser element.
-- v10: winegstreamer: Avoid large buffer pushes in wg_transform.
From: Yuxuan Shui yshui@codeweavers.com
This is done by inserting a special "stepper" element behind the parser (if there is one). This element will keep the incoming buffers in a internal queue and not push them downstream immediately. Instead, it gives us control over when we push and how much is pushed. --- dlls/winegstreamer/unix_private.h | 8 ++ dlls/winegstreamer/unixlib.c | 4 + dlls/winegstreamer/wg_transform.c | 184 +++++++++++++++++++++++++++++- 3 files changed, 191 insertions(+), 5 deletions(-)
diff --git a/dlls/winegstreamer/unix_private.h b/dlls/winegstreamer/unix_private.h index b2cc036c914..149e8e50da5 100644 --- a/dlls/winegstreamer/unix_private.h +++ b/dlls/winegstreamer/unix_private.h @@ -41,6 +41,12 @@ extern GList *find_element_factories(GstElementFactoryListType type, GstRank min GstCaps *element_sink_caps, GstCaps *element_src_caps); extern GstElement *find_element(GstElementFactoryListType type, GstCaps *element_sink_caps, GstCaps *element_src_caps); +/* + * Append `element` to `container`, updates the pointer to the first and last elements in the + * pipeline. Returns whether the operation succeeded. + * + * This takes the ownership of `element` whether it succeeded or not. + */ extern bool append_element(GstElement *container, GstElement *element, GstElement **first, GstElement **last); extern bool link_src_to_sink(GstPad *src_pad, GstPad *sink_pad); extern bool link_src_to_element(GstPad *src_pad, GstElement *element); @@ -104,4 +110,6 @@ extern void wg_allocator_provide_sample(GstAllocator *allocator, struct wg_sampl extern void wg_allocator_release_sample(GstAllocator *allocator, struct wg_sample *sample, bool discard_data);
+GST_ELEMENT_REGISTER_DECLARE(winegstreamerstepper); + #endif /* __WINE_WINEGSTREAMER_UNIX_PRIVATE_H */ diff --git a/dlls/winegstreamer/unixlib.c b/dlls/winegstreamer/unixlib.c index 729008e1f46..7a07b2e0a2a 100644 --- a/dlls/winegstreamer/unixlib.c +++ b/dlls/winegstreamer/unixlib.c @@ -300,6 +300,10 @@ NTSTATUS wg_init_gstreamer(void *arg)
GST_INFO("GStreamer library version %s; wine built with %d.%d.%d.", gst_version_string(), GST_VERSION_MAJOR, GST_VERSION_MINOR, GST_VERSION_MICRO); + + if (!GST_ELEMENT_REGISTER(winegstreamerstepper, NULL)) + GST_ERROR("Failed to register the stepper element"); + return STATUS_SUCCESS; }
diff --git a/dlls/winegstreamer/wg_transform.c b/dlls/winegstreamer/wg_transform.c index 33758401a06..352415ad0b9 100644 --- a/dlls/winegstreamer/wg_transform.c +++ b/dlls/winegstreamer/wg_transform.c @@ -42,11 +42,40 @@
#define GST_SAMPLE_FLAG_WG_CAPS_CHANGED (GST_MINI_OBJECT_FLAG_LAST << 0)
+/* This GstElement takes buffers and events from its sink pad, instead of pushing them + * out the src pad, it keeps them in a internal queue until the step function + * is called manually. + */ +typedef struct _WgStepper +{ + GstElement element; + GstPad *src, *sink; + GstAtomicQueue *fifo; +} WgStepper; + +typedef struct _WgStepperClass +{ + GstElementClass parent_class; +} WgStepperClass; + +#define GST_TYPE_WG_STEPPER (wg_stepper_get_type()) +#define WG_STEPPER(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_WG_STEPPER, WgStepper)) +#define GST_WG_STEPPER_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_WG_STEPPER,WgStepperClass)) +#define GST_IS_WG_STEPPER(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_WG_STEPPER)) +#define GST_IS_WG_STEPPER_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_WG_STEPPER)) + +G_DEFINE_TYPE (WgStepper, wg_stepper, GST_TYPE_ELEMENT); +GST_ELEMENT_REGISTER_DEFINE(winegstreamerstepper, "winegstreamerstepper", GST_RANK_NONE, GST_TYPE_WG_STEPPER); + +static bool wg_stepper_step(WgStepper *stepper); +static void wg_stepper_flush(WgStepper *stepper); + struct wg_transform { struct wg_transform_attrs attrs;
GstElement *container; + WgStepper *stepper; GstAllocator *allocator; GstPad *my_src, *my_sink; GstSegment segment; @@ -493,7 +522,18 @@ static bool transform_create_decoder_elements(struct wg_transform *transform, if ((element = find_element(GST_ELEMENT_FACTORY_TYPE_PARSER, transform->input_caps, parsed_caps)) && !append_element(transform->container, element, first, last)) goto done; - else if (!element) + + if (element) + { + /* We try to intercept buffers produced by the parser, so if we push a large buffer into the + * parser, it won't push everything into the decoder all in one go. + */ + if ((element = create_element("winegstreamerstepper", NULL)) && + append_element(transform->container, element, first, last)) + /* element is owned by the container */ + transform->stepper = WG_STEPPER(element); + } + else { gst_caps_unref(parsed_caps); parsed_caps = gst_caps_ref(transform->input_caps); @@ -1008,7 +1048,8 @@ static NTSTATUS read_transform_output(struct wg_sample *sample, GstBuffer *buffe
static NTSTATUS complete_drain(struct wg_transform *transform) { - if (transform->draining && gst_atomic_queue_length(transform->input_queue) == 0) + bool stepper_empty = transform->stepper == NULL || gst_atomic_queue_length(transform->stepper->fifo) == 0; + if (transform->draining && gst_atomic_queue_length(transform->input_queue) == 0 && stepper_empty) { GstEvent *event; transform->draining = false; @@ -1035,14 +1076,23 @@ error:
static bool get_transform_output(struct wg_transform *transform, struct wg_sample *sample) { - GstBuffer *input_buffer; GstFlowReturn ret;
wg_allocator_provide_sample(transform->allocator, sample);
- while (!(transform->output_sample = gst_atomic_queue_pop(transform->output_queue)) - && (input_buffer = gst_atomic_queue_pop(transform->input_queue))) + while (!(transform->output_sample = gst_atomic_queue_pop(transform->output_queue))) { + GstBuffer *input_buffer; + if (transform->stepper && wg_stepper_step(transform->stepper)) + { + /* If we pushed anything from the stepper, we don't need to dequeue more buffers. */ + complete_drain(transform); + continue; + } + + if (!(input_buffer = gst_atomic_queue_pop(transform->input_queue))) + break; + if ((ret = gst_pad_push(transform->my_src, input_buffer))) GST_WARNING("Failed to push transform input, error %d", ret);
@@ -1182,6 +1232,9 @@ NTSTATUS wg_transform_flush(void *args) while ((input_buffer = gst_atomic_queue_pop(transform->input_queue))) gst_buffer_unref(input_buffer);
+ if (transform->stepper) + wg_stepper_flush(transform->stepper); + if ((status = wg_transform_drain(args))) return status;
@@ -1221,3 +1274,124 @@ NTSTATUS wg_transform_notify_qos(void *args)
return S_OK; } + +/* Move events and at most one buffer from the internal fifo queue to the output src pad. + * Returns true if anything is moved, or false if the fifo is empty. + */ +static bool wg_stepper_step(WgStepper *stepper) +{ + bool pushed = false; + gpointer ptr; + while ((ptr = gst_atomic_queue_pop(stepper->fifo))) + { + if (GST_IS_BUFFER(ptr)) + { + GST_TRACE("Forwarding buffer %"GST_PTR_FORMAT" from fifo", ptr); + gst_pad_push(stepper->src, GST_BUFFER(ptr)); + pushed = true; + break; + } + + if (GST_IS_EVENT(ptr)) + { + GST_TRACE("Processing event %"GST_PTR_FORMAT" from fifo", ptr); + gst_pad_event_default(stepper->sink, GST_OBJECT(stepper), GST_EVENT(ptr)); + pushed = true; + } + } + return pushed; +} + +static void wg_stepper_flush(WgStepper *stepper) +{ + gpointer ptr; + GST_TRACE("Discarding all objects in the fifo"); + while ((ptr = gst_atomic_queue_pop(stepper->fifo))) + { + if (GST_IS_EVENT(ptr)) + gst_event_unref(ptr); + else if (GST_IS_BUFFER(ptr)) + gst_buffer_unref(ptr); + } +} + +static GstStateChangeReturn wg_stepper_change_state(GstElement *element, GstStateChange transition) +{ + WgStepper *this = WG_STEPPER(element); + if (transition == GST_STATE_CHANGE_READY_TO_NULL) + wg_stepper_flush(this); + return GST_STATE_CHANGE_SUCCESS; +} + +static void wg_stepper_class_init(WgStepperClass *klass) +{ + gst_element_class_set_metadata(GST_ELEMENT_CLASS(klass), "winegstreamer buffer stepper", "Connector", + "Hold incoming buffer for manual pushing", "Yuxuan Shui yshui@codeweavers.com"); + klass->parent_class.change_state = wg_stepper_change_state; +} + +static GstFlowReturn wg_stepper_chain_cb(GstPad *pad, GstObject *parent, GstBuffer *buf) +{ + WgStepper *this = WG_STEPPER(parent); + GST_TRACE("Pushing buffer %"GST_PTR_FORMAT" into fifo", buf); + gst_atomic_queue_push(this->fifo, buf); + return GST_FLOW_OK; +} + +static gboolean wg_stepper_event_cb(GstPad *pad, GstObject *parent, GstEvent *event) +{ + WgStepper *this = WG_STEPPER(parent); + if (gst_atomic_queue_length(this->fifo) == 0) + { + GST_TRACE("Processing event %"GST_PTR_FORMAT" immediately", event); + gst_pad_event_default(pad, parent, event); + } + else + { + GST_TRACE("Pushing event %"GST_PTR_FORMAT" into fifo", event); + gst_atomic_queue_push(this->fifo, event); + } + return true; +} + +static gboolean wg_stepper_src_query_cb(GstPad *pad, GstObject *parent, GstQuery *query) +{ + WgStepper *this = WG_STEPPER(parent); + GstPad *peer = gst_pad_get_peer(this->sink); + if (!peer) return gst_pad_query_default(pad, parent, query); + GST_TRACE("Forwarding query %"GST_PTR_FORMAT" to upstream", query); + return gst_pad_query(peer, query); +} + +static void wg_stepper_init(WgStepper *stepper) +{ + static GstStaticPadTemplate sink_factory = + GST_STATIC_PAD_TEMPLATE ( + "sink", + GST_PAD_SINK, + GST_PAD_ALWAYS, + GST_STATIC_CAPS ("ANY") + ); + static GstStaticPadTemplate src_factory = + GST_STATIC_PAD_TEMPLATE( + "src", + GST_PAD_SRC, + GST_PAD_ALWAYS, + GST_STATIC_CAPS("ANY") + ); + stepper->sink = gst_pad_new_from_static_template(&sink_factory, "sink"); + gst_element_add_pad(GST_ELEMENT(stepper), stepper->sink); + gst_pad_set_chain_function(stepper->sink, wg_stepper_chain_cb); + gst_pad_set_event_function(stepper->sink, wg_stepper_event_cb); + GST_PAD_SET_PROXY_CAPS(stepper->sink); + gst_pad_set_active(stepper->sink, true); + + stepper->src = gst_pad_new_from_static_template(&src_factory, "src"); + gst_element_add_pad(GST_ELEMENT(stepper), stepper->src); + gst_pad_set_query_function(stepper->src, wg_stepper_src_query_cb); + gst_pad_set_active(stepper->src, true); + + stepper->fifo = gst_atomic_queue_new(4); + + GST_DEBUG("Created new stepper element %"GST_PTR_FORMAT, stepper); +}
On Mon Mar 3 08:51:18 2025 +0000, Rémi Bernon wrote:
/* We try to intercept buffers produced by the parser, so if we push a large buffer into the * parser, it won't push everything into the decoder all in one go. */ if ((element = create_element("winegstreamerstepper", NULL))) { if (!append_element(transform->container, element, first, last)) g_object_unref(element); else /* element is owned by the container */ transform->stepper = WG_STEPPER(element); }
Please keep this like the other invocations, and as I've suggested above. Ownership is transferred in the append_element call, whether it succeeds or not. If there's anything to fix, it might be in `append_element` itself, depending on whether ownership is indeed preserved if `gst_bin_add` fails, but it's not clear from the documentation, and there are other cases where that calls succeed but `append_element` might still fail in which case ownership has been transferred already.
ok, i changed it. hopefully this is what you meant?
i also checked the code of `gst_bin_add` and can confirm it does always take the ownership of `element`.
This merge request was approved by Rémi Bernon.