-- v2: winegstreamer: Implement wg_muxer_push_sample. winegstreamer: Implement ProcessSample for media sink. winegstreamer: Implement wg_muxer_start. winegstreamer: Implement wg_muxer_add_stream. winegstreamer: Introduce link_src_to_sink. winegstreamer: Use NTSTATUS value in wg_muxer_create.
From: Ziqing Hui zhui@codeweavers.com
--- dlls/winegstreamer/main.c | 8 ++++++-- dlls/winegstreamer/wg_muxer.c | 6 +++--- 2 files changed, 9 insertions(+), 5 deletions(-)
diff --git a/dlls/winegstreamer/main.c b/dlls/winegstreamer/main.c index 853907e1825..beb48fa3731 100644 --- a/dlls/winegstreamer/main.c +++ b/dlls/winegstreamer/main.c @@ -466,13 +466,17 @@ HRESULT wg_muxer_create(const char *format, wg_muxer_t *muxer)
TRACE("format %p, muxer %p.\n", format, muxer);
- if (SUCCEEDED(status = WINE_UNIX_CALL(unix_wg_muxer_create, ¶ms))) + if (!(status = WINE_UNIX_CALL(unix_wg_muxer_create, ¶ms))) { *muxer = params.muxer; TRACE("Created wg_muxer %#I64x.\n", params.muxer); } + else + { + WARN("Failed to create muxer, status %#lx.\n", status); + }
- return status; + return HRESULT_FROM_NT(status); }
void wg_muxer_destroy(wg_muxer_t muxer) diff --git a/dlls/winegstreamer/wg_muxer.c b/dlls/winegstreamer/wg_muxer.c index 6887bc015ba..601c5f03d31 100644 --- a/dlls/winegstreamer/wg_muxer.c +++ b/dlls/winegstreamer/wg_muxer.c @@ -61,14 +61,14 @@ NTSTATUS wg_muxer_create(void *args) { struct wg_muxer_create_params *params = args; GstElement *first = NULL, *last = NULL; + NTSTATUS status = STATUS_UNSUCCESSFUL; GstPadTemplate *template = NULL; GstCaps *sink_caps = NULL; - NTSTATUS status = E_FAIL; struct wg_muxer *muxer;
/* Create wg_muxer object. */ if (!(muxer = calloc(1, sizeof(*muxer)))) - return E_OUTOFMEMORY; + return STATUS_NO_MEMORY; if (!(muxer->container = gst_bin_new("wg_muxer"))) goto out;
@@ -110,7 +110,7 @@ NTSTATUS wg_muxer_create(void *args) GST_INFO("Created winegstreamer muxer %p.", muxer); params->muxer = (wg_transform_t)(ULONG_PTR)muxer;
- return S_OK; + return STATUS_SUCCESS;
out: if (muxer->my_sink)
From: Ziqing Hui zhui@codeweavers.com
--- dlls/winegstreamer/unix_private.h | 1 + dlls/winegstreamer/unixlib.c | 44 ++++++++++++++++++------------- dlls/winegstreamer/wg_parser.c | 7 ++--- 3 files changed, 29 insertions(+), 23 deletions(-)
diff --git a/dlls/winegstreamer/unix_private.h b/dlls/winegstreamer/unix_private.h index 305d69c12a8..eefd3cdb259 100644 --- a/dlls/winegstreamer/unix_private.h +++ b/dlls/winegstreamer/unix_private.h @@ -37,6 +37,7 @@ extern GstStreamType stream_type_from_caps(GstCaps *caps) DECLSPEC_HIDDEN; extern GstElement *create_element(const char *name, const char *plugin_set) DECLSPEC_HIDDEN; extern GstElement *find_element(GstElementFactoryListType type, GstCaps *src_caps, GstCaps *sink_caps) DECLSPEC_HIDDEN; extern bool append_element(GstElement *container, GstElement *element, GstElement **first, GstElement **last) DECLSPEC_HIDDEN; +extern bool link_src_to_sink(GstPad *src_pad, GstPad *sink_pad) DECLSPEC_HIDDEN; extern bool link_src_to_element(GstPad *src_pad, GstElement *element) DECLSPEC_HIDDEN; extern bool link_element_to_sink(GstElement *element, GstPad *sink_pad) DECLSPEC_HIDDEN; extern bool push_event(GstPad *pad, GstEvent *event) DECLSPEC_HIDDEN; diff --git a/dlls/winegstreamer/unixlib.c b/dlls/winegstreamer/unixlib.c index 513ece95a90..4b83983d975 100644 --- a/dlls/winegstreamer/unixlib.c +++ b/dlls/winegstreamer/unixlib.c @@ -162,10 +162,30 @@ bool append_element(GstElement *container, GstElement *element, GstElement **fir return success; }
-bool link_src_to_element(GstPad *src_pad, GstElement *element) +bool link_src_to_sink(GstPad *src_pad, GstPad *sink_pad) { GstPadLinkReturn ret; + + if ((ret = gst_pad_link(src_pad, sink_pad)) != GST_PAD_LINK_OK) + { + gchar *src_name = gst_pad_get_name(src_pad), *sink_name = gst_pad_get_name(sink_pad); + + GST_ERROR("Failed to link src pad %s to sink pad %s, reason: %s", + src_name, sink_name, gst_pad_link_get_name(ret)); + + g_free(sink_name); + g_free(src_name); + + return false; + } + + return true; +} + +bool link_src_to_element(GstPad *src_pad, GstElement *element) +{ GstPad *sink_pad; + bool ret;
if (!(sink_pad = gst_element_get_static_pad(element, "sink"))) { @@ -174,21 +194,15 @@ bool link_src_to_element(GstPad *src_pad, GstElement *element) g_free(name); return false; } - if ((ret = gst_pad_link(src_pad, sink_pad))) - { - gchar *src_name = gst_pad_get_name(src_pad), *sink_name = gst_pad_get_name(sink_pad); - GST_ERROR("Failed to link element pad %s with pad %s", src_name, sink_name); - g_free(sink_name); - g_free(src_name); - } + ret = link_src_to_sink(src_pad, sink_pad); gst_object_unref(sink_pad); - return !ret; + return ret; }
bool link_element_to_sink(GstElement *element, GstPad *sink_pad) { - GstPadLinkReturn ret; GstPad *src_pad; + bool ret;
if (!(src_pad = gst_element_get_static_pad(element, "src"))) { @@ -197,15 +211,9 @@ bool link_element_to_sink(GstElement *element, GstPad *sink_pad) g_free(name); return false; } - if ((ret = gst_pad_link(src_pad, sink_pad))) - { - gchar *src_name = gst_pad_get_name(src_pad), *sink_name = gst_pad_get_name(sink_pad); - GST_ERROR("Failed to link pad %s with element pad %s", src_name, sink_name); - g_free(sink_name); - g_free(src_name); - } + ret = link_src_to_sink(src_pad, sink_pad); gst_object_unref(src_pad); - return !ret; + return ret; }
bool push_event(GstPad *pad, GstEvent *event) diff --git a/dlls/winegstreamer/wg_parser.c b/dlls/winegstreamer/wg_parser.c index 299eea09c90..ebdefb6a7f9 100644 --- a/dlls/winegstreamer/wg_parser.c +++ b/dlls/winegstreamer/wg_parser.c @@ -849,7 +849,6 @@ static bool stream_create_post_processing_elements(GstPad *pad, struct wg_parser struct wg_parser *parser = stream->parser; const char *name; GstCaps *caps; - int ret;
caps = gst_pad_query_caps(pad, NULL); name = gst_structure_get_name(gst_caps_get_structure(caps, 0)); @@ -898,11 +897,9 @@ static bool stream_create_post_processing_elements(GstPad *pad, struct wg_parser if (!link_src_to_element(pad, first) || !link_element_to_sink(last, stream->my_sink)) return false; } - else if ((ret = gst_pad_link(pad, stream->my_sink)) < 0) + else { - GST_ERROR("Failed to link decodebin source pad to our sink pad, error %s.", - gst_pad_link_get_name(ret)); - return false; + return link_src_to_sink(pad, stream->my_sink); }
return true;
From: Ziqing Hui zhui@codeweavers.com
--- dlls/winegstreamer/gst_private.h | 1 + dlls/winegstreamer/main.c | 21 +++++++ dlls/winegstreamer/media_sink.c | 9 +++ dlls/winegstreamer/unix_private.h | 1 + dlls/winegstreamer/unixlib.h | 8 +++ dlls/winegstreamer/wg_muxer.c | 94 +++++++++++++++++++++++++++++++ dlls/winegstreamer/wg_parser.c | 19 +++++++ 7 files changed, 153 insertions(+)
diff --git a/dlls/winegstreamer/gst_private.h b/dlls/winegstreamer/gst_private.h index 8cfadd10bfc..23b59766d0c 100644 --- a/dlls/winegstreamer/gst_private.h +++ b/dlls/winegstreamer/gst_private.h @@ -112,6 +112,7 @@ HRESULT wg_transform_flush(wg_transform_t transform);
HRESULT wg_muxer_create(const char *format, wg_muxer_t *muxer); void wg_muxer_destroy(wg_muxer_t muxer); +HRESULT wg_muxer_add_stream(wg_muxer_t muxer, UINT32 stream_id, const struct wg_format *format);
unsigned int wg_format_get_max_size(const struct wg_format *format);
diff --git a/dlls/winegstreamer/main.c b/dlls/winegstreamer/main.c index beb48fa3731..55817922e9b 100644 --- a/dlls/winegstreamer/main.c +++ b/dlls/winegstreamer/main.c @@ -486,6 +486,27 @@ void wg_muxer_destroy(wg_muxer_t muxer) WINE_UNIX_CALL(unix_wg_muxer_destroy, &muxer); }
+HRESULT wg_muxer_add_stream(wg_muxer_t muxer, UINT32 stream_id, const struct wg_format *format) +{ + struct wg_muxer_add_stream_params params = + { + .muxer = muxer, + .stream_id = stream_id, + .format = format, + }; + NTSTATUS status; + + TRACE("muxer %#I64x, stream_id %u, format %p.\n", muxer, stream_id, format); + + if ((status = WINE_UNIX_CALL(unix_wg_muxer_add_stream, ¶ms))) + { + WARN("Failed to add stream, status %#lx.\n", status); + return HRESULT_FROM_NT(status); + } + + return S_OK; +} + #define ALIGN(n, alignment) (((n) + (alignment) - 1) & ~((alignment) - 1))
unsigned int wg_format_get_stride(const struct wg_format *format) diff --git a/dlls/winegstreamer/media_sink.c b/dlls/winegstreamer/media_sink.c index 6bd9fdcfb7d..344134d1633 100644 --- a/dlls/winegstreamer/media_sink.c +++ b/dlls/winegstreamer/media_sink.c @@ -588,6 +588,7 @@ static HRESULT WINAPI media_sink_AddStreamSink(IMFFinalizableMediaSink *iface, D { struct media_sink *media_sink = impl_from_IMFFinalizableMediaSink(iface); struct stream_sink *object; + struct wg_format format; HRESULT hr;
TRACE("iface %p, stream_sink_id %#lx, media_type %p, stream_sink %p.\n", @@ -608,6 +609,14 @@ static HRESULT WINAPI media_sink_AddStreamSink(IMFFinalizableMediaSink *iface, D return hr; }
+ mf_media_type_to_wg_format(media_type, &format); + if (FAILED(hr = wg_muxer_add_stream(media_sink->muxer, stream_sink_id, &format))) + { + LeaveCriticalSection(&media_sink->cs); + IMFStreamSink_Release(&object->IMFStreamSink_iface); + return hr; + } + list_add_tail(&media_sink->stream_sinks, &object->entry);
LeaveCriticalSection(&media_sink->cs); diff --git a/dlls/winegstreamer/unix_private.h b/dlls/winegstreamer/unix_private.h index eefd3cdb259..1f594200c2e 100644 --- a/dlls/winegstreamer/unix_private.h +++ b/dlls/winegstreamer/unix_private.h @@ -63,6 +63,7 @@ extern NTSTATUS wg_transform_flush(void *args) DECLSPEC_HIDDEN;
extern NTSTATUS wg_muxer_create(void *args) DECLSPEC_HIDDEN; extern NTSTATUS wg_muxer_destroy(void *args) DECLSPEC_HIDDEN; +extern NTSTATUS wg_muxer_add_stream(void *args) DECLSPEC_HIDDEN;
/* wg_allocator.c */
diff --git a/dlls/winegstreamer/unixlib.h b/dlls/winegstreamer/unixlib.h index a3131e9f789..d00790c6b05 100644 --- a/dlls/winegstreamer/unixlib.h +++ b/dlls/winegstreamer/unixlib.h @@ -372,6 +372,13 @@ struct wg_muxer_create_params const char *format; };
+struct wg_muxer_add_stream_params +{ + wg_muxer_t muxer; + UINT32 stream_id; + const struct wg_format *format; +}; + enum unix_funcs { unix_wg_init_gstreamer, @@ -414,6 +421,7 @@ enum unix_funcs
unix_wg_muxer_create, unix_wg_muxer_destroy, + unix_wg_muxer_add_stream,
unix_wg_funcs_count, }; diff --git a/dlls/winegstreamer/wg_muxer.c b/dlls/winegstreamer/wg_muxer.c index 601c5f03d31..c6e56793dc2 100644 --- a/dlls/winegstreamer/wg_muxer.c +++ b/dlls/winegstreamer/wg_muxer.c @@ -22,16 +22,34 @@ #pragma makedep unix #endif
+#include <stdio.h> + #include "ntstatus.h" #define WIN32_NO_STATUS #include "winternl.h"
#include "unix_private.h"
+#include "wine/list.h" + struct wg_muxer { GstElement *container, *muxer; GstPad *my_sink; + struct list streams; +}; + +struct wg_muxer_stream +{ + struct wg_muxer *muxer; + struct wg_format format; + uint32_t id; + + GstPad *my_src; + GstCaps *my_src_caps; + GstSegment segment; + + struct list entry; };
static struct wg_muxer *get_muxer(wg_muxer_t muxer) @@ -39,6 +57,23 @@ static struct wg_muxer *get_muxer(wg_muxer_t muxer) return (struct wg_muxer *)(ULONG_PTR)muxer; }
+static gboolean muxer_src_query_cb(GstPad *pad, GstObject *parent, GstQuery *query) +{ + struct wg_muxer_stream *stream = gst_pad_get_element_private(pad); + + GST_DEBUG("pad %p, parent %p, query %p, stream %u, type "%s".", + pad, parent, query, stream->id, gst_query_type_get_name(query->type)); + + switch (query->type) + { + case GST_QUERY_LATENCY: + gst_query_set_latency(query, true, 0, 0); + return true; + default: + return gst_pad_query_default(pad, parent, query); + } +} + static gboolean muxer_sink_query_cb(GstPad *pad, GstObject *parent, GstQuery *query) { struct wg_muxer *muxer = gst_pad_get_element_private(pad); @@ -69,6 +104,7 @@ NTSTATUS wg_muxer_create(void *args) /* Create wg_muxer object. */ if (!(muxer = calloc(1, sizeof(*muxer)))) return STATUS_NO_MEMORY; + list_init(&muxer->streams); if (!(muxer->container = gst_bin_new("wg_muxer"))) goto out;
@@ -132,7 +168,15 @@ out: NTSTATUS wg_muxer_destroy(void *args) { struct wg_muxer *muxer = get_muxer(*(wg_muxer_t *)args); + struct wg_muxer_stream *stream, *next;
+ LIST_FOR_EACH_ENTRY_SAFE(stream, next, &muxer->streams, struct wg_muxer_stream, entry) + { + list_remove(&stream->entry); + gst_object_unref(stream->my_src); + gst_caps_unref(stream->my_src_caps); + free(stream); + } gst_object_unref(muxer->my_sink); gst_element_set_state(muxer->container, GST_STATE_NULL); gst_object_unref(muxer->container); @@ -140,3 +184,53 @@ NTSTATUS wg_muxer_destroy(void *args)
return S_OK; } + +NTSTATUS wg_muxer_add_stream(void *args) +{ + struct wg_muxer_add_stream_params *params = args; + struct wg_muxer *muxer = get_muxer(params->muxer); + NTSTATUS status = STATUS_UNSUCCESSFUL; + GstPadTemplate *template = NULL; + struct wg_muxer_stream *stream; + char src_pad_name[64]; + + GST_DEBUG("muxer %p, stream_id %u, format %p.", muxer, params->stream_id, params->format); + + /* Create stream object. */ + if (!(stream = calloc(1, sizeof(*stream)))) + return STATUS_NO_MEMORY; + stream->muxer = muxer; + stream->format = *params->format; + stream->id = params->stream_id; + + /* Create stream my_src pad. */ + if (!(stream->my_src_caps = wg_format_to_caps(params->format))) + goto out; + if (!(template = gst_pad_template_new("src", GST_PAD_SRC, GST_PAD_ALWAYS, stream->my_src_caps))) + goto out; + sprintf(src_pad_name, "wg_muxer_stream_src_%u", stream->id); + if (!(stream->my_src = gst_pad_new_from_template(template, src_pad_name))) + goto out; + gst_pad_set_element_private(stream->my_src, stream); + gst_pad_set_query_function(stream->my_src, muxer_src_query_cb); + + /* Add to muxer stream list. */ + list_add_tail(&muxer->streams, &stream->entry); + + gst_object_unref(template); + + GST_INFO("Created winegstreamer muxer stream %p.", stream); + + return STATUS_SUCCESS; + +out: + if (stream->my_src) + gst_object_unref(stream->my_src); + if (template) + gst_object_unref(template); + if (stream->my_src_caps) + gst_caps_unref(stream->my_src_caps); + free(stream); + + return status; +} diff --git a/dlls/winegstreamer/wg_parser.c b/dlls/winegstreamer/wg_parser.c index ebdefb6a7f9..8fb39bdcb05 100644 --- a/dlls/winegstreamer/wg_parser.c +++ b/dlls/winegstreamer/wg_parser.c @@ -1945,6 +1945,7 @@ const unixlib_entry_t __wine_unix_call_funcs[] =
X(wg_muxer_create), X(wg_muxer_destroy), + X(wg_muxer_add_stream), };
C_ASSERT(ARRAYSIZE(__wine_unix_call_funcs) == unix_wg_funcs_count); @@ -2171,6 +2172,23 @@ NTSTATUS wow64_wg_muxer_create(void *args) return ret; }
+NTSTATUS wow64_wg_muxer_add_stream(void *args) +{ + struct + { + wg_muxer_t muxer; + DWORD stream_id; + PTR32 format; + } *params32 = args; + struct wg_muxer_add_stream_params params = + { + .muxer = params32->muxer, + .stream_id = params32->stream_id, + .format = ULongToPtr(params32->format), + }; + return wg_muxer_add_stream(¶ms); +} + const unixlib_entry_t __wine_unix_call_wow64_funcs[] = { #define X64(name) [unix_ ## name] = wow64_ ## name @@ -2214,6 +2232,7 @@ const unixlib_entry_t __wine_unix_call_wow64_funcs[] =
X64(wg_muxer_create), X(wg_muxer_destroy), + X64(wg_muxer_add_stream), };
C_ASSERT(ARRAYSIZE(__wine_unix_call_wow64_funcs) == unix_wg_funcs_count);
From: Ziqing Hui zhui@codeweavers.com
wg_muxer_start will autoplug gstreamer muxer and parser elements. It creates a pipeline like this:
------------------- ------- my_src 1 ==> |parser 1 (optional)| ==> | | ------------------- | | | | ------------------- | | my_src 2 ==> |parser 2 (optional)| ==> | | ------------------- | | | muxer | ==> my_sink | | [......] | | | | | | ------------------- | | my_src n ==> |parser n (optional)| ==> | | ------------------- ------- --- dlls/winegstreamer/gst_private.h | 1 + dlls/winegstreamer/main.c | 15 ++ dlls/winegstreamer/media_sink.c | 9 +- dlls/winegstreamer/unix_private.h | 1 + dlls/winegstreamer/unixlib.h | 1 + dlls/winegstreamer/wg_muxer.c | 296 +++++++++++++++++++++++++++--- dlls/winegstreamer/wg_parser.c | 2 + 7 files changed, 296 insertions(+), 29 deletions(-)
diff --git a/dlls/winegstreamer/gst_private.h b/dlls/winegstreamer/gst_private.h index 23b59766d0c..38516ba9024 100644 --- a/dlls/winegstreamer/gst_private.h +++ b/dlls/winegstreamer/gst_private.h @@ -113,6 +113,7 @@ HRESULT wg_transform_flush(wg_transform_t transform); HRESULT wg_muxer_create(const char *format, wg_muxer_t *muxer); void wg_muxer_destroy(wg_muxer_t muxer); HRESULT wg_muxer_add_stream(wg_muxer_t muxer, UINT32 stream_id, const struct wg_format *format); +HRESULT wg_muxer_start(wg_muxer_t muxer);
unsigned int wg_format_get_max_size(const struct wg_format *format);
diff --git a/dlls/winegstreamer/main.c b/dlls/winegstreamer/main.c index 55817922e9b..95c4296812b 100644 --- a/dlls/winegstreamer/main.c +++ b/dlls/winegstreamer/main.c @@ -507,6 +507,21 @@ HRESULT wg_muxer_add_stream(wg_muxer_t muxer, UINT32 stream_id, const struct wg_ return S_OK; }
+HRESULT wg_muxer_start(wg_muxer_t muxer) +{ + NTSTATUS status; + + TRACE("muxer %#I64x.\n", muxer); + + if ((status = WINE_UNIX_CALL(unix_wg_muxer_start, &muxer))) + { + WARN("Failed to start muxer, status %#lx.\n", status); + return HRESULT_FROM_NT(status); + } + + return S_OK; +} + #define ALIGN(n, alignment) (((n) + (alignment) - 1) & ~((alignment) - 1))
unsigned int wg_format_get_stride(const struct wg_format *format) diff --git a/dlls/winegstreamer/media_sink.c b/dlls/winegstreamer/media_sink.c index 344134d1633..489ede1d26a 100644 --- a/dlls/winegstreamer/media_sink.c +++ b/dlls/winegstreamer/media_sink.c @@ -498,7 +498,13 @@ static HRESULT media_sink_queue_stream_event(struct media_sink *media_sink, Medi
static HRESULT media_sink_start(struct media_sink *media_sink) { + HRESULT hr; + + if (FAILED(hr = wg_muxer_start(media_sink->muxer))) + return hr; + media_sink->state = STATE_STARTED; + return media_sink_queue_stream_event(media_sink, MEStreamSinkStarted); }
@@ -1007,7 +1013,8 @@ static HRESULT WINAPI media_sink_callback_Invoke(IMFAsyncCallback *iface, IMFAsy switch (command->op) { case ASYNC_START: - hr = media_sink_start(media_sink); + if (FAILED(hr = media_sink_start(media_sink))) + WARN("Failed to start media sink.\n"); break; case ASYNC_STOP: hr = media_sink_stop(media_sink); diff --git a/dlls/winegstreamer/unix_private.h b/dlls/winegstreamer/unix_private.h index 1f594200c2e..01fdc310bda 100644 --- a/dlls/winegstreamer/unix_private.h +++ b/dlls/winegstreamer/unix_private.h @@ -64,6 +64,7 @@ extern NTSTATUS wg_transform_flush(void *args) DECLSPEC_HIDDEN; extern NTSTATUS wg_muxer_create(void *args) DECLSPEC_HIDDEN; extern NTSTATUS wg_muxer_destroy(void *args) DECLSPEC_HIDDEN; extern NTSTATUS wg_muxer_add_stream(void *args) DECLSPEC_HIDDEN; +extern NTSTATUS wg_muxer_start(void *args) DECLSPEC_HIDDEN;
/* wg_allocator.c */
diff --git a/dlls/winegstreamer/unixlib.h b/dlls/winegstreamer/unixlib.h index d00790c6b05..9cb6359af26 100644 --- a/dlls/winegstreamer/unixlib.h +++ b/dlls/winegstreamer/unixlib.h @@ -422,6 +422,7 @@ enum unix_funcs unix_wg_muxer_create, unix_wg_muxer_destroy, unix_wg_muxer_add_stream, + unix_wg_muxer_start,
unix_wg_funcs_count, }; diff --git a/dlls/winegstreamer/wg_muxer.c b/dlls/winegstreamer/wg_muxer.c index c6e56793dc2..73d6443566c 100644 --- a/dlls/winegstreamer/wg_muxer.c +++ b/dlls/winegstreamer/wg_muxer.c @@ -32,10 +32,16 @@
#include "wine/list.h"
+#if !GST_CHECK_VERSION(1,20,0) +#define gst_element_request_pad_simple gst_element_get_request_pad +#endif + struct wg_muxer { GstElement *container, *muxer; GstPad *my_sink; + GstCaps *my_sink_caps; + struct list streams; };
@@ -47,6 +53,7 @@ struct wg_muxer_stream
GstPad *my_src; GstCaps *my_src_caps; + GstElement *parser; GstSegment segment;
struct list entry; @@ -57,6 +64,226 @@ static struct wg_muxer *get_muxer(wg_muxer_t muxer) return (struct wg_muxer *)(ULONG_PTR)muxer; }
+static const char *get_sink_name(GstStreamType type) +{ + switch (type) + { + case GST_STREAM_TYPE_VIDEO: + return "video_%u"; + case GST_STREAM_TYPE_AUDIO: + return "audio_%u"; + case GST_STREAM_TYPE_TEXT: + return "subtitle_%u"; + default: + return NULL; + } +} + +static GstCaps *element_factory_get_caps(GstElementFactory *factory, GstStreamType type) +{ + const GList *iter; + + for (iter = gst_element_factory_get_static_pad_templates(factory); iter; iter = iter->next) + { + GstStaticPadTemplate *static_template = iter->data; + + if (static_template->direction != GST_PAD_SINK) + continue; + + if ((static_template->presence == GST_PAD_REQUEST && !strcmp(static_template->name_template, get_sink_name(type))) + || static_template->presence == GST_PAD_ALWAYS) + return gst_static_pad_template_get_caps(static_template); + } + + return NULL; +} + +static bool stream_select_parser(struct wg_muxer_stream *stream, GstElementFactory *muxer_factory, GstRank min_rank) +{ + GList *parser_list, *tmp; + GstCaps *muxer_sink_caps; + bool ret = false; + + GST_DEBUG("stream %u %p, factory %s, min_rank %u, stream my_src_caps: %"GST_PTR_FORMAT".", + stream->id, stream, gst_plugin_feature_get_name(GST_PLUGIN_FEATURE(muxer_factory)), + min_rank, stream->my_src_caps); + + if (!(parser_list = gst_element_factory_list_get_elements(GST_ELEMENT_FACTORY_TYPE_PARSER, min_rank))) + return false; + + /* Filter parsers by my_src_caps. */ + tmp = gst_element_factory_list_filter(parser_list, stream->my_src_caps, GST_PAD_SINK, FALSE); + gst_plugin_feature_list_free(parser_list); + if (!(parser_list = tmp)) + return false; + + if (!(muxer_sink_caps = element_factory_get_caps(muxer_factory, stream_type_from_caps(stream->my_src_caps)))) + { + gst_plugin_feature_list_free(parser_list); + return false; + } + + /* Filter parsers by muxer sink caps. */ + if ((tmp = gst_element_factory_list_filter(parser_list, muxer_sink_caps, GST_PAD_SRC, FALSE))) + { + const gchar *parser_plugin_name = gst_plugin_feature_get_name(GST_PLUGIN_FEATURE(tmp->data)); + + if ((stream->parser = gst_element_factory_create(GST_ELEMENT_FACTORY(tmp->data), NULL))) + { + gchar *element_name = gst_element_get_name(stream->parser); + GST_DEBUG("Selected parser %s, created element %s.", parser_plugin_name, element_name); + g_free(element_name); + ret = true; + } + else + { + GST_WARNING("Failed to create %s element.", parser_plugin_name); + } + + gst_plugin_feature_list_free(tmp); + } + + gst_plugin_feature_list_free(parser_list); + gst_caps_unref(muxer_sink_caps); + + return ret; +} + +static bool stream_start(struct wg_muxer_stream *stream) +{ + GstStreamType type = stream_type_from_caps(stream->my_src_caps); + struct wg_muxer *muxer = stream->muxer; + GstPad *muxer_sink_pad = NULL; + char buffer[64]; + bool link_ok; + + /* Get muxer sink pad. */ + if (!(muxer_sink_pad = gst_element_get_static_pad(muxer->muxer, "sink"))) + { + if (!(muxer_sink_pad = gst_element_request_pad_simple(muxer->muxer, get_sink_name(type)))) + return false; + } + + /* Link my_src to muxer. */ + if (stream->parser) + { + link_ok = gst_bin_add(GST_BIN(muxer->container), stream->parser) + && link_src_to_element(stream->my_src, stream->parser) + && link_element_to_sink(stream->parser, muxer_sink_pad); + } + else + { + link_ok = !link_src_to_sink(stream->my_src, muxer_sink_pad); + } + gst_object_unref(muxer_sink_pad); + if (!link_ok) + return false; + + /* Active pad and push events to prepare for streaming. */ + sprintf(buffer, "wg_muxer_stream_src_%u", stream->id); + gst_segment_init(&stream->segment, GST_FORMAT_BYTES); + if (!gst_pad_set_active(stream->my_src, 1)) + return false; + if (!push_event(stream->my_src, gst_event_new_stream_start(buffer)) + || !push_event(stream->my_src, gst_event_new_caps(stream->my_src_caps)) + || !push_event(stream->my_src, gst_event_new_segment(&stream->segment))) + return false; + + GST_DEBUG("Started stream %u %p.", stream->id, stream); + + return true; +} + +static bool muxer_try(struct wg_muxer *muxer, GstElementFactory *muxer_factory, GstRank min_rank) +{ + char *element_name, *plugin_name = gst_plugin_feature_get_name(GST_PLUGIN_FEATURE(muxer_factory)); + struct wg_muxer_stream *stream; + unsigned int stream_index; + GstCaps *muxer_sink_caps; + + GST_DEBUG("Try muxer %s, muxer %p, min_rank %u.", plugin_name, muxer, min_rank); + + /* Investigate if all streams can connect to muxer_factory. */ + stream_index = 0; + LIST_FOR_EACH_ENTRY(stream, &muxer->streams, struct wg_muxer_stream, entry) + { + bool can_link_directly; + + if (!(muxer_sink_caps = element_factory_get_caps(muxer_factory, stream_type_from_caps(stream->my_src_caps)))) + break; + can_link_directly = gst_caps_can_intersect(stream->my_src_caps, muxer_sink_caps); + gst_caps_unref(muxer_sink_caps); + + /* Unable to link stream to muxer element directly, try inserting a parser. */ + if (!can_link_directly + && !stream_select_parser(stream, muxer_factory, min_rank)) + break; + + ++stream_index; + } + + /* Cannot satisfy all streams. */ + if (stream_index != list_count(&muxer->streams)) + { + LIST_FOR_EACH_ENTRY(stream, &muxer->streams, struct wg_muxer_stream, entry) + { + if (stream->parser) + { + gst_object_unref(stream->parser); + stream->parser = NULL; + } + } + GST_DEBUG("Cannot use %s.", plugin_name); + return false; + } + + muxer->muxer = gst_element_factory_create(GST_ELEMENT_FACTORY(muxer_factory), NULL); + + element_name = gst_element_get_name(muxer->muxer); + GST_DEBUG("Selected muxer %s, created element %s.", plugin_name, element_name); + g_free(element_name); + + return true; +} + +static bool muxer_autoplug(struct wg_muxer *muxer) +{ + GstElementFactoryListType muxer_type = GST_ELEMENT_FACTORY_TYPE_MUXER | GST_ELEMENT_FACTORY_TYPE_FORMATTER; + GstRank min_rank[] = {GST_RANK_PRIMARY, GST_RANK_SECONDARY, GST_RANK_MARGINAL, GST_RANK_NONE}; + GList *muxers, *tmp; + unsigned int i; + + GST_DEBUG("muxer %p.", muxer); + + /* Decreased min_rank in each cycle. */ + for (i = 0; i < ARRAY_SIZE(min_rank); ++i) + { + if (!(muxers = gst_element_factory_list_get_elements(muxer_type, min_rank[i]))) + continue; + + /* Filter muxers by my_sink_caps. */ + tmp = gst_element_factory_list_filter(muxers, muxer->my_sink_caps, GST_PAD_SRC, FALSE); + gst_plugin_feature_list_free(muxers); + if (!(muxers = tmp)) + continue; + muxers = g_list_sort(muxers, gst_plugin_feature_rank_compare_func); + + /* Try each muxer. */ + for (tmp = muxers; tmp; tmp = tmp->next) + { + if (muxer_try(muxer, GST_ELEMENT_FACTORY(tmp->data), min_rank[i])) + { + gst_plugin_feature_list_free(muxers); + return true; + } + } + + gst_plugin_feature_list_free(muxers); + } + + return false; +} + static gboolean muxer_src_query_cb(GstPad *pad, GstObject *parent, GstQuery *query) { struct wg_muxer_stream *stream = gst_pad_get_element_private(pad); @@ -95,10 +322,8 @@ static gboolean muxer_sink_query_cb(GstPad *pad, GstObject *parent, GstQuery *qu NTSTATUS wg_muxer_create(void *args) { struct wg_muxer_create_params *params = args; - GstElement *first = NULL, *last = NULL; NTSTATUS status = STATUS_UNSUCCESSFUL; GstPadTemplate *template = NULL; - GstCaps *sink_caps = NULL; struct wg_muxer *muxer;
/* Create wg_muxer object. */ @@ -109,12 +334,12 @@ NTSTATUS wg_muxer_create(void *args) goto out;
/* Create sink pad. */ - if (!(sink_caps = gst_caps_from_string(params->format))) + if (!(muxer->my_sink_caps = gst_caps_from_string(params->format))) { GST_ERROR("Failed to get caps from format string: "%s".", params->format); goto out; } - if (!(template = gst_pad_template_new("sink", GST_PAD_SINK, GST_PAD_ALWAYS, sink_caps))) + if (!(template = gst_pad_template_new("sink", GST_PAD_SINK, GST_PAD_ALWAYS, muxer->my_sink_caps))) goto out; muxer->my_sink = gst_pad_new_from_template(template, "wg_muxer_sink"); if (!muxer->my_sink) @@ -122,26 +347,7 @@ NTSTATUS wg_muxer_create(void *args) gst_pad_set_element_private(muxer->my_sink, muxer); gst_pad_set_query_function(muxer->my_sink, muxer_sink_query_cb);
- /* Create gstreamer muxer element. */ - if (!(muxer->muxer = find_element(GST_ELEMENT_FACTORY_TYPE_MUXER | GST_ELEMENT_FACTORY_TYPE_FORMATTER, - NULL, sink_caps))) - goto out; - if (!append_element(muxer->container, muxer->muxer, &first, &last)) - goto out; - - /* Link muxer to sink pad. */ - if (!link_element_to_sink(muxer->muxer, muxer->my_sink)) - goto out; - if (!gst_pad_set_active(muxer->my_sink, 1)) - goto out; - - /* Set to pause state. */ - gst_element_set_state(muxer->container, GST_STATE_PAUSED); - if (!gst_element_get_state(muxer->container, NULL, NULL, -1)) - goto out; - gst_object_unref(template); - gst_caps_unref(sink_caps);
GST_INFO("Created winegstreamer muxer %p.", muxer); params->muxer = (wg_transform_t)(ULONG_PTR)muxer; @@ -153,13 +359,10 @@ out: gst_object_unref(muxer->my_sink); if (template) gst_object_unref(template); - if (sink_caps) - gst_caps_unref(sink_caps); + if (muxer->my_sink_caps) + gst_caps_unref(muxer->my_sink_caps); if (muxer->container) - { - gst_element_set_state(muxer->container, GST_STATE_NULL); gst_object_unref(muxer->container); - } free(muxer);
return status; @@ -178,6 +381,7 @@ NTSTATUS wg_muxer_destroy(void *args) free(stream); } gst_object_unref(muxer->my_sink); + gst_caps_unref(muxer->my_sink_caps); gst_element_set_state(muxer->container, GST_STATE_NULL); gst_object_unref(muxer->container); free(muxer); @@ -234,3 +438,39 @@ out:
return status; } + +NTSTATUS wg_muxer_start(void *args) +{ + struct wg_muxer *muxer = get_muxer(*(wg_muxer_t *)args); + NTSTATUS status = STATUS_UNSUCCESSFUL; + struct wg_muxer_stream *stream; + + GST_DEBUG("muxer %p.", muxer); + + /* Autoplug gstreamer elements */ + if (!muxer_autoplug(muxer)) + return status; + + /* Link muxer element to my_sink */ + if (!gst_bin_add(GST_BIN(muxer->container), muxer->muxer) + || !link_element_to_sink(muxer->muxer, muxer->my_sink) + || !gst_pad_set_active(muxer->my_sink, 1)) + return status; + + /* Link my_src of each stream to muxer element. */ + LIST_FOR_EACH_ENTRY(stream, &muxer->streams, struct wg_muxer_stream, entry) + { + if (!stream_start(stream)) + return status; + } + + /* Set to pause state. */ + if (gst_element_set_state(muxer->container, GST_STATE_PAUSED) == GST_STATE_CHANGE_FAILURE) + return status; + if (gst_element_get_state(muxer->container, NULL, NULL, -1) == GST_STATE_CHANGE_FAILURE) + return status; + + GST_DEBUG("Started muxer %p.", muxer); + + return STATUS_SUCCESS; +} diff --git a/dlls/winegstreamer/wg_parser.c b/dlls/winegstreamer/wg_parser.c index 8fb39bdcb05..9821bab961b 100644 --- a/dlls/winegstreamer/wg_parser.c +++ b/dlls/winegstreamer/wg_parser.c @@ -1946,6 +1946,7 @@ const unixlib_entry_t __wine_unix_call_funcs[] = X(wg_muxer_create), X(wg_muxer_destroy), X(wg_muxer_add_stream), + X(wg_muxer_start), };
C_ASSERT(ARRAYSIZE(__wine_unix_call_funcs) == unix_wg_funcs_count); @@ -2233,6 +2234,7 @@ const unixlib_entry_t __wine_unix_call_wow64_funcs[] = X64(wg_muxer_create), X(wg_muxer_destroy), X64(wg_muxer_add_stream), + X(wg_muxer_start), };
C_ASSERT(ARRAYSIZE(__wine_unix_call_wow64_funcs) == unix_wg_funcs_count);
From: Ziqing Hui zhui@codeweavers.com
--- dlls/winegstreamer/gst_private.h | 1 + dlls/winegstreamer/main.c | 21 ++++++++ dlls/winegstreamer/media_sink.c | 88 ++++++++++++++++++++++++++++++- dlls/winegstreamer/unix_private.h | 1 + dlls/winegstreamer/unixlib.h | 8 +++ dlls/winegstreamer/wg_muxer.c | 5 ++ dlls/winegstreamer/wg_parser.c | 19 +++++++ 7 files changed, 141 insertions(+), 2 deletions(-)
diff --git a/dlls/winegstreamer/gst_private.h b/dlls/winegstreamer/gst_private.h index 38516ba9024..1355171c751 100644 --- a/dlls/winegstreamer/gst_private.h +++ b/dlls/winegstreamer/gst_private.h @@ -114,6 +114,7 @@ HRESULT wg_muxer_create(const char *format, wg_muxer_t *muxer); void wg_muxer_destroy(wg_muxer_t muxer); HRESULT wg_muxer_add_stream(wg_muxer_t muxer, UINT32 stream_id, const struct wg_format *format); HRESULT wg_muxer_start(wg_muxer_t muxer); +HRESULT wg_muxer_push_sample(wg_muxer_t muxer, struct wg_sample *sample, UINT32 stream_id);
unsigned int wg_format_get_max_size(const struct wg_format *format);
diff --git a/dlls/winegstreamer/main.c b/dlls/winegstreamer/main.c index 95c4296812b..eddd2a45661 100644 --- a/dlls/winegstreamer/main.c +++ b/dlls/winegstreamer/main.c @@ -522,6 +522,27 @@ HRESULT wg_muxer_start(wg_muxer_t muxer) return S_OK; }
+HRESULT wg_muxer_push_sample(wg_muxer_t muxer, struct wg_sample *sample, UINT32 steam_id) +{ + struct wg_muxer_push_sample_params params = + { + .muxer = muxer, + .sample = sample, + .stream_id = steam_id, + }; + NTSTATUS status; + + TRACE("muxer %#I64x, sample %p.\n", muxer, sample); + + if ((status = WINE_UNIX_CALL(unix_wg_muxer_push_sample, ¶ms))) + { + WARN("Failed to push sample, status %#lx.\n", status); + return HRESULT_FROM_NT(status); + } + + return S_OK; +} + #define ALIGN(n, alignment) (((n) + (alignment) - 1) & ~((alignment) - 1))
unsigned int wg_format_get_stride(const struct wg_format *format) diff --git a/dlls/winegstreamer/media_sink.c b/dlls/winegstreamer/media_sink.c index 489ede1d26a..d9c15291b61 100644 --- a/dlls/winegstreamer/media_sink.c +++ b/dlls/winegstreamer/media_sink.c @@ -31,6 +31,7 @@ enum async_op ASYNC_START, ASYNC_STOP, ASYNC_PAUSE, + ASYNC_PROCESS, };
struct async_command @@ -39,6 +40,15 @@ struct async_command LONG refcount;
enum async_op op; + + union + { + struct + { + IMFSample *sample; + UINT32 stream_id; + } process; + } u; };
struct stream_sink @@ -142,7 +152,11 @@ static ULONG WINAPI async_command_Release(IUnknown *iface) ULONG refcount = InterlockedDecrement(&command->refcount);
if (!refcount) + { + if (command->op == ASYNC_PROCESS && command->u.process.sample) + IMFSample_Release(command->u.process.sample); free(command); + }
return refcount; } @@ -301,9 +315,41 @@ static HRESULT WINAPI stream_sink_GetMediaTypeHandler(IMFStreamSink *iface, IMFM
static HRESULT WINAPI stream_sink_ProcessSample(IMFStreamSink *iface, IMFSample *sample) { - FIXME("iface %p, sample %p stub!\n", iface, sample); + struct stream_sink *stream_sink = impl_from_IMFStreamSink(iface); + struct media_sink *media_sink = impl_from_IMFFinalizableMediaSink(stream_sink->media_sink); + struct async_command *command; + HRESULT hr;
- return E_NOTIMPL; + TRACE("iface %p, sample %p.\n", iface, sample); + + EnterCriticalSection(&media_sink->cs); + + if (media_sink->state == STATE_SHUTDOWN) + { + LeaveCriticalSection(&media_sink->cs); + return MF_E_SHUTDOWN; + } + + if (media_sink->state != STATE_STARTED && media_sink->state != STATE_PAUSED) + { + LeaveCriticalSection(&media_sink->cs); + return MF_E_INVALIDREQUEST; + } + + if (FAILED(hr = (async_command_create(ASYNC_PROCESS, &command)))) + { + LeaveCriticalSection(&media_sink->cs); + return hr; + } + IMFSample_AddRef((command->u.process.sample = sample)); + command->u.process.stream_id = stream_sink->id; + + if (FAILED(hr = MFPutWorkItem(MFASYNC_CALLBACK_QUEUE_STANDARD, &media_sink->async_callback, &command->IUnknown_iface))) + IUnknown_Release(&command->IUnknown_iface); + + LeaveCriticalSection(&media_sink->cs); + + return hr; }
static HRESULT WINAPI stream_sink_PlaceMarker(IMFStreamSink *iface, MFSTREAMSINK_MARKER_TYPE marker_type, @@ -521,6 +567,40 @@ static HRESULT media_sink_pause(struct media_sink *media_sink) return media_sink_queue_stream_event(media_sink, MEStreamSinkPaused); }
+static HRESULT media_sink_process(struct media_sink *media_sink, IMFSample *sample, UINT32 stream_id) +{ + wg_muxer_t muxer = media_sink->muxer; + struct wg_sample *wg_sample; + LONGLONG time, duration; + UINT32 value; + HRESULT hr; + + TRACE("media_sink %p, sample %p, stream_id %u.\n", media_sink, sample, stream_id); + + if (FAILED(hr = wg_sample_create_mf(sample, &wg_sample))) + return hr; + + if (SUCCEEDED(IMFSample_GetSampleTime(sample, &time))) + { + wg_sample->flags |= WG_SAMPLE_FLAG_HAS_PTS; + wg_sample->pts = time; + } + if (SUCCEEDED(IMFSample_GetSampleDuration(sample, &duration))) + { + wg_sample->flags |= WG_SAMPLE_FLAG_HAS_DURATION; + wg_sample->duration = duration; + } + if (SUCCEEDED(IMFSample_GetUINT32(sample, &MFSampleExtension_CleanPoint, &value)) && value) + wg_sample->flags |= WG_SAMPLE_FLAG_SYNC_POINT; + if (SUCCEEDED(IMFSample_GetUINT32(sample, &MFSampleExtension_Discontinuity, &value)) && value) + wg_sample->flags |= WG_SAMPLE_FLAG_DISCONTINUITY; + + hr = wg_muxer_push_sample(muxer, wg_sample, stream_id); + wg_sample_release(wg_sample); + + return hr; +} + static HRESULT WINAPI media_sink_QueryInterface(IMFFinalizableMediaSink *iface, REFIID riid, void **obj) { struct media_sink *media_sink = impl_from_IMFFinalizableMediaSink(iface); @@ -1022,6 +1102,10 @@ static HRESULT WINAPI media_sink_callback_Invoke(IMFAsyncCallback *iface, IMFAsy case ASYNC_PAUSE: hr = media_sink_pause(media_sink); break; + case ASYNC_PROCESS: + if (FAILED(hr = media_sink_process(media_sink, command->u.process.sample, command->u.process.stream_id))) + WARN("Failed to process sample, hr %#lx.\n", hr); + break; default: WARN("Unsupported op %u.\n", command->op); break; diff --git a/dlls/winegstreamer/unix_private.h b/dlls/winegstreamer/unix_private.h index 01fdc310bda..2fff70c9ee1 100644 --- a/dlls/winegstreamer/unix_private.h +++ b/dlls/winegstreamer/unix_private.h @@ -65,6 +65,7 @@ extern NTSTATUS wg_muxer_create(void *args) DECLSPEC_HIDDEN; extern NTSTATUS wg_muxer_destroy(void *args) DECLSPEC_HIDDEN; extern NTSTATUS wg_muxer_add_stream(void *args) DECLSPEC_HIDDEN; extern NTSTATUS wg_muxer_start(void *args) DECLSPEC_HIDDEN; +extern NTSTATUS wg_muxer_push_sample(void *args) DECLSPEC_HIDDEN;
/* wg_allocator.c */
diff --git a/dlls/winegstreamer/unixlib.h b/dlls/winegstreamer/unixlib.h index 9cb6359af26..d20b8907925 100644 --- a/dlls/winegstreamer/unixlib.h +++ b/dlls/winegstreamer/unixlib.h @@ -379,6 +379,13 @@ struct wg_muxer_add_stream_params const struct wg_format *format; };
+struct wg_muxer_push_sample_params +{ + wg_muxer_t muxer; + struct wg_sample *sample; + UINT32 stream_id; +}; + enum unix_funcs { unix_wg_init_gstreamer, @@ -423,6 +430,7 @@ enum unix_funcs unix_wg_muxer_destroy, unix_wg_muxer_add_stream, unix_wg_muxer_start, + unix_wg_muxer_push_sample,
unix_wg_funcs_count, }; diff --git a/dlls/winegstreamer/wg_muxer.c b/dlls/winegstreamer/wg_muxer.c index 73d6443566c..7434aed13fe 100644 --- a/dlls/winegstreamer/wg_muxer.c +++ b/dlls/winegstreamer/wg_muxer.c @@ -474,3 +474,8 @@ NTSTATUS wg_muxer_start(void *args)
return STATUS_SUCCESS; } + +NTSTATUS wg_muxer_push_sample(void *args) +{ + return STATUS_NOT_IMPLEMENTED; +} diff --git a/dlls/winegstreamer/wg_parser.c b/dlls/winegstreamer/wg_parser.c index 9821bab961b..c9d74e87d72 100644 --- a/dlls/winegstreamer/wg_parser.c +++ b/dlls/winegstreamer/wg_parser.c @@ -1947,6 +1947,7 @@ const unixlib_entry_t __wine_unix_call_funcs[] = X(wg_muxer_destroy), X(wg_muxer_add_stream), X(wg_muxer_start), + X(wg_muxer_push_sample), };
C_ASSERT(ARRAYSIZE(__wine_unix_call_funcs) == unix_wg_funcs_count); @@ -2190,6 +2191,23 @@ NTSTATUS wow64_wg_muxer_add_stream(void *args) return wg_muxer_add_stream(¶ms); }
+NTSTATUS wow64_wg_muxer_push_sample(void *args) +{ + struct + { + wg_muxer_t muxer; + PTR32 sample; + UINT32 stream_id; + } *params32 = args; + struct wg_muxer_push_sample_params params = + { + .muxer = params32->muxer, + .sample = ULongToPtr(params32->sample), + .stream_id = params32->stream_id, + }; + return wg_muxer_push_sample(¶ms); +} + const unixlib_entry_t __wine_unix_call_wow64_funcs[] = { #define X64(name) [unix_ ## name] = wow64_ ## name @@ -2235,6 +2253,7 @@ const unixlib_entry_t __wine_unix_call_wow64_funcs[] = X(wg_muxer_destroy), X64(wg_muxer_add_stream), X(wg_muxer_start), + X64(wg_muxer_push_sample), };
C_ASSERT(ARRAYSIZE(__wine_unix_call_wow64_funcs) == unix_wg_funcs_count);
From: Ziqing Hui zhui@codeweavers.com
--- dlls/winegstreamer/wg_muxer.c | 135 +++++++++++++++++++++++++++++++++- 1 file changed, 134 insertions(+), 1 deletion(-)
diff --git a/dlls/winegstreamer/wg_muxer.c b/dlls/winegstreamer/wg_muxer.c index 7434aed13fe..ca90fe4440f 100644 --- a/dlls/winegstreamer/wg_muxer.c +++ b/dlls/winegstreamer/wg_muxer.c @@ -42,6 +42,11 @@ struct wg_muxer GstPad *my_sink; GstCaps *my_sink_caps;
+ GstAtomicQueue *output_queue; + + pthread_mutex_t mutex; + guint64 offset; + struct list streams; };
@@ -194,6 +199,19 @@ static bool stream_start(struct wg_muxer_stream *stream) return true; }
+static struct wg_muxer_stream *muxer_get_stream_by_id(struct wg_muxer *muxer, DWORD id) +{ + struct wg_muxer_stream *stream; + + LIST_FOR_EACH_ENTRY(stream, &muxer->streams, struct wg_muxer_stream, entry) + { + if (stream->id == id) + return stream; + } + + return NULL; +} + static bool muxer_try(struct wg_muxer *muxer, GstElementFactory *muxer_factory, GstRank min_rank) { char *element_name, *plugin_name = gst_plugin_feature_get_name(GST_PLUGIN_FEATURE(muxer_factory)); @@ -319,6 +337,66 @@ static gboolean muxer_sink_query_cb(GstPad *pad, GstObject *parent, GstQuery *qu } }
+static gboolean muxer_sink_event_cb(GstPad *pad, GstObject *parent, GstEvent *event) +{ + struct wg_muxer *muxer = gst_pad_get_element_private(pad); + const GstSegment *segment; + + GST_DEBUG("pad %p, parent %p, event %p, muxer %p, type "%s".", + pad, parent, event, muxer, GST_EVENT_TYPE_NAME(event)); + + switch (event->type) + { + case GST_EVENT_SEGMENT: + pthread_mutex_lock(&muxer->mutex); + + gst_event_parse_segment(event, &segment); + if (segment->format != GST_FORMAT_BYTES) + { + pthread_mutex_unlock(&muxer->mutex); + GST_FIXME("Unhandled segment format "%s".", gst_format_get_name(segment->format)); + break; + } + muxer->offset = segment->start; + + pthread_mutex_unlock(&muxer->mutex); + break; + + default: + GST_WARNING("Ignoring "%s" event.", GST_EVENT_TYPE_NAME(event)); + break; + } + + gst_event_unref(event); + return TRUE; +} + +static GstFlowReturn muxer_sink_chain_cb(GstPad *pad, GstObject *parent, GstBuffer *buffer) +{ + struct wg_muxer *muxer = gst_pad_get_element_private(pad); + + GST_DEBUG("pad %p, parent %p, buffer %p, muxer %p.", pad, parent, buffer, muxer); + + pthread_mutex_lock(&muxer->mutex); + + GST_BUFFER_OFFSET(buffer) = GST_BUFFER_OFFSET_NONE; + if (muxer->offset != GST_BUFFER_OFFSET_NONE) + { + GST_BUFFER_OFFSET(buffer) = muxer->offset; + muxer->offset = GST_BUFFER_OFFSET_NONE; + } + gst_atomic_queue_push(muxer->output_queue, buffer); + + GST_LOG("Pushed buffer %p to output queue %p, buffer size %" G_GSIZE_FORMAT ", " + "offset %" G_GUINT64_FORMAT ", %u buffers in queue now.", + buffer, muxer->output_queue, gst_buffer_get_size(buffer), GST_BUFFER_OFFSET(buffer), + gst_atomic_queue_length(muxer->output_queue)); + + pthread_mutex_unlock(&muxer->mutex); + + return GST_FLOW_OK; +} + NTSTATUS wg_muxer_create(void *args) { struct wg_muxer_create_params *params = args; @@ -330,8 +408,12 @@ NTSTATUS wg_muxer_create(void *args) if (!(muxer = calloc(1, sizeof(*muxer)))) return STATUS_NO_MEMORY; list_init(&muxer->streams); + muxer->offset = GST_BUFFER_OFFSET_NONE; + pthread_mutex_init(&muxer->mutex, NULL); if (!(muxer->container = gst_bin_new("wg_muxer"))) goto out; + if (!(muxer->output_queue = gst_atomic_queue_new(8))) + goto out;
/* Create sink pad. */ if (!(muxer->my_sink_caps = gst_caps_from_string(params->format))) @@ -346,6 +428,8 @@ NTSTATUS wg_muxer_create(void *args) goto out; gst_pad_set_element_private(muxer->my_sink, muxer); gst_pad_set_query_function(muxer->my_sink, muxer_sink_query_cb); + gst_pad_set_event_function(muxer->my_sink, muxer_sink_event_cb); + gst_pad_set_chain_function(muxer->my_sink, muxer_sink_chain_cb);
gst_object_unref(template);
@@ -361,8 +445,10 @@ out: gst_object_unref(template); if (muxer->my_sink_caps) gst_caps_unref(muxer->my_sink_caps); + gst_atomic_queue_unref(muxer->output_queue); if (muxer->container) gst_object_unref(muxer->container); + pthread_mutex_destroy(&muxer->mutex); free(muxer);
return status; @@ -372,6 +458,7 @@ NTSTATUS wg_muxer_destroy(void *args) { struct wg_muxer *muxer = get_muxer(*(wg_muxer_t *)args); struct wg_muxer_stream *stream, *next; + GstBuffer *buffer;
LIST_FOR_EACH_ENTRY_SAFE(stream, next, &muxer->streams, struct wg_muxer_stream, entry) { @@ -380,10 +467,18 @@ NTSTATUS wg_muxer_destroy(void *args) gst_caps_unref(stream->my_src_caps); free(stream); } + + while ((buffer = gst_atomic_queue_pop(muxer->output_queue))) + gst_buffer_unref(buffer); + gst_atomic_queue_unref(muxer->output_queue); + gst_object_unref(muxer->my_sink); gst_caps_unref(muxer->my_sink_caps); gst_element_set_state(muxer->container, GST_STATE_NULL); gst_object_unref(muxer->container); + + pthread_mutex_destroy(&muxer->mutex); + free(muxer);
return S_OK; @@ -477,5 +572,43 @@ NTSTATUS wg_muxer_start(void *args)
NTSTATUS wg_muxer_push_sample(void *args) { - return STATUS_NOT_IMPLEMENTED; + struct wg_muxer_push_sample_params *params = args; + struct wg_muxer *muxer = get_muxer(params->muxer); + struct wg_sample *sample = params->sample; + struct wg_muxer_stream *stream; + GstFlowReturn ret; + GstBuffer *buffer; + + if (!(stream = muxer_get_stream_by_id(muxer, params->stream_id))) + return STATUS_NOT_FOUND; + + /* Create sample data buffer. */ + if (!(buffer = gst_buffer_new_and_alloc(sample->size)) + || !gst_buffer_fill(buffer, 0, wg_sample_data(sample), sample->size)) + { + GST_ERROR("Failed to allocate input buffer."); + return STATUS_NO_MEMORY; + } + + GST_INFO("Copied %u bytes from sample %p to buffer %p.", sample->size, sample, buffer); + + /* Set sample properties. */ + if (sample->flags & WG_SAMPLE_FLAG_HAS_PTS) + GST_BUFFER_PTS(buffer) = sample->pts * 100; + if (sample->flags & WG_SAMPLE_FLAG_HAS_DURATION) + GST_BUFFER_DURATION(buffer) = sample->duration * 100; + if (!(sample->flags & WG_SAMPLE_FLAG_SYNC_POINT)) + GST_BUFFER_FLAG_SET(buffer, GST_BUFFER_FLAG_DELTA_UNIT); + if (sample->flags & WG_SAMPLE_FLAG_DISCONTINUITY) + GST_BUFFER_FLAG_SET(buffer, GST_BUFFER_FLAG_DISCONT); + + /* Push sample data buffer to stream src pad. */ + if ((ret = gst_pad_push(stream->my_src, buffer)) < 0) + { + GST_ERROR("Failed to push buffer %p to pad %s, reason %s.", + buffer, gst_pad_get_name(stream->my_src), gst_flow_get_name(ret)); + return STATUS_UNSUCCESSFUL; + } + + return STATUS_SUCCESS; }
https://gitlab.winehq.org/wine/wine/-/merge_requests/3810/diffs?commit_id=8b... This commit implements autoplug.
Zebediah Figura (@zfigura) commented about dlls/winegstreamer/wg_muxer.c:
struct wg_muxer { GstElement *container, *muxer; GstPad *my_sink;
- struct list streams;
+};
+struct wg_muxer_stream +{
- struct wg_muxer *muxer;
- struct wg_format format;
- uint32_t id;
- GstPad *my_src;
- GstCaps *my_src_caps;
- GstSegment segment;
This isn't used yet in this commit.
Zebediah Figura (@zfigura) commented about dlls/winegstreamer/wg_muxer.c:
return (struct wg_muxer *)(ULONG_PTR)muxer;
}
+static gboolean muxer_src_query_cb(GstPad *pad, GstObject *parent, GstQuery *query) +{
- struct wg_muxer_stream *stream = gst_pad_get_element_private(pad);
- GST_DEBUG("pad %p, parent %p, query %p, stream %u, type "%s".",
pad, parent, query, stream->id, gst_query_type_get_name(query->type));
- switch (query->type)
- {
case GST_QUERY_LATENCY:
gst_query_set_latency(query, true, 0, 0);
return true;
Why are we responding to latency queries?
Zebediah Figura (@zfigura) commented about dlls/winegstreamer/wg_muxer.c:
}
}
GST_DEBUG("Cannot use %s.", plugin_name);
return false;
- }
- muxer->muxer = gst_element_factory_create(GST_ELEMENT_FACTORY(muxer_factory), NULL);
- element_name = gst_element_get_name(muxer->muxer);
- GST_DEBUG("Selected muxer %s, created element %s.", plugin_name, element_name);
- g_free(element_name);
- return true;
+}
+static bool muxer_autoplug(struct wg_muxer *muxer)
The amount of autoplugging code that we're implementing here is rather unfortunate. Have you tried using encodebin?
Zebediah Figura (@zfigura) commented about dlls/winegstreamer/wg_muxer.c:
- if (stream->parser)
- {
link_ok = gst_bin_add(GST_BIN(muxer->container), stream->parser)
&& link_src_to_element(stream->my_src, stream->parser)
&& link_element_to_sink(stream->parser, muxer_sink_pad);
- }
- else
- {
link_ok = !link_src_to_sink(stream->my_src, muxer_sink_pad);
- }
- gst_object_unref(muxer_sink_pad);
- if (!link_ok)
return false;
- /* Active pad and push events to prepare for streaming. */
- sprintf(buffer, "wg_muxer_stream_src_%u", stream->id);
This is supposed to have a slash in it. I'm not sure whether any element actually cares, but I'd feel more comfortable following that guideline. Probaly we should just use gst_pad_create_stream_id().
(We may also consider using appsrc instead of a bare src pad, although I don't know if it'll actually save us a significant amount work. It will take care of the start-of-stream events for us, but that's not really that much code all things considered.)
Zebediah Figura (@zfigura) commented about dlls/winegstreamer/wg_muxer.c:
- {
link_ok = !link_src_to_sink(stream->my_src, muxer_sink_pad);
- }
- gst_object_unref(muxer_sink_pad);
- if (!link_ok)
return false;
- /* Active pad and push events to prepare for streaming. */
- sprintf(buffer, "wg_muxer_stream_src_%u", stream->id);
- gst_segment_init(&stream->segment, GST_FORMAT_BYTES);
- if (!gst_pad_set_active(stream->my_src, 1))
return false;
- if (!push_event(stream->my_src, gst_event_new_stream_start(buffer))
|| !push_event(stream->my_src, gst_event_new_caps(stream->my_src_caps))
|| !push_event(stream->my_src, gst_event_new_segment(&stream->segment)))
return false;
I don't know if this code was tested, but I'm under the impression that this part needs to be done *after* setting the bin to paused, or these events will be dropped.
Zebediah Figura (@zfigura) commented about dlls/winegstreamer/wg_parser.c:
return ret;
}
+NTSTATUS wow64_wg_muxer_add_stream(void *args) +{
- struct
- {
wg_muxer_t muxer;
DWORD stream_id;
This is UINT32 elsewhere.
Zebediah Figura (@zfigura) commented about dlls/winegstreamer/wg_muxer.c:
break;
- }
- gst_event_unref(event);
- return TRUE;
+}
+static GstFlowReturn muxer_sink_chain_cb(GstPad *pad, GstObject *parent, GstBuffer *buffer) +{
- struct wg_muxer *muxer = gst_pad_get_element_private(pad);
- GST_DEBUG("pad %p, parent %p, buffer %p, muxer %p.", pad, parent, buffer, muxer);
- pthread_mutex_lock(&muxer->mutex);
- GST_BUFFER_OFFSET(buffer) = GST_BUFFER_OFFSET_NONE;
You can't do that without first calling gst_buffer_make_writable().
It's not clear to me why you want to, though.
Zebediah Figura (@zfigura) commented about dlls/winegstreamer/wg_muxer.c:
+static GstFlowReturn muxer_sink_chain_cb(GstPad *pad, GstObject *parent, GstBuffer *buffer) +{
- struct wg_muxer *muxer = gst_pad_get_element_private(pad);
- GST_DEBUG("pad %p, parent %p, buffer %p, muxer %p.", pad, parent, buffer, muxer);
- pthread_mutex_lock(&muxer->mutex);
- GST_BUFFER_OFFSET(buffer) = GST_BUFFER_OFFSET_NONE;
- if (muxer->offset != GST_BUFFER_OFFSET_NONE)
- {
GST_BUFFER_OFFSET(buffer) = muxer->offset;
muxer->offset = GST_BUFFER_OFFSET_NONE;
- }
- gst_atomic_queue_push(muxer->output_queue, buffer);
It's not clear to me what we're going to do with the output queue.
Perhaps for this commit we should just stub the chain function.
On Fri Oct 6 04:50:08 2023 +0000, Zebediah Figura wrote:
The amount of autoplugging code that we're implementing here is rather unfortunate. Have you tried using encodebin?
I didn't try using encodebin element directly. Because I think in our situation, it would be better to make the pipeline fully under our control. There is high possibility that we will change how it works in the future. eg. we may replace h264parse with our own implementation like Rémi said, or something else. Anyway, I think using our own implemention is a future-proof.
On Fri Oct 6 04:50:08 2023 +0000, Zebediah Figura wrote:
Why are we responding to latency queries?
OK, we can remove it. It's not necessary. mp4mux will query latency, it output a warn in it's debug channel if latency is not supported by downstream pads. but it doesn't affect anything so far.
On Fri Oct 6 04:50:09 2023 +0000, Zebediah Figura wrote:
This is supposed to have a slash in it. I'm not sure whether any element actually cares, but I'd feel more comfortable following that guideline. Probaly we should just use gst_pad_create_stream_id(). (We may also consider using appsrc instead of a bare src pad, although I don't know if it'll actually save us a significant amount work. It will take care of the start-of-stream events for us, but that's not really that much code all things considered.)
gst_pad_create_stream_id() requires a parent element parameter. But our my_src pads don't belong to a parent element.
On Mon Oct 9 02:40:25 2023 +0000, Ziqing Hui wrote:
OK, we can remove it. It's not necessary. mp4mux will query latency, it output a warn in it's debug channel if latency is not supported by downstream pads. but it doesn't affect anything so far.
Right. Being able to respond to a latency query would be useful for mp4mux, but I don't think we can meaningfully respond. We don't know anything about the upstream source.