On 9/10/20 11:28 AM, Derek Lesho wrote:
On 9/9/20 6:28 PM, Zebediah Figura wrote:
On 9/8/20 10:47 AM, Derek Lesho wrote:
Signed-off-by: Derek Lesho dlesho@codeweavers.com
dlls/winegstreamer/gst_cbs.c | 45 ++++ dlls/winegstreamer/gst_cbs.h | 8 + dlls/winegstreamer/media_source.c | 416 +++++++++++++++++++++++++++++- 3 files changed, 468 insertions(+), 1 deletion(-)
diff --git a/dlls/winegstreamer/gst_cbs.c b/dlls/winegstreamer/gst_cbs.c index 8f48368c96a..4755f5b42f1 100644 --- a/dlls/winegstreamer/gst_cbs.c +++ b/dlls/winegstreamer/gst_cbs.c @@ -359,3 +359,48 @@ gboolean process_bytestream_pad_event_wrapper(GstPad *pad, GstObject *parent, Gs
return cbdata.u.event_src_data.ret;
}
+GstBusSyncReply watch_source_bus_wrapper(GstBus *bus, GstMessage *message, gpointer user) +{
- struct cb_data cbdata = { WATCH_SOURCE_BUS };
- cbdata.u.watch_bus_data.bus = bus;
- cbdata.u.watch_bus_data.msg = message;
- cbdata.u.watch_bus_data.user = user;
- call_cb(&cbdata);
- return cbdata.u.watch_bus_data.ret;
+}
+void source_stream_added_wrapper(GstElement *bin, GstPad *pad, gpointer user) +{
- struct cb_data cbdata = { SOURCE_STREAM_ADDED };
- cbdata.u.pad_added_data.element = bin;
- cbdata.u.pad_added_data.pad = pad;
- cbdata.u.pad_added_data.user = user;
- call_cb(&cbdata);
+}
The function naming is a bit inconsistent. I'd recommend standardizing on the "source_stream_added" style (i.e. object first, then verb), on the principle of narrowing scope. Same thing for 1/3, really.
(In general I wouldn't be too attached to the gstreamer function names; they're not always the best.)
π
+void source_stream_removed_wrapper(GstElement *element, GstPad *pad, gpointer user) +{
- struct cb_data cbdata = { SOURCE_STREAM_REMOVED };
- cbdata.u.pad_removed_data.element = element;
- cbdata.u.pad_removed_data.pad = pad;
- cbdata.u.pad_removed_data.user = user;
- call_cb(&cbdata);
+}
+void source_all_streams_wrapper(GstElement *element, gpointer user) +{
- struct cb_data cbdata = { SOURCE_ALL_STREAMS };
- cbdata.u.no_more_pads_data.element = element;
- cbdata.u.no_more_pads_data.user = user;
- call_cb(&cbdata);
+}
"all_streams" is a bit confusing at first glance; any reason not to use the GStreamer name "no_more_pads"?
no_more_pads_wrapper is already taken for gstdemux.c
I think the solution to that is to add a namespace discriminator, e.g. "mfplat_source_no_more_pads" (and ideally to both, but renaming all of the quartz callbacks can probably wait for another day), rather than trying to use something nearly synonymous.
[To expand on that a bit, it's probably not a bad idea to be more specific with all of our callback names, and even function names in general. "mfplat_source" or "mf_source" seems better than "source"; I guess quartz should probably get a prefix like "quartz_parser" or "quartz_demuxer", not sure. Naming is hard in winegstreamer.]
diff --git a/dlls/winegstreamer/gst_cbs.h b/dlls/winegstreamer/gst_cbs.h index 10e999feea7..d87cc8c21e9 100644 --- a/dlls/winegstreamer/gst_cbs.h +++ b/dlls/winegstreamer/gst_cbs.h @@ -48,6 +48,10 @@ enum CB_TYPE { QUERY_BYTESTREAM, ACTIVATE_BYTESTREAM_PAD_MODE, PROCESS_BYTESTREAM_PAD_EVENT,
- WATCH_SOURCE_BUS,
- SOURCE_STREAM_ADDED,
- SOURCE_STREAM_REMOVED,
- SOURCE_ALL_STREAMS, MEDIA_SOURCE_MAX, };
@@ -164,5 +168,9 @@ GstFlowReturn pull_from_bytestream_wrapper(GstPad *pad, GstObject *parent, guint gboolean query_bytestream_wrapper(GstPad *pad, GstObject *parent, GstQuery *query) DECLSPEC_HIDDEN; gboolean activate_bytestream_pad_mode_wrapper(GstPad *pad, GstObject *parent, GstPadMode mode, gboolean activate) DECLSPEC_HIDDEN; gboolean process_bytestream_pad_event_wrapper(GstPad *pad, GstObject *parent, GstEvent *event) DECLSPEC_HIDDEN; +GstBusSyncReply watch_source_bus_wrapper(GstBus *bus, GstMessage *message, gpointer user) DECLSPEC_HIDDEN; +void source_stream_added_wrapper(GstElement *bin, GstPad *pad, gpointer user) DECLSPEC_HIDDEN; +void source_stream_removed_wrapper(GstElement *element, GstPad *pad, gpointer user) DECLSPEC_HIDDEN; +void source_all_streams_wrapper(GstElement *element, gpointer user) DECLSPEC_HIDDEN;
#endif diff --git a/dlls/winegstreamer/media_source.c b/dlls/winegstreamer/media_source.c index 6b3bd4a7869..29af2b72def 100644 --- a/dlls/winegstreamer/media_source.c +++ b/dlls/winegstreamer/media_source.c @@ -24,6 +24,7 @@ #include "gst_private.h" #include "gst_cbs.h"
+#include <assert.h> #include <stdarg.h>
#define COBJMACROS @@ -40,21 +41,48 @@
WINE_DEFAULT_DEBUG_CHANNEL(mfplat);
+struct media_stream +{
- IMFMediaStream IMFMediaStream_iface;
- LONG ref;
- struct media_source *parent_source;
- IMFMediaEventQueue *event_queue;
- GstElement *appsink;
- GstPad *their_src, *my_sink;
- enum
- {
STREAM_STUB,
STREAM_INACTIVE,
These two values are set, but not (really) used in this patch.
π
STREAM_SHUTDOWN,
- } state;
+};
- struct media_source { IMFMediaSource IMFMediaSource_iface; LONG ref; IMFMediaEventQueue *event_queue; IMFByteStream *byte_stream;
- GstPad *my_src;
- struct media_stream **streams;
- ULONG stream_count;
- GstBus *bus;
- GstElement *container;
- GstElement *decodebin;
- GstPad *my_src, *their_sink; enum { SOURCE_OPENING, SOURCE_STOPPED, SOURCE_SHUTDOWN, } state;
- HANDLE all_streams_event; };
+static inline struct media_stream *impl_from_IMFMediaStream(IMFMediaStream *iface) +{
- return CONTAINING_RECORD(iface, struct media_stream, IMFMediaStream_iface);
+}
- static inline struct media_source *impl_from_IMFMediaSource(IMFMediaSource *iface) { return CONTAINING_RECORD(iface, struct media_source, IMFMediaSource_iface);
@@ -208,6 +236,243 @@ static gboolean process_bytestream_pad_event(GstPad *pad, GstObject *parent, Gst return TRUE; }
+GstBusSyncReply watch_source_bus(GstBus *bus, GstMessage *message, gpointer user) +{
- struct media_source *source = (struct media_source *) user;
- gchar *dbg_info = NULL;
- GError *err = NULL;
- TRACE("source %p message type %s\n", source, GST_MESSAGE_TYPE_NAME(message));
- switch (message->type)
- {
case GST_MESSAGE_ERROR:
gst_message_parse_error(message, &err, &dbg_info);
ERR("%s: %s\n", GST_OBJECT_NAME(message->src), err->message);
ERR("%s\n", dbg_info);
g_error_free(err);
g_free(dbg_info);
break;
case GST_MESSAGE_WARNING:
gst_message_parse_warning(message, &err, &dbg_info);
WARN("%s: %s\n", GST_OBJECT_NAME(message->src), err->message);
WARN("%s\n", dbg_info);
g_error_free(err);
g_free(dbg_info);
break;
default:
break;
- }
- return GST_BUS_PASS;
+}
There's no async queue used in this patch, so returning GST_BUS_PASS will effectively leak messages.
π
+static HRESULT WINAPI media_stream_QueryInterface(IMFMediaStream *iface, REFIID riid, void **out) +{
- struct media_stream *stream = impl_from_IMFMediaStream(iface);
- TRACE("(%p)->(%s %p)\n", stream, debugstr_guid(riid), out);
- if (IsEqualIID(riid, &IID_IMFMediaStream) ||
IsEqualIID(riid, &IID_IMFMediaEventGenerator) ||
IsEqualIID(riid, &IID_IUnknown))
- {
*out = &stream->IMFMediaStream_iface;
- }
- else
- {
FIXME("(%s, %p)\n", debugstr_guid(riid), out);
*out = NULL;
return E_NOINTERFACE;
- }
- IUnknown_AddRef((IUnknown*)*out);
- return S_OK;
+}
+static ULONG WINAPI media_stream_AddRef(IMFMediaStream *iface) +{
- struct media_stream *stream = impl_from_IMFMediaStream(iface);
- ULONG ref = InterlockedIncrement(&stream->ref);
- TRACE("(%p) ref=%u\n", stream, ref);
- return ref;
+}
+static ULONG WINAPI media_stream_Release(IMFMediaStream *iface) +{
- struct media_stream *stream = impl_from_IMFMediaStream(iface);
- ULONG ref = InterlockedDecrement(&stream->ref);
- TRACE("(%p) ref=%u\n", stream, ref);
- if (!ref)
- {
if (stream->my_sink)
gst_object_unref(GST_OBJECT(stream->my_sink));
if (stream->event_queue)
IMFMediaEventQueue_Release(stream->event_queue);
if (stream->parent_source)
IMFMediaSource_Release(&stream->parent_source->IMFMediaSource_iface);
heap_free(stream);
- }
- return ref;
+}
+static HRESULT WINAPI media_stream_GetEvent(IMFMediaStream *iface, DWORD flags, IMFMediaEvent **event) +{
- struct media_stream *stream = impl_from_IMFMediaStream(iface);
- TRACE("(%p)->(%#x, %p)\n", stream, flags, event);
- if (stream->state == STREAM_SHUTDOWN)
return MF_E_SHUTDOWN;
- return IMFMediaEventQueue_GetEvent(stream->event_queue, flags, event);
+}
+static HRESULT WINAPI media_stream_BeginGetEvent(IMFMediaStream *iface, IMFAsyncCallback *callback, IUnknown *state) +{
- struct media_stream *stream = impl_from_IMFMediaStream(iface);
- TRACE("(%p)->(%p, %p)\n", stream, callback, state);
- if (stream->state == STREAM_SHUTDOWN)
return MF_E_SHUTDOWN;
- return IMFMediaEventQueue_BeginGetEvent(stream->event_queue, callback, state);
+}
+static HRESULT WINAPI media_stream_EndGetEvent(IMFMediaStream *iface, IMFAsyncResult *result, IMFMediaEvent **event) +{
- struct media_stream *stream = impl_from_IMFMediaStream(iface);
- TRACE("(%p)->(%p, %p)\n", stream, result, event);
- if (stream->state == STREAM_SHUTDOWN)
return MF_E_SHUTDOWN;
- return IMFMediaEventQueue_EndGetEvent(stream->event_queue, result, event);
+}
+static HRESULT WINAPI media_stream_QueueEvent(IMFMediaStream *iface, MediaEventType event_type, REFGUID ext_type,
HRESULT hr, const PROPVARIANT *value)
+{
- struct media_stream *stream = impl_from_IMFMediaStream(iface);
- TRACE("(%p)->(%d, %s, %#x, %p)\n", stream, event_type, debugstr_guid(ext_type), hr, value);
- if (stream->state == STREAM_SHUTDOWN)
return MF_E_SHUTDOWN;
- return IMFMediaEventQueue_QueueEventParamVar(stream->event_queue, event_type, ext_type, hr, value);
+}
+static HRESULT WINAPI media_stream_GetMediaSource(IMFMediaStream *iface, IMFMediaSource **source) +{
- struct media_stream *stream = impl_from_IMFMediaStream(iface);
- FIXME("stub (%p)->(%p)\n", stream, source);
- if (stream->state == STREAM_SHUTDOWN)
return MF_E_SHUTDOWN;
- return E_NOTIMPL;
+}
+static HRESULT WINAPI media_stream_GetStreamDescriptor(IMFMediaStream* iface, IMFStreamDescriptor **descriptor) +{
- struct media_stream *stream = impl_from_IMFMediaStream(iface);
- TRACE("(%p)->(%p)\n", stream, descriptor);
- if (stream->state == STREAM_SHUTDOWN)
return MF_E_SHUTDOWN;
- return E_NOTIMPL;
+}
+static HRESULT WINAPI media_stream_RequestSample(IMFMediaStream *iface, IUnknown *token) +{
- struct media_stream *stream = impl_from_IMFMediaStream(iface);
- TRACE("(%p)->(%p)\n", iface, token);
- if (stream->state == STREAM_SHUTDOWN)
return MF_E_SHUTDOWN;
- return E_NOTIMPL;
+}
+static const IMFMediaStreamVtbl media_stream_vtbl = +{
- media_stream_QueryInterface,
- media_stream_AddRef,
- media_stream_Release,
- media_stream_GetEvent,
- media_stream_BeginGetEvent,
- media_stream_EndGetEvent,
- media_stream_QueueEvent,
- media_stream_GetMediaSource,
- media_stream_GetStreamDescriptor,
- media_stream_RequestSample
+};
+/* creates a stub stream */ +static HRESULT new_media_stream(struct media_source *source, GstPad *pad, DWORD stream_id, struct media_stream **out_stream)
You don't use "stream_id" in this patch.
π
+{
- struct media_stream *object = heap_alloc_zero(sizeof(*object));
- HRESULT hr;
- TRACE("(%p %p)->(%p)\n", source, pad, out_stream);
- object->IMFMediaStream_iface.lpVtbl = &media_stream_vtbl;
- object->ref = 1;
- IMFMediaSource_AddRef(&source->IMFMediaSource_iface);
- object->parent_source = source;
- object->their_src = pad;
- object->state = STREAM_STUB;
- if (FAILED(hr = MFCreateEventQueue(&object->event_queue)))
goto fail;
- if (!(object->appsink = gst_element_factory_make("appsink", NULL)))
- {
hr = E_OUTOFMEMORY;
goto fail;
- }
- gst_bin_add(GST_BIN(object->parent_source->container), object->appsink);
- g_object_set(object->appsink, "emit-signals", TRUE, NULL);
I might defer this line until you actually hook up the signals.
π
- g_object_set(object->appsink, "sync", FALSE, NULL);
- g_object_set(object->appsink, "max-buffers", 5, NULL);
So, just to clarify for my understanding, the reason we want to buffer is because the downstream renderer will request samples basically as soon as presentation time hits (through some weird roundabout process) and we want to minimize the latency there?
Nope, the appsink will always buffer, and max-buffers just specifies where to put the limit.Β 5 is just a random value that felt reasonable, and specifying 1 would probably not cause any problems.
I got the impression from some past communication that the reason for using appsink in the first place (instead of just a solitary sink pad) is so that we can buffer, or is there some other reason?
- g_object_set(object->appsink, "wait-on-eos", FALSE, NULL);
If I understand correctly, this means that GStreamer will signal EOS potentially before pushing all buffers; is that what we want?
Yeah, this shouldn't be here, it's a vestige of my much older media source code.
- object->my_sink = gst_element_get_static_pad(object->appsink, "sink");
- gst_pad_set_element_private(object->my_sink, object);
- gst_pad_link(object->their_src, object->my_sink);
- gst_element_sync_state_with_parent(object->appsink);
- TRACE("->(%p)\n", object);
- *out_stream = object;
- return S_OK;
- fail:
Personally I wouldn't bother using gotos in this function, but if you do, it's at least clearer if you put the label at the beginning of the line.
π
- WARN("Failed to construct media stream, hr %#x.\n", hr);
- IMFMediaStream_Release(&object->IMFMediaStream_iface);
- return hr;
+}
- static HRESULT WINAPI media_source_QueryInterface(IMFMediaSource *iface, REFIID riid, void **out) { struct media_source *source = impl_from_IMFMediaSource(iface);
@@ -367,13 +632,34 @@ static HRESULT WINAPI media_source_Shutdown(IMFMediaSource *iface)
source->state = SOURCE_SHUTDOWN;
if (source->container)
{
gst_element_set_state(source->container, GST_STATE_NULL);
gst_object_unref(GST_OBJECT(source->container));
}
if (source->my_src) gst_object_unref(GST_OBJECT(source->my_src));
if (source->their_sink)
gst_object_unref(GST_OBJECT(source->their_sink));
if (source->event_queue) IMFMediaEventQueue_Shutdown(source->event_queue); if (source->byte_stream) IMFByteStream_Release(source->byte_stream);
for (unsigned int i = 0; i < source->stream_count; i++)
Misplaced variable declaration.
π
- {
source->streams[i]->state = STREAM_SHUTDOWN;
IMFMediaStream_Release(&source->streams[i]->IMFMediaStream_iface);
- }
- if (source->stream_count)
heap_free(source->streams);
- if (source->all_streams_event)
CloseHandle(source->all_streams_event);
}return S_OK;
@@ -394,6 +680,63 @@ static const IMFMediaSourceVtbl IMFMediaSource_vtbl = media_source_Shutdown, };
+static void source_stream_added(GstElement *element, GstPad *pad, gpointer user) +{
- struct media_source *source = (struct media_source *) user;
- struct media_stream **new_stream_array;
- struct media_stream *stream;
- gchar *g_stream_id;
- DWORD stream_id;
- if (gst_pad_get_direction(pad) != GST_PAD_SRC)
return;
- /* Most/All seen randomly calculate the initial part of the stream id, the last three digits are the only deterministic part */
- g_stream_id = GST_PAD_NAME(pad);
- sscanf(strstr(g_stream_id, "_"), "_%u", &stream_id);
- TRACE("stream-id: %u\n", stream_id);
I don't know what "stream_id" is supposed to be used for, but depending on the pad name seems wrong.
Yeah, I'm not sure.Β Theoretically we could have an input file with media streams which don't last throughout the whole duration.Β In this case, the approach taken by quartz may not work, if say, for instance, an application seeked halfway through the source, and we started conflating stream-added events with the wrong cached media_stream objects.
I get the vague impression that this is for the identifier returned by IMFStreamDescriptor::GetStreamIdentifier(), in which case I suspect what we want to do is use a source-specific counter, i.e. handle it all on the MF side.
I don't know how Media Foundation handles things like dynamically added streams (frankly; I can't recall offhand what kinds of media files those even show up in, except chained oggs maybe?), so I guess I'd need more context on that to offer any advice there. Of course, if we never need to worry about stream reconnection the way quartz does, it may be a moot point.
[As an aside, I really don't like the way quartz/winegstreamer reconnects its streams; it's very fragile. I haven't looked very deeply into how that could be improved, though.]
- if (FAILED(new_media_stream(source, pad, stream_id, &stream)))
- {
goto leave;
- }
- if (!(new_stream_array = heap_realloc(source->streams, (source->stream_count + 1) * (sizeof(*new_stream_array)))))
- {
ERR("Failed to add stream to source\n");
goto leave;
- }
- source->streams = new_stream_array;
- source->streams[source->stream_count++] = stream;
- leave:
This label is very superfluous.
π π π
- return;
+}
+static void source_stream_removed(GstElement *element, GstPad *pad, gpointer user) +{
- struct media_source *source = (struct media_source *)user;
- for (unsigned int i = 0; i < source->stream_count; i++)
Misplaced variable initializer.
π
- {
struct media_stream *stream = source->streams[i];
if (stream->their_src != pad)
continue;
stream->their_src = NULL;
if (stream->state != STREAM_INACTIVE)
stream->state = STREAM_INACTIVE;
- }
+}
+static void source_all_streams(GstElement *element, gpointer user) +{
- struct media_source *source = (struct media_source *) user;
- SetEvent(source->all_streams_event);
+}
- static HRESULT media_source_constructor(IMFByteStream *bytestream, struct media_source **out_media_source) { GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE(
@@ -404,6 +747,7 @@ static HRESULT media_source_constructor(IMFByteStream *bytestream, struct media_
struct media_source *object = heap_alloc_zero(sizeof(*object)); HRESULT hr;
int ret;
if (!object) return E_OUTOFMEMORY;
@@ -412,10 +756,16 @@ static HRESULT media_source_constructor(IMFByteStream *bytestream, struct media_ object->ref = 1; object->byte_stream = bytestream; IMFByteStream_AddRef(bytestream);
object->all_streams_event = CreateEventA(NULL, FALSE, FALSE, NULL);
if (FAILED(hr = MFCreateEventQueue(&object->event_queue))) goto fail;
object->container = gst_bin_new(NULL);
object->bus = gst_bus_new();
gst_bus_set_sync_handler(object->bus, watch_source_bus_wrapper, object, NULL);
gst_element_set_bus(object->container, object->bus);
object->my_src = gst_pad_new_from_static_template(&src_template, "mf-src"); gst_pad_set_element_private(object->my_src, object); gst_pad_set_getrange_function(object->my_src, pull_from_bytestream_wrapper);
@@ -423,6 +773,46 @@ static HRESULT media_source_constructor(IMFByteStream *bytestream, struct media_ gst_pad_set_activatemode_function(object->my_src, activate_bytestream_pad_mode_wrapper); gst_pad_set_event_function(object->my_src, process_bytestream_pad_event_wrapper);
- object->decodebin = gst_element_factory_make("decodebin", NULL);
- if (!(object->decodebin))
- {
WARN("Failed to create decodebin for source\n");
hr = E_OUTOFMEMORY;
goto fail;
- }
- /* the appsinks determine the maximum amount of buffering instead, this means that if one stream isn't read, a leak will happen, like on windows */
Very long line, and this comment is rather confusing. What you probably want to do is mention the application and describe the problem that it would run into if the default buffering settings were used.
π
g_object_set(object->decodebin, "max-size-buffers", 0, NULL);
g_object_set(object->decodebin, "max-size-time", G_GUINT64_CONSTANT(0), NULL);
g_object_set(object->decodebin, "max-size-bytes", 0, NULL);
gst_bin_add(GST_BIN(object->container), object->decodebin);
g_signal_connect(object->decodebin, "pad-added", G_CALLBACK(source_stream_added_wrapper), object);
g_signal_connect(object->decodebin, "pad-removed", G_CALLBACK(source_stream_removed_wrapper), object);
g_signal_connect(object->decodebin, "no-more-pads", G_CALLBACK(source_all_streams_wrapper), object);
object->their_sink = gst_element_get_static_pad(object->decodebin, "sink");
if ((ret = gst_pad_link(object->my_src, object->their_sink)) < 0)
{
WARN("Failed to link our bytestream pad to the demuxer input\n");
hr = E_OUTOFMEMORY;
goto fail;
}
object->state = SOURCE_OPENING;
gst_element_set_state(object->container, GST_STATE_PAUSED);
ret = gst_element_get_state(object->container, NULL, NULL, -1);
if (ret == GST_STATE_CHANGE_FAILURE)
{
ERR("Failed to play source.\n");
hr = E_OUTOFMEMORY;
goto fail;
}
WaitForSingleObject(object->all_streams_event, INFINITE);
object->state = SOURCE_STOPPED; *out_media_source = object;
@@ -923,6 +1313,30 @@ void perform_cb_media_source(struct cb_data *cbdata) cbdata->u.event_src_data.ret = process_bytestream_pad_event(data->pad, data->parent, data->event); break; }
- case WATCH_SOURCE_BUS:
{
struct watch_bus_data *data = &cbdata->u.watch_bus_data;
cbdata->u.watch_bus_data.ret = watch_source_bus(data->bus, data->msg, data->user);
break;
}
- case SOURCE_STREAM_ADDED:
{
struct pad_added_data *data = &cbdata->u.pad_added_data;
source_stream_added(data->element, data->pad, data->user);
break;
}
- case SOURCE_STREAM_REMOVED:
{
struct pad_removed_data *data = &cbdata->u.pad_removed_data;
source_stream_removed(data->element, data->pad, data->user);
break;
}
- case SOURCE_ALL_STREAMS:
{
struct no_more_pads_data *data = &cbdata->u.no_more_pads_data;
source_all_streams(data->element, data->user);
break;
} default: { ERR("Wrong callback forwarder called\n");