## Context
This is a problem discovered when running "Max: The Curse of Brotherhood" under proton. This game writes 4MB a piece into `wg_transform`, which contains about 4 seconds of compressed video. `wg_transform` calls `gst_pad_push` in `transform_ProcessOutput`, which takes ~300ms (i.e. 20 frames @ 60fps) to decode this buffer of video. So the end result is the game hitches every 4 seconds while playing cut scene videos.
Proton currently has a special case for this particular game, for which it breaks up the buffer into small chunks to avoid long blocks. This MR adopts that and applies it generally.
One concern raised by @redmcg is:
The only issue I can think of is if there are any decoders which don't have a parser; then they might not know how to deal with an arbitrary 4096 byte buffer (the parser is generally responsible for taking arbitrary buffers and producing something the decoder can work with, for example: a full frame).
So this MR only enables this strategy when there is a parser element.
-- v6: winegstreamer: Avoid large buffer pushes in wg_transform.
From: Yuxuan Shui yshui@codeweavers.com
This is done by inserting a special "stepper" element behind the parser (if there is one). This element will keep the incoming buffers in a internal queue and not push them downstream immediately. Instead, it gives us control over when we push and how much is pushed. --- dlls/winegstreamer/unix_private.h | 2 + dlls/winegstreamer/unixlib.c | 4 + dlls/winegstreamer/wg_transform.c | 174 ++++++++++++++++++++++++++++-- 3 files changed, 173 insertions(+), 7 deletions(-)
diff --git a/dlls/winegstreamer/unix_private.h b/dlls/winegstreamer/unix_private.h index b2cc036c914..7ac82a72fe5 100644 --- a/dlls/winegstreamer/unix_private.h +++ b/dlls/winegstreamer/unix_private.h @@ -104,4 +104,6 @@ extern void wg_allocator_provide_sample(GstAllocator *allocator, struct wg_sampl extern void wg_allocator_release_sample(GstAllocator *allocator, struct wg_sample *sample, bool discard_data);
+GST_ELEMENT_REGISTER_DECLARE(winegstreamerstepper); + #endif /* __WINE_WINEGSTREAMER_UNIX_PRIVATE_H */ diff --git a/dlls/winegstreamer/unixlib.c b/dlls/winegstreamer/unixlib.c index 729008e1f46..7a07b2e0a2a 100644 --- a/dlls/winegstreamer/unixlib.c +++ b/dlls/winegstreamer/unixlib.c @@ -300,6 +300,10 @@ NTSTATUS wg_init_gstreamer(void *arg)
GST_INFO("GStreamer library version %s; wine built with %d.%d.%d.", gst_version_string(), GST_VERSION_MAJOR, GST_VERSION_MINOR, GST_VERSION_MICRO); + + if (!GST_ELEMENT_REGISTER(winegstreamerstepper, NULL)) + GST_ERROR("Failed to register the stepper element"); + return STATUS_SUCCESS; }
diff --git a/dlls/winegstreamer/wg_transform.c b/dlls/winegstreamer/wg_transform.c index 33758401a06..19bca80f7d8 100644 --- a/dlls/winegstreamer/wg_transform.c +++ b/dlls/winegstreamer/wg_transform.c @@ -42,11 +42,39 @@
#define GST_SAMPLE_FLAG_WG_CAPS_CHANGED (GST_MINI_OBJECT_FLAG_LAST << 0)
+/* This GstElement takes buffers from its sink pad, instead of pushing them + * out the src pad, it keeps them in a internal queue until the push function + * is called manually. + */ +typedef struct _WgStepper +{ + GstElement element; + GstPad *src, *sink; + GstAtomicQueue *fifo; +} WgStepper; + +typedef struct _WgStepperClass +{ + GstElementClass parent_class; +} WgStepperClass; + +#define GST_TYPE_WG_STEPPER (wg_stepper_get_type()) +#define WG_STEPPER(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_WG_STEPPER, WgStepper)) +#define GST_WG_STEPPER_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_WG_STEPPER,WgStepperClass)) +#define GST_IS_WG_STEPPER(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_WG_STEPPER)) +#define GST_IS_WG_STEPPER_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_WG_STEPPER)) + +G_DEFINE_TYPE (WgStepper, wg_stepper, GST_TYPE_ELEMENT); +GST_ELEMENT_REGISTER_DEFINE(winegstreamerstepper, "winegstreamerstepper", GST_RANK_NONE, GST_TYPE_WG_STEPPER); + +static bool wg_stepper_step(WgStepper *stepper); + struct wg_transform { struct wg_transform_attrs attrs;
GstElement *container; + WgStepper *stepper; GstAllocator *allocator; GstPad *my_src, *my_sink; GstSegment segment; @@ -493,7 +521,25 @@ static bool transform_create_decoder_elements(struct wg_transform *transform, if ((element = find_element(GST_ELEMENT_FACTORY_TYPE_PARSER, transform->input_caps, parsed_caps)) && !append_element(transform->container, element, first, last)) goto done; - else if (!element) + + if (element) + { + /* We try to intercept buffers produced by the parser, so if we push a large buffer into the + * parser, it won't push everything into the decoder all in one go. + */ + if ((element = create_element("winegstreamerstepper", NULL))) + { + if (!append_element(transform->container, element, first, last)) + { + GST_ERROR("Failed to append a stepper element to the parser"); + g_object_unref(element); + } + else + /* element is owned by the container */ + transform->stepper = WG_STEPPER(element); + } + } + else { gst_caps_unref(parsed_caps); parsed_caps = gst_caps_ref(transform->input_caps); @@ -1008,7 +1054,8 @@ static NTSTATUS read_transform_output(struct wg_sample *sample, GstBuffer *buffe
static NTSTATUS complete_drain(struct wg_transform *transform) { - if (transform->draining && gst_atomic_queue_length(transform->input_queue) == 0) + bool stepper_empty = transform->stepper == NULL || gst_atomic_queue_length(transform->stepper->fifo) == 0; + if (transform->draining && gst_atomic_queue_length(transform->input_queue) == 0 && stepper_empty) { GstEvent *event; transform->draining = false; @@ -1035,20 +1082,26 @@ error:
static bool get_transform_output(struct wg_transform *transform, struct wg_sample *sample) { - GstBuffer *input_buffer; GstFlowReturn ret;
wg_allocator_provide_sample(transform->allocator, sample);
- while (!(transform->output_sample = gst_atomic_queue_pop(transform->output_queue)) - && (input_buffer = gst_atomic_queue_pop(transform->input_queue))) + while (!(transform->output_sample = gst_atomic_queue_pop(transform->output_queue))) { + GstBuffer *input_buffer; + if (transform->stepper && wg_stepper_step(transform->stepper)) + /* If we pushed anything from the stepper, we don't need to dequeue more buffers. */ + continue; + + if (!(input_buffer = gst_atomic_queue_pop(transform->input_queue))) + break; + if ((ret = gst_pad_push(transform->my_src, input_buffer))) GST_WARNING("Failed to push transform input, error %d", ret); - - complete_drain(transform); }
+ complete_drain(transform); + /* Remove the sample so the allocator cannot use it */ wg_allocator_provide_sample(transform->allocator, NULL);
@@ -1221,3 +1274,110 @@ NTSTATUS wg_transform_notify_qos(void *args)
return S_OK; } + +/* Move one buffer from the internal fifo queue to the output src pad. + * Returns true if a buffer is moved, or false if the fifo is empty. + */ +static bool wg_stepper_step(WgStepper *stepper) +{ + bool buffer_pushed = false; + gpointer ptr; + while (!buffer_pushed && (ptr = gst_atomic_queue_pop(stepper->fifo))) + { + if (GST_IS_BUFFER(ptr)) + { + GST_TRACE("Forwarding buffer %"GST_PTR_FORMAT" from fifo", ptr); + gst_pad_push(stepper->src, GST_BUFFER(ptr)); + buffer_pushed = true; + } + else if (GST_IS_EVENT(ptr)) + { + GST_TRACE("Processing event %"GST_PTR_FORMAT" from fifo", ptr); + gst_pad_event_default(stepper->sink, GST_OBJECT(stepper), GST_EVENT(ptr)); + } + } + return buffer_pushed; +} + +static GstStateChangeReturn wg_stepper_change_state(GstElement *element, GstStateChange transition) +{ + WgStepper *this = WG_STEPPER(element); + if (transition == GST_STATE_CHANGE_READY_TO_NULL) + { + gpointer ptr; + GST_TRACE("Discarding all objects in the fifo"); + while ((ptr = gst_atomic_queue_pop(this->fifo))) + { + if (GST_IS_EVENT(ptr)) + gst_event_unref(ptr); + else if (GST_IS_BUFFER(ptr)) + gst_buffer_unref(ptr); + } + } + return GST_STATE_CHANGE_SUCCESS; +} + +static void wg_stepper_class_init(WgStepperClass *klass) +{ + gst_element_class_set_metadata(GST_ELEMENT_CLASS(klass), "winegstreamer buffer stepper", "Connector", + "Hold incoming buffer for manual pushing", "Yuxuan Shui yshui@codeweavers.com"); + klass->parent_class.change_state = wg_stepper_change_state; +} + +static GstFlowReturn wg_stepper_chain_cb(GstPad *pad, GstObject *parent, GstBuffer *buf) +{ + WgStepper *this = WG_STEPPER(parent); + GST_TRACE("Pushing buffer %"GST_PTR_FORMAT" into fifo", buf); + gst_atomic_queue_push(this->fifo, buf); + return GST_FLOW_OK; +} + +static gboolean wg_stepper_event_cb(GstPad *pad, GstObject *parent, GstEvent *event) +{ + WgStepper *this = WG_STEPPER(parent); + GST_TRACE("Pushing event %"GST_PTR_FORMAT" into fifo", event); + gst_atomic_queue_push(this->fifo, event); + return true; +} + +static gboolean wg_stepper_src_query_cb(GstPad *pad, GstObject *parent, GstQuery *query) +{ + WgStepper *this = WG_STEPPER(parent); + GstPad *peer = gst_pad_get_peer(this->sink); + if (!peer) return gst_pad_query_default(pad, parent, query); + GST_TRACE("Forwarding query %"GST_PTR_FORMAT" to upstream", query); + return gst_pad_query(peer, query); +} + +static void wg_stepper_init(WgStepper *stepper) +{ + static GstStaticPadTemplate sink_factory = + GST_STATIC_PAD_TEMPLATE ( + "sink", + GST_PAD_SINK, + GST_PAD_ALWAYS, + GST_STATIC_CAPS ("ANY") + ); + static GstStaticPadTemplate src_factory = + GST_STATIC_PAD_TEMPLATE( + "src", + GST_PAD_SRC, + GST_PAD_ALWAYS, + GST_STATIC_CAPS("ANY") + ); + stepper->sink = gst_pad_new_from_static_template(&sink_factory, "sink"); + gst_element_add_pad(GST_ELEMENT(stepper), stepper->sink); + gst_pad_set_chain_function(stepper->sink, wg_stepper_chain_cb); + gst_pad_set_event_function(stepper->sink, wg_stepper_event_cb); + GST_PAD_SET_PROXY_CAPS(stepper->sink); + gst_pad_set_active(stepper->sink, true); + + stepper->src = gst_pad_new_from_static_template(&src_factory, "src"); + gst_element_add_pad(GST_ELEMENT(stepper), stepper->src); + gst_pad_set_query_function(stepper->src, wg_stepper_src_query_cb); + gst_pad_set_active(stepper->src, true); + + stepper->fifo = gst_atomic_queue_new(4); + + GST_DEBUG("Created new stepper element %"GST_PTR_FORMAT, stepper); +}
On Wed Feb 26 15:53:58 2025 +0000, Yuxuan Shui wrote:
changed this line in [version 6 of the diff](/wine/wine/-/merge_requests/7288/diffs?diff_id=160527&start_sha=ada67ae75708b8aca18d53f174beb54eb47f58fa#1db08ec81aafecfe3bd575cfcd57f9db6a4bfb1b_54_54)
i tried to come up a good name, i think holder or queue doesn't quite capture it. i changed it to "stepper" which is the best i can think of.
(btw, technically "hold" can be a noun).