Signed-off-by: Derek Lesho dlesho@codeweavers.com --- dlls/winegstreamer/quartz_parser.c | 3 +++ 1 file changed, 3 insertions(+)
diff --git a/dlls/winegstreamer/quartz_parser.c b/dlls/winegstreamer/quartz_parser.c index 5299f4dc2ed..a8e7e3d979f 100644 --- a/dlls/winegstreamer/quartz_parser.c +++ b/dlls/winegstreamer/quartz_parser.c @@ -982,7 +982,10 @@ static HRESULT parser_sink_connect(struct strmbase_sink *iface, IPin *peer, cons goto err;
if (!filter->init_gst(filter)) + { + hr = E_FAIL; goto err; + }
for (i = 0; i < filter->source_count; ++i) {
Signed-off-by: Derek Lesho dlesho@codeweavers.com --- dlls/winegstreamer/gst_private.h | 1 - dlls/winegstreamer/media_source.c | 17 ++--- dlls/winegstreamer/quartz_parser.c | 39 ++++------ dlls/winegstreamer/wg_parser.c | 119 +++++++++++++---------------- 4 files changed, 70 insertions(+), 106 deletions(-)
diff --git a/dlls/winegstreamer/gst_private.h b/dlls/winegstreamer/gst_private.h index 49e06b31369..22d9547ed72 100644 --- a/dlls/winegstreamer/gst_private.h +++ b/dlls/winegstreamer/gst_private.h @@ -165,7 +165,6 @@ struct unix_funcs void (CDECL *wg_parser_destroy)(struct wg_parser *parser);
HRESULT (CDECL *wg_parser_connect)(struct wg_parser *parser, uint64_t file_size); - void (CDECL *wg_parser_disconnect)(struct wg_parser *parser);
void (CDECL *wg_parser_begin_flush)(struct wg_parser *parser); void (CDECL *wg_parser_end_flush)(struct wg_parser *parser); diff --git a/dlls/winegstreamer/media_source.c b/dlls/winegstreamer/media_source.c index 825bad8da27..6c2bf92e2a2 100644 --- a/dlls/winegstreamer/media_source.c +++ b/dlls/winegstreamer/media_source.c @@ -103,7 +103,6 @@ struct media_source LONGLONG start_time;
HANDLE read_thread; - bool read_thread_shutdown; };
static inline struct media_stream *impl_from_IMFMediaStream(IMFMediaStream *iface) @@ -538,7 +537,7 @@ static DWORD CALLBACK read_thread(void *arg)
TRACE("Starting read thread for media source %p.\n", source);
- while (!source->read_thread_shutdown) + for (;;) { uint64_t offset; ULONG ret_size; @@ -546,7 +545,7 @@ static DWORD CALLBACK read_thread(void *arg) HRESULT hr;
if (!unix_funcs->wg_parser_get_next_read_offset(source->wg_parser, &offset, &size)) - continue; + break;
if (offset >= file_size) size = 0; @@ -1234,9 +1233,8 @@ static HRESULT WINAPI media_source_Shutdown(IMFMediaSource *iface)
source->state = SOURCE_SHUTDOWN;
- unix_funcs->wg_parser_disconnect(source->wg_parser); + unix_funcs->wg_parser_destroy(source->wg_parser);
- source->read_thread_shutdown = true; WaitForSingleObject(source->read_thread, INFINITE); CloseHandle(source->read_thread);
@@ -1257,8 +1255,6 @@ static HRESULT WINAPI media_source_Shutdown(IMFMediaSource *iface) IMFMediaStream_Release(&stream->IMFMediaStream_iface); }
- unix_funcs->wg_parser_destroy(source->wg_parser); - free(source->streams);
MFUnlockWorkQueue(source->async_commands_queue); @@ -1426,16 +1422,13 @@ static HRESULT media_source_constructor(IMFByteStream *bytestream, struct media_ free(stream); } free(object->streams); - if (stream_count != UINT_MAX) - unix_funcs->wg_parser_disconnect(object->wg_parser); + if (object->wg_parser) + unix_funcs->wg_parser_destroy(object->wg_parser); if (object->read_thread) { - object->read_thread_shutdown = true; WaitForSingleObject(object->read_thread, INFINITE); CloseHandle(object->read_thread); } - if (object->wg_parser) - unix_funcs->wg_parser_destroy(object->wg_parser); if (object->async_commands_queue) MFUnlockWorkQueue(object->async_commands_queue); if (object->event_queue) diff --git a/dlls/winegstreamer/quartz_parser.c b/dlls/winegstreamer/quartz_parser.c index a8e7e3d979f..bf69a881d57 100644 --- a/dlls/winegstreamer/quartz_parser.c +++ b/dlls/winegstreamer/quartz_parser.c @@ -59,6 +59,7 @@ struct parser
HANDLE read_thread;
+ struct wg_parser * (*parser_create)(void); BOOL (*init_gst)(struct parser *filter); HRESULT (*source_query_accept)(struct parser_source *pin, const AM_MEDIA_TYPE *mt); HRESULT (*source_get_media_type)(struct parser_source *pin, unsigned int index, AM_MEDIA_TYPE *mt); @@ -793,14 +794,14 @@ static DWORD CALLBACK read_thread(void *arg)
TRACE("Starting read thread for filter %p.\n", filter);
- while (filter->sink_connected) + for(;;) { uint64_t offset; uint32_t size; HRESULT hr;
if (!unix_funcs->wg_parser_get_next_read_offset(filter->wg_parser, &offset, &size)) - continue; + break;
if (offset >= file_size) size = 0; @@ -869,8 +870,6 @@ static void parser_destroy(struct strmbase_filter *iface) IAsyncReader_Release(filter->reader); filter->reader = NULL;
- unix_funcs->wg_parser_destroy(filter->wg_parser); - strmbase_sink_cleanup(&filter->sink); strmbase_filter_cleanup(&filter->filter); free(filter); @@ -975,6 +974,12 @@ static HRESULT parser_sink_connect(struct strmbase_sink *iface, IPin *peer, cons
IAsyncReader_Length(filter->reader, &file_size, &unused);
+ if (!(filter->wg_parser = filter->parser_create())) + { + hr = E_OUTOFMEMORY; + goto err; + } + filter->sink_connected = true; filter->read_thread = CreateThread(NULL, 0, read_thread, filter, 0, NULL);
@@ -1115,11 +1120,7 @@ HRESULT decodebin_parser_create(IUnknown *outer, IUnknown **out) if (!(object = calloc(1, sizeof(*object)))) return E_OUTOFMEMORY;
- if (!(object->wg_parser = unix_funcs->wg_decodebin_parser_create())) - { - free(object); - return E_OUTOFMEMORY; - } + object->parser_create = unix_funcs->wg_decodebin_parser_create;
strmbase_filter_init(&object->filter, outer, &CLSID_decodebin_parser, &filter_ops); strmbase_sink_init(&object->sink, &object->filter, L"input pin", &sink_ops, NULL); @@ -1550,7 +1551,7 @@ static HRESULT GST_RemoveOutputPins(struct parser *This) if (!This->sink_connected) return S_OK;
- unix_funcs->wg_parser_disconnect(This->wg_parser); + unix_funcs->wg_parser_destroy(This->wg_parser);
/* read_thread() needs to stay alive to service any read requests GStreamer * sends, so we can only shut it down after GStreamer stops. */ @@ -1646,11 +1647,7 @@ HRESULT wave_parser_create(IUnknown *outer, IUnknown **out) if (!(object = calloc(1, sizeof(*object)))) return E_OUTOFMEMORY;
- if (!(object->wg_parser = unix_funcs->wg_wave_parser_create())) - { - free(object); - return E_OUTOFMEMORY; - } + object->parser_create = unix_funcs->wg_wave_parser_create;
strmbase_filter_init(&object->filter, outer, &CLSID_WAVEParser, &filter_ops); strmbase_sink_init(&object->sink, &object->filter, L"input pin", &wave_parser_sink_ops, NULL); @@ -1732,11 +1729,7 @@ HRESULT avi_splitter_create(IUnknown *outer, IUnknown **out) if (!(object = calloc(1, sizeof(*object)))) return E_OUTOFMEMORY;
- if (!(object->wg_parser = unix_funcs->wg_avi_parser_create())) - { - free(object); - return E_OUTOFMEMORY; - } + object->parser_create = unix_funcs->wg_avi_parser_create;
strmbase_filter_init(&object->filter, outer, &CLSID_AviSplitter, &filter_ops); strmbase_sink_init(&object->sink, &object->filter, L"input pin", &avi_splitter_sink_ops, NULL); @@ -1839,11 +1832,7 @@ HRESULT mpeg_splitter_create(IUnknown *outer, IUnknown **out) if (!(object = calloc(1, sizeof(*object)))) return E_OUTOFMEMORY;
- if (!(object->wg_parser = unix_funcs->wg_mpeg_audio_parser_create())) - { - free(object); - return E_OUTOFMEMORY; - } + object->parser_create = unix_funcs->wg_mpeg_audio_parser_create;
strmbase_filter_init(&object->filter, outer, &CLSID_MPEG1Splitter, &mpeg_splitter_ops); strmbase_sink_init(&object->sink, &object->filter, L"Input", &mpeg_splitter_sink_ops, NULL); diff --git a/dlls/winegstreamer/wg_parser.c b/dlls/winegstreamer/wg_parser.c index 6b6b033b879..5ab7991b0f2 100644 --- a/dlls/winegstreamer/wg_parser.c +++ b/dlls/winegstreamer/wg_parser.c @@ -55,8 +55,8 @@ struct wg_parser
pthread_mutex_t mutex;
- pthread_cond_t init_cond; - bool no_more_pads, has_duration, error; + pthread_cond_t state_cond; + bool no_more_pads, has_duration, error, shutdown;
pthread_cond_t read_cond; struct @@ -515,12 +515,13 @@ static bool CDECL wg_parser_get_next_read_offset(struct wg_parser *parser, { pthread_mutex_lock(&parser->mutex);
- while (parser->sink_connected && !parser->read_request.pending) + while (!parser->shutdown && !parser->read_request.pending) pthread_cond_wait(&parser->read_cond, &parser->mutex);
- if (!parser->sink_connected) + if (parser->shutdown) { pthread_mutex_unlock(&parser->mutex); + pthread_cond_signal(&parser->state_cond); return false; }
@@ -543,13 +544,10 @@ static void CDECL wg_parser_push_data(struct wg_parser *parser, { pthread_mutex_lock(&parser->mutex);
- if (parser->sink_connected) - { - error = g_error_new(G_FILE_ERROR, G_FILE_ERROR_FAILED, "WG-Parser client failed to read data at offset %" G_GUINT64_FORMAT, parser->read_request.offset); - message = gst_message_new_error(NULL, error, ""); - gst_bus_post(parser->bus, message); - parser->read_request.pending = false; - } + error = g_error_new(G_FILE_ERROR, G_FILE_ERROR_FAILED, "WG-Parser client failed to read data at offset %" G_GUINT64_FORMAT, parser->read_request.offset); + message = gst_message_new_error(NULL, error, ""); + gst_bus_post(parser->bus, message); + parser->read_request.pending = false;
pthread_mutex_unlock(&parser->mutex); return; @@ -559,8 +557,7 @@ static void CDECL wg_parser_push_data(struct wg_parser *parser, { pthread_mutex_lock(&parser->mutex);
- if (parser->sink_connected) - g_signal_emit_by_name(G_OBJECT(parser->appsrc), "end-of-stream", &ret); + g_signal_emit_by_name(G_OBJECT(parser->appsrc), "end-of-stream", &ret); parser->read_request.pending = false;
pthread_mutex_unlock(&parser->mutex); @@ -577,14 +574,6 @@ static void CDECL wg_parser_push_data(struct wg_parser *parser, gst_buffer_fill(buffer, 0, data, size);
pthread_mutex_lock(&parser->mutex); - - if (!parser->sink_connected) - { - pthread_mutex_unlock(&parser->mutex); - gst_buffer_unref(buffer); - return; - } - assert(parser->read_request.pending);
GST_BUFFER_OFFSET(buffer) = parser->read_request.offset; @@ -798,7 +787,7 @@ static void no_more_pads_cb(GstElement *element, gpointer user) pthread_mutex_lock(&parser->mutex); parser->no_more_pads = true; pthread_mutex_unlock(&parser->mutex); - pthread_cond_signal(&parser->init_cond); + pthread_cond_signal(&parser->state_cond); }
static GstFlowReturn queue_stream_event(struct wg_parser_stream *stream, @@ -885,7 +874,7 @@ static gboolean sink_event_cb(GstPad *pad, GstObject *parent, GstEvent *event) pthread_mutex_lock(&parser->mutex); stream->eos = true; pthread_mutex_unlock(&parser->mutex); - pthread_cond_signal(&parser->init_cond); + pthread_cond_signal(&parser->state_cond); } break;
@@ -936,7 +925,7 @@ static gboolean sink_event_cb(GstPad *pad, GstObject *parent, GstEvent *event) wg_format_from_caps(&stream->preferred_format, caps); stream->has_caps = true; pthread_mutex_unlock(&parser->mutex); - pthread_cond_signal(&parser->init_cond); + pthread_cond_signal(&parser->state_cond); break; }
@@ -1322,7 +1311,7 @@ static GstBusSyncReply bus_handler_cb(GstBus *bus, GstMessage *msg, gpointer use pthread_mutex_lock(&parser->mutex); parser->error = true; pthread_mutex_unlock(&parser->mutex); - pthread_cond_signal(&parser->init_cond); + pthread_cond_signal(&parser->state_cond); break;
case GST_MESSAGE_WARNING: @@ -1337,7 +1326,7 @@ static GstBusSyncReply bus_handler_cb(GstBus *bus, GstMessage *msg, gpointer use pthread_mutex_lock(&parser->mutex); parser->has_duration = true; pthread_mutex_unlock(&parser->mutex); - pthread_cond_signal(&parser->init_cond); + pthread_cond_signal(&parser->state_cond); break;
default: @@ -1352,8 +1341,6 @@ static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_s unsigned int i; int ret;
- parser->sink_connected = true; - if (!parser->bus) { parser->bus = gst_bus_new(); @@ -1402,7 +1389,7 @@ static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_s gint64 duration;
while (!stream->has_caps && !parser->error) - pthread_cond_wait(&parser->init_cond, &parser->mutex); + pthread_cond_wait(&parser->state_cond, &parser->mutex);
/* GStreamer doesn't actually provide any guarantees about when duration * is available, even for seekable streams. It's basically built for @@ -1461,13 +1448,14 @@ static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_s } else { - pthread_cond_wait(&parser->init_cond, &parser->mutex); + pthread_cond_wait(&parser->state_cond, &parser->mutex); } } }
pthread_mutex_unlock(&parser->mutex);
+ parser->sink_connected = true; return S_OK;
out: @@ -1495,38 +1483,6 @@ out: return E_FAIL; }
-static void CDECL wg_parser_disconnect(struct wg_parser *parser) -{ - unsigned int i; - - /* Unblock all of our streams. */ - pthread_mutex_lock(&parser->mutex); - for (i = 0; i < parser->stream_count; ++i) - { - parser->streams[i]->flushing = true; - pthread_cond_signal(&parser->streams[i]->event_empty_cond); - } - pthread_mutex_unlock(&parser->mutex); - - gst_element_set_state(parser->container, GST_STATE_NULL); - - pthread_mutex_lock(&parser->mutex); - parser->sink_connected = false; - pthread_mutex_unlock(&parser->mutex); - pthread_cond_signal(&parser->read_cond); - - for (i = 0; i < parser->stream_count; ++i) - free_stream(parser->streams[i]); - - parser->stream_count = 0; - free(parser->streams); - parser->streams = NULL; - - gst_element_set_bus(parser->container, NULL); - gst_object_unref(parser->container); - parser->container = NULL; -} - static BOOL decodebin_parser_init_gst(struct wg_parser *parser) { GstElement *element; @@ -1656,7 +1612,7 @@ static struct wg_parser *wg_parser_create(void) return NULL;
pthread_mutex_init(&parser->mutex, NULL); - pthread_cond_init(&parser->init_cond, NULL); + pthread_cond_init(&parser->state_cond, NULL); pthread_cond_init(&parser->read_cond, NULL); parser->flushing = true;
@@ -1702,14 +1658,42 @@ static struct wg_parser * CDECL wg_wave_parser_create(void)
static void CDECL wg_parser_destroy(struct wg_parser *parser) { - if (parser->bus) + unsigned int i; + + pthread_mutex_lock(&parser->mutex); + parser->shutdown = true; + pthread_cond_signal(&parser->read_cond); + pthread_cond_wait(&parser->state_cond, &parser->mutex); + pthread_mutex_unlock(&parser->mutex); + + if (parser->sink_connected) { - gst_bus_set_sync_handler(parser->bus, NULL, NULL, NULL); - gst_object_unref(parser->bus); + /* Unblock all of our streams. */ + pthread_mutex_lock(&parser->mutex); + for (i = 0; i < parser->stream_count; ++i) + { + parser->streams[i]->flushing = true; + pthread_cond_signal(&parser->streams[i]->event_empty_cond); + } + pthread_mutex_unlock(&parser->mutex); + + gst_element_set_state(parser->container, GST_STATE_NULL); + + for (i = 0; i < parser->stream_count; ++i) + free_stream(parser->streams[i]); + + parser->stream_count = 0; + free(parser->streams); + + gst_element_set_bus(parser->container, NULL); + gst_object_unref(parser->container); }
+ gst_bus_set_sync_handler(parser->bus, NULL, NULL, NULL); + gst_object_unref(parser->bus); + pthread_mutex_destroy(&parser->mutex); - pthread_cond_destroy(&parser->init_cond); + pthread_cond_destroy(&parser->state_cond); pthread_cond_destroy(&parser->read_cond);
free(parser); @@ -1724,7 +1708,6 @@ static const struct unix_funcs funcs = wg_parser_destroy,
wg_parser_connect, - wg_parser_disconnect,
wg_parser_begin_flush, wg_parser_end_flush,
Hi,
While running your changed tests, I think I found new failures. Being a bot and all I'm not very good at pattern recognition, so I might be wrong, but could you please double-check?
Full results can be found at: https://testbot.winehq.org/JobDetails.pl?Key=98179
Your paranoid android.
=== debiant2 (build log) ===
../wine/dlls/winegstreamer/wg_parser.c:1379:34: error: ‘struct wg_parser’ has no member named ‘init_cond’ Task: The win32 Wine build failed
=== debiant2 (build log) ===
../wine/dlls/winegstreamer/wg_parser.c:1379:34: error: ‘struct wg_parser’ has no member named ‘init_cond’ Task: The wow64 Wine build failed
On 9/16/21 4:00 PM, Derek Lesho wrote:
@@ -1702,14 +1658,42 @@ static struct wg_parser * CDECL wg_wave_parser_create(void)
static void CDECL wg_parser_destroy(struct wg_parser *parser) {
- if (parser->bus)
- unsigned int i;
- pthread_mutex_lock(&parser->mutex);
- parser->shutdown = true;
- pthread_cond_signal(&parser->read_cond);
- pthread_cond_wait(&parser->state_cond, &parser->mutex);
Well, you can't just call pthread_cond_wait() not in a loop like this.
The obvious course of action is to track whether we've returned false to the application in wg_parser_get_next_read_offset. I don't hate that course of action, although I don't love it either.
There's some other options here:
(2) Use a global mutex to prevent wg_parser_get_next_read_offset() from dereferencing a dead parser structure. This would be a decent solution if we wanted to be a lot more paranoid about the PE side, although I don't really think we do.
(3) Reference counting.
(4) Keep the disconnect/destroy split, except that we move all of the actual teardown to destroy, and disconnect serves only to unblock the read and streaming threads. We might even go so far as to get rid of sink_connected and just call pthread_cond_signal(), and say that wg_parser_get_next_read_offset() and wg_parser_stream_get_event() might spuriously return false. This is roughly the way libusb works, when using libusb_interrupt_event_handler().
I'm inclined to like (4) best, although I'm not married to it.
Signed-off-by: Derek Lesho dlesho@codeweavers.com --- dlls/winegstreamer/gst_private.h | 17 +- dlls/winegstreamer/media_source.c | 11 +- dlls/winegstreamer/quartz_parser.c | 40 +-- dlls/winegstreamer/wg_parser.c | 405 ++++++++++++++--------------- 4 files changed, 238 insertions(+), 235 deletions(-)
diff --git a/dlls/winegstreamer/gst_private.h b/dlls/winegstreamer/gst_private.h index 22d9547ed72..23d800f6247 100644 --- a/dlls/winegstreamer/gst_private.h +++ b/dlls/winegstreamer/gst_private.h @@ -156,16 +156,21 @@ struct wg_parser_event }; C_ASSERT(sizeof(struct wg_parser_event) == 40);
+enum wg_parser_type +{ + WG_DECODEBIN_PARSER, + WG_AVI_PARSER, + WG_MPEG_AUDIO_PARSER, + WG_WAVE_PARSER, +}; + +struct wg_parser; + struct unix_funcs { - struct wg_parser *(CDECL *wg_decodebin_parser_create)(void); - struct wg_parser *(CDECL *wg_avi_parser_create)(void); - struct wg_parser *(CDECL *wg_mpeg_audio_parser_create)(void); - struct wg_parser *(CDECL *wg_wave_parser_create)(void); + struct wg_parser *(CDECL *wg_parser_create)(enum wg_parser_type parser_type, uint64_t file_size); void (CDECL *wg_parser_destroy)(struct wg_parser *parser);
- HRESULT (CDECL *wg_parser_connect)(struct wg_parser *parser, uint64_t file_size); - void (CDECL *wg_parser_begin_flush)(struct wg_parser *parser); void (CDECL *wg_parser_end_flush)(struct wg_parser *parser);
diff --git a/dlls/winegstreamer/media_source.c b/dlls/winegstreamer/media_source.c index 6c2bf92e2a2..fa1ed002404 100644 --- a/dlls/winegstreamer/media_source.c +++ b/dlls/winegstreamer/media_source.c @@ -1324,7 +1324,7 @@ static HRESULT media_source_constructor(IMFByteStream *bytestream, struct media_ if (FAILED(hr = MFAllocateWorkQueue(&object->async_commands_queue))) goto fail;
- if (!(parser = unix_funcs->wg_decodebin_parser_create())) + if (!(parser = unix_funcs->wg_parser_create(WG_DECODEBIN_PARSER, file_size))) { hr = E_OUTOFMEMORY; goto fail; @@ -1335,9 +1335,6 @@ static HRESULT media_source_constructor(IMFByteStream *bytestream, struct media_
object->state = SOURCE_OPENING;
- if (FAILED(hr = unix_funcs->wg_parser_connect(parser, file_size))) - goto fail; - /* In Media Foundation, sources may read from any media source stream * without fear of blocking due to buffering limits on another. Trailmakers, * a Unity3D Engine game, only reads one sample from the audio stream (and @@ -1346,7 +1343,11 @@ static HRESULT media_source_constructor(IMFByteStream *bytestream, struct media_ * leak occurs with native. */ unix_funcs->wg_parser_set_unlimited_buffering(parser);
- stream_count = unix_funcs->wg_parser_get_stream_count(parser); + if (!(stream_count = unix_funcs->wg_parser_get_stream_count(parser))) + { + hr = E_FAIL; + goto fail; + }
if (!(object->streams = calloc(stream_count, sizeof(*object->streams)))) { diff --git a/dlls/winegstreamer/quartz_parser.c b/dlls/winegstreamer/quartz_parser.c index bf69a881d57..f21c820b21a 100644 --- a/dlls/winegstreamer/quartz_parser.c +++ b/dlls/winegstreamer/quartz_parser.c @@ -59,7 +59,7 @@ struct parser
HANDLE read_thread;
- struct wg_parser * (*parser_create)(void); + enum wg_parser_type parser_type; BOOL (*init_gst)(struct parser *filter); HRESULT (*source_query_accept)(struct parser_source *pin, const AM_MEDIA_TYPE *mt); HRESULT (*source_get_media_type)(struct parser_source *pin, unsigned int index, AM_MEDIA_TYPE *mt); @@ -974,17 +974,14 @@ static HRESULT parser_sink_connect(struct strmbase_sink *iface, IPin *peer, cons
IAsyncReader_Length(filter->reader, &file_size, &unused);
- if (!(filter->wg_parser = filter->parser_create())) + if (!(filter->wg_parser = unix_funcs->wg_parser_create(filter->parser_type, file_size))) { hr = E_OUTOFMEMORY; goto err; }
- filter->sink_connected = true; filter->read_thread = CreateThread(NULL, 0, read_thread, filter, 0, NULL);
- if (FAILED(hr = unix_funcs->wg_parser_connect(filter->wg_parser, file_size))) - goto err;
if (!filter->init_gst(filter)) { @@ -1000,6 +997,8 @@ static HRESULT parser_sink_connect(struct strmbase_sink *iface, IPin *peer, cons pin->seek.llCurrent = 0; }
+ filter->sink_connected = true; + return S_OK; err: GST_RemoveOutputPins(filter); @@ -1039,7 +1038,7 @@ static BOOL decodebin_parser_filter_init_gst(struct parser *filter) return FALSE; }
- return TRUE; + return !!stream_count; }
static HRESULT decodebin_parser_source_query_accept(struct parser_source *pin, const AM_MEDIA_TYPE *mt) @@ -1120,7 +1119,7 @@ HRESULT decodebin_parser_create(IUnknown *outer, IUnknown **out) if (!(object = calloc(1, sizeof(*object)))) return E_OUTOFMEMORY;
- object->parser_create = unix_funcs->wg_decodebin_parser_create; + object->parser_type = WG_DECODEBIN_PARSER;
strmbase_filter_init(&object->filter, outer, &CLSID_decodebin_parser, &filter_ops); strmbase_sink_init(&object->sink, &object->filter, L"input pin", &sink_ops, NULL); @@ -1553,9 +1552,6 @@ static HRESULT GST_RemoveOutputPins(struct parser *This)
unix_funcs->wg_parser_destroy(This->wg_parser);
- /* read_thread() needs to stay alive to service any read requests GStreamer - * sends, so we can only shut it down after GStreamer stops. */ - This->sink_connected = false; WaitForSingleObject(This->read_thread, INFINITE); CloseHandle(This->read_thread);
@@ -1568,6 +1564,7 @@ static HRESULT GST_RemoveOutputPins(struct parser *This) This->source_count = 0; free(This->sources); This->sources = NULL; + This->sink_connected = false;
BaseFilterImpl_IncrementPinVersion(&This->filter); return S_OK; @@ -1603,11 +1600,12 @@ static const struct strmbase_sink_ops wave_parser_sink_ops = static BOOL wave_parser_filter_init_gst(struct parser *filter) { struct wg_parser *parser = filter->wg_parser; + struct wg_parser_stream *stream = unix_funcs->wg_parser_get_stream(parser, 0);
- if (!create_pin(filter, unix_funcs->wg_parser_get_stream(parser, 0), L"output")) + if (!stream) return FALSE;
- return TRUE; + return !!create_pin(filter, stream, L"output"); }
static HRESULT wave_parser_source_query_accept(struct parser_source *pin, const AM_MEDIA_TYPE *mt) @@ -1647,7 +1645,7 @@ HRESULT wave_parser_create(IUnknown *outer, IUnknown **out) if (!(object = calloc(1, sizeof(*object)))) return E_OUTOFMEMORY;
- object->parser_create = unix_funcs->wg_wave_parser_create; + object->parser_type = WG_WAVE_PARSER;
strmbase_filter_init(&object->filter, outer, &CLSID_WAVEParser, &filter_ops); strmbase_sink_init(&object->sink, &object->filter, L"input pin", &wave_parser_sink_ops, NULL); @@ -1678,18 +1676,21 @@ static const struct strmbase_sink_ops avi_splitter_sink_ops = static BOOL avi_splitter_filter_init_gst(struct parser *filter) { struct wg_parser *parser = filter->wg_parser; + struct wg_parser_stream *stream; uint32_t i, stream_count; WCHAR source_name[20];
stream_count = unix_funcs->wg_parser_get_stream_count(parser); for (i = 0; i < stream_count; ++i) { + if (!(stream = unix_funcs->wg_parser_get_stream(parser, i))) + return FALSE; swprintf(source_name, ARRAY_SIZE(source_name), L"Stream %02u", i); - if (!create_pin(filter, unix_funcs->wg_parser_get_stream(parser, i), source_name)) + if (!create_pin(filter, stream, source_name)) return FALSE; }
- return TRUE; + return !!stream_count; }
static HRESULT avi_splitter_source_query_accept(struct parser_source *pin, const AM_MEDIA_TYPE *mt) @@ -1729,7 +1730,7 @@ HRESULT avi_splitter_create(IUnknown *outer, IUnknown **out) if (!(object = calloc(1, sizeof(*object)))) return E_OUTOFMEMORY;
- object->parser_create = unix_funcs->wg_avi_parser_create; + object->parser_type = WG_AVI_PARSER;
strmbase_filter_init(&object->filter, outer, &CLSID_AviSplitter, &filter_ops); strmbase_sink_init(&object->sink, &object->filter, L"input pin", &avi_splitter_sink_ops, NULL); @@ -1765,11 +1766,12 @@ static const struct strmbase_sink_ops mpeg_splitter_sink_ops = static BOOL mpeg_splitter_filter_init_gst(struct parser *filter) { struct wg_parser *parser = filter->wg_parser; + struct wg_parser_stream *stream = unix_funcs->wg_parser_get_stream(parser, 0);
- if (!create_pin(filter, unix_funcs->wg_parser_get_stream(parser, 0), L"Audio")) + if (!stream) return FALSE;
- return TRUE; + return !!create_pin(filter, stream, L"Audio"); }
static HRESULT mpeg_splitter_source_query_accept(struct parser_source *pin, const AM_MEDIA_TYPE *mt) @@ -1832,7 +1834,7 @@ HRESULT mpeg_splitter_create(IUnknown *outer, IUnknown **out) if (!(object = calloc(1, sizeof(*object)))) return E_OUTOFMEMORY;
- object->parser_create = unix_funcs->wg_mpeg_audio_parser_create; + object->parser_type = WG_MPEG_AUDIO_PARSER;
strmbase_filter_init(&object->filter, outer, &CLSID_MPEG1Splitter, &mpeg_splitter_ops); strmbase_sink_init(&object->sink, &object->filter, L"Input", &mpeg_splitter_sink_ops, NULL); diff --git a/dlls/winegstreamer/wg_parser.c b/dlls/winegstreamer/wg_parser.c index 5ab7991b0f2..2b2c8852303 100644 --- a/dlls/winegstreamer/wg_parser.c +++ b/dlls/winegstreamer/wg_parser.c @@ -45,8 +45,6 @@ GST_DEBUG_CATEGORY_STATIC(wine);
struct wg_parser { - BOOL (*init_gst)(struct wg_parser *parser); - struct wg_parser_stream **streams; unsigned int stream_count;
@@ -66,7 +64,7 @@ struct wg_parser bool pending; } read_request;
- bool flushing, sink_connected; + bool flushing; };
struct wg_parser_stream @@ -478,13 +476,59 @@ static bool wg_format_compare(const struct wg_format *a, const struct wg_format return false; }
+static bool wg_parser_initialize(struct wg_parser *parser) +{ + unsigned int i; + int ret; + + ret = gst_element_get_state(parser->container, NULL, NULL, -1); + if (ret == GST_STATE_CHANGE_FAILURE) + { + GST_ERROR("Failed to play stream.\n"); + return 0; + } + + pthread_mutex_lock(&parser->mutex); + while (!parser->no_more_pads && !parser->error) + pthread_cond_wait(&parser->state_cond, &parser->mutex); + if (parser->error) + { + pthread_mutex_unlock(&parser->mutex); + GST_ERROR("Failed to play stream.\n"); + return 0; + } + + for (i = 0; i < parser->stream_count; i++) + { + struct wg_parser_stream *stream = parser->streams[i]; + + while (!stream->has_caps && !parser->error) + pthread_cond_wait(&parser->state_cond, &parser->mutex); + + if (parser->error) + { + pthread_mutex_unlock(&parser->mutex); + return 0; + } + } + + pthread_mutex_unlock(&parser->mutex); + return 1; +} + static uint32_t CDECL wg_parser_get_stream_count(struct wg_parser *parser) { + if (!wg_parser_initialize(parser)) + return 0; + return parser->stream_count; }
static struct wg_parser_stream * CDECL wg_parser_get_stream(struct wg_parser *parser, uint32_t index) { + if (!wg_parser_initialize(parser)) + return 0; + return parser->streams[index]; }
@@ -492,6 +536,9 @@ static void CDECL wg_parser_begin_flush(struct wg_parser *parser) { unsigned int i;
+ if (!wg_parser_initialize(parser)) + return; + pthread_mutex_lock(&parser->mutex); parser->flushing = true; pthread_mutex_unlock(&parser->mutex); @@ -708,6 +755,87 @@ static void CDECL wg_parser_stream_release_buffer(struct wg_parser_stream *strea
static uint64_t CDECL wg_parser_stream_get_duration(struct wg_parser_stream *stream) { + struct wg_parser *parser = stream->parser; + gint64 duration; + + pthread_mutex_lock(&parser->mutex); + + if (stream->duration != UINT64_MAX) + { + pthread_mutex_unlock(&parser->mutex); + return stream->duration; + } + + if (!parser->flushing) + { + GST_WARNING("Parser must be flushing to retrieve duration.\n"); + pthread_mutex_unlock(&parser->mutex); + return UINT64_MAX; + } + + /* GStreamer doesn't actually provide any guarantees about when duration + * is available, even for seekable streams. It's basically built for + * applications that don't care, e.g. movie players that can display + * a duration once it's available, and update it visually if a better + * estimate is found. This doesn't really match well with DirectShow or + * Media Foundation, which both expect duration to be available + * immediately on connecting, so we have to use some complex heuristics + * to try to actually get a usable duration. + * + * Some elements (avidemux, wavparse, qtdemux) record duration almost + * immediately, before fixing caps. Such elements don't send + * duration-changed messages. Therefore always try querying duration + * after caps have been found. + * + * Some elements (mpegaudioparse) send duration-changed. In the case of + * a mp3 stream without seek tables it will not be sent immediately, but + * only after enough frames have been parsed to form an estimate. They + * may send it multiple times with increasingly accurate estimates, but + * unfortunately we have no way of knowing whether another estimate will + * be sent, so we always take the first one. We assume that if the + * duration is not immediately available then the element will always + * send duration-changed. + */ + + for (;;) + { + if (parser->error) + { + pthread_mutex_unlock(&parser->mutex); + return UINT64_MAX; + } + if (gst_pad_query_duration(stream->their_src, GST_FORMAT_TIME, &duration)) + { + stream->duration = duration / 100; + break; + } + + if (stream->eos) + { + stream->duration = 0; + GST_WARNING("Failed to query duration.\n"); + break; + } + + /* Elements based on GstBaseParse send duration-changed before + * actually updating the duration in GStreamer versions prior + * to 1.17.1. See gstreamer.git:d28e0b4147fe7073b2. So after + * receiving duration-changed we have to continue polling until + * the query succeeds. */ + if (parser->has_duration) + { + pthread_mutex_unlock(&parser->mutex); + g_usleep(10000); + pthread_mutex_lock(&parser->mutex); + } + else + { + pthread_cond_wait(&parser->state_cond, &parser->mutex); + } + } + + pthread_mutex_unlock(&parser->mutex); + return stream->duration; }
@@ -1069,6 +1197,8 @@ static struct wg_parser_stream *create_stream(struct wg_parser *parser) gst_pad_set_event_function(stream->my_sink, sink_event_cb); gst_pad_set_query_function(stream->my_sink, sink_query_cb);
+ stream->duration = UINT64_MAX; + parser->streams[parser->stream_count++] = stream; return stream; } @@ -1336,153 +1466,6 @@ static GstBusSyncReply bus_handler_cb(GstBus *bus, GstMessage *msg, gpointer use return GST_BUS_DROP; }
-static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_size) -{ - unsigned int i; - int ret; - - if (!parser->bus) - { - parser->bus = gst_bus_new(); - gst_bus_set_sync_handler(parser->bus, bus_handler_cb, parser, NULL); - } - - parser->container = gst_bin_new(NULL); - gst_element_set_bus(parser->container, parser->bus); - - if (!(parser->appsrc = create_element("appsrc", "base"))) - return E_FAIL; - gst_bin_add(GST_BIN(parser->container), parser->appsrc); - - g_object_set(parser->appsrc, "stream-type", GST_APP_STREAM_TYPE_RANDOM_ACCESS, NULL); - g_object_set(parser->appsrc, "size", file_size, NULL); - g_signal_connect(parser->appsrc, "need-data", G_CALLBACK(src_need_data), parser); - g_signal_connect(parser->appsrc, "seek-data", G_CALLBACK(src_seek_data), parser); - - parser->read_request.offset = 0; - parser->error = false; - - if (!parser->init_gst(parser)) - goto out; - - gst_element_set_state(parser->container, GST_STATE_PAUSED); - ret = gst_element_get_state(parser->container, NULL, NULL, -1); - if (ret == GST_STATE_CHANGE_FAILURE) - { - GST_ERROR("Failed to play stream.\n"); - goto out; - } - - pthread_mutex_lock(&parser->mutex); - - while (!parser->no_more_pads && !parser->error) - pthread_cond_wait(&parser->init_cond, &parser->mutex); - if (parser->error) - { - pthread_mutex_unlock(&parser->mutex); - goto out; - } - - for (i = 0; i < parser->stream_count; ++i) - { - struct wg_parser_stream *stream = parser->streams[i]; - gint64 duration; - - while (!stream->has_caps && !parser->error) - pthread_cond_wait(&parser->state_cond, &parser->mutex); - - /* GStreamer doesn't actually provide any guarantees about when duration - * is available, even for seekable streams. It's basically built for - * applications that don't care, e.g. movie players that can display - * a duration once it's available, and update it visually if a better - * estimate is found. This doesn't really match well with DirectShow or - * Media Foundation, which both expect duration to be available - * immediately on connecting, so we have to use some complex heuristics - * to try to actually get a usable duration. - * - * Some elements (avidemux, wavparse, qtdemux) record duration almost - * immediately, before fixing caps. Such elements don't send - * duration-changed messages. Therefore always try querying duration - * after caps have been found. - * - * Some elements (mpegaudioparse) send duration-changed. In the case of - * a mp3 stream without seek tables it will not be sent immediately, but - * only after enough frames have been parsed to form an estimate. They - * may send it multiple times with increasingly accurate estimates, but - * unfortunately we have no way of knowing whether another estimate will - * be sent, so we always take the first one. We assume that if the - * duration is not immediately available then the element will always - * send duration-changed. - */ - - for (;;) - { - if (parser->error) - { - pthread_mutex_unlock(&parser->mutex); - goto out; - } - if (gst_pad_query_duration(stream->their_src, GST_FORMAT_TIME, &duration)) - { - stream->duration = duration / 100; - break; - } - - if (stream->eos) - { - stream->duration = 0; - GST_WARNING("Failed to query duration.\n"); - break; - } - - /* Elements based on GstBaseParse send duration-changed before - * actually updating the duration in GStreamer versions prior - * to 1.17.1. See gstreamer.git:d28e0b4147fe7073b2. So after - * receiving duration-changed we have to continue polling until - * the query succeeds. */ - if (parser->has_duration) - { - pthread_mutex_unlock(&parser->mutex); - g_usleep(10000); - pthread_mutex_lock(&parser->mutex); - } - else - { - pthread_cond_wait(&parser->state_cond, &parser->mutex); - } - } - } - - pthread_mutex_unlock(&parser->mutex); - - parser->sink_connected = true; - return S_OK; - -out: - if (parser->container) - gst_element_set_state(parser->container, GST_STATE_NULL); - - for (i = 0; i < parser->stream_count; ++i) - free_stream(parser->streams[i]); - parser->stream_count = 0; - free(parser->streams); - parser->streams = NULL; - - if (parser->container) - { - gst_element_set_bus(parser->container, NULL); - gst_object_unref(parser->container); - parser->container = NULL; - } - - pthread_mutex_lock(&parser->mutex); - parser->sink_connected = false; - pthread_mutex_unlock(&parser->mutex); - pthread_cond_signal(&parser->read_cond); - - return E_FAIL; -} - static BOOL decodebin_parser_init_gst(struct wg_parser *parser) { GstElement *element; @@ -1564,7 +1547,6 @@ static BOOL mpeg_audio_parser_init_gst(struct wg_parser *parser) return FALSE; } gst_pad_set_active(stream->my_sink, 1); - parser->no_more_pads = true;
return TRUE; @@ -1598,61 +1580,82 @@ static BOOL wave_parser_init_gst(struct wg_parser *parser) return FALSE; } gst_pad_set_active(stream->my_sink, 1); - parser->no_more_pads = true;
return TRUE; }
-static struct wg_parser *wg_parser_create(void) +static struct wg_parser * CDECL wg_parser_create(enum wg_parser_type parser_type, uint64_t file_size) { struct wg_parser *parser; + BOOL ret;
if (!(parser = calloc(1, sizeof(*parser)))) return NULL;
+ if (!(parser->appsrc = create_element("appsrc", "base"))) + { + free(parser); + return NULL; + } + pthread_mutex_init(&parser->mutex, NULL); pthread_cond_init(&parser->state_cond, NULL); pthread_cond_init(&parser->read_cond, NULL); parser->flushing = true;
- GST_DEBUG("Created winegstreamer parser %p.\n", parser); - return parser; -} + parser->bus = gst_bus_new(); + gst_bus_set_sync_handler(parser->bus, bus_handler_cb, parser, NULL);
-static struct wg_parser * CDECL wg_decodebin_parser_create(void) -{ - struct wg_parser *parser; + parser->container = gst_bin_new(NULL); + gst_element_set_bus(parser->container, parser->bus);
- if ((parser = wg_parser_create())) - parser->init_gst = decodebin_parser_init_gst; - return parser; -} + gst_bin_add(GST_BIN(parser->container), parser->appsrc);
-static struct wg_parser * CDECL wg_avi_parser_create(void) -{ - struct wg_parser *parser; + g_object_set(parser->appsrc, "stream-type", GST_APP_STREAM_TYPE_RANDOM_ACCESS, NULL); + g_object_set(parser->appsrc, "size", file_size, NULL); + g_signal_connect(parser->appsrc, "need-data", G_CALLBACK(src_need_data), parser); + g_signal_connect(parser->appsrc, "seek-data", G_CALLBACK(src_seek_data), parser);
- if ((parser = wg_parser_create())) - parser->init_gst = avi_parser_init_gst; - return parser; -} + parser->read_request.offset = 0;
-static struct wg_parser * CDECL wg_mpeg_audio_parser_create(void) -{ - struct wg_parser *parser; + switch (parser_type) + { + case WG_DECODEBIN_PARSER: + ret = decodebin_parser_init_gst(parser); + break; + case WG_AVI_PARSER: + ret = avi_parser_init_gst(parser); + break; + case WG_MPEG_AUDIO_PARSER: + ret = mpeg_audio_parser_init_gst(parser); + break; + case WG_WAVE_PARSER: + ret = wave_parser_init_gst(parser); + break; + default: + assert(0); + }
- if ((parser = wg_parser_create())) - parser->init_gst = mpeg_audio_parser_init_gst; - return parser; -} + if (!ret) + { + gst_element_set_bus(parser->container, NULL); + gst_object_unref(parser->container);
-static struct wg_parser * CDECL wg_wave_parser_create(void) -{ - struct wg_parser *parser; + gst_bus_set_sync_handler(parser->bus, NULL, NULL, NULL); + gst_object_unref(parser->bus); + + pthread_mutex_destroy(&parser->mutex); + pthread_cond_destroy(&parser->state_cond); + pthread_cond_destroy(&parser->read_cond); + + free(parser); + return NULL; + } + + gst_element_set_state(parser->container, GST_STATE_PAUSED);
- if ((parser = wg_parser_create())) - parser->init_gst = wave_parser_init_gst; + GST_DEBUG("Created winegstreamer parser %p.\n", parser); return parser; }
@@ -1666,28 +1669,25 @@ static void CDECL wg_parser_destroy(struct wg_parser *parser) pthread_cond_wait(&parser->state_cond, &parser->mutex); pthread_mutex_unlock(&parser->mutex);
- if (parser->sink_connected) + /* Unblock all of our streams. */ + pthread_mutex_lock(&parser->mutex); + for (i = 0; i < parser->stream_count; ++i) { - /* Unblock all of our streams. */ - pthread_mutex_lock(&parser->mutex); - for (i = 0; i < parser->stream_count; ++i) - { - parser->streams[i]->flushing = true; - pthread_cond_signal(&parser->streams[i]->event_empty_cond); - } - pthread_mutex_unlock(&parser->mutex); + parser->streams[i]->flushing = true; + pthread_cond_signal(&parser->streams[i]->event_empty_cond); + } + pthread_mutex_unlock(&parser->mutex);
- gst_element_set_state(parser->container, GST_STATE_NULL); + gst_element_set_state(parser->container, GST_STATE_NULL);
- for (i = 0; i < parser->stream_count; ++i) - free_stream(parser->streams[i]); + for (i = 0; i < parser->stream_count; ++i) + free_stream(parser->streams[i]);
- parser->stream_count = 0; - free(parser->streams); + parser->stream_count = 0; + free(parser->streams);
- gst_element_set_bus(parser->container, NULL); - gst_object_unref(parser->container); - } + gst_element_set_bus(parser->container, NULL); + gst_object_unref(parser->container);
gst_bus_set_sync_handler(parser->bus, NULL, NULL, NULL); gst_object_unref(parser->bus); @@ -1701,14 +1701,9 @@ static void CDECL wg_parser_destroy(struct wg_parser *parser)
static const struct unix_funcs funcs = { - wg_decodebin_parser_create, - wg_avi_parser_create, - wg_mpeg_audio_parser_create, - wg_wave_parser_create, + wg_parser_create, wg_parser_destroy,
- wg_parser_connect, - wg_parser_begin_flush, wg_parser_end_flush,
On 9/16/21 4:00 PM, Derek Lesho wrote:
Signed-off-by: Derek Lesho dlesho@codeweavers.com
dlls/winegstreamer/gst_private.h | 17 +- dlls/winegstreamer/media_source.c | 11 +- dlls/winegstreamer/quartz_parser.c | 40 +-- dlls/winegstreamer/wg_parser.c | 405 ++++++++++++++--------------- 4 files changed, 238 insertions(+), 235 deletions(-)
This patch does an awful lot of things at once, e.g.:
(1) waiting for initialization in some other functions instead of in wg_parser_connect()
(2) waiting for duration in wg_parser_stream_get_duration()
(3) merging the separate wg_*_parser_create() functions together
(4) merging create with connect
Can it be split up?
It would also be nice to have a decent explanation for (1). I know we discussed this in private, but that kind of thing should show up in the patch itself as well. It's also not clear to me how much we need to have initialized, and when, for push-mode clients; that kind of context would be pretty helpful when reviewing.