## Context
This is a problem discovered when running "Max: The Curse of Brotherhood" under proton. This game writes 4MB a piece into `wg_transform`, which contains about 4 seconds of compressed video. `wg_transform` calls `gst_pad_push` in `transform_ProcessOutput`, which takes ~300ms (i.e. 20 frames @ 60fps) to decode this buffer of video. So the end result is the game hitches every 4 seconds while playing cut scene videos.
Proton currently has a special case for this particular game, for which it breaks up the buffer into small chunks to avoid long blocks. This MR adopts that and applies it generally.
One concern raised by @redmcg is:
The only issue I can think of is if there are any decoders which don't have a parser; then they might not know how to deal with an arbitrary 4096 byte buffer (the parser is generally responsible for taking arbitrary buffers and producing something the decoder can work with, for example: a full frame).
So this MR only enables this strategy when there is a parser element.
-- v5: winegstreamer: Avoid large buffer pushes in wg_transform.
From: Yuxuan Shui yshui@codeweavers.com
This is done by inserting a special "hold" element behind the parser (if there is one). This element will hold onto the incoming buffers and not immediately push them downstream. Instead, it gives us control over when we push and how much is pushed. --- dlls/winegstreamer/unix_private.h | 2 + dlls/winegstreamer/unixlib.c | 4 + dlls/winegstreamer/wg_transform.c | 169 +++++++++++++++++++++++++++++- 3 files changed, 171 insertions(+), 4 deletions(-)
diff --git a/dlls/winegstreamer/unix_private.h b/dlls/winegstreamer/unix_private.h index b2cc036c914..caab137fcbc 100644 --- a/dlls/winegstreamer/unix_private.h +++ b/dlls/winegstreamer/unix_private.h @@ -104,4 +104,6 @@ extern void wg_allocator_provide_sample(GstAllocator *allocator, struct wg_sampl extern void wg_allocator_release_sample(GstAllocator *allocator, struct wg_sample *sample, bool discard_data);
+GST_ELEMENT_REGISTER_DECLARE(hold); + #endif /* __WINE_WINEGSTREAMER_UNIX_PRIVATE_H */ diff --git a/dlls/winegstreamer/unixlib.c b/dlls/winegstreamer/unixlib.c index 729008e1f46..08eebf52f43 100644 --- a/dlls/winegstreamer/unixlib.c +++ b/dlls/winegstreamer/unixlib.c @@ -300,6 +300,10 @@ NTSTATUS wg_init_gstreamer(void *arg)
GST_INFO("GStreamer library version %s; wine built with %d.%d.%d.", gst_version_string(), GST_VERSION_MAJOR, GST_VERSION_MINOR, GST_VERSION_MICRO); + + if (!GST_ELEMENT_REGISTER(hold, NULL)) + GST_ERROR("Failed to register the hold element"); + return STATUS_SUCCESS; }
diff --git a/dlls/winegstreamer/wg_transform.c b/dlls/winegstreamer/wg_transform.c index 33758401a06..13b81636305 100644 --- a/dlls/winegstreamer/wg_transform.c +++ b/dlls/winegstreamer/wg_transform.c @@ -42,11 +42,39 @@
#define GST_SAMPLE_FLAG_WG_CAPS_CHANGED (GST_MINI_OBJECT_FLAG_LAST << 0)
+/* This GstElement takes buffers from its sink pad, instead of pushing them + * out the src pad, it keeps them in a internal queue until the push function + * is called manually. + */ +typedef struct _GstHold +{ + GstElement element; + GstPad *src, *sink; + GstAtomicQueue *fifo; +} GstHold; + +typedef struct _GstHoldClass +{ + GstElementClass parent_class; +} GstHoldClass; + +#define GST_TYPE_HOLD (gst_hold_get_type()) +#define GST_HOLD(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_HOLD, GstHold)) +#define GST_MY_FILTER_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_HOLD,GstHoldClass)) +#define GST_IS_MY_FILTER(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_HOLD)) +#define GST_IS_MY_FILTER_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_HOLD)) + +G_DEFINE_TYPE (GstHold, gst_hold, GST_TYPE_ELEMENT); +GST_ELEMENT_REGISTER_DEFINE(hold, "hold", GST_RANK_NONE, GST_TYPE_HOLD); + +static bool gst_hold_push(GstHold *hold); + struct wg_transform { struct wg_transform_attrs attrs;
GstElement *container; + GstHold *hold; GstAllocator *allocator; GstPad *my_src, *my_sink; GstSegment segment; @@ -493,7 +521,27 @@ static bool transform_create_decoder_elements(struct wg_transform *transform, if ((element = find_element(GST_ELEMENT_FACTORY_TYPE_PARSER, transform->input_caps, parsed_caps)) && !append_element(transform->container, element, first, last)) goto done; - else if (!element) + + if (element) + { + /* We try to intercept buffers produced by the parser, so if we push a large buffer into the + * parser, it won't push everything into the decoder all in one go. + */ + if ((element = create_element("hold", NULL))) + { + if (!append_element(transform->container, element, first, last)) + { + GST_ERROR("Failed to append a hold element to the parser"); + g_object_unref(element); + } + else + /* element is owned by the container */ + transform->hold = GST_HOLD(element); + } + else + GST_ERROR("Failed to create a hold element"); + } + else { gst_caps_unref(parsed_caps); parsed_caps = gst_caps_ref(transform->input_caps); @@ -1035,14 +1083,20 @@ error:
static bool get_transform_output(struct wg_transform *transform, struct wg_sample *sample) { - GstBuffer *input_buffer; GstFlowReturn ret;
wg_allocator_provide_sample(transform->allocator, sample);
- while (!(transform->output_sample = gst_atomic_queue_pop(transform->output_queue)) - && (input_buffer = gst_atomic_queue_pop(transform->input_queue))) + while (!(transform->output_sample = gst_atomic_queue_pop(transform->output_queue))) { + GstBuffer *input_buffer; + if (transform->hold && gst_hold_push(transform->hold)) + /* If we pushed anything from the hold, we don't need to dequeue more buffers. */ + continue; + + if (!(input_buffer = gst_atomic_queue_pop(transform->input_queue))) + break; + if ((ret = gst_pad_push(transform->my_src, input_buffer))) GST_WARNING("Failed to push transform input, error %d", ret);
@@ -1221,3 +1275,110 @@ NTSTATUS wg_transform_notify_qos(void *args)
return S_OK; } + +/* Move one buffer from the internal fifo queue to the output src pad. + * Returns true if a buffer is moved, or false if the fifo is empty. + */ +static bool gst_hold_push(GstHold *hold) +{ + bool buffer_pushed = false; + gpointer ptr; + while (!buffer_pushed && (ptr = gst_atomic_queue_pop(hold->fifo))) + { + if (GST_IS_BUFFER(ptr)) + { + GST_TRACE("Forwarding buffer %"GST_PTR_FORMAT" from fifo", ptr); + gst_pad_push(hold->src, GST_BUFFER(ptr)); + buffer_pushed = true; + } + else if (GST_IS_EVENT(ptr)) + { + GST_TRACE("Processing event %"GST_PTR_FORMAT" from fifo", ptr); + gst_pad_event_default(hold->sink, GST_OBJECT(hold), GST_EVENT(ptr)); + } + } + return buffer_pushed; +} + +static GstStateChangeReturn gst_hold_change_state(GstElement *element, GstStateChange transition) +{ + GstHold *this = GST_HOLD(element); + if (transition == GST_STATE_CHANGE_READY_TO_NULL) + { + gpointer ptr; + GST_TRACE("Discarding all objects in the fifo"); + while ((ptr = gst_atomic_queue_pop(this->fifo))) + { + if (GST_IS_EVENT(ptr)) + gst_event_unref(ptr); + else if (GST_IS_BUFFER(ptr)) + gst_buffer_unref(ptr); + } + } + return GST_STATE_CHANGE_SUCCESS; +} + +static void gst_hold_class_init(GstHoldClass *klass) +{ + gst_element_class_set_metadata(GST_ELEMENT_CLASS(klass), "winegstreamer buffer hold", "Connector", + "Hold incoming buffer for manual pushing", "Yuxuan Shui yshui@codeweavers.com"); + klass->parent_class.change_state = gst_hold_change_state; +} + +static GstFlowReturn gst_hold_chain_cb(GstPad *pad, GstObject *parent, GstBuffer *buf) +{ + GstHold *this = GST_HOLD(parent); + GST_TRACE("Pushing buffer %"GST_PTR_FORMAT" into fifo", buf); + gst_atomic_queue_push(this->fifo, buf); + return GST_FLOW_OK; +} + +static gboolean gst_hold_event_cb(GstPad *pad, GstObject *parent, GstEvent *event) +{ + GstHold *this = GST_HOLD(parent); + GST_TRACE("Pushing event %"GST_PTR_FORMAT" into fifo", event); + gst_atomic_queue_push(this->fifo, event); + return true; +} + +static gboolean gst_hold_src_query_cb(GstPad *pad, GstObject *parent, GstQuery *query) +{ + GstHold *this = GST_HOLD(parent); + GstPad *peer = gst_pad_get_peer(this->sink); + if (!peer) return gst_pad_query_default(pad, parent, query); + GST_TRACE("Forwarding query %"GST_PTR_FORMAT" to upstream", query); + return gst_pad_query(peer, query); +} + +static void gst_hold_init(GstHold *hold) +{ + static GstStaticPadTemplate sink_factory = + GST_STATIC_PAD_TEMPLATE ( + "sink", + GST_PAD_SINK, + GST_PAD_ALWAYS, + GST_STATIC_CAPS ("ANY") + ); + static GstStaticPadTemplate src_factory = + GST_STATIC_PAD_TEMPLATE( + "src", + GST_PAD_SRC, + GST_PAD_ALWAYS, + GST_STATIC_CAPS("ANY") + ); + hold->sink = gst_pad_new_from_static_template(&sink_factory, "sink"); + gst_element_add_pad(GST_ELEMENT(hold), hold->sink); + gst_pad_set_chain_function(hold->sink, gst_hold_chain_cb); + gst_pad_set_event_function(hold->sink, gst_hold_event_cb); + GST_PAD_SET_PROXY_CAPS(hold->sink); + gst_pad_set_active(hold->sink, true); + + hold->src = gst_pad_new_from_static_template(&src_factory, "src"); + gst_element_add_pad(GST_ELEMENT(hold), hold->src); + gst_pad_set_query_function(hold->src, gst_hold_src_query_cb); + gst_pad_set_active(hold->src, true); + + hold->fifo = gst_atomic_queue_new(4); + + GST_DEBUG("Created new hold element %"GST_PTR_FORMAT, hold); +}
(sorry for the multiple updates)
removed no longer needed changes to `struct wg_transform`, and some style changes.
Rémi Bernon (@rbernon) commented about dlls/winegstreamer/wg_transform.c:
/* We try to intercept buffers produced by the parser, so if we push a large buffer into the
* parser, it won't push everything into the decoder all in one go.
*/
if ((element = create_element("hold", NULL)))
{
if (!append_element(transform->container, element, first, last))
{
GST_ERROR("Failed to append a hold element to the parser");
g_object_unref(element);
}
else
/* element is owned by the container */
transform->hold = GST_HOLD(element);
}
else
GST_ERROR("Failed to create a hold element");
```suggestion:-12+0 if (!(element = create_element("hold", NULL)) || !append_element(transform->container, element, first, last)) goto done; /* element is owned by the container */ transform->hold = GST_HOLD(element); ```
Error messages are already printed by create_element/append_element.
Rémi Bernon (@rbernon) commented about dlls/winegstreamer/wg_transform.c:
#define GST_SAMPLE_FLAG_WG_CAPS_CHANGED (GST_MINI_OBJECT_FLAG_LAST << 0)
+/* This GstElement takes buffers from its sink pad, instead of pushing them
- out the src pad, it keeps them in a internal queue until the push function
- is called manually.
- */
+typedef struct _GstHold +{
- GstElement element;
- GstPad *src, *sink;
- GstAtomicQueue *fifo;
+} GstHold;
I don't think we should prefix our custom elements with Gst/gst_. Other custom objects use Wg/wg_ prefixes. Also I think "hold" is a verb and it would probably be more idiomatic to use a noun here like "holder" or "queue" as it's kind of the same purpose as the builtin queue but with custom semantics.
Looks quite nice otherwise! Note that there's some possibly related mf:transform test failure that would need to be investigated.
On Wed Feb 26 15:14:21 2025 +0000, Rémi Bernon wrote:
Looks quite nice otherwise! Note that there's some possibly related mf:transform test failure that would need to be investigated.
hmm, i think it might be related to draining