Signed-off-by: Nikolay Sivov nsivov@codeweavers.com --- dlls/mf/session.c | 260 ++++++++++++++++++++++++++++------------------ 1 file changed, 157 insertions(+), 103 deletions(-)
diff --git a/dlls/mf/session.c b/dlls/mf/session.c index bc4cac6ef0..85bf4e9cba 100644 --- a/dlls/mf/session.c +++ b/dlls/mf/session.c @@ -127,6 +127,11 @@ struct transform_stream unsigned int requests; };
+enum topo_node_flags +{ + TOPO_NODE_END_OF_STREAM = 0x1, +}; + struct topo_node { struct list entry; @@ -134,6 +139,7 @@ struct topo_node TOPOID node_id; IMFTopologyNode *node; enum object_state state; + unsigned int flags; union { IMFMediaStream *source_stream; @@ -651,17 +657,20 @@ static void session_set_caps(struct media_session *session, DWORD caps) IMFMediaEvent_Release(event); }
+static void transform_release_sample(struct sample *sample) +{ + list_remove(&sample->entry); + if (sample->sample) + IMFSample_Release(sample->sample); + heap_free(sample); +} + static void transform_stream_drop_samples(struct transform_stream *stream) { struct sample *sample, *sample2;
LIST_FOR_EACH_ENTRY_SAFE(sample, sample2, &stream->samples, struct sample, entry) - { - list_remove(&sample->entry); - if (sample->sample) - IMFSample_Release(sample->sample); - heap_free(sample); - } + transform_release_sample(sample); }
static void release_topo_node(struct topo_node *node) @@ -2320,9 +2329,22 @@ static DWORD transform_node_get_stream_id(struct topo_node *node, BOOL output, D return map ? map[index] : index; }
-static HRESULT transform_node_get_sample(struct topo_node *node, DWORD output, IMFSample **sample) +static struct sample *transform_create_sample(IMFSample *sample) +{ + struct sample *sample_entry = heap_alloc_zero(sizeof(*sample_entry)); + + if (sample_entry) + { + sample_entry->sample = sample; + if (sample_entry->sample) + IMFSample_AddRef(sample_entry->sample); + } + + return sample_entry; +} + +static HRESULT transform_node_pull_samples(struct topo_node *node) { - struct list *list = &node->u.transform.outputs[output].samples; MFT_OUTPUT_STREAM_INFO stream_info; MFT_OUTPUT_DATA_BUFFER *buffers; struct sample *queued_sample; @@ -2330,17 +2352,6 @@ static HRESULT transform_node_get_sample(struct topo_node *node, DWORD output, I unsigned int i; HRESULT hr;
- *sample = NULL; - - if (!list_empty(list)) - { - queued_sample = LIST_ENTRY(list_head(list), struct sample, entry); - list_remove(&queued_sample->entry); - *sample = queued_sample->sample; - heap_free(queued_sample); - return S_OK; - } - if (!(buffers = heap_calloc(node->u.transform.output_count, sizeof(*buffers)))) return E_OUTOFMEMORY;
@@ -2350,68 +2361,63 @@ static HRESULT transform_node_get_sample(struct topo_node *node, DWORD output, I buffers[i].pSample = NULL; buffers[i].dwStatus = 0; buffers[i].pEvents = NULL; - }
- memset(&stream_info, 0, sizeof(stream_info)); - if (FAILED(hr = IMFTransform_GetOutputStreamInfo(node->object.transform, buffers[output].dwStreamID, &stream_info))) - goto exit; + memset(&stream_info, 0, sizeof(stream_info)); + if (FAILED(hr = IMFTransform_GetOutputStreamInfo(node->object.transform, buffers[i].dwStreamID, &stream_info))) + break;
- if (!(stream_info.dwFlags & MFT_OUTPUT_STREAM_PROVIDES_SAMPLES)) - { - IMFMediaBuffer *buffer = NULL; + if (!(stream_info.dwFlags & MFT_OUTPUT_STREAM_PROVIDES_SAMPLES)) + { + IMFMediaBuffer *buffer = NULL;
- hr = MFCreateAlignedMemoryBuffer(stream_info.cbSize, stream_info.cbAlignment, &buffer); - if (SUCCEEDED(hr)) - hr = MFCreateSample(&buffers[output].pSample); + hr = MFCreateAlignedMemoryBuffer(stream_info.cbSize, stream_info.cbAlignment, &buffer); + if (SUCCEEDED(hr)) + hr = MFCreateSample(&buffers[i].pSample);
- if (SUCCEEDED(hr)) - hr = IMFSample_AddBuffer(buffers[output].pSample, buffer); + if (SUCCEEDED(hr)) + hr = IMFSample_AddBuffer(buffers[i].pSample, buffer);
- if (buffer) - IMFMediaBuffer_Release(buffer); + if (buffer) + IMFMediaBuffer_Release(buffer);
- if (FAILED(hr)) - goto exit; + if (FAILED(hr)) + break; + } }
- hr = IMFTransform_ProcessOutput(node->object.transform, 0, node->u.transform.output_count, buffers, &status); - if (hr == S_OK) - { - /* Collect returned samples for all streams. */ - for (i = 0; i < node->u.transform.output_count; ++i) - { - if (buffers[i].pEvents) - IMFCollection_Release(buffers[i].pEvents); + if (SUCCEEDED(hr)) + hr = IMFTransform_ProcessOutput(node->object.transform, 0, node->u.transform.output_count, buffers, &status);
- if (!buffers[i].pSample) - continue; + /* Collect returned samples for all streams. */ + for (i = 0; i < node->u.transform.output_count; ++i) + { + if (buffers[i].pEvents) + IMFCollection_Release(buffers[i].pEvents);
- if (i == output) - { - *sample = buffers[i].pSample; - } - else - { - queued_sample = heap_alloc(sizeof(*queued_sample)); - queued_sample->sample = buffers[i].pSample; - list_add_tail(&node->u.transform.outputs[i].samples, &queued_sample->entry); - } + if (SUCCEEDED(hr) && !(buffers[i].dwStatus & MFT_OUTPUT_DATA_BUFFER_NO_SAMPLE)) + { + queued_sample = transform_create_sample(buffers[i].pSample); + list_add_tail(&node->u.transform.outputs[i].samples, &queued_sample->entry); } + + if (buffers[i].pSample) + IMFSample_Release(buffers[i].pSample); }
-exit: heap_free(buffers);
return hr; }
-static void session_deliver_sample_to_node(struct media_session *session, IMFTopologyNode *node, DWORD input, +static void session_deliver_sample_to_node(struct media_session *session, IMFTopologyNode *node, unsigned int input, IMFSample *sample) { + struct sample *sample_entry, *sample_entry2; DWORD stream_id, downstream_input; IMFTopologyNode *downstream_node; struct topo_node *topo_node; MF_TOPOLOGY_TYPE node_type; + BOOL drain = FALSE; TOPOID node_id; unsigned int i; HRESULT hr; @@ -2424,48 +2430,92 @@ static void session_deliver_sample_to_node(struct media_session *session, IMFTop switch (node_type) { case MF_TOPOLOGY_OUTPUT_NODE: - if (topo_node->u.sink.requests) + if (sample) + { + if (topo_node->u.sink.requests) + { + if (FAILED(hr = IMFStreamSink_ProcessSample(topo_node->object.sink_stream, sample))) + WARN("Stream sink failed to process sample, hr %#x.\n", hr); + topo_node->u.sink.requests--; + } + } + else if (FAILED(hr = IMFStreamSink_PlaceMarker(topo_node->object.sink_stream, MFSTREAMSINK_MARKER_ENDOFSEGMENT, + NULL, NULL))) { - if (FAILED(hr = IMFStreamSink_ProcessSample(topo_node->object.sink_stream, sample))) - WARN("Sample delivery failed, hr %#x.\n", hr); - topo_node->u.sink.requests--; + WARN("Failed to place sink marker, hr %#x.\n", hr); } break; case MF_TOPOLOGY_TRANSFORM_NODE:
- stream_id = transform_node_get_stream_id(topo_node, FALSE, input); + transform_node_pull_samples(topo_node); + + sample_entry = transform_create_sample(sample); + list_add_tail(&topo_node->u.transform.inputs[input].samples, &sample_entry->entry);
- hr = IMFTransform_ProcessInput(topo_node->object.transform, stream_id, sample, 0); - if (hr == MF_E_NOTACCEPTING) + for (i = 0; i < topo_node->u.transform.input_count; ++i) { - struct sample *sample_entry = heap_alloc(sizeof(*sample_entry)); - sample_entry->sample = sample; - list_add_tail(&topo_node->u.transform.inputs[input].samples, &sample_entry->entry); + stream_id = transform_node_get_stream_id(topo_node, FALSE, i); + LIST_FOR_EACH_ENTRY_SAFE(sample_entry, sample_entry2, &topo_node->u.transform.inputs[i].samples, + struct sample, entry) + { + if (sample_entry->sample) + { + if ((hr = IMFTransform_ProcessInput(topo_node->object.transform, stream_id, + sample_entry->sample, 0)) == MF_E_NOTACCEPTING) + break; + if (FAILED(hr)) + WARN("Failed to process input for stream %u/%u, hr %#x.\n", i, stream_id, hr); + transform_release_sample(sample_entry); + } + else + { + transform_stream_drop_samples(&topo_node->u.transform.inputs[i]); + drain = TRUE; + } + } } - else if (hr == S_OK) + + if (drain) + { + if (FAILED(hr = IMFTransform_ProcessMessage(topo_node->object.transform, MFT_MESSAGE_COMMAND_DRAIN, 0))) + WARN("Drain command failed for transform, hr %#x.\n", hr); + } + + transform_node_pull_samples(topo_node); + + /* Remaining unprocessed input has been discarded, now queue markers for every output. */ + if (drain) { - /* Check if we need new output is available, push it down. */ for (i = 0; i < topo_node->u.transform.output_count; ++i) + { + if ((sample_entry = transform_create_sample(NULL))) + list_add_tail(&topo_node->u.transform.outputs[i].samples, &sample_entry->entry); + } + } + + /* Push down all available output. */ + for (i = 0; i < topo_node->u.transform.output_count; ++i) + { + if (FAILED(IMFTopologyNode_GetOutput(node, i, &downstream_node, &downstream_input))) + { + WARN("Failed to get connected node for output %u.\n", i); + continue; + } + + LIST_FOR_EACH_ENTRY_SAFE(sample_entry, sample_entry2, &topo_node->u.transform.outputs[i].samples, + struct sample, entry) { if (!topo_node->u.transform.outputs[i].requests) - continue; + break;
- sample = NULL; - transform_node_get_sample(topo_node, i, &sample); - if (sample) - { - if (SUCCEEDED(hr = IMFTopologyNode_GetOutput(node, i, &downstream_node, &downstream_input))) - { - session_deliver_sample_to_node(session, downstream_node, downstream_input, sample); - topo_node->u.transform.outputs[i].requests--; - IMFTopologyNode_Release(downstream_node); - } - IMFSample_Release(sample); - } + session_deliver_sample_to_node(session, downstream_node, downstream_input, sample_entry->sample); + topo_node->u.transform.outputs[i].requests--; + + transform_release_sample(sample_entry); } + + IMFTopologyNode_Release(downstream_node); } - else - WARN("Transform failed to process input sample, hr %#x.\n", hr);
break; case MF_TOPOLOGY_TEE_NODE: @@ -2479,10 +2529,10 @@ static void session_deliver_sample_to_node(struct media_session *session, IMFTop static HRESULT session_request_sample_from_node(struct media_session *session, IMFTopologyNode *node, DWORD output) { IMFTopologyNode *downstream_node, *upstream_node; - MF_TOPOLOGY_TYPE node_type; - IMFSample *sample = NULL; + unsigned int downstream_input, upstream_output; struct topo_node *topo_node; - DWORD downstream_input, upstream_output; + MF_TOPOLOGY_TYPE node_type; + struct sample *sample; TOPOID node_id; HRESULT hr;
@@ -2499,20 +2549,9 @@ static HRESULT session_request_sample_from_node(struct media_session *session, I break; case MF_TOPOLOGY_TRANSFORM_NODE:
- hr = transform_node_get_sample(topo_node, output, &sample); - if (sample) - { - if (SUCCEEDED(hr = IMFTopologyNode_GetOutput(node, output, &downstream_node, &downstream_input))) - { - session_deliver_sample_to_node(session, downstream_node, downstream_input, sample); - IMFTopologyNode_Release(downstream_node); - } - IMFSample_Release(sample); - } - - /* Forward request to upstream node. */ - if (hr == MF_E_TRANSFORM_NEED_MORE_INPUT || (hr == S_OK && !sample)) + if (list_empty(&topo_node->u.transform.outputs[output].samples)) { + /* Forward request to upstream node. */ if (SUCCEEDED(hr = IMFTopologyNode_GetInput(node, 0 /* FIXME */, &upstream_node, &upstream_output))) { if (SUCCEEDED(hr = session_request_sample_from_node(session, upstream_node, upstream_output))) @@ -2520,6 +2559,17 @@ static HRESULT session_request_sample_from_node(struct media_session *session, I IMFTopologyNode_Release(upstream_node); } } + else + { + if (SUCCEEDED(hr = IMFTopologyNode_GetOutput(node, output, &downstream_node, &downstream_input))) + { + sample = LIST_ENTRY(list_head(&topo_node->u.transform.outputs[output].samples), struct sample, entry); + session_deliver_sample_to_node(session, downstream_node, downstream_input, sample->sample); + transform_release_sample(sample); + IMFTopologyNode_Release(downstream_node); + } + } + break; case MF_TOPOLOGY_TEE_NODE: FIXME("Unhandled upstream node type %d.\n", node_type); @@ -2567,7 +2617,7 @@ static void session_deliver_sample(struct media_session *session, IMFMediaStream DWORD downstream_input; HRESULT hr;
- if (value->vt != VT_UNKNOWN || !value->punkVal) + if (value && (value->vt != VT_UNKNOWN || !value->punkVal)) { WARN("Unexpected value type %d.\n", value->vt); return; @@ -2585,13 +2635,16 @@ static void session_deliver_sample(struct media_session *session, IMFMediaStream if (!source_node) return;
+ if (!value) + source_node->flags |= TOPO_NODE_END_OF_STREAM; + if (FAILED(hr = IMFTopologyNode_GetOutput(source_node->node, 0, &downstream_node, &downstream_input))) { WARN("Failed to get downstream node connection, hr %#x.\n", hr); return; }
- session_deliver_sample_to_node(session, downstream_node, downstream_input, (IMFSample *)value->punkVal); + session_deliver_sample_to_node(session, downstream_node, downstream_input, value ? (IMFSample *)value->punkVal : NULL); IMFTopologyNode_Release(downstream_node); }
@@ -2712,9 +2765,10 @@ static HRESULT WINAPI session_events_callback_Invoke(IMFAsyncCallback *iface, IM
break; case MEMediaSample: + case MEEndOfStream:
EnterCriticalSection(&session->cs); - session_deliver_sample(session, (IMFMediaStream *)event_source, &value); + session_deliver_sample(session, (IMFMediaStream *)event_source, event_type == MEMediaSample ? &value : NULL); LeaveCriticalSection(&session->cs);
break;