Instead of having _Shutdown and the constructors' failures paths mix together creating confusion about what should be released where, we can designate ::Shutdown/::Release to shutting down fully initialized objects without checks, and keep the partially-created object cleanup code in the constructor.
Signed-off-by: Derek Lesho dlesho@codeweavers.com --- v3: Fix the noted oversights in the constructor cleanup path. --- dlls/winegstreamer/media_source.c | 102 +++++++++++++++++++----------- 1 file changed, 65 insertions(+), 37 deletions(-)
diff --git a/dlls/winegstreamer/media_source.c b/dlls/winegstreamer/media_source.c index 01ab626254a..7efa3cc06ba 100644 --- a/dlls/winegstreamer/media_source.c +++ b/dlls/winegstreamer/media_source.c @@ -733,6 +733,12 @@ static HRESULT new_media_stream(struct media_source *source, object->IMFMediaStream_iface.lpVtbl = &media_stream_vtbl; object->ref = 1;
+ if (FAILED(hr = MFCreateEventQueue(&object->event_queue))) + { + free(object); + return hr; + } + IMFMediaSource_AddRef(&source->IMFMediaSource_iface); object->parent_source = source; object->stream_id = stream_id; @@ -741,20 +747,11 @@ static HRESULT new_media_stream(struct media_source *source, object->eos = FALSE; object->wg_stream = wg_stream;
- if (FAILED(hr = MFCreateEventQueue(&object->event_queue))) - goto fail; - TRACE("Created stream object %p.\n", object);
*out_stream = object;
return S_OK; - -fail: - WARN("Failed to construct media stream, hr %#x.\n", hr); - - IMFMediaStream_Release(&object->IMFMediaStream_iface); - return hr; }
static HRESULT media_stream_init_desc(struct media_stream *stream) @@ -847,10 +844,16 @@ static HRESULT media_stream_init_desc(struct media_stream *stream) goto done;
if (FAILED(hr = IMFStreamDescriptor_GetMediaTypeHandler(stream->descriptor, &type_handler))) + { + IMFStreamDescriptor_Release(stream->descriptor); goto done; + }
if (FAILED(hr = IMFMediaTypeHandler_SetCurrentMediaType(type_handler, stream_types[0]))) + { + IMFStreamDescriptor_Release(stream->descriptor); goto done; + }
done: if (type_handler) @@ -1213,19 +1216,13 @@ static HRESULT WINAPI media_source_Shutdown(IMFMediaSource *iface)
unix_funcs->wg_parser_disconnect(source->wg_parser);
- if (source->read_thread) - { - source->read_thread_shutdown = true; - WaitForSingleObject(source->read_thread, INFINITE); - CloseHandle(source->read_thread); - } + source->read_thread_shutdown = true; + WaitForSingleObject(source->read_thread, INFINITE); + CloseHandle(source->read_thread);
- if (source->pres_desc) - IMFPresentationDescriptor_Release(source->pres_desc); - if (source->event_queue) - IMFMediaEventQueue_Shutdown(source->event_queue); - if (source->byte_stream) - IMFByteStream_Release(source->byte_stream); + IMFPresentationDescriptor_Release(source->pres_desc); + IMFMediaEventQueue_Shutdown(source->event_queue); + IMFByteStream_Release(source->byte_stream);
for (i = 0; i < source->stream_count; i++) { @@ -1233,23 +1230,18 @@ static HRESULT WINAPI media_source_Shutdown(IMFMediaSource *iface)
stream->state = STREAM_SHUTDOWN;
- if (stream->event_queue) - IMFMediaEventQueue_Shutdown(stream->event_queue); - if (stream->descriptor) - IMFStreamDescriptor_Release(stream->descriptor); - if (stream->parent_source) - IMFMediaSource_Release(&stream->parent_source->IMFMediaSource_iface); + IMFMediaEventQueue_Shutdown(stream->event_queue); + IMFStreamDescriptor_Release(stream->descriptor); + IMFMediaSource_Release(&stream->parent_source->IMFMediaSource_iface);
IMFMediaStream_Release(&stream->IMFMediaStream_iface); }
unix_funcs->wg_parser_destroy(source->wg_parser);
- if (source->stream_count) - free(source->streams); + free(source->streams);
- if (source->async_commands_queue) - MFUnlockWorkQueue(source->async_commands_queue); + MFUnlockWorkQueue(source->async_commands_queue);
return S_OK; } @@ -1274,6 +1266,7 @@ static const IMFMediaSourceVtbl IMFMediaSource_vtbl = static HRESULT media_source_constructor(IMFByteStream *bytestream, struct media_source **out_media_source) { IMFStreamDescriptor **descriptors = NULL; + unsigned int stream_count = UINT32_MAX; struct media_source *object; UINT64 total_pres_time = 0; struct wg_parser *parser; @@ -1337,15 +1330,15 @@ static HRESULT media_source_constructor(IMFByteStream *bytestream, struct media_ * leak occurs with native. */ unix_funcs->wg_parser_set_unlimited_buffering(parser);
- object->stream_count = unix_funcs->wg_parser_get_stream_count(parser); + stream_count = unix_funcs->wg_parser_get_stream_count(parser);
- if (!(object->streams = calloc(object->stream_count, sizeof(*object->streams)))) + if (!(object->streams = calloc(stream_count, sizeof(*object->streams)))) { hr = E_OUTOFMEMORY; goto fail; }
- for (i = 0; i < object->stream_count; ++i) + for (i = 0; i < stream_count; ++i) { if (FAILED(hr = new_media_stream(object, unix_funcs->wg_parser_get_stream(parser, i), i, &object->streams[i]))) goto fail; @@ -1353,9 +1346,13 @@ static HRESULT media_source_constructor(IMFByteStream *bytestream, struct media_ if (FAILED(hr = media_stream_init_desc(object->streams[i]))) { ERR("Failed to finish initialization of media stream %p, hr %x.\n", object->streams[i], hr); - IMFMediaStream_Release(&object->streams[i]->IMFMediaStream_iface); + IMFMediaSource_Release(&object->streams[i]->parent_source->IMFMediaSource_iface); + IMFMediaEventQueue_Release(object->streams[i]->event_queue); + free(object->streams[i]); goto fail; } + + object->stream_count++; }
/* init presentation descriptor */ @@ -1392,8 +1389,39 @@ static HRESULT media_source_constructor(IMFByteStream *bytestream, struct media_ fail: WARN("Failed to construct MFMediaSource, hr %#x.\n", hr);
- free(descriptors); - IMFMediaSource_Release(&object->IMFMediaSource_iface); + if (descriptors) + { + for(i = 0; i < object->stream_count; i++) + IMFStreamDescriptor_Release(descriptors[i]); + free(descriptors); + } + for (i = 0; i < object->stream_count; i++) + { + struct media_stream *stream = object->streams[i]; + + IMFMediaEventQueue_Release(stream->event_queue); + IMFStreamDescriptor_Release(stream->descriptor); + IMFMediaSource_Release(&stream->parent_source->IMFMediaSource_iface); + + free(stream); + } + free(object->streams); + if (stream_count != UINT32_MAX) + unix_funcs->wg_parser_disconnect(object->wg_parser); + if (object->read_thread) + { + object->read_thread_shutdown = true; + WaitForSingleObject(object->read_thread, INFINITE); + CloseHandle(object->read_thread); + } + if (object->wg_parser) + unix_funcs->wg_parser_destroy(object->wg_parser); + if (object->async_commands_queue) + MFUnlockWorkQueue(object->async_commands_queue); + if (object->event_queue) + IMFMediaEventQueue_Release(object->event_queue); + IMFByteStream_Release(object->byte_stream); + free(object); return hr; }
This adds a allocate/blit for the input data, but this is necessary for both WOW64 support and an internal rework of the source path in wg_parser to use GstAppSrc. Source data is usually compressed anyway, so it shouldn't affect performance too badly.
Signed-off-by: Derek Lesho dlesho@codeweavers.com --- v3: Addressed comments. --- dlls/winegstreamer/gst_private.h | 7 ++++--- dlls/winegstreamer/media_source.c | 28 ++++++++++++++++++++++++---- dlls/winegstreamer/quartz_parser.c | 26 +++++++++++++++++++++++--- dlls/winegstreamer/wg_parser.c | 28 +++++++++++++++------------- 4 files changed, 66 insertions(+), 23 deletions(-)
diff --git a/dlls/winegstreamer/gst_private.h b/dlls/winegstreamer/gst_private.h index c6c99b1dd55..9943682facb 100644 --- a/dlls/winegstreamer/gst_private.h +++ b/dlls/winegstreamer/gst_private.h @@ -169,9 +169,10 @@ struct unix_funcs void (CDECL *wg_parser_begin_flush)(struct wg_parser *parser); void (CDECL *wg_parser_end_flush)(struct wg_parser *parser);
- bool (CDECL *wg_parser_get_read_request)(struct wg_parser *parser, - void **data, uint64_t *offset, uint32_t *size); - void (CDECL *wg_parser_complete_read_request)(struct wg_parser *parser, bool ret); + bool (CDECL *wg_parser_get_next_read_offset)(struct wg_parser *parser, + uint64_t *offset, uint32_t *size); + void (CDECL *wg_parser_push_data)(struct wg_parser *parser, + const void *data, uint32_t size);
void (CDECL *wg_parser_set_unlimited_buffering)(struct wg_parser *parser);
diff --git a/dlls/winegstreamer/media_source.c b/dlls/winegstreamer/media_source.c index 7efa3cc06ba..db5548adbd0 100644 --- a/dlls/winegstreamer/media_source.c +++ b/dlls/winegstreamer/media_source.c @@ -530,6 +530,11 @@ static DWORD CALLBACK read_thread(void *arg) { struct media_source *source = arg; IMFByteStream *byte_stream = source->byte_stream; + uint32_t buffer_size = 0; + uint64_t file_size; + void *data = NULL; + + IMFByteStream_GetLength(byte_stream, &file_size);
TRACE("Starting read thread for media source %p.\n", source);
@@ -539,18 +544,33 @@ static DWORD CALLBACK read_thread(void *arg) ULONG ret_size; uint32_t size; HRESULT hr; - void *data;
- if (!unix_funcs->wg_parser_get_read_request(source->wg_parser, &data, &offset, &size)) + if (!unix_funcs->wg_parser_get_next_read_offset(source->wg_parser, &offset, &size)) continue;
+ if (offset >= file_size) + size = 0; + else if (offset + size >= file_size) + size = file_size - offset; + + if (size > buffer_size) + { + buffer_size = size; + data = realloc(data, size); + } + + ret_size = 0; + if (SUCCEEDED(hr = IMFByteStream_SetCurrentPosition(byte_stream, offset))) hr = IMFByteStream_Read(byte_stream, data, size, &ret_size); - if (SUCCEEDED(hr) && ret_size != size) + if (FAILED(hr)) + ERR("Failed to read source stream bytes %p+%u. hr=%#x\n", data, size, hr); + else if (ret_size != size) ERR("Unexpected short read: requested %u bytes, got %u.\n", size, ret_size); - unix_funcs->wg_parser_complete_read_request(source->wg_parser, SUCCEEDED(hr)); + unix_funcs->wg_parser_push_data(source->wg_parser, SUCCEEDED(hr) ? data : NULL, ret_size); }
+ free(data); TRACE("Media source is shutting down; exiting.\n"); return 0; } diff --git a/dlls/winegstreamer/quartz_parser.c b/dlls/winegstreamer/quartz_parser.c index 09a916d7f5c..f6179c742f7 100644 --- a/dlls/winegstreamer/quartz_parser.c +++ b/dlls/winegstreamer/quartz_parser.c @@ -785,6 +785,11 @@ static DWORD CALLBACK stream_thread(void *arg) static DWORD CALLBACK read_thread(void *arg) { struct parser *filter = arg; + LONGLONG file_size, unused; + uint32_t buffer_size = 0; + void *data = NULL; + + IAsyncReader_Length(filter->reader, &file_size, &unused);
TRACE("Starting read thread for filter %p.\n", filter);
@@ -793,14 +798,29 @@ static DWORD CALLBACK read_thread(void *arg) uint64_t offset; uint32_t size; HRESULT hr; - void *data;
- if (!unix_funcs->wg_parser_get_read_request(filter->wg_parser, &data, &offset, &size)) + if (!unix_funcs->wg_parser_get_next_read_offset(filter->wg_parser, &offset, &size)) continue; + + if (offset >= file_size) + size = 0; + else if (offset + size >= file_size) + size = file_size - offset; + + if (size > buffer_size) + { + buffer_size = size; + data = realloc(data, size); + } + hr = IAsyncReader_SyncRead(filter->reader, offset, size, data); - unix_funcs->wg_parser_complete_read_request(filter->wg_parser, SUCCEEDED(hr)); + if (FAILED(hr)) + ERR("Async Reader failed to failed to read %p+%u. hr=%#x\n", data, size, hr); + + unix_funcs->wg_parser_push_data(filter->wg_parser, SUCCEEDED(hr) ? data : NULL, size); }
+ free(data); TRACE("Streaming stopped; exiting.\n"); return 0; } diff --git a/dlls/winegstreamer/wg_parser.c b/dlls/winegstreamer/wg_parser.c index cd12a23d0c8..b9e74c55b08 100644 --- a/dlls/winegstreamer/wg_parser.c +++ b/dlls/winegstreamer/wg_parser.c @@ -517,8 +517,8 @@ static void CDECL wg_parser_end_flush(struct wg_parser *parser) pthread_mutex_unlock(&parser->mutex); }
-static bool CDECL wg_parser_get_read_request(struct wg_parser *parser, - void **data, uint64_t *offset, uint32_t *size) +static bool CDECL wg_parser_get_next_read_offset(struct wg_parser *parser, + uint64_t *offset, uint32_t *size) { pthread_mutex_lock(&parser->mutex);
@@ -531,7 +531,6 @@ static bool CDECL wg_parser_get_read_request(struct wg_parser *parser, return false; }
- *data = parser->read_request.data; *offset = parser->read_request.offset; *size = parser->read_request.size;
@@ -539,11 +538,15 @@ static bool CDECL wg_parser_get_read_request(struct wg_parser *parser, return true; }
-static void CDECL wg_parser_complete_read_request(struct wg_parser *parser, bool ret) +static void CDECL wg_parser_push_data(struct wg_parser *parser, + const void *data, uint32_t size) { pthread_mutex_lock(&parser->mutex); + parser->read_request.size = size; parser->read_request.done = true; - parser->read_request.ret = ret; + parser->read_request.ret = !!data; + if (data) + memcpy(parser->read_request.data, data, size); parser->read_request.data = NULL; pthread_mutex_unlock(&parser->mutex); pthread_cond_signal(&parser->read_done_cond); @@ -1190,10 +1193,6 @@ static GstFlowReturn src_getrange_cb(GstPad *pad, GstObject *parent, if (offset == GST_BUFFER_OFFSET_NONE) offset = parser->next_pull_offset; parser->next_pull_offset = offset + size; - if (offset >= parser->file_size) - return GST_FLOW_EOS; - if (offset + size >= parser->file_size) - size = parser->file_size - offset;
if (!*buffer) *buffer = new_buffer = gst_buffer_new_and_alloc(size); @@ -1217,6 +1216,7 @@ static GstFlowReturn src_getrange_cb(GstPad *pad, GstObject *parent, pthread_cond_wait(&parser->read_done_cond, &parser->mutex);
ret = parser->read_request.ret; + gst_buffer_set_size(*buffer, parser->read_request.size);
pthread_mutex_unlock(&parser->mutex);
@@ -1224,10 +1224,12 @@ static GstFlowReturn src_getrange_cb(GstPad *pad, GstObject *parent,
GST_LOG("Request returned %d.", ret);
- if (!ret && new_buffer) + if ((!ret || !size) && new_buffer) gst_buffer_unref(new_buffer);
- return ret ? GST_FLOW_OK : GST_FLOW_ERROR; + if (ret) + return size ? GST_FLOW_OK : GST_FLOW_EOS; + return GST_FLOW_ERROR; }
static gboolean src_query_cb(GstPad *pad, GstObject *parent, GstQuery *query) @@ -1918,8 +1920,8 @@ static const struct unix_funcs funcs = wg_parser_begin_flush, wg_parser_end_flush,
- wg_parser_get_read_request, - wg_parser_complete_read_request, + wg_parser_get_next_read_offset, + wg_parser_push_data,
wg_parser_set_unlimited_buffering,
Signed-off-by: Derek Lesho dlesho@codeweavers.com --- v3: Fix noted double-free in response to GST_FLOW_FLUSHING from push-buffer. --- dlls/winegstreamer/wg_parser.c | 391 ++++++++------------------------- 1 file changed, 93 insertions(+), 298 deletions(-)
diff --git a/dlls/winegstreamer/wg_parser.c b/dlls/winegstreamer/wg_parser.c index b9e74c55b08..1317ef82b11 100644 --- a/dlls/winegstreamer/wg_parser.c +++ b/dlls/winegstreamer/wg_parser.c @@ -33,6 +33,7 @@ #include <gst/gst.h> #include <gst/video/video.h> #include <gst/audio/audio.h> +#include <gst/app/gstappsrc.h>
/* GStreamer callbacks may be called on threads not created by Wine, and * therefore cannot access the Wine TEB. This means that we must use GStreamer @@ -49,28 +50,22 @@ struct wg_parser struct wg_parser_stream **streams; unsigned int stream_count;
- GstElement *container, *decodebin; + GstElement *container, *appsrc, *decodebin; GstBus *bus; - GstPad *my_src, *their_sink;
- guint64 file_size, start_offset, next_offset, stop_offset; - guint64 next_pull_offset; - - pthread_t push_thread; + guint64 file_size;
pthread_mutex_t mutex;
pthread_cond_t init_cond; bool no_more_pads, has_duration, error;
- pthread_cond_t read_cond, read_done_cond; + pthread_cond_t read_cond; struct { - void *data; uint64_t offset; uint32_t size; - bool done; - bool ret; + bool pending_read; } read_request;
bool flushing, sink_connected; @@ -522,7 +517,7 @@ static bool CDECL wg_parser_get_next_read_offset(struct wg_parser *parser, { pthread_mutex_lock(&parser->mutex);
- while (parser->sink_connected && !parser->read_request.data) + while (parser->sink_connected && !parser->read_request.pending_read) pthread_cond_wait(&parser->read_cond, &parser->mutex);
if (!parser->sink_connected) @@ -541,15 +536,57 @@ static bool CDECL wg_parser_get_next_read_offset(struct wg_parser *parser, static void CDECL wg_parser_push_data(struct wg_parser *parser, const void *data, uint32_t size) { + GstBuffer *buffer; + GstFlowReturn ret; + GError *error; + GstMessage *message; + + if (!data) + { + pthread_mutex_lock(&parser->mutex); + + error = g_error_new(G_FILE_ERROR, G_FILE_ERROR_FAILED, "WG-Parser client failed to read data at offset %" G_GUINT64_FORMAT, parser->read_request.offset); + message = gst_message_new_error(NULL, error, ""); + gst_bus_post(parser->bus, message); + parser->read_request.pending_read = false; + + pthread_mutex_unlock(&parser->mutex); + return; + } + + if (!size) + { + pthread_mutex_lock(&parser->mutex); + + g_signal_emit_by_name(G_OBJECT(parser->appsrc), "end-of-stream", &ret); + parser->read_request.pending_read = false; + + pthread_mutex_unlock(&parser->mutex); + return; + } + + /* We could avoid this extra copy using gst_buffer_new_wrapped. + However, PE wouldn't know when to release the buffer allocations as the buffer + objects are queued, so we'd have to create a ring buffer the size of the gstappsrc + queue on the PE side and validate that we don't overrun on the unix side. I'm not + yet convinced that trying to reduce copies of compressed data is worth the + complexity. */ + buffer = gst_buffer_new_and_alloc(size); + gst_buffer_fill(buffer, 0, data, size); + pthread_mutex_lock(&parser->mutex); - parser->read_request.size = size; - parser->read_request.done = true; - parser->read_request.ret = !!data; - if (data) - memcpy(parser->read_request.data, data, size); - parser->read_request.data = NULL; + assert(parser->read_request.pending_read); + + GST_BUFFER_OFFSET(buffer) = parser->read_request.offset; + g_signal_emit_by_name(G_OBJECT(parser->appsrc), "push-buffer", buffer, &ret); + + /* In random-access mode, GST_FLOW_EOS shouldn't be returned */ + assert(ret == GST_FLOW_OK || ret == GST_FLOW_FLUSHING); + if (ret == GST_FLOW_OK) + parser->read_request.offset += size; + + parser->read_request.pending_read = false; pthread_mutex_unlock(&parser->mutex); - pthread_cond_signal(&parser->read_done_cond); }
static void CDECL wg_parser_set_unlimited_buffering(struct wg_parser *parser) @@ -1180,196 +1217,42 @@ static void pad_removed_cb(GstElement *element, GstPad *pad, gpointer user) g_free(name); }
-static GstFlowReturn src_getrange_cb(GstPad *pad, GstObject *parent, - guint64 offset, guint size, GstBuffer **buffer) +static void src_need_data(GstElement *appsrc, guint length, gpointer user) { - struct wg_parser *parser = gst_pad_get_element_private(pad); - GstBuffer *new_buffer = NULL; - GstMapInfo map_info; - bool ret; - - GST_LOG("pad %p, offset %" G_GINT64_MODIFIER "u, length %u, buffer %p.", pad, offset, size, *buffer); - - if (offset == GST_BUFFER_OFFSET_NONE) - offset = parser->next_pull_offset; - parser->next_pull_offset = offset + size; - - if (!*buffer) - *buffer = new_buffer = gst_buffer_new_and_alloc(size); - - gst_buffer_map(*buffer, &map_info, GST_MAP_WRITE); + struct wg_parser *parser = user; + guint64 queued_bytes;
pthread_mutex_lock(&parser->mutex);
- assert(!parser->read_request.data); - parser->read_request.data = map_info.data; - parser->read_request.offset = offset; - parser->read_request.size = size; - parser->read_request.done = false; - pthread_cond_signal(&parser->read_cond); - - /* Note that we don't unblock this wait on GST_EVENT_FLUSH_START. We expect - * the upstream pin to flush if necessary. We should never be blocked on - * read_thread() not running. */ - - while (!parser->read_request.done) - pthread_cond_wait(&parser->read_done_cond, &parser->mutex); - - ret = parser->read_request.ret; - gst_buffer_set_size(*buffer, parser->read_request.size); - - pthread_mutex_unlock(&parser->mutex); - - gst_buffer_unmap(*buffer, &map_info); - - GST_LOG("Request returned %d.", ret); - - if ((!ret || !size) && new_buffer) - gst_buffer_unref(new_buffer); - - if (ret) - return size ? GST_FLOW_OK : GST_FLOW_EOS; - return GST_FLOW_ERROR; -} - -static gboolean src_query_cb(GstPad *pad, GstObject *parent, GstQuery *query) -{ - struct wg_parser *parser = gst_pad_get_element_private(pad); - GstFormat format; - - GST_LOG("parser %p, type %s.", parser, GST_QUERY_TYPE_NAME(query)); - - switch (GST_QUERY_TYPE(query)) - { - case GST_QUERY_DURATION: - gst_query_parse_duration(query, &format, NULL); - if (format == GST_FORMAT_PERCENT) - { - gst_query_set_duration(query, GST_FORMAT_PERCENT, GST_FORMAT_PERCENT_MAX); - return TRUE; - } - else if (format == GST_FORMAT_BYTES) - { - gst_query_set_duration(query, GST_FORMAT_BYTES, parser->file_size); - return TRUE; - } - return FALSE; - - case GST_QUERY_SEEKING: - gst_query_parse_seeking (query, &format, NULL, NULL, NULL); - if (format != GST_FORMAT_BYTES) - { - GST_WARNING("Cannot seek using format "%s".", gst_format_get_name(format)); - return FALSE; - } - gst_query_set_seeking(query, GST_FORMAT_BYTES, 1, 0, parser->file_size); - return TRUE; - - case GST_QUERY_SCHEDULING: - gst_query_set_scheduling(query, GST_SCHEDULING_FLAG_SEEKABLE, 1, -1, 0); - gst_query_add_scheduling_mode(query, GST_PAD_MODE_PUSH); - gst_query_add_scheduling_mode(query, GST_PAD_MODE_PULL); - return TRUE; - - default: - GST_WARNING("Unhandled query type %s.", GST_QUERY_TYPE_NAME(query)); - return FALSE; - } -} - -static void *push_data(void *arg) -{ - struct wg_parser *parser = arg; - GstBuffer *buffer; - guint max_size; - - GST_DEBUG("Starting push thread."); - - if (!(buffer = gst_buffer_new_allocate(NULL, 16384, NULL))) + /* Workaround for GStreamer bug: https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/-/issues/937 + current-level-buffers is a closer fit but it's a recent addition. */ + g_object_get(G_OBJECT(appsrc), "current-level-bytes", &queued_bytes, NULL); + if (queued_bytes) { - GST_ERROR("Failed to allocate memory."); - return NULL; - } - - max_size = parser->stop_offset ? parser->stop_offset : parser->file_size; - - for (;;) - { - ULONG size; - int ret; - - if (parser->next_offset >= max_size) - break; - size = min(16384, max_size - parser->next_offset); - - if ((ret = src_getrange_cb(parser->my_src, NULL, parser->next_offset, size, &buffer)) < 0) - { - GST_ERROR("Failed to read data, ret %s.", gst_flow_get_name(ret)); - break; - } - - parser->next_offset += size; - - buffer->duration = buffer->pts = -1; - if ((ret = gst_pad_push(parser->my_src, buffer)) < 0) - { - GST_ERROR("Failed to push data, ret %s.", gst_flow_get_name(ret)); - break; - } + pthread_mutex_unlock(&parser->mutex); + return; }
- gst_buffer_unref(buffer); + parser->read_request.pending_read = true; + parser->read_request.size = length;
- gst_pad_push_event(parser->my_src, gst_event_new_eos()); - - GST_DEBUG("Stopping push thread."); + pthread_cond_signal(&parser->read_cond);
- return NULL; + pthread_mutex_unlock(&parser->mutex); }
-static gboolean activate_push(GstPad *pad, gboolean activate) +static gboolean src_seek_data(GstElement *appsrc, guint64 offset, gpointer user) { - struct wg_parser *parser = gst_pad_get_element_private(pad); - - if (!activate) - { - if (parser->push_thread) - { - pthread_join(parser->push_thread, NULL); - parser->push_thread = 0; - } - } - else if (!parser->push_thread) - { - int ret; + struct wg_parser *parser = user;
- if ((ret = pthread_create(&parser->push_thread, NULL, push_data, parser))) - { - GST_ERROR("Failed to create push thread: %s", strerror(errno)); - parser->push_thread = 0; - return FALSE; - } - } - return TRUE; -} + pthread_mutex_lock(&parser->mutex);
-static gboolean src_activate_mode_cb(GstPad *pad, GstObject *parent, GstPadMode mode, gboolean activate) -{ - struct wg_parser *parser = gst_pad_get_element_private(pad); + assert(!parser->read_request.pending_read); + parser->read_request.offset = offset;
- GST_DEBUG("%s source pad for parser %p in %s mode.", - activate ? "Activating" : "Deactivating", parser, gst_pad_mode_get_name(mode)); + pthread_mutex_unlock(&parser->mutex);
- switch (mode) - { - case GST_PAD_MODE_PULL: - return TRUE; - case GST_PAD_MODE_PUSH: - return activate_push(pad, activate); - case GST_PAD_MODE_NONE: - break; - } - return FALSE; + return true; }
static GstBusSyncReply bus_handler_cb(GstBus *bus, GstMessage *msg, gpointer user) @@ -1416,85 +1299,8 @@ static GstBusSyncReply bus_handler_cb(GstBus *bus, GstMessage *msg, gpointer use return GST_BUS_DROP; }
-static gboolean src_perform_seek(struct wg_parser *parser, GstEvent *event) -{ - BOOL thread = !!parser->push_thread; - GstSeekType cur_type, stop_type; - GstFormat seek_format; - GstEvent *flush_event; - GstSeekFlags flags; - gint64 cur, stop; - guint32 seqnum; - gdouble rate; - - gst_event_parse_seek(event, &rate, &seek_format, &flags, - &cur_type, &cur, &stop_type, &stop); - - if (seek_format != GST_FORMAT_BYTES) - { - GST_FIXME("Unhandled format "%s".", gst_format_get_name(seek_format)); - return FALSE; - } - - seqnum = gst_event_get_seqnum(event); - - /* send flush start */ - if (flags & GST_SEEK_FLAG_FLUSH) - { - flush_event = gst_event_new_flush_start(); - gst_event_set_seqnum(flush_event, seqnum); - gst_pad_push_event(parser->my_src, flush_event); - if (thread) - gst_pad_set_active(parser->my_src, 1); - } - - parser->next_offset = parser->start_offset = cur; - - /* and prepare to continue streaming */ - if (flags & GST_SEEK_FLAG_FLUSH) - { - flush_event = gst_event_new_flush_stop(TRUE); - gst_event_set_seqnum(flush_event, seqnum); - gst_pad_push_event(parser->my_src, flush_event); - if (thread) - gst_pad_set_active(parser->my_src, 1); - } - - return TRUE; -} - -static gboolean src_event_cb(GstPad *pad, GstObject *parent, GstEvent *event) -{ - struct wg_parser *parser = gst_pad_get_element_private(pad); - gboolean ret = TRUE; - - GST_LOG("parser %p, type "%s".", parser, GST_EVENT_TYPE_NAME(event)); - - switch (event->type) - { - case GST_EVENT_SEEK: - ret = src_perform_seek(parser, event); - break; - - case GST_EVENT_FLUSH_START: - case GST_EVENT_FLUSH_STOP: - case GST_EVENT_QOS: - case GST_EVENT_RECONFIGURE: - break; - - default: - GST_WARNING("Ignoring "%s" event.", GST_EVENT_TYPE_NAME(event)); - ret = FALSE; - break; - } - gst_event_unref(event); - return ret; -} - static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_size) { - GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE("quartz_src", - GST_PAD_SRC, GST_PAD_ALWAYS, GST_STATIC_CAPS_ANY); unsigned int i;
parser->file_size = file_size; @@ -1509,15 +1315,16 @@ static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_s parser->container = gst_bin_new(NULL); gst_element_set_bus(parser->container, parser->bus);
- parser->my_src = gst_pad_new_from_static_template(&src_template, "quartz-src"); - gst_pad_set_getrange_function(parser->my_src, src_getrange_cb); - gst_pad_set_query_function(parser->my_src, src_query_cb); - gst_pad_set_activatemode_function(parser->my_src, src_activate_mode_cb); - gst_pad_set_event_function(parser->my_src, src_event_cb); - gst_pad_set_element_private(parser->my_src, parser); + if (!(parser->appsrc = create_element("appsrc", "base"))) + return E_FAIL; + gst_bin_add(GST_BIN(parser->container), parser->appsrc); + + g_object_set(parser->appsrc, "stream-type", GST_APP_STREAM_TYPE_RANDOM_ACCESS, NULL); + g_object_set(parser->appsrc, "size", parser->file_size, NULL); + g_signal_connect(parser->appsrc, "need-data", G_CALLBACK(src_need_data), parser); + g_signal_connect(parser->appsrc, "seek-data", G_CALLBACK(src_seek_data), parser);
- parser->start_offset = parser->next_offset = parser->stop_offset = 0; - parser->next_pull_offset = 0; + parser->read_request.offset = 0;
if (!parser->init_gst(parser)) return E_FAIL; @@ -1596,7 +1403,6 @@ static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_s
pthread_mutex_unlock(&parser->mutex);
- parser->next_offset = 0; return S_OK; }
@@ -1638,10 +1444,6 @@ static void CDECL wg_parser_disconnect(struct wg_parser *parser) pthread_mutex_unlock(&parser->mutex);
gst_element_set_state(parser->container, GST_STATE_NULL); - gst_pad_unlink(parser->my_src, parser->their_sink); - gst_object_unref(parser->my_src); - gst_object_unref(parser->their_sink); - parser->my_src = parser->their_sink = NULL;
pthread_mutex_lock(&parser->mutex); parser->sink_connected = false; @@ -1676,15 +1478,13 @@ static BOOL decodebin_parser_init_gst(struct wg_parser *parser) g_signal_connect(element, "autoplug-select", G_CALLBACK(autoplug_select_cb), parser); g_signal_connect(element, "no-more-pads", G_CALLBACK(no_more_pads_cb), parser);
- parser->their_sink = gst_element_get_static_pad(element, "sink"); - pthread_mutex_lock(&parser->mutex); parser->no_more_pads = parser->error = false; pthread_mutex_unlock(&parser->mutex);
- if ((ret = gst_pad_link(parser->my_src, parser->their_sink)) < 0) + if (!gst_element_link(parser->appsrc, parser->decodebin)) { - GST_ERROR("Failed to link pads, error %d.\n", ret); + GST_ERROR("Failed to link app source.\n"); return FALSE; }
@@ -1723,15 +1523,13 @@ static BOOL avi_parser_init_gst(struct wg_parser *parser) g_signal_connect(element, "pad-removed", G_CALLBACK(pad_removed_cb), parser); g_signal_connect(element, "no-more-pads", G_CALLBACK(no_more_pads_cb), parser);
- parser->their_sink = gst_element_get_static_pad(element, "sink"); - pthread_mutex_lock(&parser->mutex); parser->no_more_pads = parser->error = false; pthread_mutex_unlock(&parser->mutex);
- if ((ret = gst_pad_link(parser->my_src, parser->their_sink)) < 0) + if (!gst_element_link(parser->appsrc, element)) { - GST_ERROR("Failed to link pads, error %d.\n", ret); + GST_ERROR("Failed to link app source.\n"); return FALSE; }
@@ -1767,10 +1565,9 @@ static BOOL mpeg_audio_parser_init_gst(struct wg_parser *parser)
gst_bin_add(GST_BIN(parser->container), element);
- parser->their_sink = gst_element_get_static_pad(element, "sink"); - if ((ret = gst_pad_link(parser->my_src, parser->their_sink)) < 0) + if (!gst_element_link(parser->appsrc, element)) { - GST_ERROR("Failed to link sink pads, error %d.\n", ret); + GST_ERROR("Failed to link app source.\n"); return FALSE; }
@@ -1807,10 +1604,9 @@ static BOOL wave_parser_init_gst(struct wg_parser *parser)
gst_bin_add(GST_BIN(parser->container), element);
- parser->their_sink = gst_element_get_static_pad(element, "sink"); - if ((ret = gst_pad_link(parser->my_src, parser->their_sink)) < 0) + if (!gst_element_link(parser->appsrc, element)) { - GST_ERROR("Failed to link sink pads, error %d.\n", ret); + GST_ERROR("Failed to link app source.\n"); return FALSE; }
@@ -1847,8 +1643,8 @@ static struct wg_parser *wg_parser_create(void) pthread_mutex_init(&parser->mutex, NULL); pthread_cond_init(&parser->init_cond, NULL); pthread_cond_init(&parser->read_cond, NULL); - pthread_cond_init(&parser->read_done_cond, NULL); parser->flushing = true; + parser->read_request.pending_read = false;
GST_DEBUG("Created winegstreamer parser %p.\n", parser); return parser; @@ -1901,7 +1697,6 @@ static void CDECL wg_parser_destroy(struct wg_parser *parser) pthread_mutex_destroy(&parser->mutex); pthread_cond_destroy(&parser->init_cond); pthread_cond_destroy(&parser->read_cond); - pthread_cond_destroy(&parser->read_done_cond);
free(parser); }
I got this running tests:
leslie@terabithia:~/git/wine64/dlls/quartz/tests$ make && ~/git/wine/tools/runtest waveparser make[1]: Entering directory '/home/leslie/git/wine64' make[1]: Nothing to be done for 'dlls/quartz/tests/all'. make[1]: Leaving directory '/home/leslie/git/wine64' waveparser.c:154: Test marked todo: Got hr 0x80004002, expected 0. 0118:fixme:ole:CoCreateInstanceEx no instance created for interface {56a86895-0ad4-11ce-b03a-0020af0ba770} of class {d51bd5a1-7548-11cf-a520-0080c77ef58a}, hr 0x80004002.
(wine:329482): GLib-GObject-WARNING **: 15:30:24.992: instance with invalid (NULL) class pointer
(wine:329482): GLib-GObject-CRITICAL **: 15:30:24.992: g_signal_emit_valist: assertion 'G_TYPE_CHECK_INSTANCE (instance)' failed quartz\tests\quartz_test.exe: ../wine/dlls/winegstreamer/wg_parser.c:584: wg_parser_push_data: Assertion `ret == GST_FLOW_OK || ret == GST_FLOW_FLUSHING' failed.
It can be reproduced every time if you add a g_usleep(100000) right before the gst_buffer_new_and_alloc() call.
The problem is that you're (implicitly) destroying the appsrc in wg_parser_disconnect(), but the read thread isn't shut down until later.
There are a few ways to solve this. We have to keep the disconnect -> shutdown read thread -> destroy order as-is (because the read thread needs to be unblocked if it's blocked in wg_parser_get_next_read_offset, and we can't destroy the parser while anything still has a pointer to it), but given that, I think the right solution here is probably to return from wg_parser_push_data() if !parser->sink_connected.