On 9/10/21 12:04 PM, Derek Lesho wrote:
Signed-off-by: Derek Lesho <dlesho(a)codeweavers.com> --- v2: - Remodel read_request to only store a flag and the request size. - Fix race condition documented at https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/-/issues/937 - Send error message to bus to indicate a read error, instead of using an EOS. (gst_app_src_create will now unblock on shutdown). --- dlls/winegstreamer/wg_parser.c | 395 +++++++++------------------------ 1 file changed, 99 insertions(+), 296 deletions(-)
diff --git a/dlls/winegstreamer/wg_parser.c b/dlls/winegstreamer/wg_parser.c index fc3ea49d0a7..68b26d7093c 100644 --- a/dlls/winegstreamer/wg_parser.c +++ b/dlls/winegstreamer/wg_parser.c @@ -33,6 +33,7 @@ #include <gst/gst.h> #include <gst/video/video.h> #include <gst/audio/audio.h> +#include <gst/app/gstappsrc.h>
/* GStreamer callbacks may be called on threads not created by Wine, and * therefore cannot access the Wine TEB. This means that we must use GStreamer @@ -49,28 +50,22 @@ struct wg_parser struct wg_parser_stream **streams; unsigned int stream_count;
- GstElement *container, *decodebin; + GstElement *container, *appsrc, *decodebin; GstBus *bus; - GstPad *my_src, *their_sink;
- guint64 file_size, start_offset, next_offset, stop_offset; + guint64 file_size; guint64 next_pull_offset;
- pthread_t push_thread; - pthread_mutex_t mutex;
pthread_cond_t init_cond; bool no_more_pads, has_duration, error;
- pthread_cond_t read_cond, read_done_cond; + pthread_cond_t read_cond; struct { - void *data; - uint64_t offset; uint32_t size; - bool done; - bool ret; + bool pending_read; } read_request;
bool flushing, sink_connected;
I would personally be inclined to either keep "read_request.offset" rather than "next_pull_offset", or get rid of the read_request structure. Probably the former.
@@ -522,7 +517,7 @@ static bool CDECL wg_parser_get_next_read_offset(struct wg_parser *parser, { pthread_mutex_lock(&parser->mutex);
- while (parser->sink_connected && !parser->read_request.data) + while (parser->sink_connected && !parser->read_request.pending_read) pthread_cond_wait(&parser->read_cond, &parser->mutex);
if (!parser->sink_connected) @@ -531,7 +526,7 @@ static bool CDECL wg_parser_get_next_read_offset(struct wg_parser *parser, return false; }
- *offset = parser->read_request.offset; + *offset = parser->next_pull_offset; *size = parser->read_request.size;
pthread_mutex_unlock(&parser->mutex); @@ -541,15 +536,63 @@ static bool CDECL wg_parser_get_next_read_offset(struct wg_parser *parser, static void CDECL wg_parser_push_data(struct wg_parser *parser, const void *data, uint32_t size) { + GstBuffer *buffer; + GstFlowReturn ret; + GError *error; + GstMessage *message; + + if (!data) + { + pthread_mutex_lock(&parser->mutex); + + error = g_error_new(G_FILE_ERROR, G_FILE_ERROR_FAILED, "WG-Parser client failed to read data at offset %" G_GUINT64_FORMAT, parser->next_pull_offset); + message = gst_message_new_error(NULL, error, ""); + if (!gst_bus_post(parser->bus, message)) + { + GST_ERROR("Failed to post error message to bus!\n"); + gst_message_unref(message); + }
Is there a point in posting a message to the bus?
+ parser->read_request.pending_read = false;
This is fine temporarily. Ultimately I think we should be terminating the client-side thread on error, and if possible not calling wg_parser_push_data() at all.
+ + pthread_mutex_unlock(&parser->mutex); + return; + } + + if (!size) + { + pthread_mutex_lock(&parser->mutex); + + g_signal_emit_by_name(G_OBJECT(parser->appsrc), "end-of-stream", &ret); + parser->read_request.pending_read = false; + + pthread_mutex_unlock(&parser->mutex); + return; + } + + /* We could avoid this extra copy using gst_buffer_new_wrapped. + However, PE wouldn't know when to release the buffer allocations as the buffer + objects are queued, so we'd have to create a ring buffer the size of the gstappsrc + queue on the PE side and validate that we don't overrun on the unix side. I'm not + yet convinced that trying to reduce copies of compressed data is worth the + complexity. */ + buffer = gst_buffer_new_and_alloc(size); + gst_buffer_fill(buffer, 0, data, size); + pthread_mutex_lock(&parser->mutex); - parser->read_request.size = size; - parser->read_request.done = true; - parser->read_request.ret = !!data; - if (data) - memcpy(parser->read_request.data, data, size); - parser->read_request.data = NULL; + assert(parser->read_request.pending_read); + + GST_BUFFER_OFFSET(buffer) = parser->next_pull_offset; + g_signal_emit_by_name(G_OBJECT(parser->appsrc), "push-buffer", buffer, &ret); + + /* In random-access mode, GST_FLOW_EOS shouldn't be returned */ + assert(ret == GST_FLOW_OK || ret == GST_FLOW_FLUSHING); + if (ret == GST_FLOW_OK) + parser->next_pull_offset += size; + else + gst_buffer_unref(buffer);
According to my reading of the source code, GstAppSrc will take the reference even if it returns GST_FLOW_FLUSHING. That's a bit unintuitive and should probably be clearly documented on the GStreamer side, but anyway...
+ + parser->read_request.pending_read = false; pthread_mutex_unlock(&parser->mutex); - pthread_cond_signal(&parser->read_done_cond); }
static void CDECL wg_parser_set_unlimited_buffering(struct wg_parser *parser)