Instead of waiting for another event from the wg_parser object.
Signed-off-by: Zebediah Figura zfigura@codeweavers.com --- dlls/winegstreamer/quartz_parser.c | 27 +++++++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-)
diff --git a/dlls/winegstreamer/quartz_parser.c b/dlls/winegstreamer/quartz_parser.c index 4afd265bca7..30afd0874ea 100644 --- a/dlls/winegstreamer/quartz_parser.c +++ b/dlls/winegstreamer/quartz_parser.c @@ -74,6 +74,7 @@ struct parser_source SourceSeeking seek;
CRITICAL_SECTION flushing_cs; + CONDITION_VARIABLE eos_cv; HANDLE thread;
/* This variable is read and written by both the streaming thread and @@ -81,6 +82,8 @@ struct parser_source * thread when the streaming thread is not running, or when it is blocked * by flushing_cs. */ bool need_segment; + + bool eos; };
static inline struct parser *impl_from_strmbase_filter(struct strmbase_filter *iface) @@ -823,6 +826,13 @@ static DWORD CALLBACK stream_thread(void *arg)
EnterCriticalSection(&pin->flushing_cs);
+ if (pin->eos) + { + SleepConditionVariableCS(&pin->eos_cv, &pin->flushing_cs, INFINITE); + LeaveCriticalSection(&pin->flushing_cs); + continue; + } + if (!wg_parser_stream_get_event(pin->wg_stream, &event)) { LeaveCriticalSection(&pin->flushing_cs); @@ -839,6 +849,7 @@ static DWORD CALLBACK stream_thread(void *arg)
case WG_PARSER_EVENT_EOS: IPin_EndOfStream(pin->pin.pin.peer); + pin->eos = true; break;
case WG_PARSER_EVENT_NONE: @@ -973,17 +984,19 @@ static HRESULT parser_init_stream(struct strmbase_filter *iface)
for (i = 0; i < filter->source_count; ++i) { + struct parser_source *pin = filter->sources[i]; HRESULT hr;
- if (!filter->sources[i]->pin.pin.peer) + if (!pin->pin.pin.peer) continue;
- if (FAILED(hr = IMemAllocator_Commit(filter->sources[i]->pin.pAllocator))) + if (FAILED(hr = IMemAllocator_Commit(pin->pin.pAllocator))) ERR("Failed to commit allocator, hr %#lx.\n", hr);
- filter->sources[i]->need_segment = true; + pin->need_segment = true; + pin->eos = false;
- filter->sources[i]->thread = CreateThread(NULL, 0, stream_thread, filter->sources[i], 0, NULL); + pin->thread = CreateThread(NULL, 0, stream_thread, pin, 0, NULL); }
return S_OK; @@ -1009,6 +1022,7 @@ static HRESULT parser_cleanup_stream(struct strmbase_filter *iface)
IMemAllocator_Decommit(pin->pin.pAllocator);
+ WakeConditionVariable(&pin->eos_cv); WaitForSingleObject(pin->thread, INFINITE); CloseHandle(pin->thread); pin->thread = NULL; @@ -1371,9 +1385,13 @@ static HRESULT WINAPI GST_Seeking_SetPositions(IMediaSeeking *iface, struct parser_source *flush_pin = filter->sources[i];
flush_pin->need_segment = true; + flush_pin->eos = false;
if (flush_pin->pin.pin.peer) + { LeaveCriticalSection(&flush_pin->flushing_cs); + WakeConditionVariable(&flush_pin->eos_cv); + } }
return S_OK; @@ -1613,6 +1631,7 @@ static struct parser_source *create_pin(struct parser *filter,
InitializeCriticalSection(&pin->flushing_cs); pin->flushing_cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": pin.flushing_cs"); + InitializeConditionVariable(&pin->eos_cv);
filter->sources[filter->source_count++] = pin; return pin;
Aside from EOS logic, which is now handled entirely on the client side, wg_parser_stream_get_event() now only waits for data processing—that is, demuxing, decoding, and format conversion. While unblocking waits in wg_parser_stream_get_event() does allow that function to return immediately, a subsequent seek request in GStreamer will still have to wait for that data processing to complete and for the stream thread to return to the demuxer's main loop. In essence, wg_parser_begin_flush() is only moving costs around.
In theory we could force the GStreamer pipeline to complete faster by actually flushing it. In practice this isn't really true. Individual elements do check whether they are flushing before processing, but even elements which take a relatively long time (i.e. multiple milliseconds) to process data don't periodically check whether they are flushing while doing so. Although there is arguably a benefit to skipping some elements by flushing the GStreamer pipeline, it does not seem worth the added code complexity in Wine.
The real point of flushing in DirectShow or GStreamer is to unblock long or unbounded waits in sink elements (i.e. waits for PTS, or waits for running state while rendering preroll frames). None of these waits apply here. Waits for actual sample processing complete in bounded time, and should ideally take less than the sample DTS to complete (or we are already in trouble).
Signed-off-by: Zebediah Figura zfigura@codeweavers.com --- dlls/winegstreamer/gst_private.h | 5 +--- dlls/winegstreamer/main.c | 14 ++-------- dlls/winegstreamer/media_source.c | 6 +--- dlls/winegstreamer/quartz_parser.c | 12 +------- dlls/winegstreamer/unixlib.h | 3 -- dlls/winegstreamer/wg_parser.c | 44 ++---------------------------- dlls/winegstreamer/wm_reader.c | 7 +---- 7 files changed, 8 insertions(+), 83 deletions(-)
diff --git a/dlls/winegstreamer/gst_private.h b/dlls/winegstreamer/gst_private.h index a63daaf04b9..288d127b23b 100644 --- a/dlls/winegstreamer/gst_private.h +++ b/dlls/winegstreamer/gst_private.h @@ -70,9 +70,6 @@ void wg_parser_destroy(struct wg_parser *parser); HRESULT wg_parser_connect(struct wg_parser *parser, uint64_t file_size); void wg_parser_disconnect(struct wg_parser *parser);
-void wg_parser_begin_flush(struct wg_parser *parser); -void wg_parser_end_flush(struct wg_parser *parser); - bool wg_parser_get_next_read_offset(struct wg_parser *parser, uint64_t *offset, uint32_t *size); void wg_parser_push_data(struct wg_parser *parser, const void *data, uint32_t size);
@@ -83,7 +80,7 @@ void wg_parser_stream_get_preferred_format(struct wg_parser_stream *stream, stru void wg_parser_stream_enable(struct wg_parser_stream *stream, const struct wg_format *format); void wg_parser_stream_disable(struct wg_parser_stream *stream);
-bool wg_parser_stream_get_event(struct wg_parser_stream *stream, struct wg_parser_event *event); +void wg_parser_stream_get_event(struct wg_parser_stream *stream, struct wg_parser_event *event); bool wg_parser_stream_copy_buffer(struct wg_parser_stream *stream, void *data, uint32_t offset, uint32_t size); void wg_parser_stream_release_buffer(struct wg_parser_stream *stream); diff --git a/dlls/winegstreamer/main.c b/dlls/winegstreamer/main.c index f85e9995525..13db07c6edb 100644 --- a/dlls/winegstreamer/main.c +++ b/dlls/winegstreamer/main.c @@ -96,16 +96,6 @@ void wg_parser_disconnect(struct wg_parser *parser) __wine_unix_call(unix_handle, unix_wg_parser_disconnect, parser); }
-void wg_parser_begin_flush(struct wg_parser *parser) -{ - __wine_unix_call(unix_handle, unix_wg_parser_begin_flush, parser); -} - -void wg_parser_end_flush(struct wg_parser *parser) -{ - __wine_unix_call(unix_handle, unix_wg_parser_end_flush, parser); -} - bool wg_parser_get_next_read_offset(struct wg_parser *parser, uint64_t *offset, uint32_t *size) { struct wg_parser_get_next_read_offset_params params = @@ -182,7 +172,7 @@ void wg_parser_stream_disable(struct wg_parser_stream *stream) __wine_unix_call(unix_handle, unix_wg_parser_stream_disable, stream); }
-bool wg_parser_stream_get_event(struct wg_parser_stream *stream, struct wg_parser_event *event) +void wg_parser_stream_get_event(struct wg_parser_stream *stream, struct wg_parser_event *event) { struct wg_parser_stream_get_event_params params = { @@ -190,7 +180,7 @@ bool wg_parser_stream_get_event(struct wg_parser_stream *stream, struct wg_parse .event = event, };
- return !__wine_unix_call(unix_handle, unix_wg_parser_stream_get_event, ¶ms); + __wine_unix_call(unix_handle, unix_wg_parser_stream_get_event, ¶ms); }
bool wg_parser_stream_copy_buffer(struct wg_parser_stream *stream, diff --git a/dlls/winegstreamer/media_source.c b/dlls/winegstreamer/media_source.c index 32006964f3b..694497da259 100644 --- a/dlls/winegstreamer/media_source.c +++ b/dlls/winegstreamer/media_source.c @@ -388,7 +388,6 @@ static void start_pipeline(struct media_source *source, struct source_async_comm if (position->vt == VT_I8) wg_parser_stream_seek(source->streams[0]->wg_stream, 1.0, position->hVal.QuadPart, 0, AM_SEEKING_AbsolutePositioning, AM_SEEKING_NoPositioning); - wg_parser_end_flush(source->wg_parser);
for (i = 0; i < source->stream_count; i++) flush_token_queue(source->streams[i], position->vt == VT_EMPTY); @@ -416,8 +415,6 @@ static void stop_pipeline(struct media_source *source) { unsigned int i;
- wg_parser_begin_flush(source->wg_parser); - for (i = 0; i < source->stream_count; i++) { struct media_stream *stream = source->streams[i]; @@ -543,8 +540,7 @@ static void wait_on_sample(struct media_stream *stream, IUnknown *token)
for (;;) { - if (!wg_parser_stream_get_event(stream->wg_stream, &event)) - return; + wg_parser_stream_get_event(stream->wg_stream, &event);
TRACE("Got event of type %#x.\n", event.type);
diff --git a/dlls/winegstreamer/quartz_parser.c b/dlls/winegstreamer/quartz_parser.c index 30afd0874ea..628758b838b 100644 --- a/dlls/winegstreamer/quartz_parser.c +++ b/dlls/winegstreamer/quartz_parser.c @@ -833,11 +833,7 @@ static DWORD CALLBACK stream_thread(void *arg) continue; }
- if (!wg_parser_stream_get_event(pin->wg_stream, &event)) - { - LeaveCriticalSection(&pin->flushing_cs); - continue; - } + wg_parser_stream_get_event(pin->wg_stream, &event);
TRACE("Got event of type %#x.\n", event.type);
@@ -971,7 +967,6 @@ static HRESULT parser_init_stream(struct strmbase_filter *iface) return S_OK;
filter->streaming = true; - wg_parser_end_flush(filter->wg_parser);
/* DirectShow retains the old seek positions, but resets to them every time * it transitions from stopped -> paused. */ @@ -1011,7 +1006,6 @@ static HRESULT parser_cleanup_stream(struct strmbase_filter *iface) return S_OK;
filter->streaming = false; - wg_parser_begin_flush(filter->wg_parser);
for (i = 0; i < filter->source_count; ++i) { @@ -1336,8 +1330,6 @@ static HRESULT WINAPI GST_Seeking_SetPositions(IMediaSeeking *iface,
if (!(current_flags & AM_SEEKING_NoFlush)) { - wg_parser_begin_flush(filter->wg_parser); - for (i = 0; i < filter->source_count; ++i) { if (filter->sources[i]->pin.pin.peer) @@ -1365,8 +1357,6 @@ static HRESULT WINAPI GST_Seeking_SetPositions(IMediaSeeking *iface,
if (!(current_flags & AM_SEEKING_NoFlush)) { - wg_parser_end_flush(filter->wg_parser); - for (i = 0; i < filter->source_count; ++i) { struct parser_source *flush_pin = filter->sources[i]; diff --git a/dlls/winegstreamer/unixlib.h b/dlls/winegstreamer/unixlib.h index f445fc7ac7e..1ef80991fb4 100644 --- a/dlls/winegstreamer/unixlib.h +++ b/dlls/winegstreamer/unixlib.h @@ -238,9 +238,6 @@ enum unix_funcs unix_wg_parser_connect, unix_wg_parser_disconnect,
- unix_wg_parser_begin_flush, - unix_wg_parser_end_flush, - unix_wg_parser_get_next_read_offset, unix_wg_parser_push_data,
diff --git a/dlls/winegstreamer/wg_parser.c b/dlls/winegstreamer/wg_parser.c index 85c28895159..61120a72da8 100644 --- a/dlls/winegstreamer/wg_parser.c +++ b/dlls/winegstreamer/wg_parser.c @@ -87,7 +87,7 @@ struct wg_parser GstFlowReturn ret; } read_request;
- bool flushing, sink_connected; + bool sink_connected;
bool unlimited_buffering; }; @@ -127,35 +127,6 @@ static NTSTATUS wg_parser_get_stream(void *args) return S_OK; }
-static NTSTATUS wg_parser_begin_flush(void *args) -{ - struct wg_parser *parser = args; - unsigned int i; - - pthread_mutex_lock(&parser->mutex); - parser->flushing = true; - pthread_mutex_unlock(&parser->mutex); - - for (i = 0; i < parser->stream_count; ++i) - { - if (parser->streams[i]->enabled) - pthread_cond_signal(&parser->streams[i]->event_cond); - } - - return S_OK; -} - -static NTSTATUS wg_parser_end_flush(void *args) -{ - struct wg_parser *parser = args; - - pthread_mutex_lock(&parser->mutex); - parser->flushing = false; - pthread_mutex_unlock(&parser->mutex); - - return S_OK; -} - static NTSTATUS wg_parser_get_next_read_offset(void *args) { struct wg_parser_get_next_read_offset_params *params = args; @@ -288,16 +259,9 @@ static NTSTATUS wg_parser_stream_get_event(void *args)
pthread_mutex_lock(&parser->mutex);
- while (!parser->flushing && stream->event.type == WG_PARSER_EVENT_NONE) + while (stream->event.type == WG_PARSER_EVENT_NONE) pthread_cond_wait(&stream->event_cond, &parser->mutex);
- if (parser->flushing) - { - pthread_mutex_unlock(&parser->mutex); - GST_DEBUG("Filter is flushing.\n"); - return VFW_E_WRONG_STATE; - } - *params->event = stream->event;
if (stream->event.type != WG_PARSER_EVENT_BUFFER) @@ -1593,7 +1557,6 @@ static NTSTATUS wg_parser_create(void *args) pthread_cond_init(&parser->init_cond, NULL); pthread_cond_init(&parser->read_cond, NULL); pthread_cond_init(&parser->read_done_cond, NULL); - parser->flushing = true; parser->init_gst = init_funcs[params->type]; parser->unlimited_buffering = params->unlimited_buffering;
@@ -1630,9 +1593,6 @@ const unixlib_entry_t __wine_unix_call_funcs[] = X(wg_parser_connect), X(wg_parser_disconnect),
- X(wg_parser_begin_flush), - X(wg_parser_end_flush), - X(wg_parser_get_next_read_offset), X(wg_parser_push_data),
diff --git a/dlls/winegstreamer/wm_reader.c b/dlls/winegstreamer/wm_reader.c index f49d99071e0..6d2b9edbd09 100644 --- a/dlls/winegstreamer/wm_reader.c +++ b/dlls/winegstreamer/wm_reader.c @@ -1512,7 +1512,6 @@ static HRESULT init_stream(struct wm_reader *reader, QWORD file_size) wg_parser_stream_enable(stream->wg_stream, &stream->format); }
- wg_parser_end_flush(reader->wg_parser); /* We probably discarded events because streams weren't enabled yet. * Now that they're all enabled seek back to the start again. */ wg_parser_stream_seek(reader->streams[0].wg_stream, 1.0, 0, 0, @@ -1836,11 +1835,7 @@ HRESULT wm_reader_get_stream_sample(struct wm_stream *stream,
for (;;) { - if (!wg_parser_stream_get_event(wg_stream, &event)) - { - FIXME("Stream is flushing.\n"); - return E_NOTIMPL; - } + wg_parser_stream_get_event(wg_stream, &event);
TRACE("Got event of type %#x for %s stream %p.\n", event.type, get_major_type_string(stream->format.major_type), stream);
Instead of using WG_PARSER_EVENT_EOS.
Signed-off-by: Zebediah Figura zfigura@codeweavers.com --- This obviates 4853f65c844de8277b8b0420df1a2cdb1c5b17c8.
dlls/winegstreamer/gst_private.h | 2 +- dlls/winegstreamer/main.c | 4 +- dlls/winegstreamer/media_source.c | 30 ++---- dlls/winegstreamer/quartz_parser.c | 26 ++--- dlls/winegstreamer/unixlib.h | 1 - dlls/winegstreamer/wg_parser.c | 56 +++++----- dlls/winegstreamer/wm_reader.c | 164 +++++++++++++---------------- 7 files changed, 120 insertions(+), 163 deletions(-)
diff --git a/dlls/winegstreamer/gst_private.h b/dlls/winegstreamer/gst_private.h index 288d127b23b..2f37d97378d 100644 --- a/dlls/winegstreamer/gst_private.h +++ b/dlls/winegstreamer/gst_private.h @@ -80,7 +80,7 @@ void wg_parser_stream_get_preferred_format(struct wg_parser_stream *stream, stru void wg_parser_stream_enable(struct wg_parser_stream *stream, const struct wg_format *format); void wg_parser_stream_disable(struct wg_parser_stream *stream);
-void wg_parser_stream_get_event(struct wg_parser_stream *stream, struct wg_parser_event *event); +bool wg_parser_stream_get_event(struct wg_parser_stream *stream, struct wg_parser_event *event); bool wg_parser_stream_copy_buffer(struct wg_parser_stream *stream, void *data, uint32_t offset, uint32_t size); void wg_parser_stream_release_buffer(struct wg_parser_stream *stream); diff --git a/dlls/winegstreamer/main.c b/dlls/winegstreamer/main.c index 13db07c6edb..e9e347b0c65 100644 --- a/dlls/winegstreamer/main.c +++ b/dlls/winegstreamer/main.c @@ -172,7 +172,7 @@ void wg_parser_stream_disable(struct wg_parser_stream *stream) __wine_unix_call(unix_handle, unix_wg_parser_stream_disable, stream); }
-void wg_parser_stream_get_event(struct wg_parser_stream *stream, struct wg_parser_event *event) +bool wg_parser_stream_get_event(struct wg_parser_stream *stream, struct wg_parser_event *event) { struct wg_parser_stream_get_event_params params = { @@ -180,7 +180,7 @@ void wg_parser_stream_get_event(struct wg_parser_stream *stream, struct wg_parse .event = event, };
- __wine_unix_call(unix_handle, unix_wg_parser_stream_get_event, ¶ms); + return !__wine_unix_call(unix_handle, unix_wg_parser_stream_get_event, ¶ms); }
bool wg_parser_stream_copy_buffer(struct wg_parser_stream *stream, diff --git a/dlls/winegstreamer/media_source.c b/dlls/winegstreamer/media_source.c index 694497da259..92cb27dc5dc 100644 --- a/dlls/winegstreamer/media_source.c +++ b/dlls/winegstreamer/media_source.c @@ -532,33 +532,15 @@ static void wait_on_sample(struct media_stream *stream, IUnknown *token)
TRACE("%p, %p\n", stream, token);
- if (stream->eos) + if (wg_parser_stream_get_event(stream->wg_stream, &event)) { - IMFMediaEventQueue_QueueEventParamVar(stream->event_queue, MEError, &GUID_NULL, MF_E_END_OF_STREAM, &empty_var); - return; + send_buffer(stream, &event, token); } - - for (;;) + else { - wg_parser_stream_get_event(stream->wg_stream, &event); - - TRACE("Got event of type %#x.\n", event.type); - - switch (event.type) - { - case WG_PARSER_EVENT_BUFFER: - send_buffer(stream, &event, token); - return; - - case WG_PARSER_EVENT_EOS: - stream->eos = TRUE; - IMFMediaEventQueue_QueueEventParamVar(stream->event_queue, MEEndOfStream, &GUID_NULL, S_OK, &empty_var); - dispatch_end_of_presentation(stream->parent_source); - return; - - case WG_PARSER_EVENT_NONE: - assert(0); - } + stream->eos = TRUE; + IMFMediaEventQueue_QueueEventParamVar(stream->event_queue, MEEndOfStream, &GUID_NULL, S_OK, &empty_var); + dispatch_end_of_presentation(stream->parent_source); } }
diff --git a/dlls/winegstreamer/quartz_parser.c b/dlls/winegstreamer/quartz_parser.c index 628758b838b..9b0fa4acb93 100644 --- a/dlls/winegstreamer/quartz_parser.c +++ b/dlls/winegstreamer/quartz_parser.c @@ -833,25 +833,19 @@ static DWORD CALLBACK stream_thread(void *arg) continue; }
- wg_parser_stream_get_event(pin->wg_stream, &event); - - TRACE("Got event of type %#x.\n", event.type); - - switch (event.type) + if (wg_parser_stream_get_event(pin->wg_stream, &event)) { - case WG_PARSER_EVENT_BUFFER: - send_buffer(pin, &event); - break; - - case WG_PARSER_EVENT_EOS: - IPin_EndOfStream(pin->pin.pin.peer); - pin->eos = true; - break; - - case WG_PARSER_EVENT_NONE: - assert(0); + send_buffer(pin, &event); + } + else + { + TRACE("Got EOS.\n"); + IPin_EndOfStream(pin->pin.pin.peer); + pin->eos = true; }
+ TRACE("Got event of type %#x.\n", event.type); + LeaveCriticalSection(&pin->flushing_cs); }
diff --git a/dlls/winegstreamer/unixlib.h b/dlls/winegstreamer/unixlib.h index 1ef80991fb4..9d75e94cb6a 100644 --- a/dlls/winegstreamer/unixlib.h +++ b/dlls/winegstreamer/unixlib.h @@ -107,7 +107,6 @@ enum wg_parser_event_type { WG_PARSER_EVENT_NONE = 0, WG_PARSER_EVENT_BUFFER, - WG_PARSER_EVENT_EOS, };
struct wg_parser_event diff --git a/dlls/winegstreamer/wg_parser.c b/dlls/winegstreamer/wg_parser.c index 61120a72da8..3cdeeeb5374 100644 --- a/dlls/winegstreamer/wg_parser.c +++ b/dlls/winegstreamer/wg_parser.c @@ -259,19 +259,20 @@ static NTSTATUS wg_parser_stream_get_event(void *args)
pthread_mutex_lock(&parser->mutex);
- while (stream->event.type == WG_PARSER_EVENT_NONE) + while (!stream->eos && stream->event.type == WG_PARSER_EVENT_NONE) pthread_cond_wait(&stream->event_cond, &parser->mutex);
- *params->event = stream->event; - - if (stream->event.type != WG_PARSER_EVENT_BUFFER) + /* Note that we can both have a buffer and stream->eos, in which case we + * must return the buffer. */ + if (stream->event.type != WG_PARSER_EVENT_NONE) { - stream->event.type = WG_PARSER_EVENT_NONE; - pthread_cond_signal(&stream->event_empty_cond); + *params->event = stream->event; + pthread_mutex_unlock(&parser->mutex); + return S_OK; } - pthread_mutex_unlock(&parser->mutex);
- return S_OK; + pthread_mutex_unlock(&parser->mutex); + return S_FALSE; }
static NTSTATUS wg_parser_stream_copy_buffer(void *args) @@ -434,16 +435,15 @@ static GstFlowReturn queue_stream_event(struct wg_parser_stream *stream, GST_DEBUG("Filter is flushing; discarding event."); return GST_FLOW_FLUSHING; } - if (buffer) + + assert(GST_IS_BUFFER(buffer)); + if (!gst_buffer_map(buffer, &stream->map_info, GST_MAP_READ)) { - assert(GST_IS_BUFFER(buffer)); - if (!gst_buffer_map(buffer, &stream->map_info, GST_MAP_READ)) - { - pthread_mutex_unlock(&parser->mutex); - GST_ERROR("Failed to map buffer.\n"); - return GST_FLOW_ERROR; - } + pthread_mutex_unlock(&parser->mutex); + GST_ERROR("Failed to map buffer.\n"); + return GST_FLOW_ERROR; } + stream->event = *event; stream->buffer = buffer; pthread_mutex_unlock(&parser->mutex); @@ -479,20 +479,13 @@ static gboolean sink_event_cb(GstPad *pad, GstObject *parent, GstEvent *event) break;
case GST_EVENT_EOS: + pthread_mutex_lock(&parser->mutex); + stream->eos = true; + pthread_mutex_unlock(&parser->mutex); if (stream->enabled) - { - struct wg_parser_event stream_event; - - stream_event.type = WG_PARSER_EVENT_EOS; - queue_stream_event(stream, &stream_event, NULL); - } + pthread_cond_signal(&stream->event_cond); else - { - pthread_mutex_lock(&parser->mutex); - stream->eos = true; - pthread_mutex_unlock(&parser->mutex); pthread_cond_signal(&parser->init_cond); - } break;
case GST_EVENT_FLUSH_START: @@ -524,12 +517,13 @@ static gboolean sink_event_cb(GstPad *pad, GstObject *parent, GstEvent *event) if (reset_time) gst_segment_init(&stream->segment, GST_FORMAT_UNDEFINED);
+ pthread_mutex_lock(&parser->mutex); + + stream->eos = false; if (stream->enabled) - { - pthread_mutex_lock(&parser->mutex); stream->flushing = false; - pthread_mutex_unlock(&parser->mutex); - } + + pthread_mutex_unlock(&parser->mutex); break; }
diff --git a/dlls/winegstreamer/wm_reader.c b/dlls/winegstreamer/wm_reader.c index 6d2b9edbd09..96de6f2a3ba 100644 --- a/dlls/winegstreamer/wm_reader.c +++ b/dlls/winegstreamer/wm_reader.c @@ -1826,6 +1826,10 @@ HRESULT wm_reader_get_stream_sample(struct wm_stream *stream, IWMReaderCallbackAdvanced *callback_advanced = stream->reader->callback_advanced; struct wg_parser_stream *wg_stream = stream->wg_stream; struct wg_parser_event event; + DWORD size, capacity; + INSSBuffer *sample; + HRESULT hr; + BYTE *data;
if (stream->selection == WMT_OFF) return NS_E_INVALID_REQUEST; @@ -1835,104 +1839,88 @@ HRESULT wm_reader_get_stream_sample(struct wm_stream *stream,
for (;;) { - wg_parser_stream_get_event(wg_stream, &event); + if (!wg_parser_stream_get_event(wg_stream, &event)) + { + stream->eos = true; + TRACE("End of stream.\n"); + return NS_E_NO_MORE_SAMPLES; + }
- TRACE("Got event of type %#x for %s stream %p.\n", event.type, - get_major_type_string(stream->format.major_type), stream); + TRACE("Got buffer for '%s' stream %p.\n", get_major_type_string(stream->format.major_type), stream);
- switch (event.type) + if (callback_advanced && stream->read_compressed && stream->allocate_stream) { - case WG_PARSER_EVENT_BUFFER: + if (FAILED(hr = IWMReaderCallbackAdvanced_AllocateForStream(callback_advanced, + stream->index + 1, event.u.buffer.size, &sample, NULL))) { - DWORD size, capacity; - INSSBuffer *sample; - HRESULT hr; - BYTE *data; - - if (callback_advanced && stream->read_compressed && stream->allocate_stream) - { - if (FAILED(hr = IWMReaderCallbackAdvanced_AllocateForStream(callback_advanced, - stream->index + 1, event.u.buffer.size, &sample, NULL))) - { - ERR("Failed to allocate stream sample of %u bytes, hr %#lx.\n", event.u.buffer.size, hr); - wg_parser_stream_release_buffer(wg_stream); - return hr; - } - } - else if (callback_advanced && !stream->read_compressed && stream->allocate_output) - { - if (FAILED(hr = IWMReaderCallbackAdvanced_AllocateForOutput(callback_advanced, - stream->index, event.u.buffer.size, &sample, NULL))) - { - ERR("Failed to allocate output sample of %u bytes, hr %#lx.\n", event.u.buffer.size, hr); - wg_parser_stream_release_buffer(wg_stream); - return hr; - } - } - else - { - struct buffer *object; - - /* FIXME: Should these be pooled? */ - if (!(object = calloc(1, offsetof(struct buffer, data[event.u.buffer.size])))) - { - wg_parser_stream_release_buffer(wg_stream); - return E_OUTOFMEMORY; - } - - object->INSSBuffer_iface.lpVtbl = &buffer_vtbl; - object->refcount = 1; - object->capacity = event.u.buffer.size; - - TRACE("Created buffer %p.\n", object); - sample = &object->INSSBuffer_iface; - } - - if (FAILED(hr = INSSBuffer_GetBufferAndLength(sample, &data, &size))) - ERR("Failed to get data pointer, hr %#lx.\n", hr); - if (FAILED(hr = INSSBuffer_GetMaxLength(sample, &capacity))) - ERR("Failed to get capacity, hr %#lx.\n", hr); - if (event.u.buffer.size > capacity) - ERR("Returned capacity %lu is less than requested capacity %u.\n", - capacity, event.u.buffer.size); - - if (!wg_parser_stream_copy_buffer(wg_stream, data, 0, event.u.buffer.size)) - { - /* The GStreamer pin has been flushed. */ - INSSBuffer_Release(sample); - break; - } - - if (FAILED(hr = INSSBuffer_SetLength(sample, event.u.buffer.size))) - ERR("Failed to set size %u, hr %#lx.\n", event.u.buffer.size, hr); - + ERR("Failed to allocate stream sample of %u bytes, hr %#lx.\n", event.u.buffer.size, hr); + wg_parser_stream_release_buffer(wg_stream); + return hr; + } + } + else if (callback_advanced && !stream->read_compressed && stream->allocate_output) + { + if (FAILED(hr = IWMReaderCallbackAdvanced_AllocateForOutput(callback_advanced, + stream->index, event.u.buffer.size, &sample, NULL))) + { + ERR("Failed to allocate output sample of %u bytes, hr %#lx.\n", event.u.buffer.size, hr); wg_parser_stream_release_buffer(wg_stream); + return hr; + } + } + else + { + struct buffer *object;
- if (!event.u.buffer.has_pts) - FIXME("Missing PTS.\n"); - if (!event.u.buffer.has_duration) - FIXME("Missing duration.\n"); - - *pts = event.u.buffer.pts; - *duration = event.u.buffer.duration; - *flags = 0; - if (event.u.buffer.discontinuity) - *flags |= WM_SF_DISCONTINUITY; - if (!event.u.buffer.delta) - *flags |= WM_SF_CLEANPOINT; - - *ret_sample = sample; - return S_OK; + /* FIXME: Should these be pooled? */ + if (!(object = calloc(1, offsetof(struct buffer, data[event.u.buffer.size])))) + { + wg_parser_stream_release_buffer(wg_stream); + return E_OUTOFMEMORY; }
- case WG_PARSER_EVENT_EOS: - stream->eos = true; - TRACE("End of stream.\n"); - return NS_E_NO_MORE_SAMPLES; + object->INSSBuffer_iface.lpVtbl = &buffer_vtbl; + object->refcount = 1; + object->capacity = event.u.buffer.size; + + TRACE("Created buffer %p.\n", object); + sample = &object->INSSBuffer_iface; + }
- case WG_PARSER_EVENT_NONE: - assert(0); + if (FAILED(hr = INSSBuffer_GetBufferAndLength(sample, &data, &size))) + ERR("Failed to get data pointer, hr %#lx.\n", hr); + if (FAILED(hr = INSSBuffer_GetMaxLength(sample, &capacity))) + ERR("Failed to get capacity, hr %#lx.\n", hr); + if (event.u.buffer.size > capacity) + ERR("Returned capacity %lu is less than requested capacity %u.\n", capacity, event.u.buffer.size); + + if (!wg_parser_stream_copy_buffer(wg_stream, data, 0, event.u.buffer.size)) + { + /* The GStreamer pin has been flushed. */ + INSSBuffer_Release(sample); + continue; } + + if (FAILED(hr = INSSBuffer_SetLength(sample, event.u.buffer.size))) + ERR("Failed to set size %u, hr %#lx.\n", event.u.buffer.size, hr); + + wg_parser_stream_release_buffer(wg_stream); + + if (!event.u.buffer.has_pts) + FIXME("Missing PTS.\n"); + if (!event.u.buffer.has_duration) + FIXME("Missing duration.\n"); + + *pts = event.u.buffer.pts; + *duration = event.u.buffer.duration; + *flags = 0; + if (event.u.buffer.discontinuity) + *flags |= WM_SF_DISCONTINUITY; + if (!event.u.buffer.delta) + *flags |= WM_SF_CLEANPOINT; + + *ret_sample = sample; + return S_OK; } }
Signed-off-by: Zebediah Figura zfigura@codeweavers.com --- dlls/winegstreamer/wg_parser.c | 76 ++++++++++++++++------------------ 1 file changed, 35 insertions(+), 41 deletions(-)
diff --git a/dlls/winegstreamer/wg_parser.c b/dlls/winegstreamer/wg_parser.c index 3cdeeeb5374..e114a19a26b 100644 --- a/dlls/winegstreamer/wg_parser.c +++ b/dlls/winegstreamer/wg_parser.c @@ -415,43 +415,6 @@ static void no_more_pads_cb(GstElement *element, gpointer user) pthread_cond_signal(&parser->init_cond); }
-static GstFlowReturn queue_stream_event(struct wg_parser_stream *stream, - const struct wg_parser_event *event, GstBuffer *buffer) -{ - struct wg_parser *parser = stream->parser; - - /* Unlike request_buffer_src() [q.v.], we need to watch for GStreamer - * flushes here. The difference is that we can be blocked by the streaming - * thread not running (or itself flushing on the DirectShow side). - * request_buffer_src() can only be blocked by the upstream source, and that - * is solved by flushing the upstream source. */ - - pthread_mutex_lock(&parser->mutex); - while (!stream->flushing && stream->event.type != WG_PARSER_EVENT_NONE) - pthread_cond_wait(&stream->event_empty_cond, &parser->mutex); - if (stream->flushing) - { - pthread_mutex_unlock(&parser->mutex); - GST_DEBUG("Filter is flushing; discarding event."); - return GST_FLOW_FLUSHING; - } - - assert(GST_IS_BUFFER(buffer)); - if (!gst_buffer_map(buffer, &stream->map_info, GST_MAP_READ)) - { - pthread_mutex_unlock(&parser->mutex); - GST_ERROR("Failed to map buffer.\n"); - return GST_FLOW_ERROR; - } - - stream->event = *event; - stream->buffer = buffer; - pthread_mutex_unlock(&parser->mutex); - pthread_cond_signal(&stream->event_cond); - GST_LOG("Event queued."); - return GST_FLOW_OK; -} - static gboolean sink_event_cb(GstPad *pad, GstObject *parent, GstEvent *event) { struct wg_parser_stream *stream = gst_pad_get_element_private(pad); @@ -550,8 +513,8 @@ static gboolean sink_event_cb(GstPad *pad, GstObject *parent, GstEvent *event) static GstFlowReturn sink_chain_cb(GstPad *pad, GstObject *parent, GstBuffer *buffer) { struct wg_parser_stream *stream = gst_pad_get_element_private(pad); + struct wg_parser *parser = stream->parser; struct wg_parser_event stream_event; - GstFlowReturn ret;
GST_LOG("stream %p, buffer %p.", stream, buffer);
@@ -576,10 +539,41 @@ static GstFlowReturn sink_chain_cb(GstPad *pad, GstObject *parent, GstBuffer *bu stream_event.u.buffer.delta = GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_DELTA_UNIT); stream_event.u.buffer.size = gst_buffer_get_size(buffer);
- /* Transfer our reference to the buffer to the stream object. */ - if ((ret = queue_stream_event(stream, &stream_event, buffer)) != GST_FLOW_OK) + /* Allow this buffer to be flushed by GStreamer. We are effectively + * implementing a queue object here. */ + + pthread_mutex_lock(&parser->mutex); + + while (!stream->flushing && stream->event.type != WG_PARSER_EVENT_NONE) + pthread_cond_wait(&stream->event_empty_cond, &parser->mutex); + if (stream->flushing) + { + pthread_mutex_unlock(&parser->mutex); + GST_DEBUG("Stream is flushing; discarding buffer."); gst_buffer_unref(buffer); - return ret; + return GST_FLOW_FLUSHING; + } + + if (!gst_buffer_map(buffer, &stream->map_info, GST_MAP_READ)) + { + pthread_mutex_unlock(&parser->mutex); + GST_ERROR("Failed to map buffer.\n"); + gst_buffer_unref(buffer); + return GST_FLOW_ERROR; + } + + stream->event = stream_event; + stream->buffer = buffer; + + pthread_mutex_unlock(&parser->mutex); + pthread_cond_signal(&stream->event_cond); + + /* The chain callback is given a reference to the buffer. Transfer that + * reference to the stream object, which will release it in + * wg_parser_stream_release_buffer(). */ + + GST_LOG("Buffer queued."); + return GST_FLOW_OK; }
static gboolean sink_query_cb(GstPad *pad, GstObject *parent, GstQuery *query)
Signed-off-by: Zebediah Figura zfigura@codeweavers.com --- dlls/winegstreamer/gst_private.h | 2 +- dlls/winegstreamer/main.c | 8 ++--- dlls/winegstreamer/media_source.c | 18 +++++----- dlls/winegstreamer/quartz_parser.c | 38 ++++++++++----------- dlls/winegstreamer/unixlib.h | 31 +++++------------ dlls/winegstreamer/wg_parser.c | 54 +++++++++++++----------------- dlls/winegstreamer/wm_reader.c | 38 ++++++++++----------- 7 files changed, 84 insertions(+), 105 deletions(-)
diff --git a/dlls/winegstreamer/gst_private.h b/dlls/winegstreamer/gst_private.h index 2f37d97378d..2cbba09f9a7 100644 --- a/dlls/winegstreamer/gst_private.h +++ b/dlls/winegstreamer/gst_private.h @@ -80,7 +80,7 @@ void wg_parser_stream_get_preferred_format(struct wg_parser_stream *stream, stru void wg_parser_stream_enable(struct wg_parser_stream *stream, const struct wg_format *format); void wg_parser_stream_disable(struct wg_parser_stream *stream);
-bool wg_parser_stream_get_event(struct wg_parser_stream *stream, struct wg_parser_event *event); +bool wg_parser_stream_get_buffer(struct wg_parser_stream *stream, struct wg_parser_buffer *buffer); bool wg_parser_stream_copy_buffer(struct wg_parser_stream *stream, void *data, uint32_t offset, uint32_t size); void wg_parser_stream_release_buffer(struct wg_parser_stream *stream); diff --git a/dlls/winegstreamer/main.c b/dlls/winegstreamer/main.c index e9e347b0c65..327d144499e 100644 --- a/dlls/winegstreamer/main.c +++ b/dlls/winegstreamer/main.c @@ -172,15 +172,15 @@ void wg_parser_stream_disable(struct wg_parser_stream *stream) __wine_unix_call(unix_handle, unix_wg_parser_stream_disable, stream); }
-bool wg_parser_stream_get_event(struct wg_parser_stream *stream, struct wg_parser_event *event) +bool wg_parser_stream_get_buffer(struct wg_parser_stream *stream, struct wg_parser_buffer *buffer) { - struct wg_parser_stream_get_event_params params = + struct wg_parser_stream_get_buffer_params params = { .stream = stream, - .event = event, + .buffer = buffer, };
- return !__wine_unix_call(unix_handle, unix_wg_parser_stream_get_event, ¶ms); + return !__wine_unix_call(unix_handle, unix_wg_parser_stream_get_buffer, ¶ms); }
bool wg_parser_stream_copy_buffer(struct wg_parser_stream *stream, diff --git a/dlls/winegstreamer/media_source.c b/dlls/winegstreamer/media_source.c index 92cb27dc5dc..c9f95a8a451 100644 --- a/dlls/winegstreamer/media_source.c +++ b/dlls/winegstreamer/media_source.c @@ -450,7 +450,7 @@ static void dispatch_end_of_presentation(struct media_source *source) IMFMediaEventQueue_QueueEventParamVar(source->event_queue, MEEndOfPresentation, &GUID_NULL, S_OK, &empty); }
-static void send_buffer(struct media_stream *stream, const struct wg_parser_event *event, IUnknown *token) +static void send_buffer(struct media_stream *stream, const struct wg_parser_buffer *wg_buffer, IUnknown *token) { IMFMediaBuffer *buffer; IMFSample *sample; @@ -463,7 +463,7 @@ static void send_buffer(struct media_stream *stream, const struct wg_parser_even return; }
- if (FAILED(hr = MFCreateMemoryBuffer(event->u.buffer.size, &buffer))) + if (FAILED(hr = MFCreateMemoryBuffer(wg_buffer->size, &buffer))) { ERR("Failed to create buffer, hr %#lx.\n", hr); IMFSample_Release(sample); @@ -476,7 +476,7 @@ static void send_buffer(struct media_stream *stream, const struct wg_parser_even goto out; }
- if (FAILED(hr = IMFMediaBuffer_SetCurrentLength(buffer, event->u.buffer.size))) + if (FAILED(hr = IMFMediaBuffer_SetCurrentLength(buffer, wg_buffer->size))) { ERR("Failed to set size, hr %#lx.\n", hr); goto out; @@ -488,7 +488,7 @@ static void send_buffer(struct media_stream *stream, const struct wg_parser_even goto out; }
- if (!wg_parser_stream_copy_buffer(stream->wg_stream, data, 0, event->u.buffer.size)) + if (!wg_parser_stream_copy_buffer(stream->wg_stream, data, 0, wg_buffer->size)) { wg_parser_stream_release_buffer(stream->wg_stream); IMFMediaBuffer_Unlock(buffer); @@ -502,13 +502,13 @@ static void send_buffer(struct media_stream *stream, const struct wg_parser_even goto out; }
- if (FAILED(hr = IMFSample_SetSampleTime(sample, event->u.buffer.pts))) + if (FAILED(hr = IMFSample_SetSampleTime(sample, wg_buffer->pts))) { ERR("Failed to set sample time, hr %#lx.\n", hr); goto out; }
- if (FAILED(hr = IMFSample_SetSampleDuration(sample, event->u.buffer.duration))) + if (FAILED(hr = IMFSample_SetSampleDuration(sample, wg_buffer->duration))) { ERR("Failed to set sample duration, hr %#lx.\n", hr); goto out; @@ -528,13 +528,13 @@ out: static void wait_on_sample(struct media_stream *stream, IUnknown *token) { PROPVARIANT empty_var = {.vt = VT_EMPTY}; - struct wg_parser_event event; + struct wg_parser_buffer buffer;
TRACE("%p, %p\n", stream, token);
- if (wg_parser_stream_get_event(stream->wg_stream, &event)) + if (wg_parser_stream_get_buffer(stream->wg_stream, &buffer)) { - send_buffer(stream, &event, token); + send_buffer(stream, &buffer, token); } else { diff --git a/dlls/winegstreamer/quartz_parser.c b/dlls/winegstreamer/quartz_parser.c index 9b0fa4acb93..d369d4c7f20 100644 --- a/dlls/winegstreamer/quartz_parser.c +++ b/dlls/winegstreamer/quartz_parser.c @@ -686,7 +686,7 @@ static uint64_t scale_uint64(uint64_t value, uint32_t numerator, uint32_t denomi
/* Fill and send a single IMediaSample. */ static HRESULT send_sample(struct parser_source *pin, IMediaSample *sample, - const struct wg_parser_event *event, uint32_t offset, uint32_t size, DWORD bytes_per_second) + const struct wg_parser_buffer *buffer, uint32_t offset, uint32_t size, DWORD bytes_per_second) { HRESULT hr; BYTE *ptr = NULL; @@ -707,21 +707,21 @@ static HRESULT send_sample(struct parser_source *pin, IMediaSample *sample, return S_OK; }
- if (event->u.buffer.has_pts) + if (buffer->has_pts) { - REFERENCE_TIME start_pts = event->u.buffer.pts; + REFERENCE_TIME start_pts = buffer->pts;
if (offset) start_pts += scale_uint64(offset, 10000000, bytes_per_second); start_pts -= pin->seek.llCurrent; start_pts *= pin->seek.dRate;
- if (event->u.buffer.has_duration) + if (buffer->has_duration) { - REFERENCE_TIME end_pts = event->u.buffer.pts + event->u.buffer.duration; + REFERENCE_TIME end_pts = buffer->pts + buffer->duration;
- if (offset + size < event->u.buffer.size) - end_pts = event->u.buffer.pts + scale_uint64(offset + size, 10000000, bytes_per_second); + if (offset + size < buffer->size) + end_pts = buffer->pts + scale_uint64(offset + size, 10000000, bytes_per_second); end_pts -= pin->seek.llCurrent; end_pts *= pin->seek.dRate;
@@ -740,9 +740,9 @@ static HRESULT send_sample(struct parser_source *pin, IMediaSample *sample, IMediaSample_SetMediaTime(sample, NULL, NULL); }
- IMediaSample_SetDiscontinuity(sample, !offset && event->u.buffer.discontinuity); - IMediaSample_SetPreroll(sample, event->u.buffer.preroll); - IMediaSample_SetSyncPoint(sample, !event->u.buffer.delta); + IMediaSample_SetDiscontinuity(sample, !offset && buffer->discontinuity); + IMediaSample_SetPreroll(sample, buffer->preroll); + IMediaSample_SetSyncPoint(sample, !buffer->delta);
if (!pin->pin.pin.peer) return VFW_E_NOT_CONNECTED; @@ -754,7 +754,7 @@ static HRESULT send_sample(struct parser_source *pin, IMediaSample *sample,
/* Send a single GStreamer buffer (splitting it into multiple IMediaSamples if * necessary). */ -static void send_buffer(struct parser_source *pin, const struct wg_parser_event *event) +static void send_buffer(struct parser_source *pin, const struct wg_parser_buffer *buffer) { HRESULT hr; IMediaSample *sample; @@ -774,7 +774,7 @@ static void send_buffer(struct parser_source *pin, const struct wg_parser_event WAVEFORMATEX *format = (WAVEFORMATEX *)pin->pin.pin.mt.pbFormat; uint32_t offset = 0;
- while (offset < event->u.buffer.size) + while (offset < buffer->size) { uint32_t advance;
@@ -784,9 +784,9 @@ static void send_buffer(struct parser_source *pin, const struct wg_parser_event break; }
- advance = min(IMediaSample_GetSize(sample), event->u.buffer.size - offset); + advance = min(IMediaSample_GetSize(sample), buffer->size - offset);
- hr = send_sample(pin, sample, event, offset, advance, format->nAvgBytesPerSec); + hr = send_sample(pin, sample, buffer, offset, advance, format->nAvgBytesPerSec);
IMediaSample_Release(sample);
@@ -804,7 +804,7 @@ static void send_buffer(struct parser_source *pin, const struct wg_parser_event } else { - hr = send_sample(pin, sample, event, 0, event->u.buffer.size, 0); + hr = send_sample(pin, sample, buffer, 0, buffer->size, 0);
IMediaSample_Release(sample); } @@ -822,7 +822,7 @@ static DWORD CALLBACK stream_thread(void *arg)
while (filter->streaming) { - struct wg_parser_event event; + struct wg_parser_buffer buffer;
EnterCriticalSection(&pin->flushing_cs);
@@ -833,9 +833,9 @@ static DWORD CALLBACK stream_thread(void *arg) continue; }
- if (wg_parser_stream_get_event(pin->wg_stream, &event)) + if (wg_parser_stream_get_buffer(pin->wg_stream, &buffer)) { - send_buffer(pin, &event); + send_buffer(pin, &buffer); } else { @@ -844,8 +844,6 @@ static DWORD CALLBACK stream_thread(void *arg) pin->eos = true; }
- TRACE("Got event of type %#x.\n", event.type); - LeaveCriticalSection(&pin->flushing_cs); }
diff --git a/dlls/winegstreamer/unixlib.h b/dlls/winegstreamer/unixlib.h index 9d75e94cb6a..3dfa30b4889 100644 --- a/dlls/winegstreamer/unixlib.h +++ b/dlls/winegstreamer/unixlib.h @@ -103,27 +103,14 @@ struct wg_format } u; };
-enum wg_parser_event_type +struct wg_parser_buffer { - WG_PARSER_EVENT_NONE = 0, - WG_PARSER_EVENT_BUFFER, -}; - -struct wg_parser_event -{ - enum wg_parser_event_type type; - union - { - struct - { - /* pts and duration are in 100-nanosecond units. */ - ULONGLONG pts, duration; - uint32_t size; - bool discontinuity, preroll, delta, has_pts, has_duration; - } buffer; - } u; + /* pts and duration are in 100-nanosecond units. */ + UINT64 pts, duration; + UINT32 size; + bool discontinuity, preroll, delta, has_pts, has_duration; }; -C_ASSERT(sizeof(struct wg_parser_event) == 40); +C_ASSERT(sizeof(struct wg_parser_buffer) == 32);
enum wg_parser_type { @@ -185,10 +172,10 @@ struct wg_parser_stream_enable_params const struct wg_format *format; };
-struct wg_parser_stream_get_event_params +struct wg_parser_stream_get_buffer_params { struct wg_parser_stream *stream; - struct wg_parser_event *event; + struct wg_parser_buffer *buffer; };
struct wg_parser_stream_copy_buffer_params @@ -247,7 +234,7 @@ enum unix_funcs unix_wg_parser_stream_enable, unix_wg_parser_stream_disable,
- unix_wg_parser_stream_get_event, + unix_wg_parser_stream_get_buffer, unix_wg_parser_stream_copy_buffer, unix_wg_parser_stream_release_buffer, unix_wg_parser_stream_notify_qos, diff --git a/dlls/winegstreamer/wg_parser.c b/dlls/winegstreamer/wg_parser.c index e114a19a26b..f36feead9c6 100644 --- a/dlls/winegstreamer/wg_parser.c +++ b/dlls/winegstreamer/wg_parser.c @@ -102,7 +102,6 @@ struct wg_parser_stream struct wg_format preferred_format, current_format;
pthread_cond_t event_cond, event_empty_cond; - struct wg_parser_event event; GstBuffer *buffer; GstMapInfo map_info;
@@ -251,22 +250,37 @@ static NTSTATUS wg_parser_stream_disable(void *args) return S_OK; }
-static NTSTATUS wg_parser_stream_get_event(void *args) +static NTSTATUS wg_parser_stream_get_buffer(void *args) { - const struct wg_parser_stream_get_event_params *params = args; + const struct wg_parser_stream_get_buffer_params *params = args; + struct wg_parser_buffer *wg_buffer = params->buffer; struct wg_parser_stream *stream = params->stream; struct wg_parser *parser = stream->parser; + GstBuffer *buffer;
pthread_mutex_lock(&parser->mutex);
- while (!stream->eos && stream->event.type == WG_PARSER_EVENT_NONE) + while (!stream->eos && !stream->buffer) pthread_cond_wait(&stream->event_cond, &parser->mutex);
/* Note that we can both have a buffer and stream->eos, in which case we * must return the buffer. */ - if (stream->event.type != WG_PARSER_EVENT_NONE) + if ((buffer = stream->buffer)) { - *params->event = stream->event; + /* FIXME: Should we use gst_segment_to_stream_time_full()? Under what + * circumstances is the stream time not equal to the buffer PTS? Note + * that this will need modification to wg_parser_stream_notify_qos() as + * well. */ + + if ((wg_buffer->has_pts = GST_BUFFER_PTS_IS_VALID(buffer))) + wg_buffer->pts = GST_BUFFER_PTS(buffer) / 100; + if ((wg_buffer->has_duration = GST_BUFFER_DURATION_IS_VALID(buffer))) + wg_buffer->duration = GST_BUFFER_DURATION(buffer) / 100; + wg_buffer->discontinuity = GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_DISCONT); + wg_buffer->preroll = GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_LIVE); + wg_buffer->delta = GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_DELTA_UNIT); + wg_buffer->size = gst_buffer_get_size(buffer); + pthread_mutex_unlock(&parser->mutex); return S_OK; } @@ -291,7 +305,6 @@ static NTSTATUS wg_parser_stream_copy_buffer(void *args) return VFW_E_WRONG_STATE; }
- assert(stream->event.type == WG_PARSER_EVENT_BUFFER); assert(offset < stream->map_info.size); assert(offset + size <= stream->map_info.size); memcpy(params->data, stream->map_info.data + offset, size); @@ -307,12 +320,11 @@ static NTSTATUS wg_parser_stream_release_buffer(void *args)
pthread_mutex_lock(&parser->mutex);
- assert(stream->event.type == WG_PARSER_EVENT_BUFFER); + assert(stream->buffer);
gst_buffer_unmap(stream->buffer, &stream->map_info); gst_buffer_unref(stream->buffer); stream->buffer = NULL; - stream->event.type = WG_PARSER_EVENT_NONE;
pthread_mutex_unlock(&parser->mutex); pthread_cond_signal(&stream->event_empty_cond); @@ -459,13 +471,12 @@ static gboolean sink_event_cb(GstPad *pad, GstObject *parent, GstEvent *event) stream->flushing = true; pthread_cond_signal(&stream->event_empty_cond);
- if (stream->event.type == WG_PARSER_EVENT_BUFFER) + if (stream->buffer) { gst_buffer_unmap(stream->buffer, &stream->map_info); gst_buffer_unref(stream->buffer); stream->buffer = NULL; } - stream->event.type = WG_PARSER_EVENT_NONE;
pthread_mutex_unlock(&parser->mutex); } @@ -514,7 +525,6 @@ static GstFlowReturn sink_chain_cb(GstPad *pad, GstObject *parent, GstBuffer *bu { struct wg_parser_stream *stream = gst_pad_get_element_private(pad); struct wg_parser *parser = stream->parser; - struct wg_parser_event stream_event;
GST_LOG("stream %p, buffer %p.", stream, buffer);
@@ -524,27 +534,12 @@ static GstFlowReturn sink_chain_cb(GstPad *pad, GstObject *parent, GstBuffer *bu return GST_FLOW_OK; }
- stream_event.type = WG_PARSER_EVENT_BUFFER; - - /* FIXME: Should we use gst_segment_to_stream_time_full()? Under what - * circumstances is the stream time not equal to the buffer PTS? Note that - * this will need modification to wg_parser_stream_notify_qos() as well. */ - - if ((stream_event.u.buffer.has_pts = GST_BUFFER_PTS_IS_VALID(buffer))) - stream_event.u.buffer.pts = GST_BUFFER_PTS(buffer) / 100; - if ((stream_event.u.buffer.has_duration = GST_BUFFER_DURATION_IS_VALID(buffer))) - stream_event.u.buffer.duration = GST_BUFFER_DURATION(buffer) / 100; - stream_event.u.buffer.discontinuity = GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_DISCONT); - stream_event.u.buffer.preroll = GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_LIVE); - stream_event.u.buffer.delta = GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_DELTA_UNIT); - stream_event.u.buffer.size = gst_buffer_get_size(buffer); - /* Allow this buffer to be flushed by GStreamer. We are effectively * implementing a queue object here. */
pthread_mutex_lock(&parser->mutex);
- while (!stream->flushing && stream->event.type != WG_PARSER_EVENT_NONE) + while (!stream->flushing && stream->buffer) pthread_cond_wait(&stream->event_empty_cond, &parser->mutex); if (stream->flushing) { @@ -562,7 +557,6 @@ static GstFlowReturn sink_chain_cb(GstPad *pad, GstObject *parent, GstBuffer *bu return GST_FLOW_ERROR; }
- stream->event = stream_event; stream->buffer = buffer;
pthread_mutex_unlock(&parser->mutex); @@ -1591,7 +1585,7 @@ const unixlib_entry_t __wine_unix_call_funcs[] = X(wg_parser_stream_enable), X(wg_parser_stream_disable),
- X(wg_parser_stream_get_event), + X(wg_parser_stream_get_buffer), X(wg_parser_stream_copy_buffer), X(wg_parser_stream_release_buffer), X(wg_parser_stream_notify_qos), diff --git a/dlls/winegstreamer/wm_reader.c b/dlls/winegstreamer/wm_reader.c index 96de6f2a3ba..bfed0d1e0ca 100644 --- a/dlls/winegstreamer/wm_reader.c +++ b/dlls/winegstreamer/wm_reader.c @@ -1825,7 +1825,7 @@ HRESULT wm_reader_get_stream_sample(struct wm_stream *stream, { IWMReaderCallbackAdvanced *callback_advanced = stream->reader->callback_advanced; struct wg_parser_stream *wg_stream = stream->wg_stream; - struct wg_parser_event event; + struct wg_parser_buffer wg_buffer; DWORD size, capacity; INSSBuffer *sample; HRESULT hr; @@ -1839,7 +1839,7 @@ HRESULT wm_reader_get_stream_sample(struct wm_stream *stream,
for (;;) { - if (!wg_parser_stream_get_event(wg_stream, &event)) + if (!wg_parser_stream_get_buffer(wg_stream, &wg_buffer)) { stream->eos = true; TRACE("End of stream.\n"); @@ -1851,9 +1851,9 @@ HRESULT wm_reader_get_stream_sample(struct wm_stream *stream, if (callback_advanced && stream->read_compressed && stream->allocate_stream) { if (FAILED(hr = IWMReaderCallbackAdvanced_AllocateForStream(callback_advanced, - stream->index + 1, event.u.buffer.size, &sample, NULL))) + stream->index + 1, wg_buffer.size, &sample, NULL))) { - ERR("Failed to allocate stream sample of %u bytes, hr %#lx.\n", event.u.buffer.size, hr); + ERR("Failed to allocate stream sample of %u bytes, hr %#lx.\n", wg_buffer.size, hr); wg_parser_stream_release_buffer(wg_stream); return hr; } @@ -1861,9 +1861,9 @@ HRESULT wm_reader_get_stream_sample(struct wm_stream *stream, else if (callback_advanced && !stream->read_compressed && stream->allocate_output) { if (FAILED(hr = IWMReaderCallbackAdvanced_AllocateForOutput(callback_advanced, - stream->index, event.u.buffer.size, &sample, NULL))) + stream->index, wg_buffer.size, &sample, NULL))) { - ERR("Failed to allocate output sample of %u bytes, hr %#lx.\n", event.u.buffer.size, hr); + ERR("Failed to allocate output sample of %u bytes, hr %#lx.\n", wg_buffer.size, hr); wg_parser_stream_release_buffer(wg_stream); return hr; } @@ -1873,7 +1873,7 @@ HRESULT wm_reader_get_stream_sample(struct wm_stream *stream, struct buffer *object;
/* FIXME: Should these be pooled? */ - if (!(object = calloc(1, offsetof(struct buffer, data[event.u.buffer.size])))) + if (!(object = calloc(1, offsetof(struct buffer, data[wg_buffer.size])))) { wg_parser_stream_release_buffer(wg_stream); return E_OUTOFMEMORY; @@ -1881,7 +1881,7 @@ HRESULT wm_reader_get_stream_sample(struct wm_stream *stream,
object->INSSBuffer_iface.lpVtbl = &buffer_vtbl; object->refcount = 1; - object->capacity = event.u.buffer.size; + object->capacity = wg_buffer.size;
TRACE("Created buffer %p.\n", object); sample = &object->INSSBuffer_iface; @@ -1891,32 +1891,32 @@ HRESULT wm_reader_get_stream_sample(struct wm_stream *stream, ERR("Failed to get data pointer, hr %#lx.\n", hr); if (FAILED(hr = INSSBuffer_GetMaxLength(sample, &capacity))) ERR("Failed to get capacity, hr %#lx.\n", hr); - if (event.u.buffer.size > capacity) - ERR("Returned capacity %lu is less than requested capacity %u.\n", capacity, event.u.buffer.size); + if (wg_buffer.size > capacity) + ERR("Returned capacity %lu is less than requested capacity %u.\n", capacity, wg_buffer.size);
- if (!wg_parser_stream_copy_buffer(wg_stream, data, 0, event.u.buffer.size)) + if (!wg_parser_stream_copy_buffer(wg_stream, data, 0, wg_buffer.size)) { /* The GStreamer pin has been flushed. */ INSSBuffer_Release(sample); continue; }
- if (FAILED(hr = INSSBuffer_SetLength(sample, event.u.buffer.size))) - ERR("Failed to set size %u, hr %#lx.\n", event.u.buffer.size, hr); + if (FAILED(hr = INSSBuffer_SetLength(sample, wg_buffer.size))) + ERR("Failed to set size %u, hr %#lx.\n", wg_buffer.size, hr);
wg_parser_stream_release_buffer(wg_stream);
- if (!event.u.buffer.has_pts) + if (!wg_buffer.has_pts) FIXME("Missing PTS.\n"); - if (!event.u.buffer.has_duration) + if (!wg_buffer.has_duration) FIXME("Missing duration.\n");
- *pts = event.u.buffer.pts; - *duration = event.u.buffer.duration; + *pts = wg_buffer.pts; + *duration = wg_buffer.duration; *flags = 0; - if (event.u.buffer.discontinuity) + if (wg_buffer.discontinuity) *flags |= WM_SF_DISCONTINUITY; - if (!event.u.buffer.delta) + if (!wg_buffer.delta) *flags |= WM_SF_CLEANPOINT;
*ret_sample = sample;