This is similar to https://gitlab.winehq.org/wine/wine/-/merge_requests/2684, https://gitlab.winehq.org/wine/wine/-/merge_requests/3004 or https://gitlab.winehq.org/wine/wine/-/merge_requests/3139 but it validates the session transform node behavior with tests.
The tests are added after the changes because they otherwise don't pass and making them pass would be unnecessarily complicated.
I also have some local tests for MF_TOPONODE_WORKQUEUE_ID attributes and how it is supposed to behave. It basically creates new serialized work queues for every source node and assign them to every node downstream. Any sample request or processing operation is done in the associated queue.
When joining streams, queues are assigned downstream one after another and the last assigned queue is used when requesting samples upstream, but when samples are received and processed downstream it looks like the current queue of the source node is used for every downstream operations.
The request behavior seems to be the same when work queues are used, with round robin input requests, and single ProcessInput call followed by ProcessOutput loop until it fails.
This is yet not optimally efficient, and could be improved, for the following reasons:
1) All session operations are serialized together, even unrelated streams, and ProcessInput / ProcessOutput calls may be costly and stalling the pipeline. I believe that native probably allows parallel processing of unrelated stream requests, this needs to be confirmed.
2) MFT_MESSAGE_COMMAND_DRAIN message use isn't ideal, the message forces the transform to process all queued input synchronously, which can take a long time. I haven't checked exactly what native does but I believe it instead uses MFT_MESSAGE_NOTIFY_END_OF_STREAM messages, which would allow us to notify and drain the GStreamer decoder asynchronously.
3) MFT_MESSAGE_COMMAND_DRAIN also doesn't distinguish between input streams and needs to be sent globally. It's unclear how it should be used when multiple input streams are involved, and when one stream ends its segment then start a new segment while other streams don't have yet reached EOS. MFT_MESSAGE_NOTIFY_END_OF_STREAM messages have a stream ID parameter and would be more appropriate to handle separate input streams independently.
-- v5: mf/session: Increase the request count when requests are already queued. mf/session: Request more samples from upstream when necessary. mf/session: Push transform input samples directly to ProcessInput. mf/session: Use helpers to push and pop samples for transform streams. mf/session: Drain remaining requests in transform_node_deliver_samples. mf/session: Drain transform node input streams individually. mf/session: Use a helper to deliver transform node requested samples. mf/session: Use local variables to access transform node streams.
From: Rémi Bernon rbernon@codeweavers.com
--- dlls/mf/session.c | 48 +++++++++++++++++++++++++++++++---------------- 1 file changed, 32 insertions(+), 16 deletions(-)
diff --git a/dlls/mf/session.c b/dlls/mf/session.c index 555a71dcdf7..c629399d10e 100644 --- a/dlls/mf/session.c +++ b/dlls/mf/session.c @@ -3128,6 +3128,8 @@ static HRESULT transform_node_pull_samples(const struct media_session *session, /* Collect returned samples for all streams. */ for (i = 0; i < node->u.transform.output_count; ++i) { + struct transform_stream *stream = &node->u.transform.outputs[i]; + if (buffers[i].pEvents) IMFCollection_Release(buffers[i].pEvents);
@@ -3137,7 +3139,7 @@ static HRESULT transform_node_pull_samples(const struct media_session *session, IMFQualityManager_NotifyProcessOutput(session->quality_manager, node->node, i, buffers[i].pSample);
queued_sample = transform_create_sample(buffers[i].pSample); - list_add_tail(&node->u.transform.outputs[i].samples, &queued_sample->entry); + list_add_tail(&stream->samples, &queued_sample->entry); }
if (buffers[i].pSample) @@ -3189,17 +3191,21 @@ static void session_deliver_sample_to_node(struct media_session *session, IMFTop } break; case MF_TOPOLOGY_TRANSFORM_NODE: + { + struct transform_stream *input_stream = &topo_node->u.transform.inputs[input];
transform_node_pull_samples(session, topo_node);
sample_entry = transform_create_sample(sample); - list_add_tail(&topo_node->u.transform.inputs[input].samples, &sample_entry->entry); + list_add_tail(&input_stream->samples, &sample_entry->entry);
for (i = 0; i < topo_node->u.transform.input_count; ++i) { + struct transform_stream *stream = &topo_node->u.transform.inputs[i]; + stream_id = transform_node_get_stream_id(topo_node, FALSE, i); - LIST_FOR_EACH_ENTRY_SAFE(sample_entry, sample_entry2, &topo_node->u.transform.inputs[i].samples, - struct sample, entry) + + LIST_FOR_EACH_ENTRY_SAFE(sample_entry, sample_entry2, &stream->samples, struct sample, entry) { if (sample_entry->sample) { @@ -3212,7 +3218,7 @@ static void session_deliver_sample_to_node(struct media_session *session, IMFTop } else { - transform_stream_drop_samples(&topo_node->u.transform.inputs[i]); + transform_stream_drop_samples(stream); drain = TRUE; } } @@ -3231,28 +3237,32 @@ static void session_deliver_sample_to_node(struct media_session *session, IMFTop { for (i = 0; i < topo_node->u.transform.output_count; ++i) { - if ((sample_entry = transform_create_sample(NULL))) - list_add_tail(&topo_node->u.transform.outputs[i].samples, &sample_entry->entry); + struct transform_stream *stream = &topo_node->u.transform.outputs[i]; + + if ((sample_entry = transform_create_sample(NULL))) + list_add_tail(&stream->samples, &sample_entry->entry); } }
/* Push down all available output. */ for (i = 0; i < topo_node->u.transform.output_count; ++i) { + struct transform_stream *stream = &topo_node->u.transform.outputs[i]; + if (FAILED(IMFTopologyNode_GetOutput(node, i, &downstream_node, &downstream_input))) { WARN("Failed to get connected node for output %u.\n", i); continue; }
- LIST_FOR_EACH_ENTRY_SAFE(sample_entry, sample_entry2, &topo_node->u.transform.outputs[i].samples, + LIST_FOR_EACH_ENTRY_SAFE(sample_entry, sample_entry2, &stream->samples, struct sample, entry) { - if (!topo_node->u.transform.outputs[i].requests) + if (!stream->requests) break;
session_deliver_sample_to_node(session, downstream_node, downstream_input, sample_entry->sample); - topo_node->u.transform.outputs[i].requests--; + stream->requests--;
transform_release_sample(sample_entry); } @@ -3261,6 +3271,7 @@ static void session_deliver_sample_to_node(struct media_session *session, IMFTop }
break; + } case MF_TOPOLOGY_TEE_NODE: FIXME("Unhandled downstream node type %d.\n", node_type); break; @@ -3293,20 +3304,22 @@ static void session_deliver_pending_samples(struct media_session *session, IMFTo /* Push down all available output. */ for (i = 0; i < topo_node->u.transform.output_count; ++i) { + struct transform_stream *stream = &topo_node->u.transform.outputs[i]; + if (FAILED(IMFTopologyNode_GetOutput(node, i, &downstream_node, &downstream_input))) { WARN("Failed to get connected node for output %u.\n", i); continue; }
- LIST_FOR_EACH_ENTRY_SAFE(sample_entry, sample_entry2, &topo_node->u.transform.outputs[i].samples, + LIST_FOR_EACH_ENTRY_SAFE(sample_entry, sample_entry2, &stream->samples, struct sample, entry) { - if (!topo_node->u.transform.outputs[i].requests) + if (!stream->requests) break;
session_deliver_sample_to_node(session, downstream_node, downstream_input, sample_entry->sample); - topo_node->u.transform.outputs[i].requests--; + stream->requests--;
transform_release_sample(sample_entry); } @@ -3342,14 +3355,16 @@ static HRESULT session_request_sample_from_node(struct media_session *session, I WARN("Sample request failed, hr %#lx.\n", hr); break; case MF_TOPOLOGY_TRANSFORM_NODE: + { + struct transform_stream *stream = &topo_node->u.transform.outputs[output];
- if (list_empty(&topo_node->u.transform.outputs[output].samples)) + if (list_empty(&stream->samples)) { /* Forward request to upstream node. */ if (SUCCEEDED(hr = IMFTopologyNode_GetInput(node, 0 /* FIXME */, &upstream_node, &upstream_output))) { if (SUCCEEDED(hr = session_request_sample_from_node(session, upstream_node, upstream_output))) - topo_node->u.transform.outputs[output].requests++; + stream->requests++; IMFTopologyNode_Release(upstream_node); } } @@ -3357,7 +3372,7 @@ static HRESULT session_request_sample_from_node(struct media_session *session, I { if (SUCCEEDED(hr = IMFTopologyNode_GetOutput(node, output, &downstream_node, &downstream_input))) { - sample = LIST_ENTRY(list_head(&topo_node->u.transform.outputs[output].samples), struct sample, entry); + sample = LIST_ENTRY(list_head(&stream->samples), struct sample, entry); session_deliver_sample_to_node(session, downstream_node, downstream_input, sample->sample); transform_release_sample(sample); IMFTopologyNode_Release(downstream_node); @@ -3365,6 +3380,7 @@ static HRESULT session_request_sample_from_node(struct media_session *session, I }
break; + } case MF_TOPOLOGY_TEE_NODE: FIXME("Unhandled upstream node type %d.\n", node_type); default:
From: Rémi Bernon rbernon@codeweavers.com
--- dlls/mf/session.c | 98 +++++++++++++++++++---------------------------- 1 file changed, 39 insertions(+), 59 deletions(-)
diff --git a/dlls/mf/session.c b/dlls/mf/session.c index c629399d10e..254fea6dfe4 100644 --- a/dlls/mf/session.c +++ b/dlls/mf/session.c @@ -3151,12 +3151,47 @@ static HRESULT transform_node_pull_samples(const struct media_session *session, return hr; }
+static void session_deliver_sample_to_node(struct media_session *session, IMFTopologyNode *node, unsigned int input, + IMFSample *sample); + +static void transform_node_deliver_samples(struct media_session *session, struct topo_node *topo_node) +{ + IMFTopologyNode *up_node = topo_node->node, *down_node; + struct sample *sample, *next; + DWORD output, input; + HRESULT hr; + + /* Push down all available output. */ + for (output = 0; output < topo_node->u.transform.output_count; ++output) + { + struct transform_stream *stream = &topo_node->u.transform.outputs[output]; + + if (FAILED(hr = IMFTopologyNode_GetOutput(up_node, output, &down_node, &input))) + { + WARN("Failed to node %p/%lu output, hr %#lx.\n", up_node, output, hr); + continue; + } + + LIST_FOR_EACH_ENTRY_SAFE(sample, next, &stream->samples, struct sample, entry) + { + if (!stream->requests) + break; + + session_deliver_sample_to_node(session, down_node, input, sample->sample); + stream->requests--; + + transform_release_sample(sample); + } + + IMFTopologyNode_Release(down_node); + } +} + static void session_deliver_sample_to_node(struct media_session *session, IMFTopologyNode *node, unsigned int input, IMFSample *sample) { struct sample *sample_entry, *sample_entry2; - DWORD stream_id, downstream_input; - IMFTopologyNode *downstream_node; + DWORD stream_id; struct topo_node *topo_node; MF_TOPOLOGY_TYPE node_type; BOOL drain = FALSE; @@ -3244,32 +3279,7 @@ static void session_deliver_sample_to_node(struct media_session *session, IMFTop } }
- /* Push down all available output. */ - for (i = 0; i < topo_node->u.transform.output_count; ++i) - { - struct transform_stream *stream = &topo_node->u.transform.outputs[i]; - - if (FAILED(IMFTopologyNode_GetOutput(node, i, &downstream_node, &downstream_input))) - { - WARN("Failed to get connected node for output %u.\n", i); - continue; - } - - LIST_FOR_EACH_ENTRY_SAFE(sample_entry, sample_entry2, &stream->samples, - struct sample, entry) - { - if (!stream->requests) - break; - - session_deliver_sample_to_node(session, downstream_node, downstream_input, sample_entry->sample); - stream->requests--; - - transform_release_sample(sample_entry); - } - - IMFTopologyNode_Release(downstream_node); - } - + transform_node_deliver_samples(session, topo_node); break; } case MF_TOPOLOGY_TEE_NODE: @@ -3282,13 +3292,9 @@ static void session_deliver_sample_to_node(struct media_session *session, IMFTop
static void session_deliver_pending_samples(struct media_session *session, IMFTopologyNode *node) { - struct sample *sample_entry, *sample_entry2; - IMFTopologyNode *downstream_node; struct topo_node *topo_node; MF_TOPOLOGY_TYPE node_type; - DWORD downstream_input; TOPOID node_id; - unsigned int i;
IMFTopologyNode_GetNodeType(node, &node_type); IMFTopologyNode_GetTopoNodeID(node, &node_id); @@ -3298,34 +3304,8 @@ static void session_deliver_pending_samples(struct media_session *session, IMFTo switch (node_type) { case MF_TOPOLOGY_TRANSFORM_NODE: - transform_node_pull_samples(session, topo_node); - - /* Push down all available output. */ - for (i = 0; i < topo_node->u.transform.output_count; ++i) - { - struct transform_stream *stream = &topo_node->u.transform.outputs[i]; - - if (FAILED(IMFTopologyNode_GetOutput(node, i, &downstream_node, &downstream_input))) - { - WARN("Failed to get connected node for output %u.\n", i); - continue; - } - - LIST_FOR_EACH_ENTRY_SAFE(sample_entry, sample_entry2, &stream->samples, - struct sample, entry) - { - if (!stream->requests) - break; - - session_deliver_sample_to_node(session, downstream_node, downstream_input, sample_entry->sample); - stream->requests--; - - transform_release_sample(sample_entry); - } - - IMFTopologyNode_Release(downstream_node); - } + transform_node_deliver_samples(session, topo_node); break; default: FIXME("Unexpected node type %u.\n", node_type);
From: Rémi Bernon rbernon@codeweavers.com
--- dlls/mf/session.c | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-)
diff --git a/dlls/mf/session.c b/dlls/mf/session.c index 254fea6dfe4..1fbacde318c 100644 --- a/dlls/mf/session.c +++ b/dlls/mf/session.c @@ -3255,16 +3255,14 @@ static void session_deliver_sample_to_node(struct media_session *session, IMFTop { transform_stream_drop_samples(stream); drain = TRUE; + + if (FAILED(hr = IMFTransform_ProcessMessage(topo_node->object.transform, + MFT_MESSAGE_COMMAND_DRAIN, stream_id))) + WARN("Drain command failed for transform, hr %#lx.\n", hr); } } }
- if (drain) - { - if (FAILED(hr = IMFTransform_ProcessMessage(topo_node->object.transform, MFT_MESSAGE_COMMAND_DRAIN, 0))) - WARN("Drain command failed for transform, hr %#lx.\n", hr); - } - transform_node_pull_samples(session, topo_node);
/* Remaining unprocessed input has been discarded, now queue markers for every output. */
From: Rémi Bernon rbernon@codeweavers.com
--- dlls/mf/session.c | 55 ++++++++++++++++++++++++++++++++--------------- 1 file changed, 38 insertions(+), 17 deletions(-)
diff --git a/dlls/mf/session.c b/dlls/mf/session.c index 1fbacde318c..c9f911c1777 100644 --- a/dlls/mf/session.c +++ b/dlls/mf/session.c @@ -155,6 +155,7 @@ struct transform_stream struct list samples; unsigned int requests; unsigned int min_buffer_size; + BOOL draining; };
enum topo_node_flags @@ -892,7 +893,9 @@ static HRESULT session_subscribe_sources(struct media_session *session) static void session_start(struct media_session *session, const GUID *time_format, const PROPVARIANT *start_position) { struct media_source *source; + struct topo_node *topo_node; HRESULT hr; + UINT i;
switch (session->state) { @@ -928,6 +931,18 @@ static void session_start(struct media_session *session, const GUID *time_format } }
+ LIST_FOR_EACH_ENTRY(topo_node, &session->presentation.nodes, struct topo_node, entry) + { + if (topo_node->type == MF_TOPOLOGY_TRANSFORM_NODE) + { + for (i = 0; i < topo_node->u.transform.input_count; i++) + { + struct transform_stream *stream = &topo_node->u.transform.inputs[i]; + stream->draining = FALSE; + } + } + } + session->state = SESSION_STATE_STARTING_SOURCES; break; case SESSION_STATE_STARTED: @@ -3151,12 +3166,27 @@ static HRESULT transform_node_pull_samples(const struct media_session *session, return hr; }
+static BOOL transform_node_is_drained(struct topo_node *topo_node) +{ + UINT i; + + for (i = 0; i < topo_node->u.transform.input_count; i++) + { + struct transform_stream *stream = &topo_node->u.transform.inputs[i]; + if (!stream->draining) + return FALSE; + } + + return TRUE; +} + static void session_deliver_sample_to_node(struct media_session *session, IMFTopologyNode *node, unsigned int input, IMFSample *sample);
static void transform_node_deliver_samples(struct media_session *session, struct topo_node *topo_node) { IMFTopologyNode *up_node = topo_node->node, *down_node; + BOOL drained = transform_node_is_drained(topo_node); struct sample *sample, *next; DWORD output, input; HRESULT hr; @@ -3183,6 +3213,12 @@ static void transform_node_deliver_samples(struct media_session *session, struct transform_release_sample(sample); }
+ while (stream->requests && drained) + { + session_deliver_sample_to_node(session, down_node, input, NULL); + stream->requests--; + } + IMFTopologyNode_Release(down_node); } } @@ -3194,7 +3230,6 @@ static void session_deliver_sample_to_node(struct media_session *session, IMFTop DWORD stream_id; struct topo_node *topo_node; MF_TOPOLOGY_TYPE node_type; - BOOL drain = FALSE; TOPOID node_id; unsigned int i; HRESULT hr; @@ -3253,30 +3288,16 @@ static void session_deliver_sample_to_node(struct media_session *session, IMFTop } else { - transform_stream_drop_samples(stream); - drain = TRUE; - if (FAILED(hr = IMFTransform_ProcessMessage(topo_node->object.transform, MFT_MESSAGE_COMMAND_DRAIN, stream_id))) WARN("Drain command failed for transform, hr %#lx.\n", hr); + else + stream->draining = TRUE; } } }
transform_node_pull_samples(session, topo_node); - - /* Remaining unprocessed input has been discarded, now queue markers for every output. */ - if (drain) - { - for (i = 0; i < topo_node->u.transform.output_count; ++i) - { - struct transform_stream *stream = &topo_node->u.transform.outputs[i]; - - if ((sample_entry = transform_create_sample(NULL))) - list_add_tail(&stream->samples, &sample_entry->entry); - } - } - transform_node_deliver_samples(session, topo_node); break; }
From: Rémi Bernon rbernon@codeweavers.com
--- dlls/mf/session.c | 80 ++++++++++++++++++++++++++--------------------- 1 file changed, 44 insertions(+), 36 deletions(-)
diff --git a/dlls/mf/session.c b/dlls/mf/session.c index c9f911c1777..a089d2b765c 100644 --- a/dlls/mf/session.c +++ b/dlls/mf/session.c @@ -681,12 +681,41 @@ static void transform_release_sample(struct sample *sample) free(sample); }
+static HRESULT transform_stream_push_sample(struct transform_stream *stream, IMFSample *sample) +{ + struct sample *entry; + + if (!(entry = calloc(1, sizeof(*entry)))) + return E_OUTOFMEMORY; + + entry->sample = sample; + IMFSample_AddRef(entry->sample); + + list_add_tail(&stream->samples, &entry->entry); + return S_OK; +} + +static HRESULT transform_stream_pop_sample(struct transform_stream *stream, IMFSample **sample) +{ + struct sample *entry; + struct list *ptr; + + if (!(ptr = list_head(&stream->samples))) + return E_FAIL; + + entry = LIST_ENTRY(ptr, struct sample, entry); + list_remove(&entry->entry); + *sample = entry->sample; + free(entry); + return S_OK; +} + static void transform_stream_drop_samples(struct transform_stream *stream) { - struct sample *sample, *sample2; + IMFSample *sample;
- LIST_FOR_EACH_ENTRY_SAFE(sample, sample2, &stream->samples, struct sample, entry) - transform_release_sample(sample); + while (SUCCEEDED(transform_stream_pop_sample(stream, &sample))) + IMFSample_Release(sample); }
static void release_topo_node(struct topo_node *node) @@ -3049,20 +3078,6 @@ static void session_set_sink_stream_state(struct media_session *session, IMFStre } }
-static struct sample *transform_create_sample(IMFSample *sample) -{ - struct sample *sample_entry = calloc(1, sizeof(*sample_entry)); - - if (sample_entry) - { - sample_entry->sample = sample; - if (sample_entry->sample) - IMFSample_AddRef(sample_entry->sample); - } - - return sample_entry; -} - static HRESULT transform_get_external_output_sample(const struct media_session *session, struct topo_node *transform, unsigned int output_index, const MFT_OUTPUT_STREAM_INFO *stream_info, IMFSample **sample) { @@ -3111,7 +3126,6 @@ static HRESULT transform_node_pull_samples(const struct media_session *session, { MFT_OUTPUT_STREAM_INFO stream_info; MFT_OUTPUT_DATA_BUFFER *buffers; - struct sample *queued_sample; HRESULT hr = E_UNEXPECTED; DWORD status = 0; unsigned int i; @@ -3152,9 +3166,8 @@ static HRESULT transform_node_pull_samples(const struct media_session *session, { if (session->quality_manager) IMFQualityManager_NotifyProcessOutput(session->quality_manager, node->node, i, buffers[i].pSample); - - queued_sample = transform_create_sample(buffers[i].pSample); - list_add_tail(&stream->samples, &queued_sample->entry); + if (FAILED(hr = transform_stream_push_sample(stream, buffers[i].pSample))) + WARN("Failed to queue output sample, hr %#lx\n", hr); }
if (buffers[i].pSample) @@ -3187,8 +3200,8 @@ static void transform_node_deliver_samples(struct media_session *session, struct { IMFTopologyNode *up_node = topo_node->node, *down_node; BOOL drained = transform_node_is_drained(topo_node); - struct sample *sample, *next; DWORD output, input; + IMFSample *sample; HRESULT hr;
/* Push down all available output. */ @@ -3202,15 +3215,11 @@ static void transform_node_deliver_samples(struct media_session *session, struct continue; }
- LIST_FOR_EACH_ENTRY_SAFE(sample, next, &stream->samples, struct sample, entry) + while (stream->requests && SUCCEEDED(hr = transform_stream_pop_sample(stream, &sample))) { - if (!stream->requests) - break; - - session_deliver_sample_to_node(session, down_node, input, sample->sample); + session_deliver_sample_to_node(session, down_node, input, sample); stream->requests--; - - transform_release_sample(sample); + IMFSample_Release(sample); }
while (stream->requests && drained) @@ -3266,8 +3275,7 @@ static void session_deliver_sample_to_node(struct media_session *session, IMFTop
transform_node_pull_samples(session, topo_node);
- sample_entry = transform_create_sample(sample); - list_add_tail(&input_stream->samples, &sample_entry->entry); + transform_stream_push_sample(input_stream, sample);
for (i = 0; i < topo_node->u.transform.input_count; ++i) { @@ -3338,7 +3346,7 @@ static HRESULT session_request_sample_from_node(struct media_session *session, I DWORD downstream_input, upstream_output; struct topo_node *topo_node; MF_TOPOLOGY_TYPE node_type; - struct sample *sample; + IMFSample *sample; TOPOID node_id; HRESULT hr;
@@ -3357,7 +3365,7 @@ static HRESULT session_request_sample_from_node(struct media_session *session, I { struct transform_stream *stream = &topo_node->u.transform.outputs[output];
- if (list_empty(&stream->samples)) + if (FAILED(hr = transform_stream_pop_sample(stream, &sample))) { /* Forward request to upstream node. */ if (SUCCEEDED(hr = IMFTopologyNode_GetInput(node, 0 /* FIXME */, &upstream_node, &upstream_output))) @@ -3371,11 +3379,11 @@ static HRESULT session_request_sample_from_node(struct media_session *session, I { if (SUCCEEDED(hr = IMFTopologyNode_GetOutput(node, output, &downstream_node, &downstream_input))) { - sample = LIST_ENTRY(list_head(&stream->samples), struct sample, entry); - session_deliver_sample_to_node(session, downstream_node, downstream_input, sample->sample); - transform_release_sample(sample); + session_deliver_sample_to_node(session, downstream_node, downstream_input, sample); IMFTopologyNode_Release(downstream_node); } + + IMFSample_Release(sample); }
break;
From: Rémi Bernon rbernon@codeweavers.com
--- dlls/mf/session.c | 74 +++++++++++++++++------------------------------ 1 file changed, 27 insertions(+), 47 deletions(-)
diff --git a/dlls/mf/session.c b/dlls/mf/session.c index a089d2b765c..8a8ae6594c5 100644 --- a/dlls/mf/session.c +++ b/dlls/mf/session.c @@ -673,14 +673,6 @@ static void session_set_caps(struct media_session *session, DWORD caps) IMFMediaEvent_Release(event); }
-static void transform_release_sample(struct sample *sample) -{ - list_remove(&sample->entry); - if (sample->sample) - IMFSample_Release(sample->sample); - free(sample); -} - static HRESULT transform_stream_push_sample(struct transform_stream *stream, IMFSample *sample) { struct sample *entry; @@ -3193,6 +3185,31 @@ static BOOL transform_node_is_drained(struct topo_node *topo_node) return TRUE; }
+static HRESULT transform_node_push_sample(const struct media_session *session, struct topo_node *topo_node, + UINT input, IMFSample *sample) +{ + struct transform_stream *stream = &topo_node->u.transform.inputs[input]; + UINT id = transform_node_get_stream_id(topo_node, FALSE, input); + IMFTransform *transform = topo_node->object.transform; + HRESULT hr; + + if (sample) + { + hr = IMFTransform_ProcessInput(transform, id, sample, 0); + if (hr == MF_E_NOTACCEPTING) + hr = transform_stream_push_sample(stream, sample); + } + else + { + if (FAILED(hr = IMFTransform_ProcessMessage(transform, MFT_MESSAGE_COMMAND_DRAIN, id))) + WARN("Drain command failed for transform, hr %#lx.\n", hr); + else + stream->draining = TRUE; + } + + return hr; +} + static void session_deliver_sample_to_node(struct media_session *session, IMFTopologyNode *node, unsigned int input, IMFSample *sample);
@@ -3235,12 +3252,9 @@ static void transform_node_deliver_samples(struct media_session *session, struct static void session_deliver_sample_to_node(struct media_session *session, IMFTopologyNode *node, unsigned int input, IMFSample *sample) { - struct sample *sample_entry, *sample_entry2; - DWORD stream_id; struct topo_node *topo_node; MF_TOPOLOGY_TYPE node_type; TOPOID node_id; - unsigned int i; HRESULT hr;
if (session->quality_manager) @@ -3270,45 +3284,11 @@ static void session_deliver_sample_to_node(struct media_session *session, IMFTop } break; case MF_TOPOLOGY_TRANSFORM_NODE: - { - struct transform_stream *input_stream = &topo_node->u.transform.inputs[input]; - - transform_node_pull_samples(session, topo_node); - - transform_stream_push_sample(input_stream, sample); - - for (i = 0; i < topo_node->u.transform.input_count; ++i) - { - struct transform_stream *stream = &topo_node->u.transform.inputs[i]; - - stream_id = transform_node_get_stream_id(topo_node, FALSE, i); - - LIST_FOR_EACH_ENTRY_SAFE(sample_entry, sample_entry2, &stream->samples, struct sample, entry) - { - if (sample_entry->sample) - { - if ((hr = IMFTransform_ProcessInput(topo_node->object.transform, stream_id, - sample_entry->sample, 0)) == MF_E_NOTACCEPTING) - break; - if (FAILED(hr)) - WARN("Failed to process input for stream %u/%lu, hr %#lx.\n", i, stream_id, hr); - transform_release_sample(sample_entry); - } - else - { - if (FAILED(hr = IMFTransform_ProcessMessage(topo_node->object.transform, - MFT_MESSAGE_COMMAND_DRAIN, stream_id))) - WARN("Drain command failed for transform, hr %#lx.\n", hr); - else - stream->draining = TRUE; - } - } - } - + if (FAILED(hr = transform_node_push_sample(session, topo_node, input, sample))) + WARN("Failed to push or queue sample to transform, hr %#lx\n", hr); transform_node_pull_samples(session, topo_node); transform_node_deliver_samples(session, topo_node); break; - } case MF_TOPOLOGY_TEE_NODE: FIXME("Unhandled downstream node type %d.\n", node_type); break;
From: Rémi Bernon rbernon@codeweavers.com
--- dlls/mf/session.c | 48 +++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 44 insertions(+), 4 deletions(-)
diff --git a/dlls/mf/session.c b/dlls/mf/session.c index 8a8ae6594c5..3f5b9326636 100644 --- a/dlls/mf/session.c +++ b/dlls/mf/session.c @@ -200,6 +200,7 @@ struct topo_node struct transform_stream *inputs; DWORD *input_map; unsigned int input_count; + unsigned int next_input;
struct transform_stream *outputs; DWORD *output_map; @@ -693,7 +694,7 @@ static HRESULT transform_stream_pop_sample(struct transform_stream *stream, IMFS struct list *ptr;
if (!(ptr = list_head(&stream->samples))) - return E_FAIL; + return MF_E_TRANSFORM_NEED_MORE_INPUT;
entry = LIST_ENTRY(ptr, struct sample, entry); list_remove(&entry->entry); @@ -3185,6 +3186,17 @@ static BOOL transform_node_is_drained(struct topo_node *topo_node) return TRUE; }
+static BOOL transform_node_has_requests(struct topo_node *topo_node) +{ + UINT i; + + for (i = 0; i < topo_node->u.transform.output_count; i++) + if (topo_node->u.transform.outputs[i].requests) + return TRUE; + + return FALSE; +} + static HRESULT transform_node_push_sample(const struct media_session *session, struct topo_node *topo_node, UINT input, IMFSample *sample) { @@ -3219,10 +3231,10 @@ static void transform_node_deliver_samples(struct media_session *session, struct BOOL drained = transform_node_is_drained(topo_node); DWORD output, input; IMFSample *sample; - HRESULT hr; + HRESULT hr = S_OK;
/* Push down all available output. */ - for (output = 0; output < topo_node->u.transform.output_count; ++output) + for (output = 0; SUCCEEDED(hr) && output < topo_node->u.transform.output_count; ++output) { struct transform_stream *stream = &topo_node->u.transform.outputs[output];
@@ -3232,8 +3244,17 @@ static void transform_node_deliver_samples(struct media_session *session, struct continue; }
- while (stream->requests && SUCCEEDED(hr = transform_stream_pop_sample(stream, &sample))) + while (stream->requests) { + if (FAILED(hr = transform_stream_pop_sample(stream, &sample))) + { + /* try getting more samples by calling IMFTransform_ProcessOutput */ + if (FAILED(hr = transform_node_pull_samples(session, topo_node))) + break; + if (FAILED(hr = transform_stream_pop_sample(stream, &sample))) + break; + } + session_deliver_sample_to_node(session, down_node, input, sample); stream->requests--; IMFSample_Release(sample); @@ -3247,6 +3268,25 @@ static void transform_node_deliver_samples(struct media_session *session, struct
IMFTopologyNode_Release(down_node); } + + if (hr == MF_E_TRANSFORM_NEED_MORE_INPUT && transform_node_has_requests(topo_node)) + { + struct transform_stream *stream; + + input = topo_node->u.transform.next_input++ % topo_node->u.transform.input_count; + stream = &topo_node->u.transform.inputs[input]; + + if (SUCCEEDED(transform_stream_pop_sample(stream, &sample))) + session_deliver_sample_to_node(session, topo_node->node, input, sample); + else if (FAILED(hr = IMFTopologyNode_GetInput(topo_node->node, input, &up_node, &output))) + WARN("Failed to get node %p/%lu input, hr %#lx\n", topo_node->node, input, hr); + else + { + if (FAILED(hr = session_request_sample_from_node(session, up_node, output))) + WARN("Failed to request sample from upstream node %p/%lu, hr %#lx\n", up_node, output, hr); + IMFTopologyNode_Release(up_node); + } + } }
static void session_deliver_sample_to_node(struct media_session *session, IMFTopologyNode *node, unsigned int input,
From: Rémi Bernon rbernon@codeweavers.com
--- dlls/mf/session.c | 38 ++++++++++++++++++++------------------ 1 file changed, 20 insertions(+), 18 deletions(-)
diff --git a/dlls/mf/session.c b/dlls/mf/session.c index 3f5b9326636..b5b631fb00a 100644 --- a/dlls/mf/session.c +++ b/dlls/mf/session.c @@ -3362,13 +3362,13 @@ static void session_deliver_pending_samples(struct media_session *session, IMFTo
static HRESULT session_request_sample_from_node(struct media_session *session, IMFTopologyNode *node, DWORD output) { - IMFTopologyNode *downstream_node, *upstream_node; - DWORD downstream_input, upstream_output; + IMFTopologyNode *down_node; struct topo_node *topo_node; MF_TOPOLOGY_TYPE node_type; + HRESULT hr = S_OK; IMFSample *sample; TOPOID node_id; - HRESULT hr; + DWORD input;
IMFTopologyNode_GetNodeType(node, &node_type); IMFTopologyNode_GetTopoNodeID(node, &node_id); @@ -3385,27 +3385,29 @@ static HRESULT session_request_sample_from_node(struct media_session *session, I { struct transform_stream *stream = &topo_node->u.transform.outputs[output];
- if (FAILED(hr = transform_stream_pop_sample(stream, &sample))) + if (FAILED(hr = IMFTopologyNode_GetOutput(node, output, &down_node, &input))) { - /* Forward request to upstream node. */ - if (SUCCEEDED(hr = IMFTopologyNode_GetInput(node, 0 /* FIXME */, &upstream_node, &upstream_output))) - { - if (SUCCEEDED(hr = session_request_sample_from_node(session, upstream_node, upstream_output))) - stream->requests++; - IMFTopologyNode_Release(upstream_node); - } + WARN("Failed to node %p/%lu output, hr %#lx.\n", node, output, hr); + break; } - else - { - if (SUCCEEDED(hr = IMFTopologyNode_GetOutput(node, output, &downstream_node, &downstream_input))) - { - session_deliver_sample_to_node(session, downstream_node, downstream_input, sample); - IMFTopologyNode_Release(downstream_node); - }
+ if (SUCCEEDED(transform_stream_pop_sample(stream, &sample))) + { + session_deliver_sample_to_node(session, down_node, input, sample); IMFSample_Release(sample); } + else if (transform_node_has_requests(topo_node)) + { + /* there's already requests pending, just increase the counter */ + stream->requests++; + } + else + { + stream->requests++; + transform_node_deliver_samples(session, topo_node); + }
+ IMFTopologyNode_Release(down_node); break; } case MF_TOPOLOGY_TEE_NODE:
Rebased and dropped the tests. MFT_MESSAGE_COMMAND_DRAIN messages are now sent for each input stream, as some local tests suggest it should. Stream have a draining flag, which is used to decide whether to send EOS downstream after ProcessOutput has failed, the flag is reset when the session (re-)starts.
Anything else I should do here?
This merge request was approved by Nikolay Sivov.