On 9/8/20 10:47 AM, Derek Lesho wrote:
Signed-off-by: Derek Lesho dlesho@codeweavers.com
dlls/winegstreamer/gst_cbs.c | 45 ++++ dlls/winegstreamer/gst_cbs.h | 8 + dlls/winegstreamer/media_source.c | 416 +++++++++++++++++++++++++++++- 3 files changed, 468 insertions(+), 1 deletion(-)
diff --git a/dlls/winegstreamer/gst_cbs.c b/dlls/winegstreamer/gst_cbs.c index 8f48368c96a..4755f5b42f1 100644 --- a/dlls/winegstreamer/gst_cbs.c +++ b/dlls/winegstreamer/gst_cbs.c @@ -359,3 +359,48 @@ gboolean process_bytestream_pad_event_wrapper(GstPad *pad, GstObject *parent, Gs
return cbdata.u.event_src_data.ret;
}
+GstBusSyncReply watch_source_bus_wrapper(GstBus *bus, GstMessage *message, gpointer user) +{
- struct cb_data cbdata = { WATCH_SOURCE_BUS };
- cbdata.u.watch_bus_data.bus = bus;
- cbdata.u.watch_bus_data.msg = message;
- cbdata.u.watch_bus_data.user = user;
- call_cb(&cbdata);
- return cbdata.u.watch_bus_data.ret;
+}
+void source_stream_added_wrapper(GstElement *bin, GstPad *pad, gpointer user) +{
- struct cb_data cbdata = { SOURCE_STREAM_ADDED };
- cbdata.u.pad_added_data.element = bin;
- cbdata.u.pad_added_data.pad = pad;
- cbdata.u.pad_added_data.user = user;
- call_cb(&cbdata);
+}
The function naming is a bit inconsistent. I'd recommend standardizing on the "source_stream_added" style (i.e. object first, then verb), on the principle of narrowing scope. Same thing for 1/3, really.
(In general I wouldn't be too attached to the gstreamer function names; they're not always the best.)
+void source_stream_removed_wrapper(GstElement *element, GstPad *pad, gpointer user) +{
- struct cb_data cbdata = { SOURCE_STREAM_REMOVED };
- cbdata.u.pad_removed_data.element = element;
- cbdata.u.pad_removed_data.pad = pad;
- cbdata.u.pad_removed_data.user = user;
- call_cb(&cbdata);
+}
+void source_all_streams_wrapper(GstElement *element, gpointer user) +{
- struct cb_data cbdata = { SOURCE_ALL_STREAMS };
- cbdata.u.no_more_pads_data.element = element;
- cbdata.u.no_more_pads_data.user = user;
- call_cb(&cbdata);
+}
"all_streams" is a bit confusing at first glance; any reason not to use the GStreamer name "no_more_pads"?
diff --git a/dlls/winegstreamer/gst_cbs.h b/dlls/winegstreamer/gst_cbs.h index 10e999feea7..d87cc8c21e9 100644 --- a/dlls/winegstreamer/gst_cbs.h +++ b/dlls/winegstreamer/gst_cbs.h @@ -48,6 +48,10 @@ enum CB_TYPE { QUERY_BYTESTREAM, ACTIVATE_BYTESTREAM_PAD_MODE, PROCESS_BYTESTREAM_PAD_EVENT,
- WATCH_SOURCE_BUS,
- SOURCE_STREAM_ADDED,
- SOURCE_STREAM_REMOVED,
- SOURCE_ALL_STREAMS, MEDIA_SOURCE_MAX,
};
@@ -164,5 +168,9 @@ GstFlowReturn pull_from_bytestream_wrapper(GstPad *pad, GstObject *parent, guint gboolean query_bytestream_wrapper(GstPad *pad, GstObject *parent, GstQuery *query) DECLSPEC_HIDDEN; gboolean activate_bytestream_pad_mode_wrapper(GstPad *pad, GstObject *parent, GstPadMode mode, gboolean activate) DECLSPEC_HIDDEN; gboolean process_bytestream_pad_event_wrapper(GstPad *pad, GstObject *parent, GstEvent *event) DECLSPEC_HIDDEN; +GstBusSyncReply watch_source_bus_wrapper(GstBus *bus, GstMessage *message, gpointer user) DECLSPEC_HIDDEN; +void source_stream_added_wrapper(GstElement *bin, GstPad *pad, gpointer user) DECLSPEC_HIDDEN; +void source_stream_removed_wrapper(GstElement *element, GstPad *pad, gpointer user) DECLSPEC_HIDDEN; +void source_all_streams_wrapper(GstElement *element, gpointer user) DECLSPEC_HIDDEN;
#endif diff --git a/dlls/winegstreamer/media_source.c b/dlls/winegstreamer/media_source.c index 6b3bd4a7869..29af2b72def 100644 --- a/dlls/winegstreamer/media_source.c +++ b/dlls/winegstreamer/media_source.c @@ -24,6 +24,7 @@ #include "gst_private.h" #include "gst_cbs.h"
+#include <assert.h> #include <stdarg.h>
#define COBJMACROS @@ -40,21 +41,48 @@
WINE_DEFAULT_DEBUG_CHANNEL(mfplat);
+struct media_stream +{
- IMFMediaStream IMFMediaStream_iface;
- LONG ref;
- struct media_source *parent_source;
- IMFMediaEventQueue *event_queue;
- GstElement *appsink;
- GstPad *their_src, *my_sink;
- enum
- {
STREAM_STUB,
STREAM_INACTIVE,
These two values are set, but not (really) used in this patch.
STREAM_SHUTDOWN,
- } state;
+};
struct media_source { IMFMediaSource IMFMediaSource_iface; LONG ref; IMFMediaEventQueue *event_queue; IMFByteStream *byte_stream;
- GstPad *my_src;
- struct media_stream **streams;
- ULONG stream_count;
- GstBus *bus;
- GstElement *container;
- GstElement *decodebin;
- GstPad *my_src, *their_sink; enum { SOURCE_OPENING, SOURCE_STOPPED, SOURCE_SHUTDOWN, } state;
- HANDLE all_streams_event;
};
+static inline struct media_stream *impl_from_IMFMediaStream(IMFMediaStream *iface) +{
- return CONTAINING_RECORD(iface, struct media_stream, IMFMediaStream_iface);
+}
static inline struct media_source *impl_from_IMFMediaSource(IMFMediaSource *iface) { return CONTAINING_RECORD(iface, struct media_source, IMFMediaSource_iface); @@ -208,6 +236,243 @@ static gboolean process_bytestream_pad_event(GstPad *pad, GstObject *parent, Gst return TRUE; }
+GstBusSyncReply watch_source_bus(GstBus *bus, GstMessage *message, gpointer user) +{
- struct media_source *source = (struct media_source *) user;
- gchar *dbg_info = NULL;
- GError *err = NULL;
- TRACE("source %p message type %s\n", source, GST_MESSAGE_TYPE_NAME(message));
- switch (message->type)
- {
case GST_MESSAGE_ERROR:
gst_message_parse_error(message, &err, &dbg_info);
ERR("%s: %s\n", GST_OBJECT_NAME(message->src), err->message);
ERR("%s\n", dbg_info);
g_error_free(err);
g_free(dbg_info);
break;
case GST_MESSAGE_WARNING:
gst_message_parse_warning(message, &err, &dbg_info);
WARN("%s: %s\n", GST_OBJECT_NAME(message->src), err->message);
WARN("%s\n", dbg_info);
g_error_free(err);
g_free(dbg_info);
break;
default:
break;
- }
- return GST_BUS_PASS;
+}
There's no async queue used in this patch, so returning GST_BUS_PASS will effectively leak messages.
+static HRESULT WINAPI media_stream_QueryInterface(IMFMediaStream *iface, REFIID riid, void **out) +{
- struct media_stream *stream = impl_from_IMFMediaStream(iface);
- TRACE("(%p)->(%s %p)\n", stream, debugstr_guid(riid), out);
- if (IsEqualIID(riid, &IID_IMFMediaStream) ||
IsEqualIID(riid, &IID_IMFMediaEventGenerator) ||
IsEqualIID(riid, &IID_IUnknown))
- {
*out = &stream->IMFMediaStream_iface;
- }
- else
- {
FIXME("(%s, %p)\n", debugstr_guid(riid), out);
*out = NULL;
return E_NOINTERFACE;
- }
- IUnknown_AddRef((IUnknown*)*out);
- return S_OK;
+}
+static ULONG WINAPI media_stream_AddRef(IMFMediaStream *iface) +{
- struct media_stream *stream = impl_from_IMFMediaStream(iface);
- ULONG ref = InterlockedIncrement(&stream->ref);
- TRACE("(%p) ref=%u\n", stream, ref);
- return ref;
+}
+static ULONG WINAPI media_stream_Release(IMFMediaStream *iface) +{
- struct media_stream *stream = impl_from_IMFMediaStream(iface);
- ULONG ref = InterlockedDecrement(&stream->ref);
- TRACE("(%p) ref=%u\n", stream, ref);
- if (!ref)
- {
if (stream->my_sink)
gst_object_unref(GST_OBJECT(stream->my_sink));
if (stream->event_queue)
IMFMediaEventQueue_Release(stream->event_queue);
if (stream->parent_source)
IMFMediaSource_Release(&stream->parent_source->IMFMediaSource_iface);
heap_free(stream);
- }
- return ref;
+}
+static HRESULT WINAPI media_stream_GetEvent(IMFMediaStream *iface, DWORD flags, IMFMediaEvent **event) +{
- struct media_stream *stream = impl_from_IMFMediaStream(iface);
- TRACE("(%p)->(%#x, %p)\n", stream, flags, event);
- if (stream->state == STREAM_SHUTDOWN)
return MF_E_SHUTDOWN;
- return IMFMediaEventQueue_GetEvent(stream->event_queue, flags, event);
+}
+static HRESULT WINAPI media_stream_BeginGetEvent(IMFMediaStream *iface, IMFAsyncCallback *callback, IUnknown *state) +{
- struct media_stream *stream = impl_from_IMFMediaStream(iface);
- TRACE("(%p)->(%p, %p)\n", stream, callback, state);
- if (stream->state == STREAM_SHUTDOWN)
return MF_E_SHUTDOWN;
- return IMFMediaEventQueue_BeginGetEvent(stream->event_queue, callback, state);
+}
+static HRESULT WINAPI media_stream_EndGetEvent(IMFMediaStream *iface, IMFAsyncResult *result, IMFMediaEvent **event) +{
- struct media_stream *stream = impl_from_IMFMediaStream(iface);
- TRACE("(%p)->(%p, %p)\n", stream, result, event);
- if (stream->state == STREAM_SHUTDOWN)
return MF_E_SHUTDOWN;
- return IMFMediaEventQueue_EndGetEvent(stream->event_queue, result, event);
+}
+static HRESULT WINAPI media_stream_QueueEvent(IMFMediaStream *iface, MediaEventType event_type, REFGUID ext_type,
HRESULT hr, const PROPVARIANT *value)
+{
- struct media_stream *stream = impl_from_IMFMediaStream(iface);
- TRACE("(%p)->(%d, %s, %#x, %p)\n", stream, event_type, debugstr_guid(ext_type), hr, value);
- if (stream->state == STREAM_SHUTDOWN)
return MF_E_SHUTDOWN;
- return IMFMediaEventQueue_QueueEventParamVar(stream->event_queue, event_type, ext_type, hr, value);
+}
+static HRESULT WINAPI media_stream_GetMediaSource(IMFMediaStream *iface, IMFMediaSource **source) +{
- struct media_stream *stream = impl_from_IMFMediaStream(iface);
- FIXME("stub (%p)->(%p)\n", stream, source);
- if (stream->state == STREAM_SHUTDOWN)
return MF_E_SHUTDOWN;
- return E_NOTIMPL;
+}
+static HRESULT WINAPI media_stream_GetStreamDescriptor(IMFMediaStream* iface, IMFStreamDescriptor **descriptor) +{
- struct media_stream *stream = impl_from_IMFMediaStream(iface);
- TRACE("(%p)->(%p)\n", stream, descriptor);
- if (stream->state == STREAM_SHUTDOWN)
return MF_E_SHUTDOWN;
- return E_NOTIMPL;
+}
+static HRESULT WINAPI media_stream_RequestSample(IMFMediaStream *iface, IUnknown *token) +{
- struct media_stream *stream = impl_from_IMFMediaStream(iface);
- TRACE("(%p)->(%p)\n", iface, token);
- if (stream->state == STREAM_SHUTDOWN)
return MF_E_SHUTDOWN;
- return E_NOTIMPL;
+}
+static const IMFMediaStreamVtbl media_stream_vtbl = +{
- media_stream_QueryInterface,
- media_stream_AddRef,
- media_stream_Release,
- media_stream_GetEvent,
- media_stream_BeginGetEvent,
- media_stream_EndGetEvent,
- media_stream_QueueEvent,
- media_stream_GetMediaSource,
- media_stream_GetStreamDescriptor,
- media_stream_RequestSample
+};
+/* creates a stub stream */ +static HRESULT new_media_stream(struct media_source *source, GstPad *pad, DWORD stream_id, struct media_stream **out_stream)
You don't use "stream_id" in this patch.
+{
- struct media_stream *object = heap_alloc_zero(sizeof(*object));
- HRESULT hr;
- TRACE("(%p %p)->(%p)\n", source, pad, out_stream);
- object->IMFMediaStream_iface.lpVtbl = &media_stream_vtbl;
- object->ref = 1;
- IMFMediaSource_AddRef(&source->IMFMediaSource_iface);
- object->parent_source = source;
- object->their_src = pad;
- object->state = STREAM_STUB;
- if (FAILED(hr = MFCreateEventQueue(&object->event_queue)))
goto fail;
- if (!(object->appsink = gst_element_factory_make("appsink", NULL)))
- {
hr = E_OUTOFMEMORY;
goto fail;
- }
- gst_bin_add(GST_BIN(object->parent_source->container), object->appsink);
- g_object_set(object->appsink, "emit-signals", TRUE, NULL);
I might defer this line until you actually hook up the signals.
- g_object_set(object->appsink, "sync", FALSE, NULL);
- g_object_set(object->appsink, "max-buffers", 5, NULL);
So, just to clarify for my understanding, the reason we want to buffer is because the downstream renderer will request samples basically as soon as presentation time hits (through some weird roundabout process) and we want to minimize the latency there?
- g_object_set(object->appsink, "wait-on-eos", FALSE, NULL);
If I understand correctly, this means that GStreamer will signal EOS potentially before pushing all buffers; is that what we want?
- object->my_sink = gst_element_get_static_pad(object->appsink, "sink");
- gst_pad_set_element_private(object->my_sink, object);
- gst_pad_link(object->their_src, object->my_sink);
- gst_element_sync_state_with_parent(object->appsink);
- TRACE("->(%p)\n", object);
- *out_stream = object;
- return S_OK;
- fail:
Personally I wouldn't bother using gotos in this function, but if you do, it's at least clearer if you put the label at the beginning of the line.
- WARN("Failed to construct media stream, hr %#x.\n", hr);
- IMFMediaStream_Release(&object->IMFMediaStream_iface);
- return hr;
+}
static HRESULT WINAPI media_source_QueryInterface(IMFMediaSource *iface, REFIID riid, void **out) { struct media_source *source = impl_from_IMFMediaSource(iface); @@ -367,13 +632,34 @@ static HRESULT WINAPI media_source_Shutdown(IMFMediaSource *iface)
source->state = SOURCE_SHUTDOWN;
if (source->container)
{
gst_element_set_state(source->container, GST_STATE_NULL);
gst_object_unref(GST_OBJECT(source->container));
}
if (source->my_src) gst_object_unref(GST_OBJECT(source->my_src));
if (source->their_sink)
gst_object_unref(GST_OBJECT(source->their_sink));
if (source->event_queue) IMFMediaEventQueue_Shutdown(source->event_queue); if (source->byte_stream) IMFByteStream_Release(source->byte_stream);
for (unsigned int i = 0; i < source->stream_count; i++)
Misplaced variable declaration.
- {
source->streams[i]->state = STREAM_SHUTDOWN;
IMFMediaStream_Release(&source->streams[i]->IMFMediaStream_iface);
- }
- if (source->stream_count)
heap_free(source->streams);
- if (source->all_streams_event)
CloseHandle(source->all_streams_event);
- return S_OK;
}
@@ -394,6 +680,63 @@ static const IMFMediaSourceVtbl IMFMediaSource_vtbl = media_source_Shutdown, };
+static void source_stream_added(GstElement *element, GstPad *pad, gpointer user) +{
- struct media_source *source = (struct media_source *) user;
- struct media_stream **new_stream_array;
- struct media_stream *stream;
- gchar *g_stream_id;
- DWORD stream_id;
- if (gst_pad_get_direction(pad) != GST_PAD_SRC)
return;
- /* Most/All seen randomly calculate the initial part of the stream id, the last three digits are the only deterministic part */
- g_stream_id = GST_PAD_NAME(pad);
- sscanf(strstr(g_stream_id, "_"), "_%u", &stream_id);
- TRACE("stream-id: %u\n", stream_id);
I don't know what "stream_id" is supposed to be used for, but depending on the pad name seems wrong.
- if (FAILED(new_media_stream(source, pad, stream_id, &stream)))
- {
goto leave;
- }
- if (!(new_stream_array = heap_realloc(source->streams, (source->stream_count + 1) * (sizeof(*new_stream_array)))))
- {
ERR("Failed to add stream to source\n");
goto leave;
- }
- source->streams = new_stream_array;
- source->streams[source->stream_count++] = stream;
- leave:
This label is very superfluous.
- return;
+}
+static void source_stream_removed(GstElement *element, GstPad *pad, gpointer user) +{
- struct media_source *source = (struct media_source *)user;
- for (unsigned int i = 0; i < source->stream_count; i++)
Misplaced variable initializer.
- {
struct media_stream *stream = source->streams[i];
if (stream->their_src != pad)
continue;
stream->their_src = NULL;
if (stream->state != STREAM_INACTIVE)
stream->state = STREAM_INACTIVE;
- }
+}
+static void source_all_streams(GstElement *element, gpointer user) +{
- struct media_source *source = (struct media_source *) user;
- SetEvent(source->all_streams_event);
+}
static HRESULT media_source_constructor(IMFByteStream *bytestream, struct media_source **out_media_source) { GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE( @@ -404,6 +747,7 @@ static HRESULT media_source_constructor(IMFByteStream *bytestream, struct media_
struct media_source *object = heap_alloc_zero(sizeof(*object)); HRESULT hr;
int ret;
if (!object) return E_OUTOFMEMORY;
@@ -412,10 +756,16 @@ static HRESULT media_source_constructor(IMFByteStream *bytestream, struct media_ object->ref = 1; object->byte_stream = bytestream; IMFByteStream_AddRef(bytestream);
object->all_streams_event = CreateEventA(NULL, FALSE, FALSE, NULL);
if (FAILED(hr = MFCreateEventQueue(&object->event_queue))) goto fail;
object->container = gst_bin_new(NULL);
object->bus = gst_bus_new();
gst_bus_set_sync_handler(object->bus, watch_source_bus_wrapper, object, NULL);
gst_element_set_bus(object->container, object->bus);
object->my_src = gst_pad_new_from_static_template(&src_template, "mf-src"); gst_pad_set_element_private(object->my_src, object); gst_pad_set_getrange_function(object->my_src, pull_from_bytestream_wrapper);
@@ -423,6 +773,46 @@ static HRESULT media_source_constructor(IMFByteStream *bytestream, struct media_ gst_pad_set_activatemode_function(object->my_src, activate_bytestream_pad_mode_wrapper); gst_pad_set_event_function(object->my_src, process_bytestream_pad_event_wrapper);
- object->decodebin = gst_element_factory_make("decodebin", NULL);
- if (!(object->decodebin))
- {
WARN("Failed to create decodebin for source\n");
hr = E_OUTOFMEMORY;
goto fail;
- }
- /* the appsinks determine the maximum amount of buffering instead, this means that if one stream isn't read, a leak will happen, like on windows */
Very long line, and this comment is rather confusing. What you probably want to do is mention the application and describe the problem that it would run into if the default buffering settings were used.
g_object_set(object->decodebin, "max-size-buffers", 0, NULL);
g_object_set(object->decodebin, "max-size-time", G_GUINT64_CONSTANT(0), NULL);
g_object_set(object->decodebin, "max-size-bytes", 0, NULL);
gst_bin_add(GST_BIN(object->container), object->decodebin);
g_signal_connect(object->decodebin, "pad-added", G_CALLBACK(source_stream_added_wrapper), object);
g_signal_connect(object->decodebin, "pad-removed", G_CALLBACK(source_stream_removed_wrapper), object);
g_signal_connect(object->decodebin, "no-more-pads", G_CALLBACK(source_all_streams_wrapper), object);
object->their_sink = gst_element_get_static_pad(object->decodebin, "sink");
if ((ret = gst_pad_link(object->my_src, object->their_sink)) < 0)
{
WARN("Failed to link our bytestream pad to the demuxer input\n");
hr = E_OUTOFMEMORY;
goto fail;
}
object->state = SOURCE_OPENING;
gst_element_set_state(object->container, GST_STATE_PAUSED);
ret = gst_element_get_state(object->container, NULL, NULL, -1);
if (ret == GST_STATE_CHANGE_FAILURE)
{
ERR("Failed to play source.\n");
hr = E_OUTOFMEMORY;
goto fail;
}
WaitForSingleObject(object->all_streams_event, INFINITE);
object->state = SOURCE_STOPPED;
*out_media_source = object;
@@ -923,6 +1313,30 @@ void perform_cb_media_source(struct cb_data *cbdata) cbdata->u.event_src_data.ret = process_bytestream_pad_event(data->pad, data->parent, data->event); break; }
- case WATCH_SOURCE_BUS:
{
struct watch_bus_data *data = &cbdata->u.watch_bus_data;
cbdata->u.watch_bus_data.ret = watch_source_bus(data->bus, data->msg, data->user);
break;
}
- case SOURCE_STREAM_ADDED:
{
struct pad_added_data *data = &cbdata->u.pad_added_data;
source_stream_added(data->element, data->pad, data->user);
break;
}
- case SOURCE_STREAM_REMOVED:
{
struct pad_removed_data *data = &cbdata->u.pad_removed_data;
source_stream_removed(data->element, data->pad, data->user);
break;
}
- case SOURCE_ALL_STREAMS:
{
struct no_more_pads_data *data = &cbdata->u.no_more_pads_data;
source_all_streams(data->element, data->user);
break;
default: { ERR("Wrong callback forwarder called\n");}