From: Ziqing Hui zhui@codeweavers.com
--- dlls/winegstreamer/wg_muxer.c | 135 +++++++++++++++++++++++++++++++++- 1 file changed, 134 insertions(+), 1 deletion(-)
diff --git a/dlls/winegstreamer/wg_muxer.c b/dlls/winegstreamer/wg_muxer.c index 7434aed13fe..ca90fe4440f 100644 --- a/dlls/winegstreamer/wg_muxer.c +++ b/dlls/winegstreamer/wg_muxer.c @@ -42,6 +42,11 @@ struct wg_muxer GstPad *my_sink; GstCaps *my_sink_caps;
+ GstAtomicQueue *output_queue; + + pthread_mutex_t mutex; + guint64 offset; + struct list streams; };
@@ -194,6 +199,19 @@ static bool stream_start(struct wg_muxer_stream *stream) return true; }
+static struct wg_muxer_stream *muxer_get_stream_by_id(struct wg_muxer *muxer, DWORD id) +{ + struct wg_muxer_stream *stream; + + LIST_FOR_EACH_ENTRY(stream, &muxer->streams, struct wg_muxer_stream, entry) + { + if (stream->id == id) + return stream; + } + + return NULL; +} + static bool muxer_try(struct wg_muxer *muxer, GstElementFactory *muxer_factory, GstRank min_rank) { char *element_name, *plugin_name = gst_plugin_feature_get_name(GST_PLUGIN_FEATURE(muxer_factory)); @@ -319,6 +337,66 @@ static gboolean muxer_sink_query_cb(GstPad *pad, GstObject *parent, GstQuery *qu } }
+static gboolean muxer_sink_event_cb(GstPad *pad, GstObject *parent, GstEvent *event) +{ + struct wg_muxer *muxer = gst_pad_get_element_private(pad); + const GstSegment *segment; + + GST_DEBUG("pad %p, parent %p, event %p, muxer %p, type "%s".", + pad, parent, event, muxer, GST_EVENT_TYPE_NAME(event)); + + switch (event->type) + { + case GST_EVENT_SEGMENT: + pthread_mutex_lock(&muxer->mutex); + + gst_event_parse_segment(event, &segment); + if (segment->format != GST_FORMAT_BYTES) + { + pthread_mutex_unlock(&muxer->mutex); + GST_FIXME("Unhandled segment format "%s".", gst_format_get_name(segment->format)); + break; + } + muxer->offset = segment->start; + + pthread_mutex_unlock(&muxer->mutex); + break; + + default: + GST_WARNING("Ignoring "%s" event.", GST_EVENT_TYPE_NAME(event)); + break; + } + + gst_event_unref(event); + return TRUE; +} + +static GstFlowReturn muxer_sink_chain_cb(GstPad *pad, GstObject *parent, GstBuffer *buffer) +{ + struct wg_muxer *muxer = gst_pad_get_element_private(pad); + + GST_DEBUG("pad %p, parent %p, buffer %p, muxer %p.", pad, parent, buffer, muxer); + + pthread_mutex_lock(&muxer->mutex); + + GST_BUFFER_OFFSET(buffer) = GST_BUFFER_OFFSET_NONE; + if (muxer->offset != GST_BUFFER_OFFSET_NONE) + { + GST_BUFFER_OFFSET(buffer) = muxer->offset; + muxer->offset = GST_BUFFER_OFFSET_NONE; + } + gst_atomic_queue_push(muxer->output_queue, buffer); + + GST_LOG("Pushed buffer %p to output queue %p, buffer size %" G_GSIZE_FORMAT ", " + "offset %" G_GUINT64_FORMAT ", %u buffers in queue now.", + buffer, muxer->output_queue, gst_buffer_get_size(buffer), GST_BUFFER_OFFSET(buffer), + gst_atomic_queue_length(muxer->output_queue)); + + pthread_mutex_unlock(&muxer->mutex); + + return GST_FLOW_OK; +} + NTSTATUS wg_muxer_create(void *args) { struct wg_muxer_create_params *params = args; @@ -330,8 +408,12 @@ NTSTATUS wg_muxer_create(void *args) if (!(muxer = calloc(1, sizeof(*muxer)))) return STATUS_NO_MEMORY; list_init(&muxer->streams); + muxer->offset = GST_BUFFER_OFFSET_NONE; + pthread_mutex_init(&muxer->mutex, NULL); if (!(muxer->container = gst_bin_new("wg_muxer"))) goto out; + if (!(muxer->output_queue = gst_atomic_queue_new(8))) + goto out;
/* Create sink pad. */ if (!(muxer->my_sink_caps = gst_caps_from_string(params->format))) @@ -346,6 +428,8 @@ NTSTATUS wg_muxer_create(void *args) goto out; gst_pad_set_element_private(muxer->my_sink, muxer); gst_pad_set_query_function(muxer->my_sink, muxer_sink_query_cb); + gst_pad_set_event_function(muxer->my_sink, muxer_sink_event_cb); + gst_pad_set_chain_function(muxer->my_sink, muxer_sink_chain_cb);
gst_object_unref(template);
@@ -361,8 +445,10 @@ out: gst_object_unref(template); if (muxer->my_sink_caps) gst_caps_unref(muxer->my_sink_caps); + gst_atomic_queue_unref(muxer->output_queue); if (muxer->container) gst_object_unref(muxer->container); + pthread_mutex_destroy(&muxer->mutex); free(muxer);
return status; @@ -372,6 +458,7 @@ NTSTATUS wg_muxer_destroy(void *args) { struct wg_muxer *muxer = get_muxer(*(wg_muxer_t *)args); struct wg_muxer_stream *stream, *next; + GstBuffer *buffer;
LIST_FOR_EACH_ENTRY_SAFE(stream, next, &muxer->streams, struct wg_muxer_stream, entry) { @@ -380,10 +467,18 @@ NTSTATUS wg_muxer_destroy(void *args) gst_caps_unref(stream->my_src_caps); free(stream); } + + while ((buffer = gst_atomic_queue_pop(muxer->output_queue))) + gst_buffer_unref(buffer); + gst_atomic_queue_unref(muxer->output_queue); + gst_object_unref(muxer->my_sink); gst_caps_unref(muxer->my_sink_caps); gst_element_set_state(muxer->container, GST_STATE_NULL); gst_object_unref(muxer->container); + + pthread_mutex_destroy(&muxer->mutex); + free(muxer);
return S_OK; @@ -477,5 +572,43 @@ NTSTATUS wg_muxer_start(void *args)
NTSTATUS wg_muxer_push_sample(void *args) { - return STATUS_NOT_IMPLEMENTED; + struct wg_muxer_push_sample_params *params = args; + struct wg_muxer *muxer = get_muxer(params->muxer); + struct wg_sample *sample = params->sample; + struct wg_muxer_stream *stream; + GstFlowReturn ret; + GstBuffer *buffer; + + if (!(stream = muxer_get_stream_by_id(muxer, params->stream_id))) + return STATUS_NOT_FOUND; + + /* Create sample data buffer. */ + if (!(buffer = gst_buffer_new_and_alloc(sample->size)) + || !gst_buffer_fill(buffer, 0, wg_sample_data(sample), sample->size)) + { + GST_ERROR("Failed to allocate input buffer."); + return STATUS_NO_MEMORY; + } + + GST_INFO("Copied %u bytes from sample %p to buffer %p.", sample->size, sample, buffer); + + /* Set sample properties. */ + if (sample->flags & WG_SAMPLE_FLAG_HAS_PTS) + GST_BUFFER_PTS(buffer) = sample->pts * 100; + if (sample->flags & WG_SAMPLE_FLAG_HAS_DURATION) + GST_BUFFER_DURATION(buffer) = sample->duration * 100; + if (!(sample->flags & WG_SAMPLE_FLAG_SYNC_POINT)) + GST_BUFFER_FLAG_SET(buffer, GST_BUFFER_FLAG_DELTA_UNIT); + if (sample->flags & WG_SAMPLE_FLAG_DISCONTINUITY) + GST_BUFFER_FLAG_SET(buffer, GST_BUFFER_FLAG_DISCONT); + + /* Push sample data buffer to stream src pad. */ + if ((ret = gst_pad_push(stream->my_src, buffer)) < 0) + { + GST_ERROR("Failed to push buffer %p to pad %s, reason %s.", + buffer, gst_pad_get_name(stream->my_src), gst_flow_get_name(ret)); + return STATUS_UNSUCCESSFUL; + } + + return STATUS_SUCCESS; }