WMSyncReader starts a background read thread that reads from the IStream passed to IWMSyncReader::OpenStream. This means it could use the IStream in the background even when no IWMSyncReader methods are being called.
For well-behaved applications, this is probably OK. However, AQUARIUM (Steam 2515070) frees the IStream it passes to WMSyncReader _before_ it calls IWMSyncReader::Close, which stops the read thread. This causes the read thread to access freed memory. This is improper, but not unreasonable, as IWMSyncReader is supposed to be a synchronous interface, so one might assume when they weren't calling into IWMSyncReader methods, the IStream won't be used.
This commit adds a `wg_parser_dont_read` function, which can be used to stop wg_parser from issuing read requests. This is used by IWMSyncReader to make sure read requests are only issued when IWMSyncReader methods are being called.
-- v3: winegstreamer: Make sure WMSyncReader never reads in the background.
From: Yuxuan Shui yshui@codeweavers.com
WMSyncReader starts a background read thread that reads from the IStream passed to IWMSyncReader::OpenStream. This means it could use the IStream in the background even when no IWMSyncReader methods are being called.
For well-behaved applications, this is probably OK. However, AQUARIUM (Steam 2515070) frees the IStream it passes to WMSyncReader _before_ it calls IWMSyncReader::Close, which stops the read thread. This causes the read thread to access freed memory. This is improper, but not unreasonable, as IWMSyncReader is supposed to be a synchronous interface, so one might assume when they weren't calling into IWMSyncReader methods, the IStream won't be used.
This commit adds a semaphore around the read operations in the WMSyncReader read thread, which is released when a IWMSyncReader method is called to allow reading, and is re-acquired before exiting from the IWMSyncReader method to make sure read cannot happen in the background. --- dlls/winegstreamer/wm_reader.c | 60 +++++++++++++++++++++++++++++++--- 1 file changed, 56 insertions(+), 4 deletions(-)
diff --git a/dlls/winegstreamer/wm_reader.c b/dlls/winegstreamer/wm_reader.c index b3e2c4b46e3..23b5d259b89 100644 --- a/dlls/winegstreamer/wm_reader.c +++ b/dlls/winegstreamer/wm_reader.c @@ -61,6 +61,7 @@ struct wm_reader IStream *source_stream; HANDLE file; HANDLE read_thread; + HANDLE read_sem; bool read_thread_shutdown; wg_parser_t wg_parser;
@@ -641,8 +642,10 @@ static DWORD CALLBACK read_thread(void *arg) }
ret_size = 0; - + hr = S_OK; large_offset.QuadPart = offset; + + WaitForSingleObject(reader->read_sem, INFINITE); if (file) { if (!SetFilePointerEx(file, large_offset, NULL, FILE_BEGIN) @@ -650,7 +653,7 @@ static DWORD CALLBACK read_thread(void *arg) { ERR("Failed to read %u bytes at offset %I64u, error %lu.\n", size, offset, GetLastError()); wg_parser_push_data(reader->wg_parser, NULL, 0); - continue; + hr = E_FAIL; } } else @@ -661,9 +664,10 @@ static DWORD CALLBACK read_thread(void *arg) { ERR("Failed to read %u bytes at offset %I64u, hr %#lx.\n", size, offset, hr); wg_parser_push_data(reader->wg_parser, NULL, 0); - continue; } } + ReleaseSemaphore(reader->read_sem, 1, NULL); + if (FAILED(hr)) continue;
if (ret_size != size) ERR("Unexpected short read: requested %u bytes, got %lu.\n", size, ret_size); @@ -1479,6 +1483,12 @@ static HRESULT init_stream(struct wm_reader *reader) reader->wg_parser = wg_parser; reader->read_thread_shutdown = false;
+ if (!(reader->read_sem = CreateSemaphoreA(NULL, 1, LONG_MAX, NULL))) + { + hr = E_OUTOFMEMORY; + goto out_destroy_parser; + } + if (!(reader->read_thread = CreateThread(NULL, 0, read_thread, reader, 0, NULL))) { hr = E_OUTOFMEMORY; @@ -1539,7 +1549,12 @@ static HRESULT init_stream(struct wm_reader *reader) * Now that they're all enabled seek back to the start again. */ wg_parser_stream_seek(reader->streams[0].wg_stream, 1.0, 0, 0, AM_SEEKING_AbsolutePositioning, AM_SEEKING_NoPositioning); - + /* Pause the read thread */ + if (WaitForSingleObject(reader->read_sem, INFINITE) != WAIT_OBJECT_0) + { + ERR("Failed to wait for read semaphore.\n"); + goto out_disconnect_parser; + } return S_OK;
out_disconnect_parser: @@ -1554,6 +1569,11 @@ out_shutdown_thread: reader->read_thread = NULL;
out_destroy_parser: + if (reader->read_sem) + { + CloseHandle(reader->read_sem); + reader->read_sem = NULL; + } wg_parser_destroy(reader->wg_parser); reader->wg_parser = 0;
@@ -1566,6 +1586,7 @@ static HRESULT reinit_stream(struct wm_reader *reader, bool read_compressed) HRESULT hr; WORD i;
+ ReleaseSemaphore(reader->read_sem, 1, NULL); wg_parser_disconnect(reader->wg_parser);
EnterCriticalSection(&reader->shutdown_cs); @@ -1573,7 +1594,9 @@ static HRESULT reinit_stream(struct wm_reader *reader, bool read_compressed) LeaveCriticalSection(&reader->shutdown_cs); WaitForSingleObject(reader->read_thread, INFINITE); CloseHandle(reader->read_thread); + CloseHandle(reader->read_sem); reader->read_thread = NULL; + reader->read_sem = NULL;
destroy_stream(reader); wg_parser_destroy(reader->wg_parser); @@ -1584,6 +1607,11 @@ static HRESULT reinit_stream(struct wm_reader *reader, bool read_compressed)
reader->wg_parser = wg_parser; reader->read_thread_shutdown = false; + if (!(reader->read_sem = CreateSemaphoreA(NULL, 1, LONG_MAX, NULL))) + { + hr = E_OUTOFMEMORY; + goto out_destroy_parser; + }
if (!(reader->read_thread = CreateThread(NULL, 0, read_thread, reader, 0, NULL))) { @@ -1615,6 +1643,11 @@ static HRESULT reinit_stream(struct wm_reader *reader, bool read_compressed) * Now that they're all enabled seek back to the start again. */ wg_parser_stream_seek(reader->streams[0].wg_stream, 1.0, 0, 0, AM_SEEKING_AbsolutePositioning, AM_SEEKING_NoPositioning); + if (WaitForSingleObject(reader->read_sem, INFINITE) != WAIT_OBJECT_0) + { + ERR("Failed to wait for read semaphore.\n"); + goto out_shutdown_thread; + }
return S_OK;
@@ -1627,6 +1660,11 @@ out_shutdown_thread: reader->read_thread = NULL;
out_destroy_parser: + if (reader->read_sem) + { + CloseHandle(reader->read_sem); + reader->read_sem = NULL; + } destroy_stream(reader); wg_parser_destroy(reader->wg_parser); reader->wg_parser = 0; @@ -1906,6 +1944,7 @@ static HRESULT WINAPI reader_Close(IWMSyncReader2 *iface) return NS_E_INVALID_REQUEST; }
+ ReleaseSemaphore(reader->read_sem, 1, NULL); wg_parser_disconnect(reader->wg_parser);
EnterCriticalSection(&reader->shutdown_cs); @@ -1913,7 +1952,9 @@ static HRESULT WINAPI reader_Close(IWMSyncReader2 *iface) LeaveCriticalSection(&reader->shutdown_cs); WaitForSingleObject(reader->read_thread, INFINITE); CloseHandle(reader->read_thread); + CloseHandle(reader->read_sem); reader->read_thread = NULL; + reader->read_sem = NULL;
destroy_stream(reader); wg_parser_destroy(reader->wg_parser); @@ -1989,6 +2030,7 @@ static HRESULT WINAPI reader_GetNextSample(IWMSyncReader2 *iface, if (!stream_number && !output_number && !ret_stream_number) return E_INVALIDARG;
+ ReleaseSemaphore(reader->read_sem, 1, NULL); EnterCriticalSection(&reader->cs);
if (!stream_number) @@ -2028,6 +2070,8 @@ static HRESULT WINAPI reader_GetNextSample(IWMSyncReader2 *iface, *ret_stream_number = stream_number;
LeaveCriticalSection(&reader->cs); + if (WaitForSingleObject(reader->read_sem, INFINITE) != WAIT_OBJECT_0) + ERR("Failed to wait for read thread to pause.\n"); return hr; }
@@ -2471,6 +2515,7 @@ static HRESULT WINAPI reader_SetRange(IWMSyncReader2 *iface, QWORD start, LONGLO
TRACE("reader %p, start %I64u, duration %I64d.\n", reader, start, duration);
+ ReleaseSemaphore(reader->read_sem, 1, NULL); EnterCriticalSection(&reader->cs);
reader->start_time = start; @@ -2482,6 +2527,8 @@ static HRESULT WINAPI reader_SetRange(IWMSyncReader2 *iface, QWORD start, LONGLO reader->streams[i].eos = false;
LeaveCriticalSection(&reader->cs); + if (WaitForSingleObject(reader->read_sem, INFINITE) != WAIT_OBJECT_0) + ERR("Failed to wait for read thread to pause.\n"); return S_OK; }
@@ -2528,6 +2575,7 @@ static HRESULT WINAPI reader_SetStreamsSelected(IWMSyncReader2 *iface, if (!count) return E_INVALIDARG;
+ ReleaseSemaphore(reader->read_sem, 1, NULL); EnterCriticalSection(&reader->cs);
for (i = 0; i < count; ++i) @@ -2536,6 +2584,8 @@ static HRESULT WINAPI reader_SetStreamsSelected(IWMSyncReader2 *iface, { LeaveCriticalSection(&reader->cs); WARN("Invalid stream number %u; returning NS_E_INVALID_REQUEST.\n", stream_numbers[i]); + if (WaitForSingleObject(reader->read_sem, INFINITE) != WAIT_OBJECT_0) + ERR("Failed to wait for read thread to pause.\n"); return NS_E_INVALID_REQUEST; } } @@ -2569,6 +2619,8 @@ static HRESULT WINAPI reader_SetStreamsSelected(IWMSyncReader2 *iface, }
LeaveCriticalSection(&reader->cs); + if (WaitForSingleObject(reader->read_sem, INFINITE) != WAIT_OBJECT_0) + ERR("Failed to wait for read thread to pause.\n"); return S_OK; }
On Wed Apr 9 22:50:49 2025 +0000, Elizabeth Figura wrote:
... getting rid of the read thread would basically mean a full
redesign of wg_parser. It really wouldn't, as I tried to say over and over again, and this is why the entire fiasco was so frustrating. Nobody listened.
Unless you think there's a way to use `wg_parser` without the read thread?
It would require modification, but less than you might think. There's nothing preventing you from calling wg_parser_get_next_read_offset / wg_parser_push_data from the same thread that you're pulling samples from. The reason this doesn't work is that both wg_parser_get_next_read_offset and wg_parser_stream_get_buffer are currently blocking calls, and since both can be triggered by asynchronous work, you don't know whether or not you need more data or not. In order to fix this, you'd probably want to modify wg_parser_stream_get_buffer() to allow being interrupted if a read request comes in, whereupon it'd probably return a special status to signify that.
Not sure exactly how you would suggest using a PE-side mutex to
implement this. One way I tried couldn't account for `WMSyncReader` method being called concurrently. This MR makes sure that the read thread can read iff there is >= 1 threads inside `WMSyncReader` methods. Sorry, I probably wasn't clear enough. The idea would not be to use the mutex as a normal mutex, but basically invert it. I guess you'd actually have to do it with a semaphore or auto-reset event, though, since you don't want to tie it to any specific thread. Basically, you'd want the semaphore to start out in acquired state; acquire the semaphore in the read thread before doing any read applications and release it afterwards; and *release* the semaphore when entering a function like GetNextSample() and acquire it when exiting.
@zfigura Hi I updated this to use a semaphore like you suggested. Is this enough to get this MR merged?
I feel I kinda got caught in the middle of a debate :sweat_smile:. If you insist the read thread has to go, then I'd like to wait for a consensus of what the future of gstreamer in wine is. And in that case I hope @rbernon can chime in as well.