This will be required for MFT decoder transforms to work. They don't usually produce an output sample on every pushed input sample, and the session may need to request more sample from upstream and push them into the transform or it will not get any output and get stuck.
-- v2: mf/session: Request more data from upstream when requests are still pending. mf/session: Process pending transform nodes samples in a loop. mf/session: Move transform node ProcessInput calls to a separate helper. mf/session: Use helpers to push and pop samples for transform streams. mf/session: Use local variables to access transform node streams.
From: Rémi Bernon rbernon@codeweavers.com
--- dlls/mf/session.c | 52 +++++++++++++++++++++++++++++++---------------- 1 file changed, 35 insertions(+), 17 deletions(-)
diff --git a/dlls/mf/session.c b/dlls/mf/session.c index 8b29c8487fb..3c8fd792cce 100644 --- a/dlls/mf/session.c +++ b/dlls/mf/session.c @@ -3074,7 +3074,9 @@ static HRESULT transform_get_external_output_sample(const struct media_session * } else { - buffer_size = max(stream_info->cbSize, transform->u.transform.outputs[output_index].min_buffer_size); + struct transform_stream *stream = &transform->u.transform.outputs[output_index]; + + buffer_size = max(stream_info->cbSize, stream->min_buffer_size);
hr = MFCreateAlignedMemoryBuffer(buffer_size, stream_info->cbAlignment, &buffer); if (SUCCEEDED(hr)) @@ -3126,6 +3128,8 @@ static HRESULT transform_node_pull_samples(const struct media_session *session, /* Collect returned samples for all streams. */ for (i = 0; i < node->u.transform.output_count; ++i) { + struct transform_stream *stream = &node->u.transform.outputs[i]; + if (buffers[i].pEvents) IMFCollection_Release(buffers[i].pEvents);
@@ -3135,7 +3139,7 @@ static HRESULT transform_node_pull_samples(const struct media_session *session, IMFQualityManager_NotifyProcessOutput(session->quality_manager, node->node, i, buffers[i].pSample);
queued_sample = transform_create_sample(buffers[i].pSample); - list_add_tail(&node->u.transform.outputs[i].samples, &queued_sample->entry); + list_add_tail(&stream->samples, &queued_sample->entry); }
if (buffers[i].pSample) @@ -3187,17 +3191,21 @@ static void session_deliver_sample_to_node(struct media_session *session, IMFTop } break; case MF_TOPOLOGY_TRANSFORM_NODE: + { + struct transform_stream *input_stream = &topo_node->u.transform.inputs[input];
transform_node_pull_samples(session, topo_node);
sample_entry = transform_create_sample(sample); - list_add_tail(&topo_node->u.transform.inputs[input].samples, &sample_entry->entry); + list_add_tail(&input_stream->samples, &sample_entry->entry);
for (i = 0; i < topo_node->u.transform.input_count; ++i) { + struct transform_stream *stream = &topo_node->u.transform.inputs[i]; + stream_id = transform_node_get_stream_id(topo_node, FALSE, i); - LIST_FOR_EACH_ENTRY_SAFE(sample_entry, sample_entry2, &topo_node->u.transform.inputs[i].samples, - struct sample, entry) + + LIST_FOR_EACH_ENTRY_SAFE(sample_entry, sample_entry2, &stream->samples, struct sample, entry) { if (sample_entry->sample) { @@ -3210,7 +3218,7 @@ static void session_deliver_sample_to_node(struct media_session *session, IMFTop } else { - transform_stream_drop_samples(&topo_node->u.transform.inputs[i]); + transform_stream_drop_samples(stream); drain = TRUE; } } @@ -3229,28 +3237,32 @@ static void session_deliver_sample_to_node(struct media_session *session, IMFTop { for (i = 0; i < topo_node->u.transform.output_count; ++i) { - if ((sample_entry = transform_create_sample(NULL))) - list_add_tail(&topo_node->u.transform.outputs[i].samples, &sample_entry->entry); + struct transform_stream *stream = &topo_node->u.transform.outputs[i]; + + if ((sample_entry = transform_create_sample(NULL))) + list_add_tail(&stream->samples, &sample_entry->entry); } }
/* Push down all available output. */ for (i = 0; i < topo_node->u.transform.output_count; ++i) { + struct transform_stream *stream = &topo_node->u.transform.outputs[i]; + if (FAILED(IMFTopologyNode_GetOutput(node, i, &downstream_node, &downstream_input))) { WARN("Failed to get connected node for output %u.\n", i); continue; }
- LIST_FOR_EACH_ENTRY_SAFE(sample_entry, sample_entry2, &topo_node->u.transform.outputs[i].samples, + LIST_FOR_EACH_ENTRY_SAFE(sample_entry, sample_entry2, &stream->samples, struct sample, entry) { - if (!topo_node->u.transform.outputs[i].requests) + if (!stream->requests) break;
session_deliver_sample_to_node(session, downstream_node, downstream_input, sample_entry->sample); - topo_node->u.transform.outputs[i].requests--; + stream->requests--;
transform_release_sample(sample_entry); } @@ -3259,6 +3271,7 @@ static void session_deliver_sample_to_node(struct media_session *session, IMFTop }
break; + } case MF_TOPOLOGY_TEE_NODE: FIXME("Unhandled downstream node type %d.\n", node_type); break; @@ -3291,20 +3304,22 @@ static void session_deliver_pending_samples(struct media_session *session, IMFTo /* Push down all available output. */ for (i = 0; i < topo_node->u.transform.output_count; ++i) { + struct transform_stream *stream = &topo_node->u.transform.outputs[i]; + if (FAILED(IMFTopologyNode_GetOutput(node, i, &downstream_node, &downstream_input))) { WARN("Failed to get connected node for output %u.\n", i); continue; }
- LIST_FOR_EACH_ENTRY_SAFE(sample_entry, sample_entry2, &topo_node->u.transform.outputs[i].samples, + LIST_FOR_EACH_ENTRY_SAFE(sample_entry, sample_entry2, &stream->samples, struct sample, entry) { - if (!topo_node->u.transform.outputs[i].requests) + if (!stream->requests) break;
session_deliver_sample_to_node(session, downstream_node, downstream_input, sample_entry->sample); - topo_node->u.transform.outputs[i].requests--; + stream->requests--;
transform_release_sample(sample_entry); } @@ -3340,14 +3355,16 @@ static HRESULT session_request_sample_from_node(struct media_session *session, I WARN("Sample request failed, hr %#lx.\n", hr); break; case MF_TOPOLOGY_TRANSFORM_NODE: + { + struct transform_stream *stream = &topo_node->u.transform.outputs[output];
- if (list_empty(&topo_node->u.transform.outputs[output].samples)) + if (list_empty(&stream->samples)) { /* Forward request to upstream node. */ if (SUCCEEDED(hr = IMFTopologyNode_GetInput(node, 0 /* FIXME */, &upstream_node, &upstream_output))) { if (SUCCEEDED(hr = session_request_sample_from_node(session, upstream_node, upstream_output))) - topo_node->u.transform.outputs[output].requests++; + stream->requests++; IMFTopologyNode_Release(upstream_node); } } @@ -3355,7 +3372,7 @@ static HRESULT session_request_sample_from_node(struct media_session *session, I { if (SUCCEEDED(hr = IMFTopologyNode_GetOutput(node, output, &downstream_node, &downstream_input))) { - sample = LIST_ENTRY(list_head(&topo_node->u.transform.outputs[output].samples), struct sample, entry); + sample = LIST_ENTRY(list_head(&stream->samples), struct sample, entry); session_deliver_sample_to_node(session, downstream_node, downstream_input, sample->sample); transform_release_sample(sample); IMFTopologyNode_Release(downstream_node); @@ -3363,6 +3380,7 @@ static HRESULT session_request_sample_from_node(struct media_session *session, I }
break; + } case MF_TOPOLOGY_TEE_NODE: FIXME("Unhandled upstream node type %d.\n", node_type); default:
From: Rémi Bernon rbernon@codeweavers.com
--- dlls/mf/session.c | 137 +++++++++++++++++++++++----------------------- 1 file changed, 70 insertions(+), 67 deletions(-)
diff --git a/dlls/mf/session.c b/dlls/mf/session.c index 3c8fd792cce..9062b799318 100644 --- a/dlls/mf/session.c +++ b/dlls/mf/session.c @@ -672,20 +672,50 @@ static void session_set_caps(struct media_session *session, DWORD caps) IMFMediaEvent_Release(event); }
-static void transform_release_sample(struct sample *sample) +static void transform_stream_push_sample(struct transform_stream *stream, IMFSample *sample, BOOL head) { - list_remove(&sample->entry); - if (sample->sample) - IMFSample_Release(sample->sample); - free(sample); + struct sample *entry; + + if (!(entry = calloc(1, sizeof(*entry)))) + { + ERR("Failed to allocate sample entry.\n"); + return; + } + + entry->sample = sample; + if (entry->sample) + IMFSample_AddRef(entry->sample); + + if (head) + list_add_head(&stream->samples, &entry->entry); + else + list_add_tail(&stream->samples, &entry->entry); +} + +static HRESULT transform_stream_pop_sample(struct transform_stream *stream, IMFSample **sample) +{ + struct sample *entry; + struct list *ptr; + + if (!(ptr = list_head(&stream->samples))) + return E_PENDING; + + entry = LIST_ENTRY(ptr, struct sample, entry); + list_remove(&entry->entry); + *sample = entry->sample; + free(entry); + return S_OK; }
static void transform_stream_drop_samples(struct transform_stream *stream) { - struct sample *sample, *sample2; + IMFSample *sample;
- LIST_FOR_EACH_ENTRY_SAFE(sample, sample2, &stream->samples, struct sample, entry) - transform_release_sample(sample); + while (SUCCEEDED(transform_stream_pop_sample(stream, &sample))) + { + if (sample) + IMFSample_Release(sample); + } }
static void release_topo_node(struct topo_node *node) @@ -3032,20 +3062,6 @@ static void session_set_sink_stream_state(struct media_session *session, IMFStre } }
-static struct sample *transform_create_sample(IMFSample *sample) -{ - struct sample *sample_entry = calloc(1, sizeof(*sample_entry)); - - if (sample_entry) - { - sample_entry->sample = sample; - if (sample_entry->sample) - IMFSample_AddRef(sample_entry->sample); - } - - return sample_entry; -} - static HRESULT transform_get_external_output_sample(const struct media_session *session, struct topo_node *transform, unsigned int output_index, const MFT_OUTPUT_STREAM_INFO *stream_info, IMFSample **sample) { @@ -3096,7 +3112,6 @@ static HRESULT transform_node_pull_samples(const struct media_session *session, { MFT_OUTPUT_STREAM_INFO stream_info; MFT_OUTPUT_DATA_BUFFER *buffers; - struct sample *queued_sample; HRESULT hr = E_UNEXPECTED; DWORD status = 0; unsigned int i; @@ -3137,9 +3152,7 @@ static HRESULT transform_node_pull_samples(const struct media_session *session, { if (session->quality_manager) IMFQualityManager_NotifyProcessOutput(session->quality_manager, node->node, i, buffers[i].pSample); - - queued_sample = transform_create_sample(buffers[i].pSample); - list_add_tail(&stream->samples, &queued_sample->entry); + transform_stream_push_sample(stream, buffers[i].pSample, FALSE); }
if (buffers[i].pSample) @@ -3154,7 +3167,6 @@ static HRESULT transform_node_pull_samples(const struct media_session *session, static void session_deliver_sample_to_node(struct media_session *session, IMFTopologyNode *node, unsigned int input, IMFSample *sample) { - struct sample *sample_entry, *sample_entry2; DWORD stream_id, downstream_input; IMFTopologyNode *downstream_node; struct topo_node *topo_node; @@ -3193,34 +3205,35 @@ static void session_deliver_sample_to_node(struct media_session *session, IMFTop case MF_TOPOLOGY_TRANSFORM_NODE: { struct transform_stream *input_stream = &topo_node->u.transform.inputs[input]; + transform_stream_push_sample(input_stream, sample, FALSE);
transform_node_pull_samples(session, topo_node);
- sample_entry = transform_create_sample(sample); - list_add_tail(&input_stream->samples, &sample_entry->entry); - for (i = 0; i < topo_node->u.transform.input_count; ++i) { struct transform_stream *stream = &topo_node->u.transform.inputs[i];
stream_id = transform_node_get_stream_id(topo_node, FALSE, i); - - LIST_FOR_EACH_ENTRY_SAFE(sample_entry, sample_entry2, &stream->samples, struct sample, entry) + while (SUCCEEDED(hr = transform_stream_pop_sample(stream, &sample))) { - if (sample_entry->sample) - { - if ((hr = IMFTransform_ProcessInput(topo_node->object.transform, stream_id, - sample_entry->sample, 0)) == MF_E_NOTACCEPTING) - break; - if (FAILED(hr)) - WARN("Failed to process input for stream %u/%lu, hr %#lx.\n", i, stream_id, hr); - transform_release_sample(sample_entry); - } - else + if (!sample) { transform_stream_drop_samples(stream); drain = TRUE; + break; + } + + hr = IMFTransform_ProcessInput(topo_node->object.transform, stream_id, sample, 0); + if (hr == MF_E_NOTACCEPTING) + { + transform_stream_push_sample(stream, sample, TRUE); + IMFSample_Release(sample); + break; } + + if (FAILED(hr)) + WARN("Failed to process input for stream %u/%lu, hr %#lx.\n", i, stream_id, hr); + IMFSample_Release(sample); } }
@@ -3238,9 +3251,7 @@ static void session_deliver_sample_to_node(struct media_session *session, IMFTop for (i = 0; i < topo_node->u.transform.output_count; ++i) { struct transform_stream *stream = &topo_node->u.transform.outputs[i]; - - if ((sample_entry = transform_create_sample(NULL))) - list_add_tail(&stream->samples, &sample_entry->entry); + transform_stream_push_sample(stream, NULL, FALSE); } }
@@ -3255,16 +3266,12 @@ static void session_deliver_sample_to_node(struct media_session *session, IMFTop continue; }
- LIST_FOR_EACH_ENTRY_SAFE(sample_entry, sample_entry2, &stream->samples, - struct sample, entry) + while (stream->requests && SUCCEEDED(transform_stream_pop_sample(stream, &sample))) { - if (!stream->requests) - break; - - session_deliver_sample_to_node(session, downstream_node, downstream_input, sample_entry->sample); + session_deliver_sample_to_node(session, downstream_node, downstream_input, sample); stream->requests--; - - transform_release_sample(sample_entry); + if (sample) + IMFSample_Release(sample); }
IMFTopologyNode_Release(downstream_node); @@ -3282,11 +3289,11 @@ static void session_deliver_sample_to_node(struct media_session *session, IMFTop
static void session_deliver_pending_samples(struct media_session *session, IMFTopologyNode *node) { - struct sample *sample_entry, *sample_entry2; IMFTopologyNode *downstream_node; struct topo_node *topo_node; MF_TOPOLOGY_TYPE node_type; DWORD downstream_input; + IMFSample *sample; TOPOID node_id; unsigned int i;
@@ -3312,16 +3319,12 @@ static void session_deliver_pending_samples(struct media_session *session, IMFTo continue; }
- LIST_FOR_EACH_ENTRY_SAFE(sample_entry, sample_entry2, &stream->samples, - struct sample, entry) + while (stream->requests && SUCCEEDED(transform_stream_pop_sample(stream, &sample))) { - if (!stream->requests) - break; - - session_deliver_sample_to_node(session, downstream_node, downstream_input, sample_entry->sample); + session_deliver_sample_to_node(session, downstream_node, downstream_input, sample); stream->requests--; - - transform_release_sample(sample_entry); + if (sample) + IMFSample_Release(sample); }
IMFTopologyNode_Release(downstream_node); @@ -3339,7 +3342,7 @@ static HRESULT session_request_sample_from_node(struct media_session *session, I DWORD downstream_input, upstream_output; struct topo_node *topo_node; MF_TOPOLOGY_TYPE node_type; - struct sample *sample; + IMFSample *sample; TOPOID node_id; HRESULT hr;
@@ -3358,7 +3361,7 @@ static HRESULT session_request_sample_from_node(struct media_session *session, I { struct transform_stream *stream = &topo_node->u.transform.outputs[output];
- if (list_empty(&stream->samples)) + if (FAILED(transform_stream_pop_sample(stream, &sample))) { /* Forward request to upstream node. */ if (SUCCEEDED(hr = IMFTopologyNode_GetInput(node, 0 /* FIXME */, &upstream_node, &upstream_output))) @@ -3372,9 +3375,9 @@ static HRESULT session_request_sample_from_node(struct media_session *session, I { if (SUCCEEDED(hr = IMFTopologyNode_GetOutput(node, output, &downstream_node, &downstream_input))) { - sample = LIST_ENTRY(list_head(&stream->samples), struct sample, entry); - session_deliver_sample_to_node(session, downstream_node, downstream_input, sample->sample); - transform_release_sample(sample); + session_deliver_sample_to_node(session, downstream_node, downstream_input, sample); + if (sample) + IMFSample_Release(sample); IMFTopologyNode_Release(downstream_node); } }
From: Rémi Bernon rbernon@codeweavers.com
--- dlls/mf/session.c | 85 ++++++++++++++++++++++++++--------------------- 1 file changed, 48 insertions(+), 37 deletions(-)
diff --git a/dlls/mf/session.c b/dlls/mf/session.c index 9062b799318..e6e3792db7e 100644 --- a/dlls/mf/session.c +++ b/dlls/mf/session.c @@ -3164,14 +3164,58 @@ static HRESULT transform_node_pull_samples(const struct media_session *session, return hr; }
+static HRESULT transform_node_push_samples(const struct media_session *session, struct topo_node *node) +{ + BOOL drain = FALSE; + HRESULT hr = S_OK; + IMFSample *sample; + UINT i, id; + + for (i = 0; i < node->u.transform.input_count; ++i) + { + struct transform_stream *stream = &node->u.transform.inputs[i]; + + id = transform_node_get_stream_id(node, FALSE, i); + while (SUCCEEDED(hr = transform_stream_pop_sample(stream, &sample))) + { + if (!sample) + { + transform_stream_drop_samples(stream); + drain = TRUE; + break; + } + + hr = IMFTransform_ProcessInput(node->object.transform, id, sample, 0); + if (hr == MF_E_NOTACCEPTING) + { + transform_stream_push_sample(stream, sample, TRUE); + IMFSample_Release(sample); + break; + } + + if (FAILED(hr)) + WARN("Failed to process input for stream %u/%u, hr %#lx.\n", i, id, hr); + IMFSample_Release(sample); + } + } + + if (drain) + { + if (FAILED(hr = IMFTransform_ProcessMessage(node->object.transform, MFT_MESSAGE_COMMAND_DRAIN, 0))) + WARN("Drain command failed for transform, hr %#lx.\n", hr); + return MF_E_END_OF_STREAM; + } + + return hr; +} + static void session_deliver_sample_to_node(struct media_session *session, IMFTopologyNode *node, unsigned int input, IMFSample *sample) { - DWORD stream_id, downstream_input; + DWORD downstream_input; IMFTopologyNode *downstream_node; struct topo_node *topo_node; MF_TOPOLOGY_TYPE node_type; - BOOL drain = FALSE; TOPOID node_id; unsigned int i; HRESULT hr; @@ -3208,45 +3252,12 @@ static void session_deliver_sample_to_node(struct media_session *session, IMFTop transform_stream_push_sample(input_stream, sample, FALSE);
transform_node_pull_samples(session, topo_node); - - for (i = 0; i < topo_node->u.transform.input_count; ++i) - { - struct transform_stream *stream = &topo_node->u.transform.inputs[i]; - - stream_id = transform_node_get_stream_id(topo_node, FALSE, i); - while (SUCCEEDED(hr = transform_stream_pop_sample(stream, &sample))) - { - if (!sample) - { - transform_stream_drop_samples(stream); - drain = TRUE; - break; - } - - hr = IMFTransform_ProcessInput(topo_node->object.transform, stream_id, sample, 0); - if (hr == MF_E_NOTACCEPTING) - { - transform_stream_push_sample(stream, sample, TRUE); - IMFSample_Release(sample); - break; - } - - if (FAILED(hr)) - WARN("Failed to process input for stream %u/%lu, hr %#lx.\n", i, stream_id, hr); - IMFSample_Release(sample); - } - } - - if (drain) - { - if (FAILED(hr = IMFTransform_ProcessMessage(topo_node->object.transform, MFT_MESSAGE_COMMAND_DRAIN, 0))) - WARN("Drain command failed for transform, hr %#lx.\n", hr); - } + hr = transform_node_push_samples(session, topo_node);
transform_node_pull_samples(session, topo_node);
/* Remaining unprocessed input has been discarded, now queue markers for every output. */ - if (drain) + if (hr == MF_E_END_OF_STREAM) { for (i = 0; i < topo_node->u.transform.output_count; ++i) {
From: Rémi Bernon rbernon@codeweavers.com
--- dlls/mf/session.c | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-)
diff --git a/dlls/mf/session.c b/dlls/mf/session.c index e6e3792db7e..586be71a95c 100644 --- a/dlls/mf/session.c +++ b/dlls/mf/session.c @@ -3251,10 +3251,12 @@ static void session_deliver_sample_to_node(struct media_session *session, IMFTop struct transform_stream *input_stream = &topo_node->u.transform.inputs[input]; transform_stream_push_sample(input_stream, sample, FALSE);
- transform_node_pull_samples(session, topo_node); - hr = transform_node_push_samples(session, topo_node); - - transform_node_pull_samples(session, topo_node); + do + { + hr = transform_node_pull_samples(session, topo_node); + if (hr == MF_E_TRANSFORM_NEED_MORE_INPUT) + hr = transform_node_push_samples(session, topo_node); + } while (SUCCEEDED(hr));
/* Remaining unprocessed input has been discarded, now queue markers for every output. */ if (hr == MF_E_END_OF_STREAM)
From: Rémi Bernon rbernon@codeweavers.com
--- dlls/mf/session.c | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-)
diff --git a/dlls/mf/session.c b/dlls/mf/session.c index 586be71a95c..cc1dc7da987 100644 --- a/dlls/mf/session.c +++ b/dlls/mf/session.c @@ -3212,13 +3212,14 @@ static HRESULT transform_node_push_samples(const struct media_session *session, static void session_deliver_sample_to_node(struct media_session *session, IMFTopologyNode *node, unsigned int input, IMFSample *sample) { - DWORD downstream_input; - IMFTopologyNode *downstream_node; + DWORD downstream_input, upstream_output; + IMFTopologyNode *downstream_node, *upstream_node; struct topo_node *topo_node; MF_TOPOLOGY_TYPE node_type; TOPOID node_id; unsigned int i; HRESULT hr; + BOOL need_more;
if (session->quality_manager) IMFQualityManager_NotifyProcessInput(session->quality_manager, node, input, sample); @@ -3269,7 +3270,7 @@ static void session_deliver_sample_to_node(struct media_session *session, IMFTop }
/* Push down all available output. */ - for (i = 0; i < topo_node->u.transform.output_count; ++i) + for (need_more = FALSE, i = 0; i < topo_node->u.transform.output_count; ++i) { struct transform_stream *stream = &topo_node->u.transform.outputs[i];
@@ -3287,9 +3288,18 @@ static void session_deliver_sample_to_node(struct media_session *session, IMFTop IMFSample_Release(sample); }
+ if (stream->requests) + need_more = TRUE; + IMFTopologyNode_Release(downstream_node); }
+ if (need_more && SUCCEEDED(hr = IMFTopologyNode_GetInput(node, 0 /* FIXME */, &upstream_node, &upstream_output))) + { + hr = session_request_sample_from_node(session, upstream_node, upstream_output); + IMFTopologyNode_Release(upstream_node); + } + break; } case MF_TOPOLOGY_TEE_NODE:
v2: Avoid uninitialized variables in `transform_node_push_samples`.
This merge request was closed by Rémi Bernon.