From: Ziqing Hui zhui@codeweavers.com
--- dlls/winegstreamer/gst_private.h | 1 + dlls/winegstreamer/main.c | 22 ++++++++++++++++++++++ dlls/winegstreamer/media_sink.c | 25 +++++++++++++++++++++++++ dlls/winegstreamer/unix_private.h | 1 + dlls/winegstreamer/unixlib.h | 9 +++++++++ dlls/winegstreamer/wg_muxer.c | 6 ++++++ dlls/winegstreamer/wg_parser.c | 26 ++++++++++++++++++++++++++ 7 files changed, 90 insertions(+)
diff --git a/dlls/winegstreamer/gst_private.h b/dlls/winegstreamer/gst_private.h index 8712980092b..26873edb74b 100644 --- a/dlls/winegstreamer/gst_private.h +++ b/dlls/winegstreamer/gst_private.h @@ -115,6 +115,7 @@ void wg_muxer_destroy(wg_muxer_t muxer); HRESULT wg_muxer_add_stream(wg_muxer_t muxer, UINT32 stream_id, const struct wg_format *format); HRESULT wg_muxer_start(wg_muxer_t muxer); HRESULT wg_muxer_push_sample(wg_muxer_t muxer, struct wg_sample *sample, UINT32 stream_id); +HRESULT wg_muxer_read_data(wg_muxer_t muxer, void *buffer, UINT32 *size, UINT64 *offset);
unsigned int wg_format_get_bytes_for_uncompressed(wg_video_format format, unsigned int width, unsigned int height); unsigned int wg_format_get_max_size(const struct wg_format *format); diff --git a/dlls/winegstreamer/main.c b/dlls/winegstreamer/main.c index 3eb82bdccf5..c460fcfb037 100644 --- a/dlls/winegstreamer/main.c +++ b/dlls/winegstreamer/main.c @@ -544,6 +544,28 @@ HRESULT wg_muxer_push_sample(wg_muxer_t muxer, struct wg_sample *sample, UINT32 return S_OK; }
+HRESULT wg_muxer_read_data(wg_muxer_t muxer, void *buffer, UINT32 *size, UINT64 *offset) +{ + struct wg_muxer_read_data_params params = + { + .muxer = muxer, + .buffer = buffer, + .size = *size, + .offset = UINT64_MAX, + }; + NTSTATUS status; + + TRACE("muxer %#I64x, buffer %p, size %u.\n", muxer, buffer, *size); + + if (SUCCEEDED(status = WINE_UNIX_CALL(unix_wg_muxer_read_data, ¶ms))) + { + *size = params.size; + *offset = params.offset; + } + + return HRESULT_FROM_NT(status); +} + #define ALIGN(n, alignment) (((n) + (alignment) - 1) & ~((alignment) - 1))
unsigned int wg_format_get_stride(const struct wg_format *format) diff --git a/dlls/winegstreamer/media_sink.c b/dlls/winegstreamer/media_sink.c index d9c15291b61..6996d689e83 100644 --- a/dlls/winegstreamer/media_sink.c +++ b/dlls/winegstreamer/media_sink.c @@ -542,6 +542,28 @@ static HRESULT media_sink_queue_stream_event(struct media_sink *media_sink, Medi return S_OK; }
+static HRESULT media_sink_write_stream(struct media_sink *media_sink) +{ + BYTE buffer[1024]; + UINT32 size = sizeof(buffer); + ULONG written; + QWORD offset; + HRESULT hr; + + while (SUCCEEDED(hr = wg_muxer_read_data(media_sink->muxer, buffer, &size, &offset))) + { + if (offset != UINT64_MAX && FAILED(hr = IMFByteStream_SetCurrentPosition(media_sink->bytestream, offset))) + return hr; + + if (FAILED(hr = IMFByteStream_Write(media_sink->bytestream, buffer, size, &written))) + return hr; + + size = sizeof(buffer); + } + + return S_OK; +} + static HRESULT media_sink_start(struct media_sink *media_sink) { HRESULT hr; @@ -577,6 +599,9 @@ static HRESULT media_sink_process(struct media_sink *media_sink, IMFSample *samp
TRACE("media_sink %p, sample %p, stream_id %u.\n", media_sink, sample, stream_id);
+ if (FAILED(hr = media_sink_write_stream(media_sink))) + WARN("Failed to write output samples to stream, hr %#lx.\n", hr); + if (FAILED(hr = wg_sample_create_mf(sample, &wg_sample))) return hr;
diff --git a/dlls/winegstreamer/unix_private.h b/dlls/winegstreamer/unix_private.h index f639b03bcce..070e1285b38 100644 --- a/dlls/winegstreamer/unix_private.h +++ b/dlls/winegstreamer/unix_private.h @@ -70,6 +70,7 @@ extern NTSTATUS wg_muxer_destroy(void *args) DECLSPEC_HIDDEN; extern NTSTATUS wg_muxer_add_stream(void *args) DECLSPEC_HIDDEN; extern NTSTATUS wg_muxer_start(void *args) DECLSPEC_HIDDEN; extern NTSTATUS wg_muxer_push_sample(void *args) DECLSPEC_HIDDEN; +extern NTSTATUS wg_muxer_read_data(void *args) DECLSPEC_HIDDEN;
/* wg_allocator.c */
diff --git a/dlls/winegstreamer/unixlib.h b/dlls/winegstreamer/unixlib.h index 6c0c61f76bb..7a2a4a8da2f 100644 --- a/dlls/winegstreamer/unixlib.h +++ b/dlls/winegstreamer/unixlib.h @@ -392,6 +392,14 @@ struct wg_muxer_push_sample_params UINT32 stream_id; };
+struct wg_muxer_read_data_params +{ + wg_muxer_t muxer; + void *buffer; + UINT32 size; + UINT64 offset; +}; + enum unix_funcs { unix_wg_init_gstreamer, @@ -437,6 +445,7 @@ enum unix_funcs unix_wg_muxer_add_stream, unix_wg_muxer_start, unix_wg_muxer_push_sample, + unix_wg_muxer_read_data,
unix_wg_funcs_count, }; diff --git a/dlls/winegstreamer/wg_muxer.c b/dlls/winegstreamer/wg_muxer.c index 7402ba91a58..0d25687a7d1 100644 --- a/dlls/winegstreamer/wg_muxer.c +++ b/dlls/winegstreamer/wg_muxer.c @@ -451,3 +451,9 @@ NTSTATUS wg_muxer_push_sample(void *args)
return STATUS_SUCCESS; } + +NTSTATUS wg_muxer_read_data(void *args) +{ + GST_FIXME("Not implemented."); + return STATUS_NOT_IMPLEMENTED; +} diff --git a/dlls/winegstreamer/wg_parser.c b/dlls/winegstreamer/wg_parser.c index 5a526b15712..4a513d66f77 100644 --- a/dlls/winegstreamer/wg_parser.c +++ b/dlls/winegstreamer/wg_parser.c @@ -1924,6 +1924,7 @@ const unixlib_entry_t __wine_unix_call_funcs[] = X(wg_muxer_add_stream), X(wg_muxer_start), X(wg_muxer_push_sample), + X(wg_muxer_read_data), };
C_ASSERT(ARRAYSIZE(__wine_unix_call_funcs) == unix_wg_funcs_count); @@ -2184,6 +2185,30 @@ NTSTATUS wow64_wg_muxer_push_sample(void *args) return wg_muxer_push_sample(¶ms); }
+NTSTATUS wow64_wg_muxer_read_data(void *args) +{ + struct + { + wg_muxer_t muxer; + PTR32 buffer; + UINT32 size; + UINT64 offset; + } *params32 = args; + struct wg_muxer_read_data_params params = + { + .muxer = params32->muxer, + .buffer = ULongToPtr(params32->buffer), + .size = params32->size, + .offset = params32->offset, + }; + NTSTATUS ret; + + ret = wg_muxer_read_data(¶ms); + params32->size = params.size; + params32->offset = params.offset; + return ret; +} + const unixlib_entry_t __wine_unix_call_wow64_funcs[] = { #define X64(name) [unix_ ## name] = wow64_ ## name @@ -2230,6 +2255,7 @@ const unixlib_entry_t __wine_unix_call_wow64_funcs[] = X64(wg_muxer_add_stream), X(wg_muxer_start), X64(wg_muxer_push_sample), + X64(wg_muxer_read_data), };
C_ASSERT(ARRAYSIZE(__wine_unix_call_wow64_funcs) == unix_wg_funcs_count);
From: Ziqing Hui zhui@codeweavers.com
--- dlls/winegstreamer/wg_muxer.c | 88 +++++++++++++++++++++++++++++++++-- 1 file changed, 83 insertions(+), 5 deletions(-)
diff --git a/dlls/winegstreamer/wg_muxer.c b/dlls/winegstreamer/wg_muxer.c index 0d25687a7d1..bf4e8090dcf 100644 --- a/dlls/winegstreamer/wg_muxer.c +++ b/dlls/winegstreamer/wg_muxer.c @@ -59,8 +59,13 @@ struct wg_muxer GstPad *my_sink; GstCaps *my_sink_caps;
+ GstAtomicQueue *output_queue; + GstBuffer *buffer; + GstMapInfo buffer_map; + size_t buffer_read; + pthread_mutex_t mutex; - guint64 offset; + guint64 offset; /* Write offset of the output buffer generated by muxer. */
struct list streams; }; @@ -198,8 +203,31 @@ static gboolean muxer_sink_event_cb(GstPad *pad, GstObject *parent, GstEvent *ev
static GstFlowReturn muxer_sink_chain_cb(GstPad *pad, GstObject *parent, GstBuffer *buffer) { - GST_FIXME("Stub."); - return GST_FLOW_ERROR; + GstBuffer *buffer_writable= gst_buffer_make_writable(buffer); + struct wg_muxer *muxer = gst_pad_get_element_private(pad); + + GST_DEBUG("muxer %p, pad %"GST_PTR_FORMAT", parent %"GST_PTR_FORMAT", buffer %"GST_PTR_FORMAT".", + muxer, pad, parent, buffer); + + pthread_mutex_lock(&muxer->mutex); + + GST_BUFFER_OFFSET(buffer_writable) = GST_BUFFER_OFFSET_NONE; + if (muxer->offset != GST_BUFFER_OFFSET_NONE) + { + GST_BUFFER_OFFSET(buffer_writable) = muxer->offset; + muxer->offset = GST_BUFFER_OFFSET_NONE; + } + + gst_atomic_queue_push(muxer->output_queue, buffer_writable); + + GST_DEBUG("Pushed buffer %"GST_PTR_FORMAT" to output queue %p," + " buffer size %" G_GSIZE_FORMAT ", offset %" G_GUINT64_FORMAT ", %u buffers in queue now.", + buffer_writable, muxer->output_queue, gst_buffer_get_size(buffer_writable), + GST_BUFFER_OFFSET(buffer_writable), gst_atomic_queue_length(muxer->output_queue)); + + pthread_mutex_unlock(&muxer->mutex); + + return GST_FLOW_OK; }
static void stream_free(struct wg_muxer_stream *stream) @@ -269,6 +297,7 @@ NTSTATUS wg_muxer_destroy(void *args) { struct wg_muxer *muxer = get_muxer(*(wg_muxer_t *)args); struct wg_muxer_stream *stream, *next; + GstBuffer *buffer;
LIST_FOR_EACH_ENTRY_SAFE(stream, next, &muxer->streams, struct wg_muxer_stream, entry) { @@ -276,6 +305,16 @@ NTSTATUS wg_muxer_destroy(void *args) stream_free(stream); }
+ if (muxer->buffer) + { + gst_buffer_unmap(muxer->buffer, &muxer->buffer_map); + gst_buffer_unref(muxer->buffer); + } + + while ((buffer = gst_atomic_queue_pop(muxer->output_queue))) + gst_buffer_unref(buffer); + gst_atomic_queue_unref(muxer->output_queue); + gst_object_unref(muxer->my_sink); gst_caps_unref(muxer->my_sink_caps); gst_element_set_state(muxer->container, GST_STATE_NULL); @@ -454,6 +493,45 @@ NTSTATUS wg_muxer_push_sample(void *args)
NTSTATUS wg_muxer_read_data(void *args) { - GST_FIXME("Not implemented."); - return STATUS_NOT_IMPLEMENTED; + struct wg_muxer_read_data_params *params = args; + struct wg_muxer *muxer = get_muxer(params->muxer); + size_t size; + + /* Pop buffer from output queue, and map it. */ + if (!muxer->buffer) + { + if (!(muxer->buffer = gst_atomic_queue_pop(muxer->output_queue))) + return STATUS_NO_MEMORY; + + if (!gst_buffer_map(muxer->buffer, &muxer->buffer_map, GST_MAP_READ)) + { + GST_ERROR("Failed to map buffer %p, dropped.", muxer->buffer); + gst_buffer_unref(muxer->buffer); + muxer->buffer = NULL; + return STATUS_UNSUCCESSFUL; + } + muxer->buffer_read = 0; + } + + /* Copy data. */ + size = min(muxer->buffer_map.size - muxer->buffer_read, params->size); + memcpy(params->buffer, muxer->buffer_map.data + muxer->buffer_read, size); + + /* Set offset and size. + * We may continuously read data from a same buffer multiple times. + * But we only need to set the offset at the first reading. */ + params->size = size; + if (GST_BUFFER_OFFSET_IS_VALID(muxer->buffer) && !muxer->buffer_read) + params->offset = GST_BUFFER_OFFSET(muxer->buffer); + + /* Unmap buffer if all data is read. */ + muxer->buffer_read += size; + if (muxer->buffer_read == muxer->buffer_map.size) + { + gst_buffer_unmap(muxer->buffer, &muxer->buffer_map); + gst_buffer_unref(muxer->buffer); + muxer->buffer = NULL; + } + + return STATUS_SUCCESS; }
Rémi Bernon (@rbernon) commented about dlls/winegstreamer/wg_muxer.c:
GstPad *my_sink; GstCaps *my_sink_caps;
- GstAtomicQueue *output_queue;
- GstBuffer *buffer;
- GstMapInfo buffer_map;
- size_t buffer_read;
I don't think you should keep the buffer mapped across reads. If the client stops reading in the middle for some reason the buffer will stay mapped until the sink is destroyed. GStreamer doc specifically states that buffer should only be mapped for short period of times.
Also, instead of having to keep the number of bytes read here you could use `gst_buffer_resize` to update the buffer data offset after it has been partially read.
Rémi Bernon (@rbernon) commented about dlls/winegstreamer/wg_muxer.c:
- size_t size;
- /* Pop buffer from output queue, and map it. */
- if (!muxer->buffer)
- {
if (!(muxer->buffer = gst_atomic_queue_pop(muxer->output_queue)))
return STATUS_NO_MEMORY;
if (!gst_buffer_map(muxer->buffer, &muxer->buffer_map, GST_MAP_READ))
{
GST_ERROR("Failed to map buffer %p, dropped.", muxer->buffer);
gst_buffer_unref(muxer->buffer);
muxer->buffer = NULL;
return STATUS_UNSUCCESSFUL;
}
muxer->buffer_read = 0;
And you could set params->offset here instead.
On Thu Nov 9 10:28:19 2023 +0000, Rémi Bernon wrote:
I don't think you should keep the buffer mapped across reads. If the client stops reading in the middle for some reason the buffer will stay mapped until the sink is destroyed. GStreamer doc specifically states that buffer should only be mapped for short period of times. Also, instead of having to keep the number of bytes read here you could use `gst_buffer_resize` to update the buffer data offset after it has been partially read.
Well I can't find where I read that, maybe it's not that definite. Still IMO it'd be nicer not to keep buffer mapped.
On Thu Nov 9 10:50:40 2023 +0000, Rémi Bernon wrote:
Well I can't find where I read that, maybe it's not that definite. Still IMO it'd be nicer not to keep buffer mapped.
It probably doesn't matter, but I think it would likely be less code to just map it every time, yeah.
On Thu Nov 9 19:16:49 2023 +0000, Zebediah Figura wrote:
It probably doesn't matter, but I think it would likely be less code to just map it every time, yeah.
What about using gst_buffer_extract()? It looks like the simplest way, it map/unmap the buffer inside the function itself, and we don't need to map/unmap by ourselves.