Signed-off-by: Derek Lesho dlesho@codeweavers.com --- dlls/winegstreamer/gst_cbs.c | 44 +++ dlls/winegstreamer/gst_cbs.h | 13 + dlls/winegstreamer/media_source.c | 440 +++++++++++++++++++++++++++++- 3 files changed, 496 insertions(+), 1 deletion(-)
diff --git a/dlls/winegstreamer/gst_cbs.c b/dlls/winegstreamer/gst_cbs.c index 51b017ded6..5038ea3397 100644 --- a/dlls/winegstreamer/gst_cbs.c +++ b/dlls/winegstreamer/gst_cbs.c @@ -372,3 +372,47 @@ GstBusSyncReply watch_source_bus_wrapper(GstBus *bus, GstMessage *message, gpoin
return cbdata.u.watch_bus_data.ret; } + +void source_stream_added_wrapper(GstElement *bin, GstPad *pad, gpointer user) +{ + struct cb_data cbdata = { SOURCE_STREAM_ADDED }; + + cbdata.u.pad_added_data.element = bin; + cbdata.u.pad_added_data.pad = pad; + cbdata.u.pad_added_data.user = user; + + call_cb(&cbdata); +} + +void source_stream_removed_wrapper(GstElement *element, GstPad *pad, gpointer user) +{ + struct cb_data cbdata = { SOURCE_STREAM_REMOVED }; + + cbdata.u.pad_removed_data.element = element; + cbdata.u.pad_removed_data.pad = pad; + cbdata.u.pad_removed_data.user = user; + + call_cb(&cbdata); +} + +void source_all_streams_wrapper(GstElement *element, gpointer user) +{ + struct cb_data cbdata = { SOURCE_ALL_STREAMS }; + + cbdata.u.no_more_pads_data.element = element; + cbdata.u.no_more_pads_data.user = user; + + call_cb(&cbdata); +} + +GstFlowReturn stream_new_sample_wrapper(GstElement *appsink, gpointer user) +{ + struct cb_data cbdata = { STREAM_NEW_SAMPLE }; + + cbdata.u.new_sample_data.appsink = appsink; + cbdata.u.new_sample_data.user = user; + + call_cb(&cbdata); + + return cbdata.u.new_sample_data.ret; +} diff --git a/dlls/winegstreamer/gst_cbs.h b/dlls/winegstreamer/gst_cbs.h index 0d7acaf0b8..106368a064 100644 --- a/dlls/winegstreamer/gst_cbs.h +++ b/dlls/winegstreamer/gst_cbs.h @@ -49,6 +49,10 @@ enum CB_TYPE { ACTIVATE_BYTESTREAM_PAD_MODE, PROCESS_BYTESTREAM_PAD_EVENT, WATCH_SOURCE_BUS, + SOURCE_STREAM_ADDED, + SOURCE_STREAM_REMOVED, + SOURCE_ALL_STREAMS, + STREAM_NEW_SAMPLE, MEDIA_SOURCE_MAX, };
@@ -134,6 +138,11 @@ struct cb_data { GstQuery *query; gboolean ret; } query_sink_data; + struct new_sample_data { + GstElement *appsink; + gpointer user; + GstFlowReturn ret; + } new_sample_data; } u;
int finished; @@ -166,5 +175,9 @@ gboolean query_bytestream_wrapper(GstPad *pad, GstObject *parent, GstQuery *quer gboolean activate_bytestream_pad_mode_wrapper(GstPad *pad, GstObject *parent, GstPadMode mode, gboolean activate) DECLSPEC_HIDDEN; gboolean process_bytestream_pad_event_wrapper(GstPad *pad, GstObject *parent, GstEvent *event) DECLSPEC_HIDDEN; GstBusSyncReply watch_source_bus_wrapper(GstBus *bus, GstMessage *message, gpointer user) DECLSPEC_HIDDEN; +void source_stream_added_wrapper(GstElement *bin, GstPad *pad, gpointer user) DECLSPEC_HIDDEN; +void source_stream_removed_wrapper(GstElement *element, GstPad *pad, gpointer user) DECLSPEC_HIDDEN; +void source_all_streams_wrapper(GstElement *element, gpointer user) DECLSPEC_HIDDEN; +GstFlowReturn stream_new_sample_wrapper(GstElement *appsink, gpointer user) DECLSPEC_HIDDEN;
#endif diff --git a/dlls/winegstreamer/media_source.c b/dlls/winegstreamer/media_source.c index d3ed74876e..1dc3b40b65 100644 --- a/dlls/winegstreamer/media_source.c +++ b/dlls/winegstreamer/media_source.c @@ -5,6 +5,7 @@ #include "gst_private.h" #include "gst_cbs.h"
+#include <assert.h> #include <stdarg.h>
#define COBJMACROS @@ -31,6 +32,28 @@ static struct source_desc } };
+struct media_source; + +struct media_stream +{ + IMFMediaStream IMFMediaStream_iface; + LONG ref; + struct media_source *parent_source; + IMFMediaEventQueue *event_queue; + IMFStreamDescriptor *descriptor; + GstElement *appsink; + GstPad *their_src, *my_sink; + /* usually reflects state of source */ + enum + { + STREAM_INACTIVE, + STREAM_ENABLED, + STREAM_PAUSED, + STREAM_RUNNING, + STREAM_SHUTDOWN, + } state; +}; + struct media_source { IMFMediaSource IMFMediaSource_iface; @@ -52,8 +75,252 @@ struct media_source SOURCE_RUNNING, SOURCE_SHUTDOWN, } state; + CRITICAL_SECTION streams_cs; + HANDLE init_complete_event; };
+/* stream */ + +static inline struct media_stream *impl_from_IMFMediaStream(IMFMediaStream *iface) +{ + return CONTAINING_RECORD(iface, struct media_stream, IMFMediaStream_iface); +} + +static HRESULT WINAPI media_stream_QueryInterface(IMFMediaStream *iface, REFIID riid, void **out) +{ + struct media_stream *This = impl_from_IMFMediaStream(iface); + + TRACE("(%p)->(%s %p)\n", This, debugstr_guid(riid), out); + + if (IsEqualIID(riid, &IID_IMFMediaStream) || + IsEqualIID(riid, &IID_IMFMediaEventGenerator) || + IsEqualIID(riid, &IID_IUnknown)) + { + *out = &This->IMFMediaStream_iface; + } + else + { + FIXME("(%s, %p)\n", debugstr_guid(riid), out); + *out = NULL; + return E_NOINTERFACE; + } + + IUnknown_AddRef((IUnknown*)*out); + return S_OK; +} + +static ULONG WINAPI media_stream_AddRef(IMFMediaStream *iface) +{ + struct media_stream *This = impl_from_IMFMediaStream(iface); + ULONG ref = InterlockedIncrement(&This->ref); + + TRACE("(%p) ref=%u\n", This, ref); + + return ref; +} + +static ULONG WINAPI media_stream_Release(IMFMediaStream *iface) +{ + struct media_stream *This = impl_from_IMFMediaStream(iface); + + ULONG ref = InterlockedDecrement(&This->ref); + + TRACE("(%p) ref=%u\n", This, ref); + + if (!ref) + { + if (This->state != STREAM_SHUTDOWN) + ERR("incomplete cleanup\n"); + heap_free(This); + } + + return ref; +} + +static HRESULT WINAPI media_stream_GetEvent(IMFMediaStream *iface, DWORD flags, IMFMediaEvent **event) +{ + struct media_stream *This = impl_from_IMFMediaStream(iface); + + TRACE("(%p)->(%#x, %p)\n", This, flags, event); + + if (This->state == STREAM_SHUTDOWN) + return MF_E_SHUTDOWN; + + return IMFMediaEventQueue_GetEvent(This->event_queue, flags, event); +} + +static HRESULT WINAPI media_stream_BeginGetEvent(IMFMediaStream *iface, IMFAsyncCallback *callback, IUnknown *state) +{ + struct media_stream *This = impl_from_IMFMediaStream(iface); + + TRACE("(%p)->(%p, %p)\n", This, callback, state); + + if (This->state == STREAM_SHUTDOWN) + return MF_E_SHUTDOWN; + + return IMFMediaEventQueue_BeginGetEvent(This->event_queue, callback, state); +} + +static HRESULT WINAPI media_stream_EndGetEvent(IMFMediaStream *iface, IMFAsyncResult *result, IMFMediaEvent **event) +{ + struct media_stream *This = impl_from_IMFMediaStream(iface); + + TRACE("(%p)->(%p, %p)\n", This, result, event); + + if (This->state == STREAM_SHUTDOWN) + return MF_E_SHUTDOWN; + + return IMFMediaEventQueue_EndGetEvent(This->event_queue, result, event); +} + +static HRESULT WINAPI media_stream_QueueEvent(IMFMediaStream *iface, MediaEventType event_type, REFGUID ext_type, + HRESULT hr, const PROPVARIANT *value) +{ + struct media_stream *This = impl_from_IMFMediaStream(iface); + + TRACE("(%p)->(%d, %s, %#x, %p)\n", This, event_type, debugstr_guid(ext_type), hr, value); + + if (This->state == STREAM_SHUTDOWN) + return MF_E_SHUTDOWN; + + return IMFMediaEventQueue_QueueEventParamVar(This->event_queue, event_type, ext_type, hr, value); +} + +static HRESULT WINAPI media_stream_GetMediaSource(IMFMediaStream *iface, IMFMediaSource **source) +{ + struct media_stream *This = impl_from_IMFMediaStream(iface); + + FIXME("stub (%p)->(%p)\n", This, source); + + if (This->state == STREAM_SHUTDOWN) + return MF_E_SHUTDOWN; + + return E_NOTIMPL; +} + +static HRESULT WINAPI media_stream_GetStreamDescriptor(IMFMediaStream* iface, IMFStreamDescriptor **descriptor) +{ + struct media_stream *This = impl_from_IMFMediaStream(iface); + + TRACE("(%p)->(%p)\n", This, descriptor); + + if (This->state == STREAM_SHUTDOWN) + return MF_E_SHUTDOWN; + + return E_NOTIMPL; +} + +static HRESULT WINAPI media_stream_RequestSample(IMFMediaStream *iface, IUnknown *token) +{ + struct media_stream *This = impl_from_IMFMediaStream(iface); + + TRACE("(%p)->(%p)\n", iface, token); + + if (This->state == STREAM_SHUTDOWN) + return MF_E_SHUTDOWN; + + return E_NOTIMPL; +} + +static const IMFMediaStreamVtbl media_stream_vtbl = +{ + media_stream_QueryInterface, + media_stream_AddRef, + media_stream_Release, + media_stream_GetEvent, + media_stream_BeginGetEvent, + media_stream_EndGetEvent, + media_stream_QueueEvent, + media_stream_GetMediaSource, + media_stream_GetStreamDescriptor, + media_stream_RequestSample +}; + +static GstFlowReturn stream_new_sample(GstElement *appsink, gpointer user) +{ + struct media_stream *This = (struct media_stream *) user; + GstSample *discard_sample; + + TRACE("(%p) got sample\n", This); + + g_signal_emit_by_name(This->appsink, "pull-sample", &discard_sample); + gst_sample_unref(discard_sample); + return GST_FLOW_OK; +} + +static void media_stream_teardown(struct media_stream *This) +{ + TRACE("(%p)\n", This); + + This->state = STREAM_SHUTDOWN; + + if (This->their_src) + gst_object_unref(GST_OBJECT(This->their_src)); + if (This->my_sink) + gst_object_unref(GST_OBJECT(This->my_sink)); + if (This->descriptor) + IMFStreamDescriptor_Release(This->descriptor); + if (This->event_queue) + IMFMediaEventQueue_Release(This->event_queue); + if (This->parent_source) + IMFMediaSource_Release(&This->parent_source->IMFMediaSource_iface); +} + +static HRESULT media_stream_constructor(struct media_source *source, GstPad *pad, DWORD stream_id, struct media_stream **out_stream) +{ + HRESULT hr; + struct media_stream *This = heap_alloc_zero(sizeof(*This)); + + TRACE("(%p %p)->(%p)\n", source, pad, out_stream); + + This->state = STREAM_INACTIVE; + + if (FAILED(hr = IMFMediaSource_AddRef(&source->IMFMediaSource_iface))) + { + goto fail; + } + This->parent_source = source; + + if (FAILED(hr = MFCreateEventQueue(&This->event_queue))) + { + goto fail; + } + + if (!(This->appsink = gst_element_factory_make("appsink", NULL))) + { + hr = E_OUTOFMEMORY; + goto fail; + } + gst_bin_add(GST_BIN(This->parent_source->container), This->appsink); + + g_object_set(This->appsink, "emit-signals", TRUE, NULL); + g_object_set(This->appsink, "sync", FALSE, NULL); + g_signal_connect(This->appsink, "new-sample", G_CALLBACK(stream_new_sample_wrapper), This); + + This->their_src = pad; + This->my_sink = gst_element_get_static_pad(This->appsink, "sink"); + gst_pad_set_element_private(pad, This); + + gst_pad_link(This->their_src, This->my_sink); + + gst_element_sync_state_with_parent(This->appsink); + + This->IMFMediaStream_iface.lpVtbl = &media_stream_vtbl; + This->ref = 1; + + TRACE("->(%p)\n", This); + + *out_stream = This; + return S_OK; + + fail: + WARN("Failed to construct media stream, hr %#x.\n", hr); + + media_stream_teardown(This); + heap_free(This); + return hr; +} + /* source */
static inline struct media_source *impl_from_IMFMediaSource(IMFMediaSource *iface) @@ -235,6 +502,19 @@ static HRESULT media_source_teardown(struct media_source *This) if (This->byte_stream) IMFByteStream_Release(This->byte_stream);
+ for (unsigned int i = 0; i < This->stream_count; i++) + { + media_stream_teardown(This->streams[i]); + IMFMediaStream_Release(&This->streams[i]->IMFMediaStream_iface); + } + + if (This->stream_count) + heap_free(This->streams); + + if (This->init_complete_event) + CloseHandle(This->init_complete_event); + DeleteCriticalSection(&This->streams_cs); + return S_OK; }
@@ -433,6 +713,115 @@ GstBusSyncReply watch_source_bus(GstBus *bus, GstMessage *message, gpointer user return GST_BUS_DROP; }
+static void source_stream_added(GstElement *element, GstPad *pad, gpointer user) +{ + struct media_stream *stream; + struct media_source *source = (struct media_source *) user; + struct media_stream **new_stream_array; + gchar *g_stream_id; + const char *stream_id_string; + DWORD stream_id; + + EnterCriticalSection(&source->streams_cs); + + g_stream_id = gst_pad_get_stream_id(pad); + stream_id_string = strstr(g_stream_id, "/"); + sscanf(stream_id_string, "/%03u", &stream_id); + TRACE("stream-id: %u\n", stream_id); + g_free(g_stream_id); + + /* find existing stream */ + for (unsigned int i = 0; i < source->stream_count; i++) + { + DWORD existing_stream_id; + IMFStreamDescriptor *descriptor = source->streams[i]->descriptor; + + if (FAILED(IMFStreamDescriptor_GetStreamIdentifier(descriptor, &existing_stream_id))) + goto leave; + + if (existing_stream_id == stream_id) + { + struct media_stream *existing_stream = source->streams[i]; + GstPadLinkReturn ret; + + TRACE("Found existing stream %p\n", existing_stream); + + if (!existing_stream->my_sink) + { + ERR("Couldn't find our sink\n"); + goto leave; + } + + existing_stream->their_src = pad; + gst_pad_set_element_private(pad, existing_stream); + + if ((ret = gst_pad_link(existing_stream->their_src, existing_stream->my_sink)) != GST_PAD_LINK_OK) + { + ERR("Error linking demuxer to stream %p, err = %d\n", existing_stream, ret); + } + gst_element_sync_state_with_parent(existing_stream->appsink); + + goto leave; + } + } + + if (FAILED(media_stream_constructor(source, pad, stream_id, &stream))) + { + goto leave; + } + + if (!(new_stream_array = heap_realloc(source->streams, (source->stream_count + 1) * (sizeof(*new_stream_array))))) + { + ERR("Failed to add stream to source\n"); + goto leave; + } + + source->streams = new_stream_array; + source->streams[source->stream_count++] = stream; + + leave: + LeaveCriticalSection(&source->streams_cs); + return; +} + +static void source_stream_removed(GstElement *element, GstPad *pad, gpointer user) +{ + struct media_stream *stream; + + if (gst_pad_get_direction(pad) != GST_PAD_SRC) + { + return; + } + + stream = (struct media_stream *) gst_pad_get_element_private(pad); + + if (stream) + { + TRACE("Stream %p of Source %p removed\n", stream, stream->parent_source); + + assert (stream->their_src == pad); + + gst_pad_unlink(stream->their_src, stream->my_sink); + + stream->their_src = NULL; + gst_pad_set_element_private(pad, NULL); + } +} + +static void source_all_streams(GstElement *element, gpointer user) +{ + struct media_source *source = (struct media_source *) user; + + EnterCriticalSection(&source->streams_cs); + if (source->state != SOURCE_OPENING) + goto leave; + + SetEvent(source->init_complete_event); + + leave: + LeaveCriticalSection(&source->streams_cs); +} + static HRESULT media_source_constructor(IMFByteStream *bytestream, enum source_type type, struct media_source **out_media_source) { GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE( @@ -510,11 +899,36 @@ static HRESULT media_source_constructor(IMFByteStream *bytestream, enum source_t goto fail; }
- This->state = SOURCE_STOPPED; + g_signal_connect(This->demuxer, "pad-added", G_CALLBACK(source_stream_added_wrapper), This); + g_signal_connect(This->demuxer, "pad-removed", G_CALLBACK(source_stream_removed_wrapper), This); + g_signal_connect(This->demuxer, "no-more-pads", G_CALLBACK(source_all_streams_wrapper), This); + + InitializeCriticalSection(&This->streams_cs); + This->init_complete_event = CreateEventA(NULL, TRUE, FALSE, NULL);
+ This->state = SOURCE_OPENING; + + /* Setup interface early as the streams interact with us during initialization */ This->IMFMediaSource_iface.lpVtbl = &IMFMediaSource_vtbl; This->ref = 1;
+ gst_element_set_state(This->container, GST_STATE_PAUSED); + ret = gst_element_get_state(This->container, NULL, NULL, -1); + if (ret == GST_STATE_CHANGE_FAILURE) + { + ERR("Failed to play source.\n"); + hr = E_OUTOFMEMORY; + goto fail; + } + + WaitForSingleObject(This->init_complete_event, INFINITE); + CloseHandle(This->init_complete_event); + This->init_complete_event = NULL; + + gst_element_set_state(This->container, GST_STATE_READY); + + This->state = SOURCE_STOPPED; + *out_media_source = This; return S_OK;
@@ -1027,6 +1441,30 @@ void perform_cb_media_source(struct cb_data *cbdata) cbdata->u.watch_bus_data.ret = watch_source_bus(data->bus, data->msg, data->user); break; } + case SOURCE_STREAM_ADDED: + { + struct pad_added_data *data = &cbdata->u.pad_added_data; + source_stream_added(data->element, data->pad, data->user); + break; + } + case SOURCE_STREAM_REMOVED: + { + struct pad_removed_data *data = &cbdata->u.pad_removed_data; + source_stream_removed(data->element, data->pad, data->user); + break; + } + case SOURCE_ALL_STREAMS: + { + struct no_more_pads_data *data = &cbdata->u.no_more_pads_data; + source_all_streams(data->element, data->user); + break; + } + case STREAM_NEW_SAMPLE: + { + struct new_sample_data *data = &cbdata->u.new_sample_data; + cbdata->u.new_sample_data.ret = stream_new_sample(data->appsink, data->user); + break; + } default: { ERR("Wrong callback forwarder called\n");