Module: wine Branch: master Commit: 1617bd3f874ea96b2757723d82a0bd1b656f3d2f URL: https://gitlab.winehq.org/wine/wine/-/commit/1617bd3f874ea96b2757723d82a0bd1...
Author: Ziqing Hui zhui@codeweavers.com Date: Mon Oct 30 10:29:08 2023 +0800
winegstreamer: Implement wg_muxer_read_data.
---
dlls/winegstreamer/wg_muxer.c | 76 ++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 71 insertions(+), 5 deletions(-)
diff --git a/dlls/winegstreamer/wg_muxer.c b/dlls/winegstreamer/wg_muxer.c index 0d25687a7d1..c0614807459 100644 --- a/dlls/winegstreamer/wg_muxer.c +++ b/dlls/winegstreamer/wg_muxer.c @@ -59,8 +59,11 @@ struct wg_muxer GstPad *my_sink; GstCaps *my_sink_caps;
+ GstAtomicQueue *output_queue; + GstBuffer *buffer; + pthread_mutex_t mutex; - guint64 offset; + guint64 offset; /* Write offset of the output buffer generated by muxer. */
struct list streams; }; @@ -198,8 +201,29 @@ static gboolean muxer_sink_event_cb(GstPad *pad, GstObject *parent, GstEvent *ev
static GstFlowReturn muxer_sink_chain_cb(GstPad *pad, GstObject *parent, GstBuffer *buffer) { - GST_FIXME("Stub."); - return GST_FLOW_ERROR; + GstBuffer *buffer_writable= gst_buffer_make_writable(buffer); + struct wg_muxer *muxer = gst_pad_get_element_private(pad); + + GST_DEBUG("muxer %p, pad %"GST_PTR_FORMAT", parent %"GST_PTR_FORMAT", buffer <%"GST_PTR_FORMAT">.", + muxer, pad, parent, buffer); + + pthread_mutex_lock(&muxer->mutex); + + GST_BUFFER_OFFSET(buffer_writable) = GST_BUFFER_OFFSET_NONE; + if (muxer->offset != GST_BUFFER_OFFSET_NONE) + { + GST_BUFFER_OFFSET(buffer_writable) = muxer->offset; + muxer->offset = GST_BUFFER_OFFSET_NONE; + } + + gst_atomic_queue_push(muxer->output_queue, buffer_writable); + + GST_DEBUG("Pushed writable buffer <%"GST_PTR_FORMAT"> to output queue %p, %u buffers in queue now.", + buffer_writable, muxer->output_queue, gst_atomic_queue_length(muxer->output_queue)); + + pthread_mutex_unlock(&muxer->mutex); + + return GST_FLOW_OK; }
static void stream_free(struct wg_muxer_stream *stream) @@ -226,6 +250,8 @@ NTSTATUS wg_muxer_create(void *args) pthread_mutex_init(&muxer->mutex, NULL); if (!(muxer->container = gst_bin_new("wg_muxer"))) goto out; + if (!(muxer->output_queue = gst_atomic_queue_new(8))) + goto out;
/* Create sink pad. */ if (!(muxer->my_sink_caps = gst_caps_from_string(params->format))) @@ -257,6 +283,8 @@ out: gst_object_unref(template); if (muxer->my_sink_caps) gst_caps_unref(muxer->my_sink_caps); + if (muxer->output_queue) + gst_atomic_queue_unref(muxer->output_queue); if (muxer->container) gst_object_unref(muxer->container); pthread_mutex_destroy(&muxer->mutex); @@ -269,6 +297,7 @@ NTSTATUS wg_muxer_destroy(void *args) { struct wg_muxer *muxer = get_muxer(*(wg_muxer_t *)args); struct wg_muxer_stream *stream, *next; + GstBuffer *buffer;
LIST_FOR_EACH_ENTRY_SAFE(stream, next, &muxer->streams, struct wg_muxer_stream, entry) { @@ -276,6 +305,13 @@ NTSTATUS wg_muxer_destroy(void *args) stream_free(stream); }
+ if (muxer->buffer) + gst_buffer_unref(muxer->buffer); + + while ((buffer = gst_atomic_queue_pop(muxer->output_queue))) + gst_buffer_unref(buffer); + gst_atomic_queue_unref(muxer->output_queue); + gst_object_unref(muxer->my_sink); gst_caps_unref(muxer->my_sink_caps); gst_element_set_state(muxer->container, GST_STATE_NULL); @@ -454,6 +490,36 @@ NTSTATUS wg_muxer_push_sample(void *args)
NTSTATUS wg_muxer_read_data(void *args) { - GST_FIXME("Not implemented."); - return STATUS_NOT_IMPLEMENTED; + struct wg_muxer_read_data_params *params = args; + struct wg_muxer *muxer = get_muxer(params->muxer); + gsize size, copied; + + /* Pop buffer from output queue. */ + if (!muxer->buffer) + { + if (!(muxer->buffer = gst_atomic_queue_pop(muxer->output_queue))) + return STATUS_NO_MEMORY; + + /* We may continuously read data from a same buffer multiple times. + * But we only need to set the offset at the first reading. */ + if (GST_BUFFER_OFFSET_IS_VALID(muxer->buffer)) + params->offset = GST_BUFFER_OFFSET(muxer->buffer); + } + + /* Copy data. */ + size = min(gst_buffer_get_size(muxer->buffer), params->size); + copied = gst_buffer_extract(muxer->buffer, 0, params->buffer, size); + params->size = copied; + + GST_INFO("Copied %"G_GSIZE_FORMAT" bytes from buffer <%"GST_PTR_FORMAT">", copied, muxer->buffer); + + /* Unref buffer if all data is read. */ + gst_buffer_resize(muxer->buffer, (gssize)copied, -1); + if (!gst_buffer_get_size(muxer->buffer)) + { + gst_buffer_unref(muxer->buffer); + muxer->buffer = NULL; + } + + return STATUS_SUCCESS; }