Signed-off-by: Nikolay Sivov nsivov@codeweavers.com --- dlls/mf/session.c | 118 ++++++++++++++++++++++++++++++++++++---------- 1 file changed, 93 insertions(+), 25 deletions(-)
diff --git a/dlls/mf/session.c b/dlls/mf/session.c index 5b9f720815..bc4cac6ef0 100644 --- a/dlls/mf/session.c +++ b/dlls/mf/session.c @@ -76,6 +76,7 @@ enum session_state { SESSION_STATE_STOPPED = 0, SESSION_STATE_STARTING_SOURCES, + SESSION_STATE_PREROLLING_SINKS, SESSION_STATE_STARTING_SINKS, SESSION_STATE_STARTED, SESSION_STATE_PAUSING_SINKS, @@ -93,6 +94,7 @@ enum object_state OBJ_STATE_STOPPED = 0, OBJ_STATE_STARTED, OBJ_STATE_PAUSED, + OBJ_STATE_PREROLLED, OBJ_STATE_INVALID, };
@@ -108,6 +110,7 @@ struct media_sink { struct list entry; IMFMediaSink *sink; + IMFMediaSinkPreroll *preroll; IMFMediaEventGenerator *event_generator; BOOL finalized; }; @@ -168,6 +171,7 @@ enum presentation_flags SESSION_FLAG_SOURCES_SUBSCRIBED = 0x1, SESSION_FLAG_PRESENTATION_CLOCK_SET = 0x2, SESSION_FLAG_FINALIZE_SINKS = 0x4, + SESSION_FLAG_NEEDS_PREROLL = 0x8, };
struct media_session @@ -722,6 +726,8 @@ static void session_clear_presentation(struct media_session *session)
if (sink->sink) IMFMediaSink_Release(sink->sink); + if (sink->preroll) + IMFMediaSinkPreroll_Release(sink->preroll); if (sink->event_generator) IMFMediaEventGenerator_Release(sink->event_generator); heap_free(sink); @@ -982,6 +988,7 @@ static DWORD session_get_object_rate_caps(IUnknown *object) static HRESULT session_add_media_sink(struct media_session *session, IMFTopologyNode *node, IMFMediaSink *sink) { struct media_sink *media_sink; + DWORD flags;
LIST_FOR_EACH_ENTRY(media_sink, &session->presentation.sinks, struct media_sink, entry) { @@ -997,6 +1004,12 @@ static HRESULT session_add_media_sink(struct media_session *session, IMFTopology
IMFMediaSink_QueryInterface(media_sink->sink, &IID_IMFMediaEventGenerator, (void **)&media_sink->event_generator);
+ if (SUCCEEDED(IMFMediaSink_GetCharacteristics(sink, &flags)) && flags & MEDIASINK_CAN_PREROLL) + { + if (SUCCEEDED(IMFMediaSink_QueryInterface(media_sink->sink, &IID_IMFMediaSinkPreroll, (void **)&media_sink->preroll))) + session->presentation.flags |= SESSION_FLAG_NEEDS_PREROLL; + } + list_add_tail(&session->presentation.sinks, &media_sink->entry);
return S_OK; @@ -1915,6 +1928,8 @@ static enum object_state session_get_object_state_for_event(MediaEventType event case MEStreamStopped: case MEStreamSinkStopped: return OBJ_STATE_STOPPED; + case MEStreamSinkPrerolled: + return OBJ_STATE_PREROLLED; default: return OBJ_STATE_INVALID; } @@ -1931,11 +1946,10 @@ static void session_set_consumed_clock(IUnknown *object, IMFPresentationClock *c } }
-static HRESULT session_start_clock(struct media_session *session) +static void session_set_presentation_clock(struct media_session *session) { IMFPresentationTimeSource *time_source = NULL; struct media_source *source; - LONGLONG start_offset = 0; struct media_sink *sink; struct topo_node *node; HRESULT hr; @@ -2007,6 +2021,12 @@ static HRESULT session_start_clock(struct media_session *session)
session->presentation.flags |= SESSION_FLAG_PRESENTATION_CLOCK_SET; } +} + +static HRESULT session_start_clock(struct media_session *session) +{ + LONGLONG start_offset = 0; + HRESULT hr;
if (IsEqualGUID(&session->presentation.time_format, &GUID_NULL)) { @@ -2026,14 +2046,36 @@ static HRESULT session_start_clock(struct media_session *session) return hr; }
+static BOOL session_set_node_object_state(struct media_session *session, IUnknown *object, + MF_TOPOLOGY_TYPE node_type, enum object_state state) +{ + struct topo_node *node; + BOOL changed = FALSE; + + LIST_FOR_EACH_ENTRY(node, &session->presentation.nodes, struct topo_node, entry) + { + if (node->type == node_type && object == node->object.object) + { + changed = node->state != state; + node->state = state; + break; + } + } + + return changed; +}
static void session_set_source_object_state(struct media_session *session, IUnknown *object, MediaEventType event_type) { + IMFStreamSink *stream_sink; struct media_source *src; + struct media_sink *sink; enum object_state state; struct topo_node *node; + unsigned int i, count; BOOL changed = FALSE; + HRESULT hr;
if ((state = session_get_object_state_for_event(event_type)) == OBJ_STATE_INVALID) return; @@ -2058,15 +2100,7 @@ static void session_set_source_object_state(struct media_session *session, IUnkn case MEStreamPaused: case MEStreamStopped:
- LIST_FOR_EACH_ENTRY(node, &session->presentation.nodes, struct topo_node, entry) - { - if (node->type == MF_TOPOLOGY_SOURCESTREAM_NODE && object == node->object.object) - { - changed = node->state != state; - node->state = state; - break; - } - } + changed = session_set_node_object_state(session, object, MF_TOPOLOGY_SOURCESTREAM_NODE, state); default: ; } @@ -2081,7 +2115,44 @@ static void session_set_source_object_state(struct media_session *session, IUnkn break;
session_set_topo_status(session, S_OK, MF_TOPOSTATUS_STARTED_SOURCE); - if (SUCCEEDED(session_start_clock(session))) + + session_set_presentation_clock(session); + + if (session->presentation.flags & SESSION_FLAG_NEEDS_PREROLL) + { + MFTIME preroll_time = 0; + + if (session->presentation.start_position.vt == VT_I8) + preroll_time = session->presentation.start_position.hVal.QuadPart; + + /* Mark stream sinks without prerolling support as PREROLLED to keep state test logic generic. */ + LIST_FOR_EACH_ENTRY(sink, &session->presentation.sinks, struct media_sink, entry) + { + if (sink->preroll) + { + /* FIXME: abort and enter error state on failure. */ + if (FAILED(hr = IMFMediaSinkPreroll_NotifyPreroll(sink->preroll, preroll_time))) + WARN("Preroll notification failed, hr %#x.\n", hr); + } + else + { + if (SUCCEEDED(IMFMediaSink_GetStreamSinkCount(sink->sink, &count))) + { + for (i = 0; i < count; ++i) + { + if (SUCCEEDED(IMFMediaSink_GetStreamSinkByIndex(sink->sink, i, &stream_sink))) + { + session_set_node_object_state(session, (IUnknown *)stream_sink, MF_TOPOLOGY_OUTPUT_NODE, + OBJ_STATE_PREROLLED); + IMFStreamSink_Release(stream_sink); + } + } + } + } + } + session->state = SESSION_STATE_PREROLLING_SINKS; + } + else if (SUCCEEDED(session_start_clock(session))) session->state = SESSION_STATE_STARTING_SINKS;
break; @@ -2137,31 +2208,27 @@ static void session_set_sink_stream_state(struct media_session *session, IMFStre MediaEventType event_type) { struct media_source *source; - struct topo_node *node; enum object_state state; - BOOL changed = FALSE; IMFMediaEvent *event; DWORD caps, flags; + BOOL changed; HRESULT hr;
if ((state = session_get_object_state_for_event(event_type)) == OBJ_STATE_INVALID) return;
- LIST_FOR_EACH_ENTRY(node, &session->presentation.nodes, struct topo_node, entry) - { - if (node->type == MF_TOPOLOGY_OUTPUT_NODE && stream == node->object.sink_stream) - { - changed = node->state != state; - node->state = state; - break; - } - } - - if (!changed) + if (!(changed = session_set_node_object_state(session, (IUnknown *)stream, MF_TOPOLOGY_OUTPUT_NODE, state))) return;
switch (session->state) { + case SESSION_STATE_PREROLLING_SINKS: + if (!session_is_output_nodes_state(session, OBJ_STATE_PREROLLED)) + break; + + if (SUCCEEDED(session_start_clock(session))) + session->state = SESSION_STATE_STARTING_SINKS; + break; case SESSION_STATE_STARTING_SINKS: if (!session_is_output_nodes_state(session, OBJ_STATE_STARTED)) break; @@ -2630,6 +2697,7 @@ static HRESULT WINAPI session_events_callback_Invoke(IMFAsyncCallback *iface, IM case MEStreamSinkStarted: case MEStreamSinkPaused: case MEStreamSinkStopped: + case MEStreamSinkPrerolled:
EnterCriticalSection(&session->cs); session_set_sink_stream_state(session, (IMFStreamSink *)event_source, event_type);
Signed-off-by: Nikolay Sivov nsivov@codeweavers.com --- dlls/mf/tests/mf.c | 62 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+)
diff --git a/dlls/mf/tests/mf.c b/dlls/mf/tests/mf.c index 6c44e4728e..dd6c935b8c 100644 --- a/dlls/mf/tests/mf.c +++ b/dlls/mf/tests/mf.c @@ -735,6 +735,68 @@ static void test_topology(void) IMFTopologyNode_Release(node2); IMFTopologyNode_Release(node);
+ /* Add one node, connect to another that hasn't been added. */ + hr = IMFTopology_Clear(topology); + ok(hr == S_OK, "Failed to clear topology, hr %#x.\n", hr); + + hr = MFCreateTopologyNode(MF_TOPOLOGY_SOURCESTREAM_NODE, &node); + ok(hr == S_OK, "Failed to create topology node, hr %#x.\n", hr); + + hr = MFCreateTopologyNode(MF_TOPOLOGY_OUTPUT_NODE, &node2); + ok(hr == S_OK, "Failed to create topology node, hr %#x.\n", hr); + + hr = IMFTopology_AddNode(topology, node); + ok(hr == S_OK, "Failed to add a node, hr %#x.\n", hr); + + hr = IMFTopology_GetNodeCount(topology, &node_count); + ok(hr == S_OK, "Failed to get node count, hr %#x.\n", hr); + ok(node_count == 1, "Unexpected node count.\n"); + + hr = IMFTopologyNode_ConnectOutput(node, 0, node2, 0); + ok(hr == S_OK, "Failed to connect nodes, hr %#x.\n", hr); + + hr = IMFTopology_GetNodeCount(topology, &node_count); + ok(hr == S_OK, "Failed to get node count, hr %#x.\n", hr); + ok(node_count == 1, "Unexpected node count.\n"); + + IMFTopologyNode_Release(node); + IMFTopologyNode_Release(node2); + + /* Add same node to different topologies. */ + hr = IMFTopology_Clear(topology); + ok(hr == S_OK, "Failed to clear topology, hr %#x.\n", hr); + + hr = MFCreateTopology(&topology2); + ok(hr == S_OK, "Failed to create topology, hr %#x.\n", hr); + + hr = MFCreateTopologyNode(MF_TOPOLOGY_SOURCESTREAM_NODE, &node); + ok(hr == S_OK, "Failed to create topology node, hr %#x.\n", hr); + + hr = IMFTopology_AddNode(topology, node); + ok(hr == S_OK, "Failed to add a node, hr %#x.\n", hr); + EXPECT_REF(node, 2); + + hr = IMFTopology_GetNodeCount(topology, &node_count); + ok(hr == S_OK, "Failed to get node count, hr %#x.\n", hr); + ok(node_count == 1, "Unexpected node count.\n"); + + hr = IMFTopology_GetNodeCount(topology2, &node_count); + ok(hr == S_OK, "Failed to get node count, hr %#x.\n", hr); + ok(node_count == 0, "Unexpected node count.\n"); + + hr = IMFTopology_AddNode(topology2, node); + ok(hr == S_OK, "Failed to add a node, hr %#x.\n", hr); + EXPECT_REF(node, 3); + + hr = IMFTopology_GetNodeCount(topology, &node_count); + ok(hr == S_OK, "Failed to get node count, hr %#x.\n", hr); + ok(node_count == 1, "Unexpected node count.\n"); + + hr = IMFTopology_GetNodeCount(topology2, &node_count); + ok(hr == S_OK, "Failed to get node count, hr %#x.\n", hr); + ok(node_count == 1, "Unexpected node count.\n"); + + IMFTopology_Release(topology2); IMFTopology_Release(topology); }
Signed-off-by: Nikolay Sivov nsivov@codeweavers.com --- dlls/mf/session.c | 260 ++++++++++++++++++++++++++++------------------ 1 file changed, 157 insertions(+), 103 deletions(-)
diff --git a/dlls/mf/session.c b/dlls/mf/session.c index bc4cac6ef0..85bf4e9cba 100644 --- a/dlls/mf/session.c +++ b/dlls/mf/session.c @@ -127,6 +127,11 @@ struct transform_stream unsigned int requests; };
+enum topo_node_flags +{ + TOPO_NODE_END_OF_STREAM = 0x1, +}; + struct topo_node { struct list entry; @@ -134,6 +139,7 @@ struct topo_node TOPOID node_id; IMFTopologyNode *node; enum object_state state; + unsigned int flags; union { IMFMediaStream *source_stream; @@ -651,17 +657,20 @@ static void session_set_caps(struct media_session *session, DWORD caps) IMFMediaEvent_Release(event); }
+static void transform_release_sample(struct sample *sample) +{ + list_remove(&sample->entry); + if (sample->sample) + IMFSample_Release(sample->sample); + heap_free(sample); +} + static void transform_stream_drop_samples(struct transform_stream *stream) { struct sample *sample, *sample2;
LIST_FOR_EACH_ENTRY_SAFE(sample, sample2, &stream->samples, struct sample, entry) - { - list_remove(&sample->entry); - if (sample->sample) - IMFSample_Release(sample->sample); - heap_free(sample); - } + transform_release_sample(sample); }
static void release_topo_node(struct topo_node *node) @@ -2320,9 +2329,22 @@ static DWORD transform_node_get_stream_id(struct topo_node *node, BOOL output, D return map ? map[index] : index; }
-static HRESULT transform_node_get_sample(struct topo_node *node, DWORD output, IMFSample **sample) +static struct sample *transform_create_sample(IMFSample *sample) +{ + struct sample *sample_entry = heap_alloc_zero(sizeof(*sample_entry)); + + if (sample_entry) + { + sample_entry->sample = sample; + if (sample_entry->sample) + IMFSample_AddRef(sample_entry->sample); + } + + return sample_entry; +} + +static HRESULT transform_node_pull_samples(struct topo_node *node) { - struct list *list = &node->u.transform.outputs[output].samples; MFT_OUTPUT_STREAM_INFO stream_info; MFT_OUTPUT_DATA_BUFFER *buffers; struct sample *queued_sample; @@ -2330,17 +2352,6 @@ static HRESULT transform_node_get_sample(struct topo_node *node, DWORD output, I unsigned int i; HRESULT hr;
- *sample = NULL; - - if (!list_empty(list)) - { - queued_sample = LIST_ENTRY(list_head(list), struct sample, entry); - list_remove(&queued_sample->entry); - *sample = queued_sample->sample; - heap_free(queued_sample); - return S_OK; - } - if (!(buffers = heap_calloc(node->u.transform.output_count, sizeof(*buffers)))) return E_OUTOFMEMORY;
@@ -2350,68 +2361,63 @@ static HRESULT transform_node_get_sample(struct topo_node *node, DWORD output, I buffers[i].pSample = NULL; buffers[i].dwStatus = 0; buffers[i].pEvents = NULL; - }
- memset(&stream_info, 0, sizeof(stream_info)); - if (FAILED(hr = IMFTransform_GetOutputStreamInfo(node->object.transform, buffers[output].dwStreamID, &stream_info))) - goto exit; + memset(&stream_info, 0, sizeof(stream_info)); + if (FAILED(hr = IMFTransform_GetOutputStreamInfo(node->object.transform, buffers[i].dwStreamID, &stream_info))) + break;
- if (!(stream_info.dwFlags & MFT_OUTPUT_STREAM_PROVIDES_SAMPLES)) - { - IMFMediaBuffer *buffer = NULL; + if (!(stream_info.dwFlags & MFT_OUTPUT_STREAM_PROVIDES_SAMPLES)) + { + IMFMediaBuffer *buffer = NULL;
- hr = MFCreateAlignedMemoryBuffer(stream_info.cbSize, stream_info.cbAlignment, &buffer); - if (SUCCEEDED(hr)) - hr = MFCreateSample(&buffers[output].pSample); + hr = MFCreateAlignedMemoryBuffer(stream_info.cbSize, stream_info.cbAlignment, &buffer); + if (SUCCEEDED(hr)) + hr = MFCreateSample(&buffers[i].pSample);
- if (SUCCEEDED(hr)) - hr = IMFSample_AddBuffer(buffers[output].pSample, buffer); + if (SUCCEEDED(hr)) + hr = IMFSample_AddBuffer(buffers[i].pSample, buffer);
- if (buffer) - IMFMediaBuffer_Release(buffer); + if (buffer) + IMFMediaBuffer_Release(buffer);
- if (FAILED(hr)) - goto exit; + if (FAILED(hr)) + break; + } }
- hr = IMFTransform_ProcessOutput(node->object.transform, 0, node->u.transform.output_count, buffers, &status); - if (hr == S_OK) - { - /* Collect returned samples for all streams. */ - for (i = 0; i < node->u.transform.output_count; ++i) - { - if (buffers[i].pEvents) - IMFCollection_Release(buffers[i].pEvents); + if (SUCCEEDED(hr)) + hr = IMFTransform_ProcessOutput(node->object.transform, 0, node->u.transform.output_count, buffers, &status);
- if (!buffers[i].pSample) - continue; + /* Collect returned samples for all streams. */ + for (i = 0; i < node->u.transform.output_count; ++i) + { + if (buffers[i].pEvents) + IMFCollection_Release(buffers[i].pEvents);
- if (i == output) - { - *sample = buffers[i].pSample; - } - else - { - queued_sample = heap_alloc(sizeof(*queued_sample)); - queued_sample->sample = buffers[i].pSample; - list_add_tail(&node->u.transform.outputs[i].samples, &queued_sample->entry); - } + if (SUCCEEDED(hr) && !(buffers[i].dwStatus & MFT_OUTPUT_DATA_BUFFER_NO_SAMPLE)) + { + queued_sample = transform_create_sample(buffers[i].pSample); + list_add_tail(&node->u.transform.outputs[i].samples, &queued_sample->entry); } + + if (buffers[i].pSample) + IMFSample_Release(buffers[i].pSample); }
-exit: heap_free(buffers);
return hr; }
-static void session_deliver_sample_to_node(struct media_session *session, IMFTopologyNode *node, DWORD input, +static void session_deliver_sample_to_node(struct media_session *session, IMFTopologyNode *node, unsigned int input, IMFSample *sample) { + struct sample *sample_entry, *sample_entry2; DWORD stream_id, downstream_input; IMFTopologyNode *downstream_node; struct topo_node *topo_node; MF_TOPOLOGY_TYPE node_type; + BOOL drain = FALSE; TOPOID node_id; unsigned int i; HRESULT hr; @@ -2424,48 +2430,92 @@ static void session_deliver_sample_to_node(struct media_session *session, IMFTop switch (node_type) { case MF_TOPOLOGY_OUTPUT_NODE: - if (topo_node->u.sink.requests) + if (sample) + { + if (topo_node->u.sink.requests) + { + if (FAILED(hr = IMFStreamSink_ProcessSample(topo_node->object.sink_stream, sample))) + WARN("Stream sink failed to process sample, hr %#x.\n", hr); + topo_node->u.sink.requests--; + } + } + else if (FAILED(hr = IMFStreamSink_PlaceMarker(topo_node->object.sink_stream, MFSTREAMSINK_MARKER_ENDOFSEGMENT, + NULL, NULL))) { - if (FAILED(hr = IMFStreamSink_ProcessSample(topo_node->object.sink_stream, sample))) - WARN("Sample delivery failed, hr %#x.\n", hr); - topo_node->u.sink.requests--; + WARN("Failed to place sink marker, hr %#x.\n", hr); } break; case MF_TOPOLOGY_TRANSFORM_NODE:
- stream_id = transform_node_get_stream_id(topo_node, FALSE, input); + transform_node_pull_samples(topo_node); + + sample_entry = transform_create_sample(sample); + list_add_tail(&topo_node->u.transform.inputs[input].samples, &sample_entry->entry);
- hr = IMFTransform_ProcessInput(topo_node->object.transform, stream_id, sample, 0); - if (hr == MF_E_NOTACCEPTING) + for (i = 0; i < topo_node->u.transform.input_count; ++i) { - struct sample *sample_entry = heap_alloc(sizeof(*sample_entry)); - sample_entry->sample = sample; - list_add_tail(&topo_node->u.transform.inputs[input].samples, &sample_entry->entry); + stream_id = transform_node_get_stream_id(topo_node, FALSE, i); + LIST_FOR_EACH_ENTRY_SAFE(sample_entry, sample_entry2, &topo_node->u.transform.inputs[i].samples, + struct sample, entry) + { + if (sample_entry->sample) + { + if ((hr = IMFTransform_ProcessInput(topo_node->object.transform, stream_id, + sample_entry->sample, 0)) == MF_E_NOTACCEPTING) + break; + if (FAILED(hr)) + WARN("Failed to process input for stream %u/%u, hr %#x.\n", i, stream_id, hr); + transform_release_sample(sample_entry); + } + else + { + transform_stream_drop_samples(&topo_node->u.transform.inputs[i]); + drain = TRUE; + } + } } - else if (hr == S_OK) + + if (drain) + { + if (FAILED(hr = IMFTransform_ProcessMessage(topo_node->object.transform, MFT_MESSAGE_COMMAND_DRAIN, 0))) + WARN("Drain command failed for transform, hr %#x.\n", hr); + } + + transform_node_pull_samples(topo_node); + + /* Remaining unprocessed input has been discarded, now queue markers for every output. */ + if (drain) { - /* Check if we need new output is available, push it down. */ for (i = 0; i < topo_node->u.transform.output_count; ++i) + { + if ((sample_entry = transform_create_sample(NULL))) + list_add_tail(&topo_node->u.transform.outputs[i].samples, &sample_entry->entry); + } + } + + /* Push down all available output. */ + for (i = 0; i < topo_node->u.transform.output_count; ++i) + { + if (FAILED(IMFTopologyNode_GetOutput(node, i, &downstream_node, &downstream_input))) + { + WARN("Failed to get connected node for output %u.\n", i); + continue; + } + + LIST_FOR_EACH_ENTRY_SAFE(sample_entry, sample_entry2, &topo_node->u.transform.outputs[i].samples, + struct sample, entry) { if (!topo_node->u.transform.outputs[i].requests) - continue; + break;
- sample = NULL; - transform_node_get_sample(topo_node, i, &sample); - if (sample) - { - if (SUCCEEDED(hr = IMFTopologyNode_GetOutput(node, i, &downstream_node, &downstream_input))) - { - session_deliver_sample_to_node(session, downstream_node, downstream_input, sample); - topo_node->u.transform.outputs[i].requests--; - IMFTopologyNode_Release(downstream_node); - } - IMFSample_Release(sample); - } + session_deliver_sample_to_node(session, downstream_node, downstream_input, sample_entry->sample); + topo_node->u.transform.outputs[i].requests--; + + transform_release_sample(sample_entry); } + + IMFTopologyNode_Release(downstream_node); } - else - WARN("Transform failed to process input sample, hr %#x.\n", hr);
break; case MF_TOPOLOGY_TEE_NODE: @@ -2479,10 +2529,10 @@ static void session_deliver_sample_to_node(struct media_session *session, IMFTop static HRESULT session_request_sample_from_node(struct media_session *session, IMFTopologyNode *node, DWORD output) { IMFTopologyNode *downstream_node, *upstream_node; - MF_TOPOLOGY_TYPE node_type; - IMFSample *sample = NULL; + unsigned int downstream_input, upstream_output; struct topo_node *topo_node; - DWORD downstream_input, upstream_output; + MF_TOPOLOGY_TYPE node_type; + struct sample *sample; TOPOID node_id; HRESULT hr;
@@ -2499,20 +2549,9 @@ static HRESULT session_request_sample_from_node(struct media_session *session, I break; case MF_TOPOLOGY_TRANSFORM_NODE:
- hr = transform_node_get_sample(topo_node, output, &sample); - if (sample) - { - if (SUCCEEDED(hr = IMFTopologyNode_GetOutput(node, output, &downstream_node, &downstream_input))) - { - session_deliver_sample_to_node(session, downstream_node, downstream_input, sample); - IMFTopologyNode_Release(downstream_node); - } - IMFSample_Release(sample); - } - - /* Forward request to upstream node. */ - if (hr == MF_E_TRANSFORM_NEED_MORE_INPUT || (hr == S_OK && !sample)) + if (list_empty(&topo_node->u.transform.outputs[output].samples)) { + /* Forward request to upstream node. */ if (SUCCEEDED(hr = IMFTopologyNode_GetInput(node, 0 /* FIXME */, &upstream_node, &upstream_output))) { if (SUCCEEDED(hr = session_request_sample_from_node(session, upstream_node, upstream_output))) @@ -2520,6 +2559,17 @@ static HRESULT session_request_sample_from_node(struct media_session *session, I IMFTopologyNode_Release(upstream_node); } } + else + { + if (SUCCEEDED(hr = IMFTopologyNode_GetOutput(node, output, &downstream_node, &downstream_input))) + { + sample = LIST_ENTRY(list_head(&topo_node->u.transform.outputs[output].samples), struct sample, entry); + session_deliver_sample_to_node(session, downstream_node, downstream_input, sample->sample); + transform_release_sample(sample); + IMFTopologyNode_Release(downstream_node); + } + } + break; case MF_TOPOLOGY_TEE_NODE: FIXME("Unhandled upstream node type %d.\n", node_type); @@ -2567,7 +2617,7 @@ static void session_deliver_sample(struct media_session *session, IMFMediaStream DWORD downstream_input; HRESULT hr;
- if (value->vt != VT_UNKNOWN || !value->punkVal) + if (value && (value->vt != VT_UNKNOWN || !value->punkVal)) { WARN("Unexpected value type %d.\n", value->vt); return; @@ -2585,13 +2635,16 @@ static void session_deliver_sample(struct media_session *session, IMFMediaStream if (!source_node) return;
+ if (!value) + source_node->flags |= TOPO_NODE_END_OF_STREAM; + if (FAILED(hr = IMFTopologyNode_GetOutput(source_node->node, 0, &downstream_node, &downstream_input))) { WARN("Failed to get downstream node connection, hr %#x.\n", hr); return; }
- session_deliver_sample_to_node(session, downstream_node, downstream_input, (IMFSample *)value->punkVal); + session_deliver_sample_to_node(session, downstream_node, downstream_input, value ? (IMFSample *)value->punkVal : NULL); IMFTopologyNode_Release(downstream_node); }
@@ -2712,9 +2765,10 @@ static HRESULT WINAPI session_events_callback_Invoke(IMFAsyncCallback *iface, IM
break; case MEMediaSample: + case MEEndOfStream:
EnterCriticalSection(&session->cs); - session_deliver_sample(session, (IMFMediaStream *)event_source, &value); + session_deliver_sample(session, (IMFMediaStream *)event_source, event_type == MEMediaSample ? &value : NULL); LeaveCriticalSection(&session->cs);
break;
Signed-off-by: Nikolay Sivov nsivov@codeweavers.com --- dlls/mf/session.c | 121 +++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 113 insertions(+), 8 deletions(-)
diff --git a/dlls/mf/session.c b/dlls/mf/session.c index 85bf4e9cba..3474b242a4 100644 --- a/dlls/mf/session.c +++ b/dlls/mf/session.c @@ -98,12 +98,18 @@ enum object_state OBJ_STATE_INVALID, };
+enum media_source_flags +{ + SOURCE_FLAG_END_OF_PRESENTATION = 0x1, +}; + struct media_source { struct list entry; IMFMediaSource *source; IMFPresentationDescriptor *pd; enum object_state state; + unsigned int flags; };
struct media_sink @@ -178,6 +184,7 @@ enum presentation_flags SESSION_FLAG_PRESENTATION_CLOCK_SET = 0x2, SESSION_FLAG_FINALIZE_SINKS = 0x4, SESSION_FLAG_NEEDS_PREROLL = 0x8, + SESSION_FLAG_END_OF_PRESENTATION = 0x10, };
struct media_session @@ -2055,20 +2062,30 @@ static HRESULT session_start_clock(struct media_session *session) return hr; }
+static struct topo_node *session_get_node_object(struct media_session *session, IUnknown *object, + MF_TOPOLOGY_TYPE node_type) +{ + struct topo_node *node = NULL; + + LIST_FOR_EACH_ENTRY(node, &session->presentation.nodes, struct topo_node, entry) + { + if (node->type == node_type && object == node->object.object) + break; + } + + return node; +} + static BOOL session_set_node_object_state(struct media_session *session, IUnknown *object, MF_TOPOLOGY_TYPE node_type, enum object_state state) { struct topo_node *node; BOOL changed = FALSE;
- LIST_FOR_EACH_ENTRY(node, &session->presentation.nodes, struct topo_node, entry) + if ((node = session_get_node_object(session, object, node_type))) { - if (node->type == node_type && object == node->object.object) - { - changed = node->state != state; - node->state = state; - break; - } + changed = node->state != state; + node->state = state; }
return changed; @@ -2680,6 +2697,80 @@ static void session_sink_invalidated(struct media_session *session, IMFMediaEven session_set_topo_status(session, S_OK, MF_TOPOSTATUS_ENDED); }
+static BOOL session_nodes_is_mask_set(struct media_session *session, MF_TOPOLOGY_TYPE node_type, unsigned int flags) +{ + struct media_source *source; + struct topo_node *node; + + if (node_type == MF_TOPOLOGY_MAX) + { + LIST_FOR_EACH_ENTRY(source, &session->presentation.sources, struct media_source, entry) + { + if ((source->flags & flags) != flags) + return FALSE; + } + } + else + { + LIST_FOR_EACH_ENTRY(node, &session->presentation.nodes, struct topo_node, entry) + { + if (node->type == node_type && (node->flags & flags) != flags) + return FALSE; + } + } + + return TRUE; +} + +static void session_raise_end_of_presentation(struct media_session *session) +{ + if (!(session_nodes_is_mask_set(session, MF_TOPOLOGY_SOURCESTREAM_NODE, TOPO_NODE_END_OF_STREAM))) + return; + + if (!(session->presentation.flags & SESSION_FLAG_END_OF_PRESENTATION)) + { + if (session_nodes_is_mask_set(session, MF_TOPOLOGY_MAX, SOURCE_FLAG_END_OF_PRESENTATION)) + { + IMFMediaEventQueue_QueueEventParamVar(session->event_queue, MEEndOfPresentation, &GUID_NULL, S_OK, NULL); + session->presentation.flags |= SESSION_FLAG_END_OF_PRESENTATION; + } + } +} + +static void session_handle_end_of_stream(struct media_session *session, IMFMediaStream *stream) +{ + struct topo_node *node; + + if (!(node = session_get_node_object(session, (IUnknown *)stream, MF_TOPOLOGY_SOURCESTREAM_NODE)) + || node->flags & TOPO_NODE_END_OF_STREAM) + { + return; + } + + session_deliver_sample(session, stream, NULL); + + session_raise_end_of_presentation(session); +} + +static void session_handle_end_of_presentation(struct media_session *session, IMFMediaSource *object) +{ + struct media_source *source; + + LIST_FOR_EACH_ENTRY(source, &session->presentation.sources, struct media_source, entry) + { + if (source->source == object) + { + if (!(source->flags & SOURCE_FLAG_END_OF_PRESENTATION)) + { + source->flags |= SOURCE_FLAG_END_OF_PRESENTATION; + session_raise_end_of_presentation(session); + } + + break; + } + } +} + static HRESULT WINAPI session_events_callback_Invoke(IMFAsyncCallback *iface, IMFAsyncResult *result) { struct media_session *session = impl_from_events_callback_IMFAsyncCallback(iface); @@ -2765,10 +2856,24 @@ static HRESULT WINAPI session_events_callback_Invoke(IMFAsyncCallback *iface, IM
break; case MEMediaSample: + + EnterCriticalSection(&session->cs); + session_deliver_sample(session, (IMFMediaStream *)event_source, &value); + LeaveCriticalSection(&session->cs); + + break; case MEEndOfStream:
EnterCriticalSection(&session->cs); - session_deliver_sample(session, (IMFMediaStream *)event_source, event_type == MEMediaSample ? &value : NULL); + session_handle_end_of_stream(session, (IMFMediaStream *)event_source); + LeaveCriticalSection(&session->cs); + + break; + + case MEEndOfPresentation: + + EnterCriticalSection(&session->cs); + session_handle_end_of_presentation(session, (IMFMediaSource *)event_source); LeaveCriticalSection(&session->cs);
break;