-- v2: winegstreamer: Implement wg_muxer_read_data. winestreamer: Introduce media_sink_write_stream.
From: Ziqing Hui zhui@codeweavers.com
--- dlls/winegstreamer/gst_private.h | 1 + dlls/winegstreamer/main.c | 23 +++++++++++++++++++++++ dlls/winegstreamer/media_sink.c | 25 +++++++++++++++++++++++++ dlls/winegstreamer/unix_private.h | 1 + dlls/winegstreamer/unixlib.h | 9 +++++++++ dlls/winegstreamer/wg_muxer.c | 6 ++++++ dlls/winegstreamer/wg_parser.c | 26 ++++++++++++++++++++++++++ 7 files changed, 91 insertions(+)
diff --git a/dlls/winegstreamer/gst_private.h b/dlls/winegstreamer/gst_private.h index 8712980092b..26873edb74b 100644 --- a/dlls/winegstreamer/gst_private.h +++ b/dlls/winegstreamer/gst_private.h @@ -115,6 +115,7 @@ void wg_muxer_destroy(wg_muxer_t muxer); HRESULT wg_muxer_add_stream(wg_muxer_t muxer, UINT32 stream_id, const struct wg_format *format); HRESULT wg_muxer_start(wg_muxer_t muxer); HRESULT wg_muxer_push_sample(wg_muxer_t muxer, struct wg_sample *sample, UINT32 stream_id); +HRESULT wg_muxer_read_data(wg_muxer_t muxer, void *buffer, UINT32 *size, UINT64 *offset);
unsigned int wg_format_get_bytes_for_uncompressed(wg_video_format format, unsigned int width, unsigned int height); unsigned int wg_format_get_max_size(const struct wg_format *format); diff --git a/dlls/winegstreamer/main.c b/dlls/winegstreamer/main.c index 3eb82bdccf5..900905be0d9 100644 --- a/dlls/winegstreamer/main.c +++ b/dlls/winegstreamer/main.c @@ -544,6 +544,29 @@ HRESULT wg_muxer_push_sample(wg_muxer_t muxer, struct wg_sample *sample, UINT32 return S_OK; }
+HRESULT wg_muxer_read_data(wg_muxer_t muxer, void *buffer, UINT32 *size, UINT64 *offset) +{ + struct wg_muxer_read_data_params params = + { + .muxer = muxer, + .buffer = buffer, + .size = *size, + .offset = UINT64_MAX, + }; + NTSTATUS status; + + TRACE("muxer %#I64x, buffer %p, size %u.\n", muxer, buffer, *size); + + if (SUCCEEDED(status = WINE_UNIX_CALL(unix_wg_muxer_read_data, ¶ms))) + { + *size = params.size; + *offset = params.offset; + TRACE("Read %u bytes, offset %#I64x.\n", *size, *offset); + } + + return HRESULT_FROM_NT(status); +} + #define ALIGN(n, alignment) (((n) + (alignment) - 1) & ~((alignment) - 1))
unsigned int wg_format_get_stride(const struct wg_format *format) diff --git a/dlls/winegstreamer/media_sink.c b/dlls/winegstreamer/media_sink.c index d9c15291b61..6996d689e83 100644 --- a/dlls/winegstreamer/media_sink.c +++ b/dlls/winegstreamer/media_sink.c @@ -542,6 +542,28 @@ static HRESULT media_sink_queue_stream_event(struct media_sink *media_sink, Medi return S_OK; }
+static HRESULT media_sink_write_stream(struct media_sink *media_sink) +{ + BYTE buffer[1024]; + UINT32 size = sizeof(buffer); + ULONG written; + QWORD offset; + HRESULT hr; + + while (SUCCEEDED(hr = wg_muxer_read_data(media_sink->muxer, buffer, &size, &offset))) + { + if (offset != UINT64_MAX && FAILED(hr = IMFByteStream_SetCurrentPosition(media_sink->bytestream, offset))) + return hr; + + if (FAILED(hr = IMFByteStream_Write(media_sink->bytestream, buffer, size, &written))) + return hr; + + size = sizeof(buffer); + } + + return S_OK; +} + static HRESULT media_sink_start(struct media_sink *media_sink) { HRESULT hr; @@ -577,6 +599,9 @@ static HRESULT media_sink_process(struct media_sink *media_sink, IMFSample *samp
TRACE("media_sink %p, sample %p, stream_id %u.\n", media_sink, sample, stream_id);
+ if (FAILED(hr = media_sink_write_stream(media_sink))) + WARN("Failed to write output samples to stream, hr %#lx.\n", hr); + if (FAILED(hr = wg_sample_create_mf(sample, &wg_sample))) return hr;
diff --git a/dlls/winegstreamer/unix_private.h b/dlls/winegstreamer/unix_private.h index f639b03bcce..070e1285b38 100644 --- a/dlls/winegstreamer/unix_private.h +++ b/dlls/winegstreamer/unix_private.h @@ -70,6 +70,7 @@ extern NTSTATUS wg_muxer_destroy(void *args) DECLSPEC_HIDDEN; extern NTSTATUS wg_muxer_add_stream(void *args) DECLSPEC_HIDDEN; extern NTSTATUS wg_muxer_start(void *args) DECLSPEC_HIDDEN; extern NTSTATUS wg_muxer_push_sample(void *args) DECLSPEC_HIDDEN; +extern NTSTATUS wg_muxer_read_data(void *args) DECLSPEC_HIDDEN;
/* wg_allocator.c */
diff --git a/dlls/winegstreamer/unixlib.h b/dlls/winegstreamer/unixlib.h index 6c0c61f76bb..7a2a4a8da2f 100644 --- a/dlls/winegstreamer/unixlib.h +++ b/dlls/winegstreamer/unixlib.h @@ -392,6 +392,14 @@ struct wg_muxer_push_sample_params UINT32 stream_id; };
+struct wg_muxer_read_data_params +{ + wg_muxer_t muxer; + void *buffer; + UINT32 size; + UINT64 offset; +}; + enum unix_funcs { unix_wg_init_gstreamer, @@ -437,6 +445,7 @@ enum unix_funcs unix_wg_muxer_add_stream, unix_wg_muxer_start, unix_wg_muxer_push_sample, + unix_wg_muxer_read_data,
unix_wg_funcs_count, }; diff --git a/dlls/winegstreamer/wg_muxer.c b/dlls/winegstreamer/wg_muxer.c index 7402ba91a58..0d25687a7d1 100644 --- a/dlls/winegstreamer/wg_muxer.c +++ b/dlls/winegstreamer/wg_muxer.c @@ -451,3 +451,9 @@ NTSTATUS wg_muxer_push_sample(void *args)
return STATUS_SUCCESS; } + +NTSTATUS wg_muxer_read_data(void *args) +{ + GST_FIXME("Not implemented."); + return STATUS_NOT_IMPLEMENTED; +} diff --git a/dlls/winegstreamer/wg_parser.c b/dlls/winegstreamer/wg_parser.c index 5a526b15712..4a513d66f77 100644 --- a/dlls/winegstreamer/wg_parser.c +++ b/dlls/winegstreamer/wg_parser.c @@ -1924,6 +1924,7 @@ const unixlib_entry_t __wine_unix_call_funcs[] = X(wg_muxer_add_stream), X(wg_muxer_start), X(wg_muxer_push_sample), + X(wg_muxer_read_data), };
C_ASSERT(ARRAYSIZE(__wine_unix_call_funcs) == unix_wg_funcs_count); @@ -2184,6 +2185,30 @@ NTSTATUS wow64_wg_muxer_push_sample(void *args) return wg_muxer_push_sample(¶ms); }
+NTSTATUS wow64_wg_muxer_read_data(void *args) +{ + struct + { + wg_muxer_t muxer; + PTR32 buffer; + UINT32 size; + UINT64 offset; + } *params32 = args; + struct wg_muxer_read_data_params params = + { + .muxer = params32->muxer, + .buffer = ULongToPtr(params32->buffer), + .size = params32->size, + .offset = params32->offset, + }; + NTSTATUS ret; + + ret = wg_muxer_read_data(¶ms); + params32->size = params.size; + params32->offset = params.offset; + return ret; +} + const unixlib_entry_t __wine_unix_call_wow64_funcs[] = { #define X64(name) [unix_ ## name] = wow64_ ## name @@ -2230,6 +2255,7 @@ const unixlib_entry_t __wine_unix_call_wow64_funcs[] = X64(wg_muxer_add_stream), X(wg_muxer_start), X64(wg_muxer_push_sample), + X64(wg_muxer_read_data), };
C_ASSERT(ARRAYSIZE(__wine_unix_call_wow64_funcs) == unix_wg_funcs_count);
From: Ziqing Hui zhui@codeweavers.com
--- dlls/winegstreamer/wg_muxer.c | 76 ++++++++++++++++++++++++++++++++--- 1 file changed, 71 insertions(+), 5 deletions(-)
diff --git a/dlls/winegstreamer/wg_muxer.c b/dlls/winegstreamer/wg_muxer.c index 0d25687a7d1..c0614807459 100644 --- a/dlls/winegstreamer/wg_muxer.c +++ b/dlls/winegstreamer/wg_muxer.c @@ -59,8 +59,11 @@ struct wg_muxer GstPad *my_sink; GstCaps *my_sink_caps;
+ GstAtomicQueue *output_queue; + GstBuffer *buffer; + pthread_mutex_t mutex; - guint64 offset; + guint64 offset; /* Write offset of the output buffer generated by muxer. */
struct list streams; }; @@ -198,8 +201,29 @@ static gboolean muxer_sink_event_cb(GstPad *pad, GstObject *parent, GstEvent *ev
static GstFlowReturn muxer_sink_chain_cb(GstPad *pad, GstObject *parent, GstBuffer *buffer) { - GST_FIXME("Stub."); - return GST_FLOW_ERROR; + GstBuffer *buffer_writable= gst_buffer_make_writable(buffer); + struct wg_muxer *muxer = gst_pad_get_element_private(pad); + + GST_DEBUG("muxer %p, pad %"GST_PTR_FORMAT", parent %"GST_PTR_FORMAT", buffer <%"GST_PTR_FORMAT">.", + muxer, pad, parent, buffer); + + pthread_mutex_lock(&muxer->mutex); + + GST_BUFFER_OFFSET(buffer_writable) = GST_BUFFER_OFFSET_NONE; + if (muxer->offset != GST_BUFFER_OFFSET_NONE) + { + GST_BUFFER_OFFSET(buffer_writable) = muxer->offset; + muxer->offset = GST_BUFFER_OFFSET_NONE; + } + + gst_atomic_queue_push(muxer->output_queue, buffer_writable); + + GST_DEBUG("Pushed writable buffer <%"GST_PTR_FORMAT"> to output queue %p, %u buffers in queue now.", + buffer_writable, muxer->output_queue, gst_atomic_queue_length(muxer->output_queue)); + + pthread_mutex_unlock(&muxer->mutex); + + return GST_FLOW_OK; }
static void stream_free(struct wg_muxer_stream *stream) @@ -226,6 +250,8 @@ NTSTATUS wg_muxer_create(void *args) pthread_mutex_init(&muxer->mutex, NULL); if (!(muxer->container = gst_bin_new("wg_muxer"))) goto out; + if (!(muxer->output_queue = gst_atomic_queue_new(8))) + goto out;
/* Create sink pad. */ if (!(muxer->my_sink_caps = gst_caps_from_string(params->format))) @@ -257,6 +283,8 @@ out: gst_object_unref(template); if (muxer->my_sink_caps) gst_caps_unref(muxer->my_sink_caps); + if (muxer->output_queue) + gst_atomic_queue_unref(muxer->output_queue); if (muxer->container) gst_object_unref(muxer->container); pthread_mutex_destroy(&muxer->mutex); @@ -269,6 +297,7 @@ NTSTATUS wg_muxer_destroy(void *args) { struct wg_muxer *muxer = get_muxer(*(wg_muxer_t *)args); struct wg_muxer_stream *stream, *next; + GstBuffer *buffer;
LIST_FOR_EACH_ENTRY_SAFE(stream, next, &muxer->streams, struct wg_muxer_stream, entry) { @@ -276,6 +305,13 @@ NTSTATUS wg_muxer_destroy(void *args) stream_free(stream); }
+ if (muxer->buffer) + gst_buffer_unref(muxer->buffer); + + while ((buffer = gst_atomic_queue_pop(muxer->output_queue))) + gst_buffer_unref(buffer); + gst_atomic_queue_unref(muxer->output_queue); + gst_object_unref(muxer->my_sink); gst_caps_unref(muxer->my_sink_caps); gst_element_set_state(muxer->container, GST_STATE_NULL); @@ -454,6 +490,36 @@ NTSTATUS wg_muxer_push_sample(void *args)
NTSTATUS wg_muxer_read_data(void *args) { - GST_FIXME("Not implemented."); - return STATUS_NOT_IMPLEMENTED; + struct wg_muxer_read_data_params *params = args; + struct wg_muxer *muxer = get_muxer(params->muxer); + gsize size, copied; + + /* Pop buffer from output queue. */ + if (!muxer->buffer) + { + if (!(muxer->buffer = gst_atomic_queue_pop(muxer->output_queue))) + return STATUS_NO_MEMORY; + + /* We may continuously read data from a same buffer multiple times. + * But we only need to set the offset at the first reading. */ + if (GST_BUFFER_OFFSET_IS_VALID(muxer->buffer)) + params->offset = GST_BUFFER_OFFSET(muxer->buffer); + } + + /* Copy data. */ + size = min(gst_buffer_get_size(muxer->buffer), params->size); + copied = gst_buffer_extract(muxer->buffer, 0, params->buffer, size); + params->size = copied; + + GST_INFO("Copied %"G_GSIZE_FORMAT" bytes from buffer <%"GST_PTR_FORMAT">", copied, muxer->buffer); + + /* Unref buffer if all data is read. */ + gst_buffer_resize(muxer->buffer, (gssize)copied, -1); + if (!gst_buffer_get_size(muxer->buffer)) + { + gst_buffer_unref(muxer->buffer); + muxer->buffer = NULL; + } + + return STATUS_SUCCESS; }
This merge request was approved by Rémi Bernon.
This merge request was approved by Zebediah Figura.