WMSyncReader starts a background read thread that reads from the IStream passed to IWMSyncReader::OpenStream. This means it could use the IStream in the background even when no IWMSyncReader methods are being called.
For well-behaved applications, this is probably OK. However, AQUARIUM (Steam 2515070) frees the IStream it passes to WMSyncReader _before_ it calls IWMSyncReader::Close, which stops the read thread. This causes the read thread to access freed memory. This is improper, but not unreasonable, as IWMSyncReader is supposed to be a synchronous interface, so one might assume when they weren't calling into IWMSyncReader methods, the IStream won't be used.
This commit adds a `wg_parser_dont_read` function, which can be used to stop wg_parser from issuing read requests. This is used by IWMSyncReader to make sure read requests are only issued when IWMSyncReader methods are being called.
-- v2: winegstreamer: Make sure WMSyncReader never reads in the background.
From: Yuxuan Shui yshui@codeweavers.com
WMSyncReader starts a background read thread that reads from the IStream passed to IWMSyncReader::OpenStream. This means it could use the IStream in the background even when no IWMSyncReader methods are being called.
For well-behaved applications, this is probably OK. However, AQUARIUM (Steam 2515070) frees the IStream it passes to WMSyncReader _before_ it calls IWMSyncReader::Close, which stops the read thread. This causes the read thread to access freed memory. This is improper, but not unreasonable, as IWMSyncReader is supposed to be a synchronous interface, so one might assume when they weren't calling into IWMSyncReader methods, the IStream won't be used.
This commit adds a `wg_parser_dont_read` function, which can be used to stop wg_parser from issuing read requests. This is used by IWMSyncReader to make sure read requests are only issued when IWMSyncReader methods are being called. --- dlls/winegstreamer/gst_private.h | 1 + dlls/winegstreamer/main.c | 12 ++++++++++ dlls/winegstreamer/unixlib.h | 8 +++++++ dlls/winegstreamer/wg_parser.c | 38 ++++++++++++++++++++++++++++++-- dlls/winegstreamer/wm_reader.c | 12 +++++++++- 5 files changed, 68 insertions(+), 3 deletions(-)
diff --git a/dlls/winegstreamer/gst_private.h b/dlls/winegstreamer/gst_private.h index 9db484c033f..05a1bccd958 100644 --- a/dlls/winegstreamer/gst_private.h +++ b/dlls/winegstreamer/gst_private.h @@ -74,6 +74,7 @@ void wg_sample_queue_flush(struct wg_sample_queue *queue, bool all);
wg_parser_t wg_parser_create(bool output_compressed); void wg_parser_destroy(wg_parser_t parser); +void wg_parser_dont_read(wg_parser_t parser, bool dont_read);
HRESULT wg_parser_connect(wg_parser_t parser, uint64_t file_size, const WCHAR *uri); void wg_parser_disconnect(wg_parser_t parser); diff --git a/dlls/winegstreamer/main.c b/dlls/winegstreamer/main.c index d18db8b21e8..fa9510ab1f2 100644 --- a/dlls/winegstreamer/main.c +++ b/dlls/winegstreamer/main.c @@ -189,6 +189,18 @@ void wg_parser_destroy(wg_parser_t parser) WINE_UNIX_CALL(unix_wg_parser_destroy, &parser); }
+void wg_parser_dont_read(wg_parser_t parser, bool dont_read) +{ + struct wg_parser_dont_read_params params = + { + .parser = parser, + .dont_read = dont_read, + }; + TRACE("%#I64x %d\n", parser, dont_read); + + WINE_UNIX_CALL(unix_wg_parser_dont_read, ¶ms); +} + HRESULT wg_parser_connect(wg_parser_t parser, uint64_t file_size, const WCHAR *uri) { struct wg_parser_connect_params params = diff --git a/dlls/winegstreamer/unixlib.h b/dlls/winegstreamer/unixlib.h index 19270bd731b..e24702b4a03 100644 --- a/dlls/winegstreamer/unixlib.h +++ b/dlls/winegstreamer/unixlib.h @@ -223,6 +223,12 @@ struct wg_parser_create_params UINT8 warn_on; };
+struct wg_parser_dont_read_params +{ + wg_parser_t parser; + UINT8 dont_read; +}; + struct wg_parser_connect_params { wg_parser_t parser; @@ -421,6 +427,8 @@ enum unix_funcs unix_wg_parser_create, unix_wg_parser_destroy,
+ unix_wg_parser_dont_read, + unix_wg_parser_connect, unix_wg_parser_disconnect,
diff --git a/dlls/winegstreamer/wg_parser.c b/dlls/winegstreamer/wg_parser.c index 710cfe6a0a5..a23fc95beea 100644 --- a/dlls/winegstreamer/wg_parser.c +++ b/dlls/winegstreamer/wg_parser.c @@ -81,13 +81,14 @@ struct wg_parser bool no_more_pads, has_duration, error; bool err_on, warn_on;
- pthread_cond_t read_cond, read_done_cond; + pthread_cond_t read_cond, read_done_cond, dont_read_cond; struct { GstBuffer *buffer; uint64_t offset; uint32_t size; bool done; + uint32_t n_read_tokens; GstFlowReturn ret; } read_request;
@@ -221,7 +222,7 @@ static NTSTATUS wg_parser_push_data(void *args) parser->read_request.size = 0;
pthread_mutex_unlock(&parser->mutex); - pthread_cond_signal(&parser->read_done_cond); + pthread_cond_broadcast(&parser->read_done_cond);
return S_OK; } @@ -1058,6 +1059,8 @@ static GstFlowReturn issue_read_request(struct wg_parser *parser, guint64 offset GstFlowReturn ret;
pthread_mutex_lock(&parser->mutex); + while (parser->read_request.n_read_tokens == 0) + pthread_cond_wait(&parser->dont_read_cond, &parser->mutex);
assert(!parser->read_request.size); parser->read_request.buffer = *buffer; @@ -1564,6 +1567,30 @@ static void query_tags(struct wg_parser_stream *stream) gst_object_unref(peer); }
+static NTSTATUS wg_parser_dont_read(void *args) +{ + const struct wg_parser_dont_read_params *params = args; + struct wg_parser *parser = get_parser(params->parser); + pthread_mutex_lock(&parser->mutex); + + if (!params->dont_read) + { + if ((parser->read_request.n_read_tokens++) == 0) + pthread_cond_signal(&parser->dont_read_cond); + } + else if (parser->read_request.n_read_tokens == 0) + GST_ERROR("Trying to stop read when it's already stopped!"); + else if ((--parser->read_request.n_read_tokens) == 0) + { + /* wait for current read condition to finish */ + while (!parser->read_request.done) + pthread_cond_wait(&parser->read_done_cond, &parser->mutex); + } + + pthread_mutex_unlock(&parser->mutex); + return S_OK; +} + static NTSTATUS wg_parser_connect(void *args) { GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE("quartz_src", @@ -1828,9 +1855,11 @@ static NTSTATUS wg_parser_create(void *args) pthread_cond_init(&parser->init_cond, NULL); pthread_cond_init(&parser->read_cond, NULL); pthread_cond_init(&parser->read_done_cond, NULL); + pthread_cond_init(&parser->dont_read_cond, NULL); parser->output_compressed = params->output_compressed; parser->err_on = params->err_on; parser->warn_on = params->warn_on; + parser->read_request.n_read_tokens = 1; GST_DEBUG("Created winegstreamer parser %p.", parser); params->parser = (wg_parser_t)(ULONG_PTR)parser; return S_OK; @@ -1850,6 +1879,7 @@ static NTSTATUS wg_parser_destroy(void *args) pthread_cond_destroy(&parser->init_cond); pthread_cond_destroy(&parser->read_cond); pthread_cond_destroy(&parser->read_done_cond); + pthread_cond_destroy(&parser->dont_read_cond);
free(parser->uri); free(parser); @@ -1864,6 +1894,8 @@ const unixlib_entry_t __wine_unix_call_funcs[] = X(wg_parser_create), X(wg_parser_destroy),
+ X(wg_parser_dont_read), + X(wg_parser_connect), X(wg_parser_disconnect),
@@ -2261,6 +2293,8 @@ const unixlib_entry_t __wine_unix_call_wow64_funcs[] = X(wg_parser_create), X(wg_parser_destroy),
+ X(wg_parser_dont_read), + X64(wg_parser_connect), X(wg_parser_disconnect),
diff --git a/dlls/winegstreamer/wm_reader.c b/dlls/winegstreamer/wm_reader.c index b3e2c4b46e3..07f27cc58ea 100644 --- a/dlls/winegstreamer/wm_reader.c +++ b/dlls/winegstreamer/wm_reader.c @@ -1539,7 +1539,7 @@ static HRESULT init_stream(struct wm_reader *reader) * Now that they're all enabled seek back to the start again. */ wg_parser_stream_seek(reader->streams[0].wg_stream, 1.0, 0, 0, AM_SEEKING_AbsolutePositioning, AM_SEEKING_NoPositioning); - + wg_parser_dont_read(reader->wg_parser, true); return S_OK;
out_disconnect_parser: @@ -1566,6 +1566,7 @@ static HRESULT reinit_stream(struct wm_reader *reader, bool read_compressed) HRESULT hr; WORD i;
+ wg_parser_dont_read(reader->wg_parser, false); wg_parser_disconnect(reader->wg_parser);
EnterCriticalSection(&reader->shutdown_cs); @@ -1615,6 +1616,7 @@ static HRESULT reinit_stream(struct wm_reader *reader, bool read_compressed) * Now that they're all enabled seek back to the start again. */ wg_parser_stream_seek(reader->streams[0].wg_stream, 1.0, 0, 0, AM_SEEKING_AbsolutePositioning, AM_SEEKING_NoPositioning); + wg_parser_dont_read(reader->wg_parser, true);
return S_OK;
@@ -1906,6 +1908,7 @@ static HRESULT WINAPI reader_Close(IWMSyncReader2 *iface) return NS_E_INVALID_REQUEST; }
+ wg_parser_dont_read(reader->wg_parser, false); wg_parser_disconnect(reader->wg_parser);
EnterCriticalSection(&reader->shutdown_cs); @@ -1989,6 +1992,7 @@ static HRESULT WINAPI reader_GetNextSample(IWMSyncReader2 *iface, if (!stream_number && !output_number && !ret_stream_number) return E_INVALIDARG;
+ wg_parser_dont_read(reader->wg_parser, false); EnterCriticalSection(&reader->cs);
if (!stream_number) @@ -2028,6 +2032,7 @@ static HRESULT WINAPI reader_GetNextSample(IWMSyncReader2 *iface, *ret_stream_number = stream_number;
LeaveCriticalSection(&reader->cs); + wg_parser_dont_read(reader->wg_parser, true); return hr; }
@@ -2471,6 +2476,7 @@ static HRESULT WINAPI reader_SetRange(IWMSyncReader2 *iface, QWORD start, LONGLO
TRACE("reader %p, start %I64u, duration %I64d.\n", reader, start, duration);
+ wg_parser_dont_read(reader->wg_parser, false); EnterCriticalSection(&reader->cs);
reader->start_time = start; @@ -2482,6 +2488,7 @@ static HRESULT WINAPI reader_SetRange(IWMSyncReader2 *iface, QWORD start, LONGLO reader->streams[i].eos = false;
LeaveCriticalSection(&reader->cs); + wg_parser_dont_read(reader->wg_parser, true); return S_OK; }
@@ -2528,6 +2535,7 @@ static HRESULT WINAPI reader_SetStreamsSelected(IWMSyncReader2 *iface, if (!count) return E_INVALIDARG;
+ wg_parser_dont_read(reader->wg_parser, false); EnterCriticalSection(&reader->cs);
for (i = 0; i < count; ++i) @@ -2536,6 +2544,7 @@ static HRESULT WINAPI reader_SetStreamsSelected(IWMSyncReader2 *iface, { LeaveCriticalSection(&reader->cs); WARN("Invalid stream number %u; returning NS_E_INVALID_REQUEST.\n", stream_numbers[i]); + wg_parser_dont_read(reader->wg_parser, true); return NS_E_INVALID_REQUEST; } } @@ -2569,6 +2578,7 @@ static HRESULT WINAPI reader_SetStreamsSelected(IWMSyncReader2 *iface, }
LeaveCriticalSection(&reader->cs); + wg_parser_dont_read(reader->wg_parser, true); return S_OK; }
Why do you need to plumb anything into the Unix library?
Generally speaking I would imagine you can achieve this by using a PE-side mutex, which is acquired by the Unix thread, and released only when calling functions.
Better—assuming I remember correctly that the synchronous reader is indeed supposed to synchronously read from the same thread—would probably be to get rid of the read thread, add the ability to check for read requests without blocking, and actually perform reads directly when waiting for samples. I made this proposal some time ago, but it was ignored, like many others.
On Tue Apr 8 23:18:26 2025 +0000, Elizabeth Figura wrote:
I did consider removing the read thread, quoting myself from CodeWeavers tracker, replying to a comment from @rbernon.
... getting rid of the read thread would basically mean a full redesign of wg_parser. and from what I heard from you a redesign/alternative is already in the works? (although i am not clear what the plan/current progress is). so it doesn't make sense to duplicate the work.
Unless you think there's a way to use `wg_parser` without the read thread?
Not sure exactly how you would suggest using a PE-side mutex to implement this. One way I tried couldn't account for `WMSyncReader` method being called concurrently. This MR makes sure that the read thread can read iff there is >= 1 threads inside `WMSyncReader` methods.
... getting rid of the read thread would basically mean a full redesign of wg_parser.
It really wouldn't, as I tried to say over and over again, and this is why the entire fiasco was so frustrating. Nobody listened.
Unless you think there's a way to use `wg_parser` without the read thread?
It would require modification, but less than you might think. There's nothing preventing you from calling wg_parser_get_next_read_offset / wg_parser_push_data from the same thread that you're pulling samples from.
The reason this doesn't work is that both wg_parser_get_next_read_offset and wg_parser_stream_get_buffer are currently blocking calls, and since both can be triggered by asynchronous work, you don't know whether or not you need more data or not. In order to fix this, you'd probably want to modify wg_parser_stream_get_buffer() to allow being interrupted if a read request comes in, whereupon it'd probably return a special status to signify that.
Not sure exactly how you would suggest using a PE-side mutex to implement this. One way I tried couldn't account for `WMSyncReader` method being called concurrently. This MR makes sure that the read thread can read iff there is >= 1 threads inside `WMSyncReader` methods.
Sorry, I probably wasn't clear enough. The idea would not be to use the mutex as a normal mutex, but basically invert it. I guess you'd actually have to do it with a semaphore or auto-reset event, though, since you don't want to tie it to any specific thread. Basically, you'd want the semaphore to start out in acquired state; acquire the semaphore in the read thread before doing any read applications and release it afterwards; and *release* the semaphore when entering a function like GetNextSample() and acquire it when exiting.