Instead of having _Shutdown and the constructors' failures paths mix together creating confusion about what should be released where, we can designate ::Shutdown/::Release to shutting down fully initialized objects without checks, and keep the partially-created object cleanup code in the constructor.
Signed-off-by: Derek Lesho dlesho@codeweavers.com --- dlls/winegstreamer/media_source.c | 99 +++++++++++++++++++------------ 1 file changed, 62 insertions(+), 37 deletions(-)
diff --git a/dlls/winegstreamer/media_source.c b/dlls/winegstreamer/media_source.c index 01ab626254a..2a0bb83374d 100644 --- a/dlls/winegstreamer/media_source.c +++ b/dlls/winegstreamer/media_source.c @@ -733,6 +733,12 @@ static HRESULT new_media_stream(struct media_source *source, object->IMFMediaStream_iface.lpVtbl = &media_stream_vtbl; object->ref = 1;
+ if (FAILED(hr = MFCreateEventQueue(&object->event_queue))) + { + free(object); + return hr; + } + IMFMediaSource_AddRef(&source->IMFMediaSource_iface); object->parent_source = source; object->stream_id = stream_id; @@ -741,20 +747,11 @@ static HRESULT new_media_stream(struct media_source *source, object->eos = FALSE; object->wg_stream = wg_stream;
- if (FAILED(hr = MFCreateEventQueue(&object->event_queue))) - goto fail; - TRACE("Created stream object %p.\n", object);
*out_stream = object;
return S_OK; - -fail: - WARN("Failed to construct media stream, hr %#x.\n", hr); - - IMFMediaStream_Release(&object->IMFMediaStream_iface); - return hr; }
static HRESULT media_stream_init_desc(struct media_stream *stream) @@ -857,6 +854,8 @@ done: IMFMediaTypeHandler_Release(type_handler); for (i = 0; i < type_count; i++) IMFMediaType_Release(stream_types[i]); + if (FAILED(hr)) + IMFStreamDescriptor_Release(stream->descriptor); return hr; }
@@ -1213,19 +1212,13 @@ static HRESULT WINAPI media_source_Shutdown(IMFMediaSource *iface)
unix_funcs->wg_parser_disconnect(source->wg_parser);
- if (source->read_thread) - { - source->read_thread_shutdown = true; - WaitForSingleObject(source->read_thread, INFINITE); - CloseHandle(source->read_thread); - } + source->read_thread_shutdown = true; + WaitForSingleObject(source->read_thread, INFINITE); + CloseHandle(source->read_thread);
- if (source->pres_desc) - IMFPresentationDescriptor_Release(source->pres_desc); - if (source->event_queue) - IMFMediaEventQueue_Shutdown(source->event_queue); - if (source->byte_stream) - IMFByteStream_Release(source->byte_stream); + IMFPresentationDescriptor_Release(source->pres_desc); + IMFMediaEventQueue_Shutdown(source->event_queue); + IMFByteStream_Release(source->byte_stream);
for (i = 0; i < source->stream_count; i++) { @@ -1233,23 +1226,18 @@ static HRESULT WINAPI media_source_Shutdown(IMFMediaSource *iface)
stream->state = STREAM_SHUTDOWN;
- if (stream->event_queue) - IMFMediaEventQueue_Shutdown(stream->event_queue); - if (stream->descriptor) - IMFStreamDescriptor_Release(stream->descriptor); - if (stream->parent_source) - IMFMediaSource_Release(&stream->parent_source->IMFMediaSource_iface); + IMFMediaEventQueue_Shutdown(stream->event_queue); + IMFStreamDescriptor_Release(stream->descriptor); + IMFMediaSource_Release(&stream->parent_source->IMFMediaSource_iface);
IMFMediaStream_Release(&stream->IMFMediaStream_iface); }
unix_funcs->wg_parser_destroy(source->wg_parser);
- if (source->stream_count) - free(source->streams); + free(source->streams);
- if (source->async_commands_queue) - MFUnlockWorkQueue(source->async_commands_queue); + MFUnlockWorkQueue(source->async_commands_queue);
return S_OK; } @@ -1274,6 +1262,7 @@ static const IMFMediaSourceVtbl IMFMediaSource_vtbl = static HRESULT media_source_constructor(IMFByteStream *bytestream, struct media_source **out_media_source) { IMFStreamDescriptor **descriptors = NULL; + unsigned int stream_count = 0; struct media_source *object; UINT64 total_pres_time = 0; struct wg_parser *parser; @@ -1337,15 +1326,15 @@ static HRESULT media_source_constructor(IMFByteStream *bytestream, struct media_ * leak occurs with native. */ unix_funcs->wg_parser_set_unlimited_buffering(parser);
- object->stream_count = unix_funcs->wg_parser_get_stream_count(parser); + stream_count = unix_funcs->wg_parser_get_stream_count(parser);
- if (!(object->streams = calloc(object->stream_count, sizeof(*object->streams)))) + if (!(object->streams = calloc(stream_count, sizeof(*object->streams)))) { hr = E_OUTOFMEMORY; goto fail; }
- for (i = 0; i < object->stream_count; ++i) + for (i = 0; i < stream_count; ++i) { if (FAILED(hr = new_media_stream(object, unix_funcs->wg_parser_get_stream(parser, i), i, &object->streams[i]))) goto fail; @@ -1353,9 +1342,13 @@ static HRESULT media_source_constructor(IMFByteStream *bytestream, struct media_ if (FAILED(hr = media_stream_init_desc(object->streams[i]))) { ERR("Failed to finish initialization of media stream %p, hr %x.\n", object->streams[i], hr); - IMFMediaStream_Release(&object->streams[i]->IMFMediaStream_iface); + IMFMediaSource_Release(&object->streams[i]->parent_source->IMFMediaSource_iface); + IMFMediaEventQueue_Release(object->streams[i]->event_queue); + free(object->streams[i]); goto fail; } + + object->stream_count++; }
/* init presentation descriptor */ @@ -1392,8 +1385,40 @@ static HRESULT media_source_constructor(IMFByteStream *bytestream, struct media_ fail: WARN("Failed to construct MFMediaSource, hr %#x.\n", hr);
- free(descriptors); - IMFMediaSource_Release(&object->IMFMediaSource_iface); + if (descriptors) + { + for(i = 0; i < object->stream_count; i++) + IMFStreamDescriptor_Release(descriptors[i]); + free(descriptors); + } + for (i = 0; i < object->stream_count; i++) + { + struct media_stream *stream = object->streams[i]; + + IMFMediaEventQueue_Release(stream->event_queue); + IMFStreamDescriptor_Release(stream->descriptor); + IMFMediaSource_Release(&stream->parent_source->IMFMediaSource_iface); + + free(stream); + } + if (object->streams) + free(object->streams); + if (stream_count) + unix_funcs->wg_parser_disconnect(object->wg_parser); + if (object->read_thread) + { + object->read_thread_shutdown = true; + WaitForSingleObject(object->read_thread, INFINITE); + CloseHandle(object->read_thread); + } + if (object->wg_parser) + unix_funcs->wg_parser_destroy(object->wg_parser); + if (object->async_commands_queue) + MFUnlockWorkQueue(object->async_commands_queue); + if (object->event_queue) + IMFMediaEventQueue_Release(object->event_queue); + IMFByteStream_Release(object->byte_stream); + free(object); return hr; }
This adds a allocate/blit for the input data, but this is necessary for both WOW64 support and an internal rework of the source path in wg_parser to use GstAppSrc. Source data is usually compressed anyway, so it shouldn't affect performance too badly.
Signed-off-by: Derek Lesho dlesho@codeweavers.com --- v2: - Move EOS detection to client side. - Maintain expanding buffer for uploads instead of reallocating on every request. --- dlls/winegstreamer/gst_private.h | 7 ++++--- dlls/winegstreamer/media_source.c | 29 +++++++++++++++++++++++++---- dlls/winegstreamer/quartz_parser.c | 27 ++++++++++++++++++++++++--- dlls/winegstreamer/wg_parser.c | 26 +++++++++++++------------- 4 files changed, 66 insertions(+), 23 deletions(-)
diff --git a/dlls/winegstreamer/gst_private.h b/dlls/winegstreamer/gst_private.h index c6c99b1dd55..9943682facb 100644 --- a/dlls/winegstreamer/gst_private.h +++ b/dlls/winegstreamer/gst_private.h @@ -169,9 +169,10 @@ struct unix_funcs void (CDECL *wg_parser_begin_flush)(struct wg_parser *parser); void (CDECL *wg_parser_end_flush)(struct wg_parser *parser);
- bool (CDECL *wg_parser_get_read_request)(struct wg_parser *parser, - void **data, uint64_t *offset, uint32_t *size); - void (CDECL *wg_parser_complete_read_request)(struct wg_parser *parser, bool ret); + bool (CDECL *wg_parser_get_next_read_offset)(struct wg_parser *parser, + uint64_t *offset, uint32_t *size); + void (CDECL *wg_parser_push_data)(struct wg_parser *parser, + const void *data, uint32_t size);
void (CDECL *wg_parser_set_unlimited_buffering)(struct wg_parser *parser);
diff --git a/dlls/winegstreamer/media_source.c b/dlls/winegstreamer/media_source.c index 2a0bb83374d..0ff0cd73790 100644 --- a/dlls/winegstreamer/media_source.c +++ b/dlls/winegstreamer/media_source.c @@ -530,6 +530,11 @@ static DWORD CALLBACK read_thread(void *arg) { struct media_source *source = arg; IMFByteStream *byte_stream = source->byte_stream; + uint32_t buffer_size = 0; + uint64_t file_size; + void *data = NULL; + + IMFByteStream_GetLength(byte_stream, &file_size);
TRACE("Starting read thread for media source %p.\n", source);
@@ -539,18 +544,34 @@ static DWORD CALLBACK read_thread(void *arg) ULONG ret_size; uint32_t size; HRESULT hr; - void *data;
- if (!unix_funcs->wg_parser_get_read_request(source->wg_parser, &data, &offset, &size)) + if (!unix_funcs->wg_parser_get_next_read_offset(source->wg_parser, &offset, &size)) continue;
+ if (offset >= file_size) + size = 0; + else if (offset + size >= file_size) + size = file_size - offset; + + if (size > buffer_size) + { + buffer_size = size; + free(data); + data = malloc(size); + } + + ret_size = 0; + if (SUCCEEDED(hr = IMFByteStream_SetCurrentPosition(byte_stream, offset))) hr = IMFByteStream_Read(byte_stream, data, size, &ret_size); - if (SUCCEEDED(hr) && ret_size != size) + if (FAILED(hr)) + ERR("Failed to read source stream bytes %p+%u. hr=%#x\n", data, size, hr); + else if (ret_size != size) ERR("Unexpected short read: requested %u bytes, got %u.\n", size, ret_size); - unix_funcs->wg_parser_complete_read_request(source->wg_parser, SUCCEEDED(hr)); + unix_funcs->wg_parser_push_data(source->wg_parser, SUCCEEDED(hr) ? data : NULL, ret_size); }
+ free(data); TRACE("Media source is shutting down; exiting.\n"); return 0; } diff --git a/dlls/winegstreamer/quartz_parser.c b/dlls/winegstreamer/quartz_parser.c index 09a916d7f5c..5544e30dac9 100644 --- a/dlls/winegstreamer/quartz_parser.c +++ b/dlls/winegstreamer/quartz_parser.c @@ -785,6 +785,11 @@ static DWORD CALLBACK stream_thread(void *arg) static DWORD CALLBACK read_thread(void *arg) { struct parser *filter = arg; + LONGLONG file_size, unused; + uint32_t buffer_size = 0; + void *data = NULL; + + IAsyncReader_Length(filter->reader, &file_size, &unused);
TRACE("Starting read thread for filter %p.\n", filter);
@@ -793,14 +798,30 @@ static DWORD CALLBACK read_thread(void *arg) uint64_t offset; uint32_t size; HRESULT hr; - void *data;
- if (!unix_funcs->wg_parser_get_read_request(filter->wg_parser, &data, &offset, &size)) + if (!unix_funcs->wg_parser_get_next_read_offset(filter->wg_parser, &offset, &size)) continue; + + if (offset >= file_size) + size = 0; + else if (offset + size >= file_size) + size = file_size - offset; + + if (size > buffer_size) + { + buffer_size = size; + free(data); + data = malloc(size); + } + hr = IAsyncReader_SyncRead(filter->reader, offset, size, data); - unix_funcs->wg_parser_complete_read_request(filter->wg_parser, SUCCEEDED(hr)); + if (FAILED(hr)) + ERR("Async Reader failed to failed to read %p+%u. hr=%#x\n", data, size, hr); + + unix_funcs->wg_parser_push_data(filter->wg_parser, SUCCEEDED(hr) ? data : NULL, size); }
+ free(data); TRACE("Streaming stopped; exiting.\n"); return 0; } diff --git a/dlls/winegstreamer/wg_parser.c b/dlls/winegstreamer/wg_parser.c index cd12a23d0c8..fc3ea49d0a7 100644 --- a/dlls/winegstreamer/wg_parser.c +++ b/dlls/winegstreamer/wg_parser.c @@ -517,8 +517,8 @@ static void CDECL wg_parser_end_flush(struct wg_parser *parser) pthread_mutex_unlock(&parser->mutex); }
-static bool CDECL wg_parser_get_read_request(struct wg_parser *parser, - void **data, uint64_t *offset, uint32_t *size) +static bool CDECL wg_parser_get_next_read_offset(struct wg_parser *parser, + uint64_t *offset, uint32_t *size) { pthread_mutex_lock(&parser->mutex);
@@ -531,7 +531,6 @@ static bool CDECL wg_parser_get_read_request(struct wg_parser *parser, return false; }
- *data = parser->read_request.data; *offset = parser->read_request.offset; *size = parser->read_request.size;
@@ -539,11 +538,15 @@ static bool CDECL wg_parser_get_read_request(struct wg_parser *parser, return true; }
-static void CDECL wg_parser_complete_read_request(struct wg_parser *parser, bool ret) +static void CDECL wg_parser_push_data(struct wg_parser *parser, + const void *data, uint32_t size) { pthread_mutex_lock(&parser->mutex); + parser->read_request.size = size; parser->read_request.done = true; - parser->read_request.ret = ret; + parser->read_request.ret = !!data; + if (data) + memcpy(parser->read_request.data, data, size); parser->read_request.data = NULL; pthread_mutex_unlock(&parser->mutex); pthread_cond_signal(&parser->read_done_cond); @@ -1190,10 +1193,6 @@ static GstFlowReturn src_getrange_cb(GstPad *pad, GstObject *parent, if (offset == GST_BUFFER_OFFSET_NONE) offset = parser->next_pull_offset; parser->next_pull_offset = offset + size; - if (offset >= parser->file_size) - return GST_FLOW_EOS; - if (offset + size >= parser->file_size) - size = parser->file_size - offset;
if (!*buffer) *buffer = new_buffer = gst_buffer_new_and_alloc(size); @@ -1217,6 +1216,7 @@ static GstFlowReturn src_getrange_cb(GstPad *pad, GstObject *parent, pthread_cond_wait(&parser->read_done_cond, &parser->mutex);
ret = parser->read_request.ret; + gst_buffer_set_size(*buffer, parser->read_request.size);
pthread_mutex_unlock(&parser->mutex);
@@ -1224,10 +1224,10 @@ static GstFlowReturn src_getrange_cb(GstPad *pad, GstObject *parent,
GST_LOG("Request returned %d.", ret);
- if (!ret && new_buffer) + if ((!ret || !size) && new_buffer) gst_buffer_unref(new_buffer);
- return ret ? GST_FLOW_OK : GST_FLOW_ERROR; + return ret ? size ? GST_FLOW_OK : GST_FLOW_EOS : GST_FLOW_ERROR; }
static gboolean src_query_cb(GstPad *pad, GstObject *parent, GstQuery *query) @@ -1918,8 +1918,8 @@ static const struct unix_funcs funcs = wg_parser_begin_flush, wg_parser_end_flush,
- wg_parser_get_read_request, - wg_parser_complete_read_request, + wg_parser_get_next_read_offset, + wg_parser_push_data,
wg_parser_set_unlimited_buffering,
On 9/10/21 12:04 PM, Derek Lesho wrote:
@@ -539,18 +544,34 @@ static DWORD CALLBACK read_thread(void *arg) ULONG ret_size; uint32_t size; HRESULT hr;
void *data;
if (!unix_funcs->wg_parser_get_read_request(source->wg_parser, &data, &offset, &size))
if (!unix_funcs->wg_parser_get_next_read_offset(source->wg_parser, &offset, &size)) continue;
if (offset >= file_size)
size = 0;
else if (offset + size >= file_size)
size = file_size - offset;
if (size > buffer_size)
{
buffer_size = size;
free(data);
data = malloc(size);
}
Any reason not to use realloc() here?
ret_size = 0;
if (SUCCEEDED(hr = IMFByteStream_SetCurrentPosition(byte_stream, offset))) hr = IMFByteStream_Read(byte_stream, data, size, &ret_size);
if (SUCCEEDED(hr) && ret_size != size)
if (FAILED(hr))
ERR("Failed to read source stream bytes %p+%u. hr=%#x\n", data, size, hr);
else if (ret_size != size) ERR("Unexpected short read: requested %u bytes, got %u.\n", size, ret_size);
unix_funcs->wg_parser_complete_read_request(source->wg_parser, SUCCEEDED(hr));
unix_funcs->wg_parser_push_data(source->wg_parser, SUCCEEDED(hr) ? data : NULL, ret_size); }
free(data); TRACE("Media source is shutting down; exiting.\n"); return 0; }
...
@@ -1224,10 +1224,10 @@ static GstFlowReturn src_getrange_cb(GstPad *pad, GstObject *parent,
GST_LOG("Request returned %d.", ret);
- if (!ret && new_buffer)
- if ((!ret || !size) && new_buffer) gst_buffer_unref(new_buffer);
- return ret ? GST_FLOW_OK : GST_FLOW_ERROR;
- return ret ? size ? GST_FLOW_OK : GST_FLOW_EOS : GST_FLOW_ERROR;
Please don't do this; it's hard to read.
}
static gboolean src_query_cb(GstPad *pad, GstObject *parent, GstQuery *query)
Signed-off-by: Derek Lesho dlesho@codeweavers.com --- v2: - Remodel read_request to only store a flag and the request size. - Fix race condition documented at https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/-/issues/937 - Send error message to bus to indicate a read error, instead of using an EOS. (gst_app_src_create will now unblock on shutdown). --- dlls/winegstreamer/wg_parser.c | 395 +++++++++------------------------ 1 file changed, 99 insertions(+), 296 deletions(-)
diff --git a/dlls/winegstreamer/wg_parser.c b/dlls/winegstreamer/wg_parser.c index fc3ea49d0a7..68b26d7093c 100644 --- a/dlls/winegstreamer/wg_parser.c +++ b/dlls/winegstreamer/wg_parser.c @@ -33,6 +33,7 @@ #include <gst/gst.h> #include <gst/video/video.h> #include <gst/audio/audio.h> +#include <gst/app/gstappsrc.h>
/* GStreamer callbacks may be called on threads not created by Wine, and * therefore cannot access the Wine TEB. This means that we must use GStreamer @@ -49,28 +50,22 @@ struct wg_parser struct wg_parser_stream **streams; unsigned int stream_count;
- GstElement *container, *decodebin; + GstElement *container, *appsrc, *decodebin; GstBus *bus; - GstPad *my_src, *their_sink;
- guint64 file_size, start_offset, next_offset, stop_offset; + guint64 file_size; guint64 next_pull_offset;
- pthread_t push_thread; - pthread_mutex_t mutex;
pthread_cond_t init_cond; bool no_more_pads, has_duration, error;
- pthread_cond_t read_cond, read_done_cond; + pthread_cond_t read_cond; struct { - void *data; - uint64_t offset; uint32_t size; - bool done; - bool ret; + bool pending_read; } read_request;
bool flushing, sink_connected; @@ -522,7 +517,7 @@ static bool CDECL wg_parser_get_next_read_offset(struct wg_parser *parser, { pthread_mutex_lock(&parser->mutex);
- while (parser->sink_connected && !parser->read_request.data) + while (parser->sink_connected && !parser->read_request.pending_read) pthread_cond_wait(&parser->read_cond, &parser->mutex);
if (!parser->sink_connected) @@ -531,7 +526,7 @@ static bool CDECL wg_parser_get_next_read_offset(struct wg_parser *parser, return false; }
- *offset = parser->read_request.offset; + *offset = parser->next_pull_offset; *size = parser->read_request.size;
pthread_mutex_unlock(&parser->mutex); @@ -541,15 +536,63 @@ static bool CDECL wg_parser_get_next_read_offset(struct wg_parser *parser, static void CDECL wg_parser_push_data(struct wg_parser *parser, const void *data, uint32_t size) { + GstBuffer *buffer; + GstFlowReturn ret; + GError *error; + GstMessage *message; + + if (!data) + { + pthread_mutex_lock(&parser->mutex); + + error = g_error_new(G_FILE_ERROR, G_FILE_ERROR_FAILED, "WG-Parser client failed to read data at offset %" G_GUINT64_FORMAT, parser->next_pull_offset); + message = gst_message_new_error(NULL, error, ""); + if (!gst_bus_post(parser->bus, message)) + { + GST_ERROR("Failed to post error message to bus!\n"); + gst_message_unref(message); + } + parser->read_request.pending_read = false; + + pthread_mutex_unlock(&parser->mutex); + return; + } + + if (!size) + { + pthread_mutex_lock(&parser->mutex); + + g_signal_emit_by_name(G_OBJECT(parser->appsrc), "end-of-stream", &ret); + parser->read_request.pending_read = false; + + pthread_mutex_unlock(&parser->mutex); + return; + } + + /* We could avoid this extra copy using gst_buffer_new_wrapped. + However, PE wouldn't know when to release the buffer allocations as the buffer + objects are queued, so we'd have to create a ring buffer the size of the gstappsrc + queue on the PE side and validate that we don't overrun on the unix side. I'm not + yet convinced that trying to reduce copies of compressed data is worth the + complexity. */ + buffer = gst_buffer_new_and_alloc(size); + gst_buffer_fill(buffer, 0, data, size); + pthread_mutex_lock(&parser->mutex); - parser->read_request.size = size; - parser->read_request.done = true; - parser->read_request.ret = !!data; - if (data) - memcpy(parser->read_request.data, data, size); - parser->read_request.data = NULL; + assert(parser->read_request.pending_read); + + GST_BUFFER_OFFSET(buffer) = parser->next_pull_offset; + g_signal_emit_by_name(G_OBJECT(parser->appsrc), "push-buffer", buffer, &ret); + + /* In random-access mode, GST_FLOW_EOS shouldn't be returned */ + assert(ret == GST_FLOW_OK || ret == GST_FLOW_FLUSHING); + if (ret == GST_FLOW_OK) + parser->next_pull_offset += size; + else + gst_buffer_unref(buffer); + + parser->read_request.pending_read = false; pthread_mutex_unlock(&parser->mutex); - pthread_cond_signal(&parser->read_done_cond); }
static void CDECL wg_parser_set_unlimited_buffering(struct wg_parser *parser) @@ -1180,194 +1223,42 @@ static void pad_removed_cb(GstElement *element, GstPad *pad, gpointer user) g_free(name); }
-static GstFlowReturn src_getrange_cb(GstPad *pad, GstObject *parent, - guint64 offset, guint size, GstBuffer **buffer) +static void src_need_data(GstElement *appsrc, guint length, gpointer user) { - struct wg_parser *parser = gst_pad_get_element_private(pad); - GstBuffer *new_buffer = NULL; - GstMapInfo map_info; - bool ret; - - GST_LOG("pad %p, offset %" G_GINT64_MODIFIER "u, length %u, buffer %p.", pad, offset, size, *buffer); - - if (offset == GST_BUFFER_OFFSET_NONE) - offset = parser->next_pull_offset; - parser->next_pull_offset = offset + size; - - if (!*buffer) - *buffer = new_buffer = gst_buffer_new_and_alloc(size); - - gst_buffer_map(*buffer, &map_info, GST_MAP_WRITE); + struct wg_parser *parser = user; + guint64 queued_bytes;
pthread_mutex_lock(&parser->mutex);
- assert(!parser->read_request.data); - parser->read_request.data = map_info.data; - parser->read_request.offset = offset; - parser->read_request.size = size; - parser->read_request.done = false; - pthread_cond_signal(&parser->read_cond); - - /* Note that we don't unblock this wait on GST_EVENT_FLUSH_START. We expect - * the upstream pin to flush if necessary. We should never be blocked on - * read_thread() not running. */ - - while (!parser->read_request.done) - pthread_cond_wait(&parser->read_done_cond, &parser->mutex); - - ret = parser->read_request.ret; - gst_buffer_set_size(*buffer, parser->read_request.size); - - pthread_mutex_unlock(&parser->mutex); - - gst_buffer_unmap(*buffer, &map_info); - - GST_LOG("Request returned %d.", ret); - - if ((!ret || !size) && new_buffer) - gst_buffer_unref(new_buffer); - - return ret ? size ? GST_FLOW_OK : GST_FLOW_EOS : GST_FLOW_ERROR; -} - -static gboolean src_query_cb(GstPad *pad, GstObject *parent, GstQuery *query) -{ - struct wg_parser *parser = gst_pad_get_element_private(pad); - GstFormat format; - - GST_LOG("parser %p, type %s.", parser, GST_QUERY_TYPE_NAME(query)); - - switch (GST_QUERY_TYPE(query)) + /* Workaround for GStreamer bug: https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/-/issues/937 + current-level-buffers is a closer fit but it's a recent addition. */ + g_object_get(G_OBJECT(appsrc), "current-level-bytes", &queued_bytes, NULL); + if (queued_bytes) { - case GST_QUERY_DURATION: - gst_query_parse_duration(query, &format, NULL); - if (format == GST_FORMAT_PERCENT) - { - gst_query_set_duration(query, GST_FORMAT_PERCENT, GST_FORMAT_PERCENT_MAX); - return TRUE; - } - else if (format == GST_FORMAT_BYTES) - { - gst_query_set_duration(query, GST_FORMAT_BYTES, parser->file_size); - return TRUE; - } - return FALSE; - - case GST_QUERY_SEEKING: - gst_query_parse_seeking (query, &format, NULL, NULL, NULL); - if (format != GST_FORMAT_BYTES) - { - GST_WARNING("Cannot seek using format "%s".", gst_format_get_name(format)); - return FALSE; - } - gst_query_set_seeking(query, GST_FORMAT_BYTES, 1, 0, parser->file_size); - return TRUE; - - case GST_QUERY_SCHEDULING: - gst_query_set_scheduling(query, GST_SCHEDULING_FLAG_SEEKABLE, 1, -1, 0); - gst_query_add_scheduling_mode(query, GST_PAD_MODE_PUSH); - gst_query_add_scheduling_mode(query, GST_PAD_MODE_PULL); - return TRUE; - - default: - GST_WARNING("Unhandled query type %s.", GST_QUERY_TYPE_NAME(query)); - return FALSE; - } -} - -static void *push_data(void *arg) -{ - struct wg_parser *parser = arg; - GstBuffer *buffer; - guint max_size; - - GST_DEBUG("Starting push thread."); - - if (!(buffer = gst_buffer_new_allocate(NULL, 16384, NULL))) - { - GST_ERROR("Failed to allocate memory."); - return NULL; - } - - max_size = parser->stop_offset ? parser->stop_offset : parser->file_size; - - for (;;) - { - ULONG size; - int ret; - - if (parser->next_offset >= max_size) - break; - size = min(16384, max_size - parser->next_offset); - - if ((ret = src_getrange_cb(parser->my_src, NULL, parser->next_offset, size, &buffer)) < 0) - { - GST_ERROR("Failed to read data, ret %s.", gst_flow_get_name(ret)); - break; - } - - parser->next_offset += size; - - buffer->duration = buffer->pts = -1; - if ((ret = gst_pad_push(parser->my_src, buffer)) < 0) - { - GST_ERROR("Failed to push data, ret %s.", gst_flow_get_name(ret)); - break; - } + pthread_mutex_unlock(&parser->mutex); + return; }
- gst_buffer_unref(buffer); - - gst_pad_push_event(parser->my_src, gst_event_new_eos()); + parser->read_request.pending_read = true; + parser->read_request.size = length;
- GST_DEBUG("Stopping push thread."); + pthread_cond_signal(&parser->read_cond);
- return NULL; + pthread_mutex_unlock(&parser->mutex); }
-static gboolean activate_push(GstPad *pad, gboolean activate) +static gboolean src_seek_data(GstElement *appsrc, guint64 offset, gpointer user) { - struct wg_parser *parser = gst_pad_get_element_private(pad); - - if (!activate) - { - if (parser->push_thread) - { - pthread_join(parser->push_thread, NULL); - parser->push_thread = 0; - } - } - else if (!parser->push_thread) - { - int ret; + struct wg_parser *parser = user;
- if ((ret = pthread_create(&parser->push_thread, NULL, push_data, parser))) - { - GST_ERROR("Failed to create push thread: %s", strerror(errno)); - parser->push_thread = 0; - return FALSE; - } - } - return TRUE; -} + pthread_mutex_lock(&parser->mutex);
-static gboolean src_activate_mode_cb(GstPad *pad, GstObject *parent, GstPadMode mode, gboolean activate) -{ - struct wg_parser *parser = gst_pad_get_element_private(pad); + assert(!parser->read_request.pending_read); + parser->next_pull_offset = offset;
- GST_DEBUG("%s source pad for parser %p in %s mode.", - activate ? "Activating" : "Deactivating", parser, gst_pad_mode_get_name(mode)); + pthread_mutex_unlock(&parser->mutex);
- switch (mode) - { - case GST_PAD_MODE_PULL: - return TRUE; - case GST_PAD_MODE_PUSH: - return activate_push(pad, activate); - case GST_PAD_MODE_NONE: - break; - } - return FALSE; + return true; }
static GstBusSyncReply bus_handler_cb(GstBus *bus, GstMessage *msg, gpointer user) @@ -1414,85 +1305,8 @@ static GstBusSyncReply bus_handler_cb(GstBus *bus, GstMessage *msg, gpointer use return GST_BUS_DROP; }
-static gboolean src_perform_seek(struct wg_parser *parser, GstEvent *event) -{ - BOOL thread = !!parser->push_thread; - GstSeekType cur_type, stop_type; - GstFormat seek_format; - GstEvent *flush_event; - GstSeekFlags flags; - gint64 cur, stop; - guint32 seqnum; - gdouble rate; - - gst_event_parse_seek(event, &rate, &seek_format, &flags, - &cur_type, &cur, &stop_type, &stop); - - if (seek_format != GST_FORMAT_BYTES) - { - GST_FIXME("Unhandled format "%s".", gst_format_get_name(seek_format)); - return FALSE; - } - - seqnum = gst_event_get_seqnum(event); - - /* send flush start */ - if (flags & GST_SEEK_FLAG_FLUSH) - { - flush_event = gst_event_new_flush_start(); - gst_event_set_seqnum(flush_event, seqnum); - gst_pad_push_event(parser->my_src, flush_event); - if (thread) - gst_pad_set_active(parser->my_src, 1); - } - - parser->next_offset = parser->start_offset = cur; - - /* and prepare to continue streaming */ - if (flags & GST_SEEK_FLAG_FLUSH) - { - flush_event = gst_event_new_flush_stop(TRUE); - gst_event_set_seqnum(flush_event, seqnum); - gst_pad_push_event(parser->my_src, flush_event); - if (thread) - gst_pad_set_active(parser->my_src, 1); - } - - return TRUE; -} - -static gboolean src_event_cb(GstPad *pad, GstObject *parent, GstEvent *event) -{ - struct wg_parser *parser = gst_pad_get_element_private(pad); - gboolean ret = TRUE; - - GST_LOG("parser %p, type "%s".", parser, GST_EVENT_TYPE_NAME(event)); - - switch (event->type) - { - case GST_EVENT_SEEK: - ret = src_perform_seek(parser, event); - break; - - case GST_EVENT_FLUSH_START: - case GST_EVENT_FLUSH_STOP: - case GST_EVENT_QOS: - case GST_EVENT_RECONFIGURE: - break; - - default: - GST_WARNING("Ignoring "%s" event.", GST_EVENT_TYPE_NAME(event)); - ret = FALSE; - break; - } - gst_event_unref(event); - return ret; -} - static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_size) { - GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE("quartz_src", - GST_PAD_SRC, GST_PAD_ALWAYS, GST_STATIC_CAPS_ANY); unsigned int i;
parser->file_size = file_size; @@ -1507,14 +1321,15 @@ static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_s parser->container = gst_bin_new(NULL); gst_element_set_bus(parser->container, parser->bus);
- parser->my_src = gst_pad_new_from_static_template(&src_template, "quartz-src"); - gst_pad_set_getrange_function(parser->my_src, src_getrange_cb); - gst_pad_set_query_function(parser->my_src, src_query_cb); - gst_pad_set_activatemode_function(parser->my_src, src_activate_mode_cb); - gst_pad_set_event_function(parser->my_src, src_event_cb); - gst_pad_set_element_private(parser->my_src, parser); + if (!(parser->appsrc = create_element("appsrc", "base"))) + return E_FAIL; + gst_bin_add(GST_BIN(parser->container), parser->appsrc); + + g_object_set(parser->appsrc, "stream-type", GST_APP_STREAM_TYPE_RANDOM_ACCESS, NULL); + g_object_set(parser->appsrc, "size", parser->file_size, NULL); + g_signal_connect(parser->appsrc, "need-data", G_CALLBACK(src_need_data), parser); + g_signal_connect(parser->appsrc, "seek-data", G_CALLBACK(src_seek_data), parser);
- parser->start_offset = parser->next_offset = parser->stop_offset = 0; parser->next_pull_offset = 0;
if (!parser->init_gst(parser)) @@ -1594,7 +1409,6 @@ static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_s
pthread_mutex_unlock(&parser->mutex);
- parser->next_offset = 0; return S_OK; }
@@ -1636,10 +1450,6 @@ static void CDECL wg_parser_disconnect(struct wg_parser *parser) pthread_mutex_unlock(&parser->mutex);
gst_element_set_state(parser->container, GST_STATE_NULL); - gst_pad_unlink(parser->my_src, parser->their_sink); - gst_object_unref(parser->my_src); - gst_object_unref(parser->their_sink); - parser->my_src = parser->their_sink = NULL;
pthread_mutex_lock(&parser->mutex); parser->sink_connected = false; @@ -1674,15 +1484,13 @@ static BOOL decodebin_parser_init_gst(struct wg_parser *parser) g_signal_connect(element, "autoplug-select", G_CALLBACK(autoplug_select_cb), parser); g_signal_connect(element, "no-more-pads", G_CALLBACK(no_more_pads_cb), parser);
- parser->their_sink = gst_element_get_static_pad(element, "sink"); - pthread_mutex_lock(&parser->mutex); parser->no_more_pads = parser->error = false; pthread_mutex_unlock(&parser->mutex);
- if ((ret = gst_pad_link(parser->my_src, parser->their_sink)) < 0) + if (!gst_element_link(parser->appsrc, parser->decodebin)) { - GST_ERROR("Failed to link pads, error %d.\n", ret); + GST_ERROR("Failed to link app source.\n"); return FALSE; }
@@ -1721,15 +1529,13 @@ static BOOL avi_parser_init_gst(struct wg_parser *parser) g_signal_connect(element, "pad-removed", G_CALLBACK(pad_removed_cb), parser); g_signal_connect(element, "no-more-pads", G_CALLBACK(no_more_pads_cb), parser);
- parser->their_sink = gst_element_get_static_pad(element, "sink"); - pthread_mutex_lock(&parser->mutex); parser->no_more_pads = parser->error = false; pthread_mutex_unlock(&parser->mutex);
- if ((ret = gst_pad_link(parser->my_src, parser->their_sink)) < 0) + if (!gst_element_link(parser->appsrc, element)) { - GST_ERROR("Failed to link pads, error %d.\n", ret); + GST_ERROR("Failed to link app source.\n"); return FALSE; }
@@ -1765,10 +1571,9 @@ static BOOL mpeg_audio_parser_init_gst(struct wg_parser *parser)
gst_bin_add(GST_BIN(parser->container), element);
- parser->their_sink = gst_element_get_static_pad(element, "sink"); - if ((ret = gst_pad_link(parser->my_src, parser->their_sink)) < 0) + if (!gst_element_link(parser->appsrc, element)) { - GST_ERROR("Failed to link sink pads, error %d.\n", ret); + GST_ERROR("Failed to link app source.\n"); return FALSE; }
@@ -1805,10 +1610,9 @@ static BOOL wave_parser_init_gst(struct wg_parser *parser)
gst_bin_add(GST_BIN(parser->container), element);
- parser->their_sink = gst_element_get_static_pad(element, "sink"); - if ((ret = gst_pad_link(parser->my_src, parser->their_sink)) < 0) + if (!gst_element_link(parser->appsrc, element)) { - GST_ERROR("Failed to link sink pads, error %d.\n", ret); + GST_ERROR("Failed to link app source.\n"); return FALSE; }
@@ -1845,8 +1649,8 @@ static struct wg_parser *wg_parser_create(void) pthread_mutex_init(&parser->mutex, NULL); pthread_cond_init(&parser->init_cond, NULL); pthread_cond_init(&parser->read_cond, NULL); - pthread_cond_init(&parser->read_done_cond, NULL); parser->flushing = true; + parser->read_request.pending_read = false;
GST_DEBUG("Created winegstreamer parser %p.\n", parser); return parser; @@ -1899,7 +1703,6 @@ static void CDECL wg_parser_destroy(struct wg_parser *parser) pthread_mutex_destroy(&parser->mutex); pthread_cond_destroy(&parser->init_cond); pthread_cond_destroy(&parser->read_cond); - pthread_cond_destroy(&parser->read_done_cond);
free(parser); }
On 9/10/21 12:04 PM, Derek Lesho wrote:
Signed-off-by: Derek Lesho dlesho@codeweavers.com
v2:
- Remodel read_request to only store a flag and the request size.
- Fix race condition documented at https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/-/issues/937
- Send error message to bus to indicate a read error, instead of using an EOS. (gst_app_src_create will now unblock on shutdown).
dlls/winegstreamer/wg_parser.c | 395 +++++++++------------------------ 1 file changed, 99 insertions(+), 296 deletions(-)
diff --git a/dlls/winegstreamer/wg_parser.c b/dlls/winegstreamer/wg_parser.c index fc3ea49d0a7..68b26d7093c 100644 --- a/dlls/winegstreamer/wg_parser.c +++ b/dlls/winegstreamer/wg_parser.c @@ -33,6 +33,7 @@ #include <gst/gst.h> #include <gst/video/video.h> #include <gst/audio/audio.h> +#include <gst/app/gstappsrc.h>
/* GStreamer callbacks may be called on threads not created by Wine, and
- therefore cannot access the Wine TEB. This means that we must use GStreamer
@@ -49,28 +50,22 @@ struct wg_parser struct wg_parser_stream **streams; unsigned int stream_count;
- GstElement *container, *decodebin;
- GstElement *container, *appsrc, *decodebin; GstBus *bus;
GstPad *my_src, *their_sink;
guint64 file_size, start_offset, next_offset, stop_offset;
- guint64 file_size; guint64 next_pull_offset;
pthread_t push_thread;
pthread_mutex_t mutex; pthread_cond_t init_cond; bool no_more_pads, has_duration, error;
pthread_cond_t read_cond, read_done_cond;
- pthread_cond_t read_cond; struct {
void *data;
uint64_t offset; uint32_t size;
bool done;
bool ret;
bool pending_read; } read_request; bool flushing, sink_connected;
I would personally be inclined to either keep "read_request.offset" rather than "next_pull_offset", or get rid of the read_request structure. Probably the former.
@@ -522,7 +517,7 @@ static bool CDECL wg_parser_get_next_read_offset(struct wg_parser *parser, { pthread_mutex_lock(&parser->mutex);
- while (parser->sink_connected && !parser->read_request.data)
while (parser->sink_connected && !parser->read_request.pending_read) pthread_cond_wait(&parser->read_cond, &parser->mutex);
if (!parser->sink_connected)
@@ -531,7 +526,7 @@ static bool CDECL wg_parser_get_next_read_offset(struct wg_parser *parser, return false; }
- *offset = parser->read_request.offset;
*offset = parser->next_pull_offset; *size = parser->read_request.size;
pthread_mutex_unlock(&parser->mutex);
@@ -541,15 +536,63 @@ static bool CDECL wg_parser_get_next_read_offset(struct wg_parser *parser, static void CDECL wg_parser_push_data(struct wg_parser *parser, const void *data, uint32_t size) {
- GstBuffer *buffer;
- GstFlowReturn ret;
- GError *error;
- GstMessage *message;
- if (!data)
- {
pthread_mutex_lock(&parser->mutex);
error = g_error_new(G_FILE_ERROR, G_FILE_ERROR_FAILED, "WG-Parser client failed to read data at offset %" G_GUINT64_FORMAT, parser->next_pull_offset);
message = gst_message_new_error(NULL, error, "");
if (!gst_bus_post(parser->bus, message))
{
GST_ERROR("Failed to post error message to bus!\n");
gst_message_unref(message);
}
Is there a point in posting a message to the bus?
parser->read_request.pending_read = false;
This is fine temporarily. Ultimately I think we should be terminating the client-side thread on error, and if possible not calling wg_parser_push_data() at all.
pthread_mutex_unlock(&parser->mutex);
return;
- }
- if (!size)
- {
pthread_mutex_lock(&parser->mutex);
g_signal_emit_by_name(G_OBJECT(parser->appsrc), "end-of-stream", &ret);
parser->read_request.pending_read = false;
pthread_mutex_unlock(&parser->mutex);
return;
- }
- /* We could avoid this extra copy using gst_buffer_new_wrapped.
However, PE wouldn't know when to release the buffer allocations as the buffer
objects are queued, so we'd have to create a ring buffer the size of the gstappsrc
queue on the PE side and validate that we don't overrun on the unix side. I'm not
yet convinced that trying to reduce copies of compressed data is worth the
complexity. */
- buffer = gst_buffer_new_and_alloc(size);
- gst_buffer_fill(buffer, 0, data, size);
pthread_mutex_lock(&parser->mutex);
- parser->read_request.size = size;
- parser->read_request.done = true;
- parser->read_request.ret = !!data;
- if (data)
memcpy(parser->read_request.data, data, size);
- parser->read_request.data = NULL;
- assert(parser->read_request.pending_read);
- GST_BUFFER_OFFSET(buffer) = parser->next_pull_offset;
- g_signal_emit_by_name(G_OBJECT(parser->appsrc), "push-buffer", buffer, &ret);
- /* In random-access mode, GST_FLOW_EOS shouldn't be returned */
- assert(ret == GST_FLOW_OK || ret == GST_FLOW_FLUSHING);
- if (ret == GST_FLOW_OK)
parser->next_pull_offset += size;
- else
gst_buffer_unref(buffer);
According to my reading of the source code, GstAppSrc will take the reference even if it returns GST_FLOW_FLUSHING. That's a bit unintuitive and should probably be clearly documented on the GStreamer side, but anyway...
- parser->read_request.pending_read = false; pthread_mutex_unlock(&parser->mutex);
pthread_cond_signal(&parser->read_done_cond); }
static void CDECL wg_parser_set_unlimited_buffering(struct wg_parser *parser)
On 9/13/21 1:57 PM, Zebediah Figura wrote:
@@ -541,15 +536,63 @@ static bool CDECL wg_parser_get_next_read_offset(struct wg_parser *parser, static void CDECL wg_parser_push_data(struct wg_parser *parser, const void *data, uint32_t size) { + GstBuffer *buffer; + GstFlowReturn ret; + GError *error; + GstMessage *message;
+ if (!data) + { + pthread_mutex_lock(&parser->mutex);
+ error = g_error_new(G_FILE_ERROR, G_FILE_ERROR_FAILED, "WG-Parser client failed to read data at offset %" G_GUINT64_FORMAT, parser->next_pull_offset); + message = gst_message_new_error(NULL, error, ""); + if (!gst_bus_post(parser->bus, message)) + { + GST_ERROR("Failed to post error message to bus!\n"); + gst_message_unref(message); + }
Is there a point in posting a message to the bus?
The point would be to have a unified error path when something goes wrong. (whether the error be GStreamer internal or a read failure)
- parser->read_request.pending_read = false;
This is fine temporarily. Ultimately I think we should be terminating the client-side thread on error, and if possible not calling wg_parser_push_data() at all.
It would be a little tricky to do so given that this thread exists throughout initialization, and not signaling the error to GStreamer would mean we have to add yet another shutdown path that doesn't interfere with the others.
+ pthread_mutex_unlock(&parser->mutex); + return; + }
+ if (!size) + { + pthread_mutex_lock(&parser->mutex);
+ g_signal_emit_by_name(G_OBJECT(parser->appsrc), "end-of-stream", &ret); + parser->read_request.pending_read = false;
+ pthread_mutex_unlock(&parser->mutex); + return; + }
+ /* We could avoid this extra copy using gst_buffer_new_wrapped. + However, PE wouldn't know when to release the buffer allocations as the buffer + objects are queued, so we'd have to create a ring buffer the size of the gstappsrc + queue on the PE side and validate that we don't overrun on the unix side. I'm not + yet convinced that trying to reduce copies of compressed data is worth the + complexity. */ + buffer = gst_buffer_new_and_alloc(size); + gst_buffer_fill(buffer, 0, data, size);
pthread_mutex_lock(&parser->mutex); - parser->read_request.size = size; - parser->read_request.done = true; - parser->read_request.ret = !!data; - if (data) - memcpy(parser->read_request.data, data, size); - parser->read_request.data = NULL; + assert(parser->read_request.pending_read);
+ GST_BUFFER_OFFSET(buffer) = parser->next_pull_offset; + g_signal_emit_by_name(G_OBJECT(parser->appsrc), "push-buffer", buffer, &ret);
+ /* In random-access mode, GST_FLOW_EOS shouldn't be returned */ + assert(ret == GST_FLOW_OK || ret == GST_FLOW_FLUSHING); + if (ret == GST_FLOW_OK) + parser->next_pull_offset += size; + else + gst_buffer_unref(buffer);
According to my reading of the source code, GstAppSrc will take the reference even if it returns GST_FLOW_FLUSHING. That's a bit unintuitive and should probably be clearly documented on the GStreamer side, but anyway...
Good catch, thanks.
+ parser->read_request.pending_read = false; pthread_mutex_unlock(&parser->mutex); - pthread_cond_signal(&parser->read_done_cond); } static void CDECL wg_parser_set_unlimited_buffering(struct wg_parser *parser)
On 9/13/21 1:08 PM, Derek Lesho wrote:
On 9/13/21 1:57 PM, Zebediah Figura wrote:
@@ -541,15 +536,63 @@ static bool CDECL wg_parser_get_next_read_offset(struct wg_parser *parser, static void CDECL wg_parser_push_data(struct wg_parser *parser, const void *data, uint32_t size) { + GstBuffer *buffer; + GstFlowReturn ret; + GError *error; + GstMessage *message;
+ if (!data) + { + pthread_mutex_lock(&parser->mutex);
+ error = g_error_new(G_FILE_ERROR, G_FILE_ERROR_FAILED, "WG-Parser client failed to read data at offset %" G_GUINT64_FORMAT, parser->next_pull_offset); + message = gst_message_new_error(NULL, error, ""); + if (!gst_bus_post(parser->bus, message)) + { + GST_ERROR("Failed to post error message to bus!\n"); + gst_message_unref(message); + }
Is there a point in posting a message to the bus?
The point would be to have a unified error path when something goes wrong. (whether the error be GStreamer internal or a read failure)
Oh, I guess we would probably need to unblock initialization here; I forgot that we did something other than just printing an error. Okay, that makes sense.
I don't think there's any point checking for failure from gst_bus_post(), though.
- parser->read_request.pending_read = false;
This is fine temporarily. Ultimately I think we should be terminating the client-side thread on error, and if possible not calling wg_parser_push_data() at all.
It would be a little tricky to do so given that this thread exists throughout initialization, and not signaling the error to GStreamer would mean we have to add yet another shutdown path that doesn't interfere with the others.
+ pthread_mutex_unlock(&parser->mutex); + return; + }
+ if (!size) + { + pthread_mutex_lock(&parser->mutex);
+ g_signal_emit_by_name(G_OBJECT(parser->appsrc), "end-of-stream", &ret); + parser->read_request.pending_read = false;
+ pthread_mutex_unlock(&parser->mutex); + return; + }
+ /* We could avoid this extra copy using gst_buffer_new_wrapped. + However, PE wouldn't know when to release the buffer allocations as the buffer + objects are queued, so we'd have to create a ring buffer the size of the gstappsrc + queue on the PE side and validate that we don't overrun on the unix side. I'm not + yet convinced that trying to reduce copies of compressed data is worth the + complexity. */ + buffer = gst_buffer_new_and_alloc(size); + gst_buffer_fill(buffer, 0, data, size);
pthread_mutex_lock(&parser->mutex); - parser->read_request.size = size; - parser->read_request.done = true; - parser->read_request.ret = !!data; - if (data) - memcpy(parser->read_request.data, data, size); - parser->read_request.data = NULL; + assert(parser->read_request.pending_read);
+ GST_BUFFER_OFFSET(buffer) = parser->next_pull_offset; + g_signal_emit_by_name(G_OBJECT(parser->appsrc), "push-buffer", buffer, &ret);
+ /* In random-access mode, GST_FLOW_EOS shouldn't be returned */ + assert(ret == GST_FLOW_OK || ret == GST_FLOW_FLUSHING); + if (ret == GST_FLOW_OK) + parser->next_pull_offset += size; + else + gst_buffer_unref(buffer);
According to my reading of the source code, GstAppSrc will take the reference even if it returns GST_FLOW_FLUSHING. That's a bit unintuitive and should probably be clearly documented on the GStreamer side, but anyway...
Good catch, thanks.
+ parser->read_request.pending_read = false; pthread_mutex_unlock(&parser->mutex); - pthread_cond_signal(&parser->read_done_cond); } static void CDECL wg_parser_set_unlimited_buffering(struct wg_parser *parser)
On 9/10/21 12:04 PM, Derek Lesho wrote:
@@ -857,6 +854,8 @@ done: IMFMediaTypeHandler_Release(type_handler); for (i = 0; i < type_count; i++) IMFMediaType_Release(stream_types[i]);
- if (FAILED(hr))
}IMFStreamDescriptor_Release(stream->descriptor); return hr;
There are failure paths from media_stream_init_desc() with FAILED(hr) && !stream->descriptor.
...
@@ -1392,8 +1385,40 @@ static HRESULT media_source_constructor(IMFByteStream *bytestream, struct media_ fail: WARN("Failed to construct MFMediaSource, hr %#x.\n", hr);
- free(descriptors);
- IMFMediaSource_Release(&object->IMFMediaSource_iface);
- if (descriptors)
- {
for(i = 0; i < object->stream_count; i++)
IMFStreamDescriptor_Release(descriptors[i]);
free(descriptors);
- }
- for (i = 0; i < object->stream_count; i++)
- {
struct media_stream *stream = object->streams[i];
IMFMediaEventQueue_Release(stream->event_queue);
IMFStreamDescriptor_Release(stream->descriptor);
IMFMediaSource_Release(&stream->parent_source->IMFMediaSource_iface);
free(stream);
- }
- if (object->streams)
free(object->streams);
As long as we're here, this if() is unnecessary.
- if (stream_count)
unix_funcs->wg_parser_disconnect(object->wg_parser);
This doesn't work; the parser could have zero streams. Granted, that usually means an error condition on the GStreamer side, but it's still worth handling right.
- if (object->read_thread)
- {
object->read_thread_shutdown = true;
WaitForSingleObject(object->read_thread, INFINITE);
CloseHandle(object->read_thread);
- }
- if (object->wg_parser)
unix_funcs->wg_parser_destroy(object->wg_parser);
- if (object->async_commands_queue)
MFUnlockWorkQueue(object->async_commands_queue);
- if (object->event_queue)
IMFMediaEventQueue_Release(object->event_queue);
- IMFByteStream_Release(object->byte_stream);
- free(object); return hr; }