-- v6: winegstreamer: Use an atomic queue for wg_transform input buffers. winegstreamer: Release requested samples if they are too small. mf/tests: Add todo_wine for newer FFmpeg versions.
From: Rémi Bernon rbernon@codeweavers.com
Signed-off-by: Rémi Bernon rbernon@codeweavers.com --- dlls/mf/tests/mf.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-)
diff --git a/dlls/mf/tests/mf.c b/dlls/mf/tests/mf.c index 2f3ee3151da..5f26b1a6238 100644 --- a/dlls/mf/tests/mf.c +++ b/dlls/mf/tests/mf.c @@ -5911,7 +5911,7 @@ static void check_sample_pcm16_(int line, IMFSample *sample, const BYTE *expect_ if (expect - value + 512 > 1024) break; }
- todo_wine_if(todo) + todo_wine_if(todo && i < length / 2) ok_(__FILE__, line)(i == length, "unexpected buffer data\n");
if (output_file) WriteFile(output_file, buffer, length, &length, NULL); @@ -6544,6 +6544,9 @@ static void test_wma_decoder(void) hr = IMFTransform_ProcessOutput(transform, 0, 1, &output, &status);
winetest_pop_context(); + + /* some FFmpeg version request more input to complete decoding */ + if (hr == MF_E_TRANSFORM_NEED_MORE_INPUT && i == 2) break; } todo_wine ok(wmadec_data_len == 0, "missing %#lx bytes\n", wmadec_data_len);
From: Rémi Bernon rbernon@codeweavers.com
Signed-off-by: Rémi Bernon rbernon@codeweavers.com --- dlls/winegstreamer/wg_allocator.c | 8 +++++++- dlls/winegstreamer/wg_transform.c | 7 +------ 2 files changed, 8 insertions(+), 7 deletions(-)
diff --git a/dlls/winegstreamer/wg_allocator.c b/dlls/winegstreamer/wg_allocator.c index c31751ce83f..5b2c48dcf77 100644 --- a/dlls/winegstreamer/wg_allocator.c +++ b/dlls/winegstreamer/wg_allocator.c @@ -150,6 +150,7 @@ static GstMemory *wg_allocator_alloc(GstAllocator *gst_allocator, gsize size, GstAllocationParams *params) { WgAllocator *allocator = (WgAllocator *)gst_allocator; + struct wg_sample *sample; WgMemory *memory;
GST_LOG("allocator %p, size %#zx, params %p", allocator, size, params); @@ -162,7 +163,12 @@ static GstMemory *wg_allocator_alloc(GstAllocator *gst_allocator, gsize size,
pthread_mutex_lock(&allocator->mutex);
- memory->sample = allocator->request_sample(size, allocator->request_sample_context); + sample = allocator->request_sample(size, allocator->request_sample_context); + if (sample->max_size < size) + InterlockedDecrement(&sample->refcount); + else + memory->sample = sample; + list_add_tail(&allocator->memory_list, &memory->entry);
pthread_mutex_unlock(&allocator->mutex); diff --git a/dlls/winegstreamer/wg_transform.c b/dlls/winegstreamer/wg_transform.c index e05432f6ac7..adefd16c787 100644 --- a/dlls/winegstreamer/wg_transform.c +++ b/dlls/winegstreamer/wg_transform.c @@ -310,15 +310,10 @@ static bool transform_append_element(struct wg_transform *transform, GstElement static struct wg_sample *transform_request_sample(gsize size, void *context) { struct wg_transform *transform = context; - struct wg_sample *sample;
GST_LOG("size %#zx, context %p", size, transform);
- sample = InterlockedExchangePointer((void **)&transform->output_wg_sample, NULL); - if (!sample || sample->max_size < size) - return NULL; - - return sample; + return InterlockedExchangePointer((void **)&transform->output_wg_sample, NULL); }
NTSTATUS wg_transform_create(void *args)
From: Rémi Bernon rbernon@codeweavers.com
And push them one by one until an output buffer is generated, to avoid generating multiple output buffers without a backing wg_sample.
This makes zero-copy more efficient for games which queue multiple input buffers before checking output, such as Yakuza 4.
Wine-Bug: https://bugs.winehq.org/show_bug.cgi?id=45988 Wine-Bug: https://bugs.winehq.org/show_bug.cgi?id=47084 Wine-Bug: https://bugs.winehq.org/show_bug.cgi?id=49715 Wine-Bug: https://bugs.winehq.org/show_bug.cgi?id=52183 Signed-off-by: Rémi Bernon rbernon@codeweavers.com --- dlls/winegstreamer/wg_transform.c | 70 +++++++++++++++++-------------- 1 file changed, 39 insertions(+), 31 deletions(-)
diff --git a/dlls/winegstreamer/wg_transform.c b/dlls/winegstreamer/wg_transform.c index adefd16c787..070263698fb 100644 --- a/dlls/winegstreamer/wg_transform.c +++ b/dlls/winegstreamer/wg_transform.c @@ -51,8 +51,10 @@ struct wg_transform GstPad *my_src, *my_sink; GstPad *their_sink, *their_src; GstSegment segment; - GstBufferList *input; + guint input_max_length; + GstAtomicQueue *input_queue; + guint output_plane_align; struct wg_sample *output_wg_sample; GstAtomicQueue *output_queue; @@ -215,9 +217,11 @@ NTSTATUS wg_transform_destroy(void *args) { struct wg_transform *transform = args; GstSample *sample; + GstBuffer *buffer;
- if (transform->input) - gst_buffer_list_unref(transform->input); + while ((buffer = gst_atomic_queue_pop(transform->input_queue))) + gst_buffer_unref(buffer); + gst_atomic_queue_unref(transform->input_queue);
gst_element_set_state(transform->container, GST_STATE_NULL);
@@ -336,7 +340,7 @@ NTSTATUS wg_transform_create(void *args) return STATUS_NO_MEMORY; if (!(transform->container = gst_bin_new("wg_transform"))) goto out; - if (!(transform->input = gst_buffer_list_new())) + if (!(transform->input_queue = gst_atomic_queue_new(8))) goto out; if (!(transform->output_queue = gst_atomic_queue_new(8))) goto out; @@ -503,8 +507,8 @@ out: wg_allocator_destroy(transform->allocator); if (transform->output_queue) gst_atomic_queue_unref(transform->output_queue); - if (transform->input) - gst_buffer_list_unref(transform->input); + if (transform->input_queue) + gst_atomic_queue_unref(transform->input_queue); if (transform->container) { gst_element_set_state(transform->container, GST_STATE_NULL); @@ -530,7 +534,7 @@ NTSTATUS wg_transform_push_data(void *args) GstBuffer *buffer; guint length;
- length = gst_buffer_list_length(transform->input); + length = gst_atomic_queue_length(transform->input_queue); if (length >= transform->input_max_length) { GST_INFO("Refusing %u bytes, %u buffers already queued", sample->size, length); @@ -556,7 +560,7 @@ NTSTATUS wg_transform_push_data(void *args) GST_BUFFER_DURATION(buffer) = sample->duration * 100; if (!(sample->flags & WG_SAMPLE_FLAG_SYNC_POINT)) GST_BUFFER_FLAG_SET(buffer, GST_BUFFER_FLAG_DELTA_UNIT); - gst_buffer_list_insert(transform->input, -1, buffer); + gst_atomic_queue_push(transform->input_queue, buffer);
params->result = S_OK; return STATUS_SUCCESS; @@ -705,48 +709,52 @@ static NTSTATUS read_transform_output_data(GstBuffer *buffer, GstCaps *caps, gsi return STATUS_SUCCESS; }
-NTSTATUS wg_transform_read_data(void *args) +static bool get_transform_output(struct wg_transform *transform, struct wg_sample *sample) { - struct wg_transform_read_data_params *params = args; - struct wg_transform *transform = params->transform; - struct wg_sample *sample = params->sample; - struct wg_format *format = params->format; GstFlowReturn ret = GST_FLOW_OK; - GstBuffer *output_buffer; - GstBufferList *input; - GstCaps *output_caps; - bool discard_data; - NTSTATUS status; + GstBuffer *input_buffer;
/* Provide the sample for transform_request_sample to pick it up */ InterlockedIncrement(&sample->refcount); InterlockedExchangePointer((void **)&transform->output_wg_sample, sample);
- if (!gst_buffer_list_length(transform->input)) - GST_DEBUG("Not input buffer queued"); - else if ((input = gst_buffer_list_new())) + while (!(transform->output_sample = gst_atomic_queue_pop(transform->output_queue))) { - ret = gst_pad_push_list(transform->my_src, transform->input); - transform->input = input; - } - else - { - GST_ERROR("Failed to allocate new input queue"); - ret = GST_FLOW_ERROR; + if (!(input_buffer = gst_atomic_queue_pop(transform->input_queue))) + break; + + if ((ret = gst_pad_push(transform->my_src, input_buffer))) + { + GST_ERROR("Failed to push transform input, error %d", ret); + break; + } }
/* Remove the sample so transform_request_sample cannot use it */ if (InterlockedExchangePointer((void **)&transform->output_wg_sample, NULL)) InterlockedDecrement(&sample->refcount);
- if (ret) + return ret == GST_FLOW_OK; +} + +NTSTATUS wg_transform_read_data(void *args) +{ + struct wg_transform_read_data_params *params = args; + struct wg_transform *transform = params->transform; + struct wg_sample *sample = params->sample; + struct wg_format *format = params->format; + GstBuffer *output_buffer; + GstCaps *output_caps; + bool discard_data; + NTSTATUS status; + + if (!transform->output_sample && !get_transform_output(transform, sample)) { - GST_ERROR("Failed to push transform input, error %d", ret); wg_allocator_release_sample(transform->allocator, sample, false); return STATUS_UNSUCCESSFUL; }
- if (!transform->output_sample && !(transform->output_sample = gst_atomic_queue_pop(transform->output_queue))) + if (!transform->output_sample) { sample->size = 0; params->result = MF_E_TRANSFORM_NEED_MORE_INPUT;
I understand that adding more locks increases the mental burden, but I think this is worse—sure, we're "reusing" a lock that was going to be created anyway, but it's still effectively introducing another lock, plus now this requires us to understand when GStreamer is going to implicitly take that lock.
Regarding the lock, it's not really about the mental burden but about actual deadlock situations. It's happening already in many cases in winegstreamer and I don't want to make it worse.
Adding a new lock to the wg_transform is just calling for trouble. It may be fine for this very case, to guard the output_wg_sample pointer updates, and as long as it stays used for this only, but having a lock in the wg_transform makes it appealing to use it for guarding more things in the future. I don't want to introduce that opportunity.
The interlocked pointer exchange is guaranteeing the same thing as an explicit lock, except that it cannot cause any deadlock. I've made an update to the MR, keeping it as I think it's the best solution, but moving the sample size check to the allocator, to also simplify the wg_transform callback.
Retrospectively, I agree that using the GStreamer object lock for the allocator locking is a bit risky as well, so I'll leave this aside too.
This merge request was approved by Zebediah Figura.
[...] I'd still be surprised if a well-behaved application would never release samples. That would result in a significant memory leak.
Sure, but we should prepare for the worst, because it's what happens.
Badly behaving applications is the norm. Or actually, because we don't know what the good behavior really is, applications that do not behave like we expected them to.
I don't think a fixed pool can work. We cannot expect the output buffers passed downstream to the application to ever be released, or, even if they probably will, I don't think we can easily and safely put them in back in the pool when they are.
Wait, we can't?
I assume you mean for Media Foundation specifically, in particular because DirectShow does have pooling built into the API (and with a maximum buffer count as well). As far as I understand neither Media Foundation nor the ASF reader explicitly pool, but [...]
- We'd need a custom buffer implementation for each API.
Wait, why?
- The buffers may be released after the pipeline has been shutdown, we'd need to keep it alive until all the buffers are released, which is likely going to cause problems.
Why? We shouldn't need to provide anything to the Unix side except for the buffer pointer and size. I would expect that the PE buffers should outlive the unix device under normal operation.
I don't see any notion of a buffer pool in MF, though I have only a limited knowledge of the API. But even with quartz, I understand the buffers are pooled by a IMemAllocator object. The allocator gets notified when buffers are released, but we (the wg_parser used) don't.
As I understand it the allocator may also be application provided and the buffer implementation too.
So I don't really understand how you expect the buffers to be released to the unix buffer pool if not by periodically requesting new buffers from the IMemAllocator, or, for the MF API, by implementing our own IMFSample interface to hook the final buffer release.
In any case it's starting to look very similar to allocations on demand through the wg_allocator callback.
Some API provide allocation callback which conflicts with 1) needs.
The allocation callbacks may block.
They may block waiting for what?
Idk, anything?
The callbacks can be app provided, I'm not making any assumption on what they decide to wait for.
I don't know what pattern native is using, though I expect it to be more or less like GStreamer, with allocations as needed.
On 7/5/22 14:31, Rémi Bernon (@rbernon) wrote:
- The buffers may be released after the pipeline has been shutdown, we'd need to keep it alive until all the buffers are released, which is likely going to cause problems.
Why? We shouldn't need to provide anything to the Unix side except for the buffer pointer and size. I would expect that the PE buffers should outlive the unix device under normal operation.
I don't see any notion of a buffer pool in MF, though I have only a limited knowledge of the API. But even with quartz, I understand the buffers are pooled by a IMemAllocator object. The allocator gets notified when buffers are released, but we (the wg_parser used) don't.
As I understand it the allocator may also be application provided and the buffer implementation too.
So I don't really understand how you expect the buffers to be released to the unix buffer pool if not by periodically requesting new buffers from the IMemAllocator, or, for the MF API, by implementing our own IMFSample interface to hook the final buffer release.
In any case it's starting to look very similar to allocations on demand through the wg_allocator callback.
Right. I failed to consider that, sorry.
This merge request was approved by Nikolay Sivov.