On 9/10/21 12:04 PM, Derek Lesho wrote:
Signed-off-by: Derek Lesho dlesho@codeweavers.com
v2:
- Remodel read_request to only store a flag and the request size.
- Fix race condition documented at https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/-/issues/937
- Send error message to bus to indicate a read error, instead of using an EOS. (gst_app_src_create will now unblock on shutdown).
dlls/winegstreamer/wg_parser.c | 395 +++++++++------------------------ 1 file changed, 99 insertions(+), 296 deletions(-)
diff --git a/dlls/winegstreamer/wg_parser.c b/dlls/winegstreamer/wg_parser.c index fc3ea49d0a7..68b26d7093c 100644 --- a/dlls/winegstreamer/wg_parser.c +++ b/dlls/winegstreamer/wg_parser.c @@ -33,6 +33,7 @@ #include <gst/gst.h> #include <gst/video/video.h> #include <gst/audio/audio.h> +#include <gst/app/gstappsrc.h>
/* GStreamer callbacks may be called on threads not created by Wine, and
- therefore cannot access the Wine TEB. This means that we must use GStreamer
@@ -49,28 +50,22 @@ struct wg_parser struct wg_parser_stream **streams; unsigned int stream_count;
- GstElement *container, *decodebin;
- GstElement *container, *appsrc, *decodebin; GstBus *bus;
GstPad *my_src, *their_sink;
guint64 file_size, start_offset, next_offset, stop_offset;
- guint64 file_size; guint64 next_pull_offset;
pthread_t push_thread;
pthread_mutex_t mutex; pthread_cond_t init_cond; bool no_more_pads, has_duration, error;
pthread_cond_t read_cond, read_done_cond;
- pthread_cond_t read_cond; struct {
void *data;
uint64_t offset; uint32_t size;
bool done;
bool ret;
bool pending_read; } read_request; bool flushing, sink_connected;
I would personally be inclined to either keep "read_request.offset" rather than "next_pull_offset", or get rid of the read_request structure. Probably the former.
@@ -522,7 +517,7 @@ static bool CDECL wg_parser_get_next_read_offset(struct wg_parser *parser, { pthread_mutex_lock(&parser->mutex);
- while (parser->sink_connected && !parser->read_request.data)
while (parser->sink_connected && !parser->read_request.pending_read) pthread_cond_wait(&parser->read_cond, &parser->mutex);
if (!parser->sink_connected)
@@ -531,7 +526,7 @@ static bool CDECL wg_parser_get_next_read_offset(struct wg_parser *parser, return false; }
- *offset = parser->read_request.offset;
*offset = parser->next_pull_offset; *size = parser->read_request.size;
pthread_mutex_unlock(&parser->mutex);
@@ -541,15 +536,63 @@ static bool CDECL wg_parser_get_next_read_offset(struct wg_parser *parser, static void CDECL wg_parser_push_data(struct wg_parser *parser, const void *data, uint32_t size) {
- GstBuffer *buffer;
- GstFlowReturn ret;
- GError *error;
- GstMessage *message;
- if (!data)
- {
pthread_mutex_lock(&parser->mutex);
error = g_error_new(G_FILE_ERROR, G_FILE_ERROR_FAILED, "WG-Parser client failed to read data at offset %" G_GUINT64_FORMAT, parser->next_pull_offset);
message = gst_message_new_error(NULL, error, "");
if (!gst_bus_post(parser->bus, message))
{
GST_ERROR("Failed to post error message to bus!\n");
gst_message_unref(message);
}
Is there a point in posting a message to the bus?
parser->read_request.pending_read = false;
This is fine temporarily. Ultimately I think we should be terminating the client-side thread on error, and if possible not calling wg_parser_push_data() at all.
pthread_mutex_unlock(&parser->mutex);
return;
- }
- if (!size)
- {
pthread_mutex_lock(&parser->mutex);
g_signal_emit_by_name(G_OBJECT(parser->appsrc), "end-of-stream", &ret);
parser->read_request.pending_read = false;
pthread_mutex_unlock(&parser->mutex);
return;
- }
- /* We could avoid this extra copy using gst_buffer_new_wrapped.
However, PE wouldn't know when to release the buffer allocations as the buffer
objects are queued, so we'd have to create a ring buffer the size of the gstappsrc
queue on the PE side and validate that we don't overrun on the unix side. I'm not
yet convinced that trying to reduce copies of compressed data is worth the
complexity. */
- buffer = gst_buffer_new_and_alloc(size);
- gst_buffer_fill(buffer, 0, data, size);
pthread_mutex_lock(&parser->mutex);
- parser->read_request.size = size;
- parser->read_request.done = true;
- parser->read_request.ret = !!data;
- if (data)
memcpy(parser->read_request.data, data, size);
- parser->read_request.data = NULL;
- assert(parser->read_request.pending_read);
- GST_BUFFER_OFFSET(buffer) = parser->next_pull_offset;
- g_signal_emit_by_name(G_OBJECT(parser->appsrc), "push-buffer", buffer, &ret);
- /* In random-access mode, GST_FLOW_EOS shouldn't be returned */
- assert(ret == GST_FLOW_OK || ret == GST_FLOW_FLUSHING);
- if (ret == GST_FLOW_OK)
parser->next_pull_offset += size;
- else
gst_buffer_unref(buffer);
According to my reading of the source code, GstAppSrc will take the reference even if it returns GST_FLOW_FLUSHING. That's a bit unintuitive and should probably be clearly documented on the GStreamer side, but anyway...
- parser->read_request.pending_read = false; pthread_mutex_unlock(&parser->mutex);
pthread_cond_signal(&parser->read_done_cond); }
static void CDECL wg_parser_set_unlimited_buffering(struct wg_parser *parser)