From: Rémi Bernon rbernon@codeweavers.com
--- dlls/winegstreamer/wm_reader.c | 46 ++++++++++++++++++---------------- 1 file changed, 25 insertions(+), 21 deletions(-)
diff --git a/dlls/winegstreamer/wm_reader.c b/dlls/winegstreamer/wm_reader.c index e68e0a7d0ca..9d683d6dfc4 100644 --- a/dlls/winegstreamer/wm_reader.c +++ b/dlls/winegstreamer/wm_reader.c @@ -1570,6 +1570,30 @@ static const char *get_major_type_string(enum wg_major_type type) return NULL; }
+static HRESULT wm_stream_allocate_sample(struct wm_stream *stream, DWORD size, INSSBuffer **sample) +{ + struct buffer *buffer; + + if (!stream->read_compressed && stream->output_allocator) + return IWMReaderAllocatorEx_AllocateForOutputEx(stream->output_allocator, stream->index, + size, sample, 0, 0, 0, NULL); + + if (stream->read_compressed && stream->stream_allocator) + return IWMReaderAllocatorEx_AllocateForStreamEx(stream->stream_allocator, stream->index + 1, + size, sample, 0, 0, 0, NULL); + + /* FIXME: Should these be pooled? */ + if (!(buffer = calloc(1, offsetof(struct buffer, data[size])))) + return E_OUTOFMEMORY; + buffer->INSSBuffer_iface.lpVtbl = &buffer_vtbl; + buffer->refcount = 1; + buffer->capacity = size; + + TRACE("Created buffer %p.\n", buffer); + *sample = &buffer->INSSBuffer_iface; + return S_OK; +} + /* Find the earliest buffer by PTS. * * Native seems to behave similarly to this with the async reader, although our @@ -1619,7 +1643,6 @@ static HRESULT wm_reader_get_stream_sample(struct wm_reader *reader, WORD stream struct wg_parser_stream *wg_stream; struct wg_parser_buffer wg_buffer; struct wm_stream *stream; - struct buffer *object; DWORD size, capacity; INSSBuffer *sample; HRESULT hr = S_OK; @@ -1666,26 +1689,7 @@ static HRESULT wm_reader_get_stream_sample(struct wm_reader *reader, WORD stream
TRACE("Got buffer for '%s' stream %p.\n", get_major_type_string(stream->format.major_type), stream);
- if (!stream->read_compressed && stream->output_allocator) - hr = IWMReaderAllocatorEx_AllocateForOutputEx(stream->output_allocator, stream->index, - wg_buffer.size, &sample, 0, 0, 0, NULL); - else if (stream->read_compressed && stream->stream_allocator) - hr = IWMReaderAllocatorEx_AllocateForStreamEx(stream->stream_allocator, stream->index + 1, - wg_buffer.size, &sample, 0, 0, 0, NULL); - /* FIXME: Should these be pooled? */ - else if (!(object = calloc(1, offsetof(struct buffer, data[wg_buffer.size])))) - hr = E_OUTOFMEMORY; - else - { - object->INSSBuffer_iface.lpVtbl = &buffer_vtbl; - object->refcount = 1; - object->capacity = wg_buffer.size; - - TRACE("Created buffer %p.\n", object); - sample = &object->INSSBuffer_iface; - } - - if (FAILED(hr)) + if (FAILED(hr = wm_stream_allocate_sample(stream, wg_buffer.size, &sample))) { ERR("Failed to allocate sample of %u bytes, hr %#lx.\n", wg_buffer.size, hr); wg_parser_stream_release_buffer(wg_stream);
From: Rémi Bernon rbernon@codeweavers.com
--- dlls/winegstreamer/wm_reader.c | 110 +++++++++++++++++---------------- 1 file changed, 58 insertions(+), 52 deletions(-)
diff --git a/dlls/winegstreamer/wm_reader.c b/dlls/winegstreamer/wm_reader.c index 9d683d6dfc4..003ccd87d4e 100644 --- a/dlls/winegstreamer/wm_reader.c +++ b/dlls/winegstreamer/wm_reader.c @@ -1594,6 +1594,58 @@ static HRESULT wm_stream_allocate_sample(struct wm_stream *stream, DWORD size, I return S_OK; }
+static HRESULT wm_reader_read_stream_sample(struct wm_reader *reader, struct wm_stream *stream, + struct wg_parser_buffer *buffer, INSSBuffer **sample, QWORD *pts, QWORD *duration, DWORD *flags) +{ + DWORD size, capacity; + HRESULT hr; + BYTE *data; + + TRACE("Got buffer for '%s' stream %p.\n", get_major_type_string(stream->format.major_type), stream); + + if (FAILED(hr = wm_stream_allocate_sample(stream, buffer->size, sample))) + { + ERR("Failed to allocate sample of %u bytes, hr %#lx.\n", buffer->size, hr); + wg_parser_stream_release_buffer(stream->wg_stream); + return hr; + } + + if (FAILED(hr = INSSBuffer_GetBufferAndLength(*sample, &data, &size))) + ERR("Failed to get data pointer, hr %#lx.\n", hr); + if (FAILED(hr = INSSBuffer_GetMaxLength(*sample, &capacity))) + ERR("Failed to get capacity, hr %#lx.\n", hr); + if (buffer->size > capacity) + ERR("Returned capacity %lu is less than requested capacity %u.\n", capacity, buffer->size); + + if (!wg_parser_stream_copy_buffer(stream->wg_stream, data, 0, buffer->size)) + { + /* The GStreamer pin has been flushed. */ + INSSBuffer_Release(*sample); + *sample = NULL; + return S_FALSE; + } + + if (FAILED(hr = INSSBuffer_SetLength(*sample, buffer->size))) + ERR("Failed to set size %u, hr %#lx.\n", buffer->size, hr); + + wg_parser_stream_release_buffer(stream->wg_stream); + + if (!buffer->has_pts) + FIXME("Missing PTS.\n"); + if (!buffer->has_duration) + FIXME("Missing duration.\n"); + + *pts = buffer->pts; + *duration = buffer->duration; + *flags = 0; + if (buffer->discontinuity) + *flags |= WM_SF_DISCONTINUITY; + if (!buffer->delta) + *flags |= WM_SF_CLEANPOINT; + + return S_OK; +} + /* Find the earliest buffer by PTS. * * Native seems to behave similarly to this with the async reader, although our @@ -1640,15 +1692,11 @@ static WORD get_earliest_buffer(struct wm_reader *reader, struct wg_parser_buffe static HRESULT wm_reader_get_stream_sample(struct wm_reader *reader, WORD stream_number, INSSBuffer **ret_sample, QWORD *pts, QWORD *duration, DWORD *flags, WORD *ret_stream_number) { - struct wg_parser_stream *wg_stream; struct wg_parser_buffer wg_buffer; struct wm_stream *stream; - DWORD size, capacity; - INSSBuffer *sample; HRESULT hr = S_OK; - BYTE *data;
- for (;;) + do { if (!stream_number) { @@ -1659,7 +1707,6 @@ static HRESULT wm_reader_get_stream_sample(struct wm_reader *reader, WORD stream }
stream = wm_reader_get_stream_by_stream_number(reader, stream_number); - wg_stream = stream->wg_stream; } else { @@ -1668,7 +1715,6 @@ static HRESULT wm_reader_get_stream_sample(struct wm_reader *reader, WORD stream WARN("Invalid stream number %u; returning E_INVALIDARG.\n", stream_number); return E_INVALIDARG; } - wg_stream = stream->wg_stream;
if (stream->selection == WMT_OFF) { @@ -1679,7 +1725,7 @@ static HRESULT wm_reader_get_stream_sample(struct wm_reader *reader, WORD stream if (stream->eos) return NS_E_NO_MORE_SAMPLES;
- if (!wg_parser_stream_get_buffer(wg_stream, &wg_buffer)) + if (!wg_parser_stream_get_buffer(stream->wg_stream, &wg_buffer)) { stream->eos = true; TRACE("End of stream.\n"); @@ -1687,51 +1733,11 @@ static HRESULT wm_reader_get_stream_sample(struct wm_reader *reader, WORD stream } }
- TRACE("Got buffer for '%s' stream %p.\n", get_major_type_string(stream->format.major_type), stream); - - if (FAILED(hr = wm_stream_allocate_sample(stream, wg_buffer.size, &sample))) - { - ERR("Failed to allocate sample of %u bytes, hr %#lx.\n", wg_buffer.size, hr); - wg_parser_stream_release_buffer(wg_stream); - return hr; - } - - if (FAILED(hr = INSSBuffer_GetBufferAndLength(sample, &data, &size))) - ERR("Failed to get data pointer, hr %#lx.\n", hr); - if (FAILED(hr = INSSBuffer_GetMaxLength(sample, &capacity))) - ERR("Failed to get capacity, hr %#lx.\n", hr); - if (wg_buffer.size > capacity) - ERR("Returned capacity %lu is less than requested capacity %u.\n", capacity, wg_buffer.size); - - if (!wg_parser_stream_copy_buffer(wg_stream, data, 0, wg_buffer.size)) - { - /* The GStreamer pin has been flushed. */ - INSSBuffer_Release(sample); - continue; - } - - if (FAILED(hr = INSSBuffer_SetLength(sample, wg_buffer.size))) - ERR("Failed to set size %u, hr %#lx.\n", wg_buffer.size, hr); - - wg_parser_stream_release_buffer(wg_stream); - - if (!wg_buffer.has_pts) - FIXME("Missing PTS.\n"); - if (!wg_buffer.has_duration) - FIXME("Missing duration.\n"); + if (SUCCEEDED(hr = wm_reader_read_stream_sample(reader, stream, &wg_buffer, ret_sample, pts, duration, flags))) + *ret_stream_number = stream_number; + } while (hr == S_FALSE);
- *pts = wg_buffer.pts; - *duration = wg_buffer.duration; - *flags = 0; - if (wg_buffer.discontinuity) - *flags |= WM_SF_DISCONTINUITY; - if (!wg_buffer.delta) - *flags |= WM_SF_CLEANPOINT; - - *ret_sample = sample; - *ret_stream_number = stream_number; - return S_OK; - } + return hr; }
static struct wm_reader *impl_from_IUnknown(IUnknown *iface)
From: Rémi Bernon rbernon@codeweavers.com
For the WM reader, returning the earliest buffer. --- dlls/winegstreamer/gst_private.h | 3 +- dlls/winegstreamer/main.c | 6 +- dlls/winegstreamer/media_source.c | 3 +- dlls/winegstreamer/quartz_parser.c | 2 +- dlls/winegstreamer/unixlib.h | 2 + dlls/winegstreamer/wg_parser.c | 92 +++++++++++++++++++++++------- dlls/winegstreamer/wm_reader.c | 61 ++++---------------- 7 files changed, 91 insertions(+), 78 deletions(-)
diff --git a/dlls/winegstreamer/gst_private.h b/dlls/winegstreamer/gst_private.h index fed5766d3a4..a5f23742234 100644 --- a/dlls/winegstreamer/gst_private.h +++ b/dlls/winegstreamer/gst_private.h @@ -86,7 +86,8 @@ void wg_parser_stream_get_preferred_format(struct wg_parser_stream *stream, stru void wg_parser_stream_enable(struct wg_parser_stream *stream, const struct wg_format *format); void wg_parser_stream_disable(struct wg_parser_stream *stream);
-bool wg_parser_stream_get_buffer(struct wg_parser_stream *stream, struct wg_parser_buffer *buffer); +bool wg_parser_stream_get_buffer(struct wg_parser *parser, struct wg_parser_stream *stream, + struct wg_parser_buffer *buffer); bool wg_parser_stream_copy_buffer(struct wg_parser_stream *stream, void *data, uint32_t offset, uint32_t size); void wg_parser_stream_release_buffer(struct wg_parser_stream *stream); diff --git a/dlls/winegstreamer/main.c b/dlls/winegstreamer/main.c index 40507a4b929..6cc9b80ef8f 100644 --- a/dlls/winegstreamer/main.c +++ b/dlls/winegstreamer/main.c @@ -201,15 +201,17 @@ void wg_parser_stream_disable(struct wg_parser_stream *stream) __wine_unix_call(unix_handle, unix_wg_parser_stream_disable, stream); }
-bool wg_parser_stream_get_buffer(struct wg_parser_stream *stream, struct wg_parser_buffer *buffer) +bool wg_parser_stream_get_buffer(struct wg_parser *parser, struct wg_parser_stream *stream, + struct wg_parser_buffer *buffer) { struct wg_parser_stream_get_buffer_params params = { + .parser = parser, .stream = stream, .buffer = buffer, };
- TRACE("stream %p, buffer %p.\n", stream, buffer); + TRACE("parser %p, stream %p, buffer %p.\n", parser, stream, buffer);
return !__wine_unix_call(unix_handle, unix_wg_parser_stream_get_buffer, ¶ms); } diff --git a/dlls/winegstreamer/media_source.c b/dlls/winegstreamer/media_source.c index 43beb71838a..9bb7a441a8f 100644 --- a/dlls/winegstreamer/media_source.c +++ b/dlls/winegstreamer/media_source.c @@ -528,12 +528,13 @@ out:
static void wait_on_sample(struct media_stream *stream, IUnknown *token) { + struct media_source *source = stream->parent_source; PROPVARIANT empty_var = {.vt = VT_EMPTY}; struct wg_parser_buffer buffer;
TRACE("%p, %p\n", stream, token);
- if (wg_parser_stream_get_buffer(stream->wg_stream, &buffer)) + if (wg_parser_stream_get_buffer(source->wg_parser, stream->wg_stream, &buffer)) { send_buffer(stream, &buffer, token); } diff --git a/dlls/winegstreamer/quartz_parser.c b/dlls/winegstreamer/quartz_parser.c index 55e5988e880..6166c173c27 100644 --- a/dlls/winegstreamer/quartz_parser.c +++ b/dlls/winegstreamer/quartz_parser.c @@ -936,7 +936,7 @@ static DWORD CALLBACK stream_thread(void *arg) continue; }
- if (wg_parser_stream_get_buffer(pin->wg_stream, &buffer)) + if (wg_parser_stream_get_buffer(filter->wg_parser, pin->wg_stream, &buffer)) { send_buffer(pin, &buffer); } diff --git a/dlls/winegstreamer/unixlib.h b/dlls/winegstreamer/unixlib.h index 8e6c5e4cc0e..d21c45efae9 100644 --- a/dlls/winegstreamer/unixlib.h +++ b/dlls/winegstreamer/unixlib.h @@ -147,6 +147,7 @@ struct wg_parser_buffer /* pts and duration are in 100-nanosecond units. */ UINT64 pts, duration; UINT32 size; + UINT32 stream; bool discontinuity, preroll, delta, has_pts, has_duration; }; C_ASSERT(sizeof(struct wg_parser_buffer) == 32); @@ -215,6 +216,7 @@ struct wg_parser_stream_enable_params
struct wg_parser_stream_get_buffer_params { + struct wg_parser *parser; struct wg_parser_stream *stream; struct wg_parser_buffer *buffer; }; diff --git a/dlls/winegstreamer/wg_parser.c b/dlls/winegstreamer/wg_parser.c index 0573c99858b..d1230c89f2e 100644 --- a/dlls/winegstreamer/wg_parser.c +++ b/dlls/winegstreamer/wg_parser.c @@ -96,6 +96,7 @@ struct wg_parser struct wg_parser_stream { struct wg_parser *parser; + uint32_t number;
GstPad *their_src, *post_sink, *post_src, *my_sink; GstElement *flip; @@ -260,43 +261,89 @@ static NTSTATUS wg_parser_stream_disable(void *args) return S_OK; }
+static GstBuffer *wait_parser_stream_buffer(struct wg_parser *parser, struct wg_parser_stream *stream) +{ + GstBuffer *buffer = NULL; + + /* Note that we can both have a buffer and stream->eos, in which case we + * must return the buffer. */ + + while (!(buffer = stream->buffer) && !stream->eos) + pthread_cond_wait(&stream->event_cond, &parser->mutex); + + return buffer; +} + static NTSTATUS wg_parser_stream_get_buffer(void *args) { const struct wg_parser_stream_get_buffer_params *params = args; struct wg_parser_buffer *wg_buffer = params->buffer; struct wg_parser_stream *stream = params->stream; - struct wg_parser *parser = stream->parser; + struct wg_parser *parser = params->parser; GstBuffer *buffer; + unsigned int i;
pthread_mutex_lock(&parser->mutex);
- while (!stream->eos && !stream->buffer) - pthread_cond_wait(&stream->event_cond, &parser->mutex); - - /* Note that we can both have a buffer and stream->eos, in which case we - * must return the buffer. */ - if ((buffer = stream->buffer)) + if (stream) + buffer = wait_parser_stream_buffer(parser, stream); + else { - /* FIXME: Should we use gst_segment_to_stream_time_full()? Under what - * circumstances is the stream time not equal to the buffer PTS? Note - * that this will need modification to wg_parser_stream_notify_qos() as - * well. */ - - if ((wg_buffer->has_pts = GST_BUFFER_PTS_IS_VALID(buffer))) - wg_buffer->pts = GST_BUFFER_PTS(buffer) / 100; - if ((wg_buffer->has_duration = GST_BUFFER_DURATION_IS_VALID(buffer))) - wg_buffer->duration = GST_BUFFER_DURATION(buffer) / 100; - wg_buffer->discontinuity = GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_DISCONT); - wg_buffer->preroll = GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_LIVE); - wg_buffer->delta = GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_DELTA_UNIT); - wg_buffer->size = gst_buffer_get_size(buffer); + /* Find the earliest buffer by PTS. + * + * Native seems to behave similarly to this with the wm async reader, although our + * unit tests show that it's not entirely consistent—some frames are received + * slightly out of order. It's possible that one stream is being manually offset + * to account for decoding latency. + * + * The behaviour with the wm sync reader, when stream 0 is requested, seems + * consistent with this hypothesis, but with a much larger offset—the video + * stream seems to be "behind" by about 150 ms. + * + * The main reason for doing this is that the video and audio stream probably + * don't have quite the same "frame rate", and we don't want to force one stream + * to decode faster just to keep up with the other. Delivering samples in PTS + * order should avoid that problem. */ + GstBuffer *earliest = NULL;
+ for (i = 0; i < parser->stream_count; ++i) + { + if (!parser->streams[i]->enabled || !(buffer = wait_parser_stream_buffer(parser, parser->streams[i]))) + continue; + /* invalid PTS is GST_CLOCK_TIME_NONE == (guint64)-1, so this will prefer valid timestamps. */ + if (!earliest || GST_BUFFER_PTS(buffer) < GST_BUFFER_PTS(earliest)) + { + stream = parser->streams[i]; + earliest = buffer; + } + } + + buffer = earliest; + } + + if (!buffer) + { pthread_mutex_unlock(&parser->mutex); - return S_OK; + return S_FALSE; }
+ /* FIXME: Should we use gst_segment_to_stream_time_full()? Under what + * circumstances is the stream time not equal to the buffer PTS? Note + * that this will need modification to wg_parser_stream_notify_qos() as + * well. */ + + if ((wg_buffer->has_pts = GST_BUFFER_PTS_IS_VALID(buffer))) + wg_buffer->pts = GST_BUFFER_PTS(buffer) / 100; + if ((wg_buffer->has_duration = GST_BUFFER_DURATION_IS_VALID(buffer))) + wg_buffer->duration = GST_BUFFER_DURATION(buffer) / 100; + wg_buffer->discontinuity = GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_DISCONT); + wg_buffer->preroll = GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_LIVE); + wg_buffer->delta = GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_DELTA_UNIT); + wg_buffer->size = gst_buffer_get_size(buffer); + wg_buffer->stream = stream->number; + pthread_mutex_unlock(&parser->mutex); - return S_FALSE; + return S_OK; }
static NTSTATUS wg_parser_stream_copy_buffer(void *args) @@ -691,6 +738,7 @@ static struct wg_parser_stream *create_stream(struct wg_parser *parser) gst_segment_init(&stream->segment, GST_FORMAT_UNDEFINED);
stream->parser = parser; + stream->number = parser->stream_count; stream->current_format.major_type = WG_MAJOR_TYPE_UNKNOWN; pthread_cond_init(&stream->event_cond, NULL); pthread_cond_init(&stream->event_empty_cond, NULL); diff --git a/dlls/winegstreamer/wm_reader.c b/dlls/winegstreamer/wm_reader.c index 003ccd87d4e..1395e5193ef 100644 --- a/dlls/winegstreamer/wm_reader.c +++ b/dlls/winegstreamer/wm_reader.c @@ -1594,13 +1594,17 @@ static HRESULT wm_stream_allocate_sample(struct wm_stream *stream, DWORD size, I return S_OK; }
-static HRESULT wm_reader_read_stream_sample(struct wm_reader *reader, struct wm_stream *stream, - struct wg_parser_buffer *buffer, INSSBuffer **sample, QWORD *pts, QWORD *duration, DWORD *flags) +static HRESULT wm_reader_read_stream_sample(struct wm_reader *reader, struct wg_parser_buffer *buffer, + INSSBuffer **sample, QWORD *pts, QWORD *duration, DWORD *flags) { + struct wm_stream *stream; DWORD size, capacity; HRESULT hr; BYTE *data;
+ if (!(stream = wm_reader_get_stream_by_stream_number(reader, buffer->stream + 1))) + return E_INVALIDARG; + TRACE("Got buffer for '%s' stream %p.\n", get_major_type_string(stream->format.major_type), stream);
if (FAILED(hr = wm_stream_allocate_sample(stream, buffer->size, sample))) @@ -1646,49 +1650,6 @@ static HRESULT wm_reader_read_stream_sample(struct wm_reader *reader, struct wm_ return S_OK; }
-/* Find the earliest buffer by PTS. - * - * Native seems to behave similarly to this with the async reader, although our - * unit tests show that it's not entirely consistent—some frames are received - * slightly out of order. It's possible that one stream is being manually offset - * to account for decoding latency. - * - * The behaviour with the synchronous reader, when stream 0 is requested, seems - * consistent with this hypothesis, but with a much larger offset—the video - * stream seems to be "behind" by about 150 ms. - * - * The main reason for doing this is that the video and audio stream probably - * don't have quite the same "frame rate", and we don't want to force one stream - * to decode faster just to keep up with the other. Delivering samples in PTS - * order should avoid that problem. */ -static WORD get_earliest_buffer(struct wm_reader *reader, struct wg_parser_buffer *ret_buffer) -{ - struct wg_parser_buffer buffer; - QWORD earliest_pts = UI64_MAX; - WORD stream_number = 0; - WORD i; - - for (i = 0; i < reader->stream_count; ++i) - { - struct wm_stream *stream = &reader->streams[i]; - - if (stream->selection == WMT_OFF) - continue; - - if (!wg_parser_stream_get_buffer(stream->wg_stream, &buffer)) - continue; - - if (buffer.has_pts && buffer.pts < earliest_pts) - { - stream_number = i + 1; - earliest_pts = buffer.pts; - *ret_buffer = buffer; - } - } - - return stream_number; -} - static HRESULT wm_reader_get_stream_sample(struct wm_reader *reader, WORD stream_number, INSSBuffer **ret_sample, QWORD *pts, QWORD *duration, DWORD *flags, WORD *ret_stream_number) { @@ -1700,13 +1661,11 @@ static HRESULT wm_reader_get_stream_sample(struct wm_reader *reader, WORD stream { if (!stream_number) { - if (!(stream_number = get_earliest_buffer(reader, &wg_buffer))) + if (!wg_parser_stream_get_buffer(reader->wg_parser, NULL, &wg_buffer)) { /* All streams are disabled or EOS. */ return NS_E_NO_MORE_SAMPLES; } - - stream = wm_reader_get_stream_by_stream_number(reader, stream_number); } else { @@ -1725,7 +1684,7 @@ static HRESULT wm_reader_get_stream_sample(struct wm_reader *reader, WORD stream if (stream->eos) return NS_E_NO_MORE_SAMPLES;
- if (!wg_parser_stream_get_buffer(stream->wg_stream, &wg_buffer)) + if (!wg_parser_stream_get_buffer(reader->wg_parser, stream->wg_stream, &wg_buffer)) { stream->eos = true; TRACE("End of stream.\n"); @@ -1733,8 +1692,8 @@ static HRESULT wm_reader_get_stream_sample(struct wm_reader *reader, WORD stream } }
- if (SUCCEEDED(hr = wm_reader_read_stream_sample(reader, stream, &wg_buffer, ret_sample, pts, duration, flags))) - *ret_stream_number = stream_number; + if (SUCCEEDED(hr = wm_reader_read_stream_sample(reader, &wg_buffer, ret_sample, pts, duration, flags))) + *ret_stream_number = wg_buffer.stream + 1; } while (hr == S_FALSE);
return hr;
From: Rémi Bernon rbernon@codeweavers.com
--- dlls/winegstreamer/wg_parser.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/dlls/winegstreamer/wg_parser.c b/dlls/winegstreamer/wg_parser.c index d1230c89f2e..14970d327c8 100644 --- a/dlls/winegstreamer/wg_parser.c +++ b/dlls/winegstreamer/wg_parser.c @@ -268,7 +268,7 @@ static GstBuffer *wait_parser_stream_buffer(struct wg_parser *parser, struct wg_ /* Note that we can both have a buffer and stream->eos, in which case we * must return the buffer. */
- while (!(buffer = stream->buffer) && !stream->eos) + while (stream->enabled && !(buffer = stream->buffer) && !stream->eos) pthread_cond_wait(&stream->event_cond, &parser->mutex);
return buffer; @@ -308,7 +308,7 @@ static NTSTATUS wg_parser_stream_get_buffer(void *args)
for (i = 0; i < parser->stream_count; ++i) { - if (!parser->streams[i]->enabled || !(buffer = wait_parser_stream_buffer(parser, parser->streams[i]))) + if (!(buffer = wait_parser_stream_buffer(parser, parser->streams[i]))) continue; /* invalid PTS is GST_CLOCK_TIME_NONE == (guint64)-1, so this will prefer valid timestamps. */ if (!earliest || GST_BUFFER_PTS(buffer) < GST_BUFFER_PTS(earliest))
From: Rémi Bernon rbernon@codeweavers.com
--- dlls/winegstreamer/wm_reader.c | 74 +++++++++++----------------------- 1 file changed, 23 insertions(+), 51 deletions(-)
diff --git a/dlls/winegstreamer/wm_reader.c b/dlls/winegstreamer/wm_reader.c index 1395e5193ef..b84852f583e 100644 --- a/dlls/winegstreamer/wm_reader.c +++ b/dlls/winegstreamer/wm_reader.c @@ -1650,55 +1650,6 @@ static HRESULT wm_reader_read_stream_sample(struct wm_reader *reader, struct wg_ return S_OK; }
-static HRESULT wm_reader_get_stream_sample(struct wm_reader *reader, WORD stream_number, - INSSBuffer **ret_sample, QWORD *pts, QWORD *duration, DWORD *flags, WORD *ret_stream_number) -{ - struct wg_parser_buffer wg_buffer; - struct wm_stream *stream; - HRESULT hr = S_OK; - - do - { - if (!stream_number) - { - if (!wg_parser_stream_get_buffer(reader->wg_parser, NULL, &wg_buffer)) - { - /* All streams are disabled or EOS. */ - return NS_E_NO_MORE_SAMPLES; - } - } - else - { - if (!(stream = wm_reader_get_stream_by_stream_number(reader, stream_number))) - { - WARN("Invalid stream number %u; returning E_INVALIDARG.\n", stream_number); - return E_INVALIDARG; - } - - if (stream->selection == WMT_OFF) - { - WARN("Stream %u is deselected; returning NS_E_INVALID_REQUEST.\n", stream_number); - return NS_E_INVALID_REQUEST; - } - - if (stream->eos) - return NS_E_NO_MORE_SAMPLES; - - if (!wg_parser_stream_get_buffer(reader->wg_parser, stream->wg_stream, &wg_buffer)) - { - stream->eos = true; - TRACE("End of stream.\n"); - return NS_E_NO_MORE_SAMPLES; - } - } - - if (SUCCEEDED(hr = wm_reader_read_stream_sample(reader, &wg_buffer, ret_sample, pts, duration, flags))) - *ret_stream_number = wg_buffer.stream + 1; - } while (hr == S_FALSE); - - return hr; -} - static struct wm_reader *impl_from_IUnknown(IUnknown *iface) { return CONTAINING_RECORD(iface, struct wm_reader, IUnknown_inner); @@ -1867,7 +1818,8 @@ static HRESULT WINAPI reader_GetNextSample(IWMSyncReader2 *iface, DWORD *flags, DWORD *output_number, WORD *ret_stream_number) { struct wm_reader *reader = impl_from_IWMSyncReader2(iface); - HRESULT hr = NS_E_NO_MORE_SAMPLES; + struct wm_stream *stream; + HRESULT hr = S_FALSE;
TRACE("reader %p, stream_number %u, sample %p, pts %p, duration %p," " flags %p, output_number %p, ret_stream_number %p.\n", @@ -1878,7 +1830,27 @@ static HRESULT WINAPI reader_GetNextSample(IWMSyncReader2 *iface,
EnterCriticalSection(&reader->cs);
- hr = wm_reader_get_stream_sample(reader, stream_number, sample, pts, duration, flags, &stream_number); + if (!stream_number) + stream = NULL; + else if (!(stream = wm_reader_get_stream_by_stream_number(reader, stream_number))) + hr = E_INVALIDARG; + else if (stream->selection == WMT_OFF) + hr = NS_E_INVALID_REQUEST; + else if (stream->eos) + hr = NS_E_NO_MORE_SAMPLES; + + while (hr == S_FALSE) + { + struct wg_parser_buffer wg_buffer; + if (!wg_parser_stream_get_buffer(reader->wg_parser, stream ? stream->wg_stream : NULL, &wg_buffer)) + hr = NS_E_NO_MORE_SAMPLES; + else if (SUCCEEDED(hr = wm_reader_read_stream_sample(reader, &wg_buffer, sample, pts, duration, flags))) + stream_number = wg_buffer.stream + 1; + } + + if (stream && hr == NS_E_NO_MORE_SAMPLES) + stream->eos = true; + if (output_number && hr == S_OK) *output_number = stream_number - 1; if (ret_stream_number && (hr == S_OK || stream_number))
Hi,
It looks like your patch introduced the new failures shown below. Please investigate and fix them before resubmitting your patch. If they are not new, fixing them anyway would help a lot. Otherwise please ask for the known failures list to be updated.
The tests also ran into some preexisting test failures. If you know how to fix them that would be helpful. See the TestBot job for the details:
The full results can be found at: https://testbot.winehq.org/JobDetails.pl?Key=124942
Your paranoid android.
=== debian11 (32 bit report) ===
ddraw: ddraw1.c:14319: Test failed: Expect clip rect (0,0)-(1024,737), got (0,0)-(2048,737). ddraw1.c:14333: Test failed: Expect clip rect (0,0)-(640,480), got (0,0)-(1664,737). ddraw1.c:14343: Test failed: Expect clip rect (0,0)-(1024,737), got (0,0)-(2048,737). ddraw2.c:15302: Test failed: Expect clip rect (0,0)-(1024,737), got (0,0)-(2048,737). ddraw2.c:15316: Test failed: Expect clip rect (0,0)-(640,480), got (0,0)-(1664,737). ddraw2.c:15326: Test failed: Expect clip rect (0,0)-(1024,737), got (0,0)-(2048,737). ddraw4.c:18352: Test failed: Expect clip rect (0,0)-(1024,737), got (0,0)-(2048,737). ddraw4.c:18366: Test failed: Expect clip rect (0,0)-(640,480), got (0,0)-(1664,737). ddraw4.c:18376: Test failed: Expect clip rect (0,0)-(1024,737), got (0,0)-(2048,737). ddraw7.c:18619: Test failed: Expect clip rect (0,0)-(1024,737), got (0,0)-(2048,737). ddraw7.c:18633: Test failed: Expect clip rect (0,0)-(640,480), got (0,0)-(1664,737). ddraw7.c:18643: Test failed: Expect clip rect (0,0)-(1024,737), got (0,0)-(2048,737).
=== debian11 (build log) ===
Use of uninitialized value $Flaky in addition (+) at /home/testbot/lib/WineTestBot/LogUtils.pm line 720, <$LogFile> line 24723. Use of uninitialized value $Flaky in addition (+) at /home/testbot/lib/WineTestBot/LogUtils.pm line 720, <$LogFile> line 24723. Use of uninitialized value $Flaky in addition (+) at /home/testbot/lib/WineTestBot/LogUtils.pm line 720, <$LogFile> line 24723.
What's the motivation for pushing this down to the backend?
On Thu Oct 13 18:38:04 2022 +0000, **** wrote:
Zebediah Figura replied on the mailing list:
What's the motivation for pushing this down to the backend?
Simplifying the interface, so that it's just be about pushing data to the parser / reading data out of it, in the same way as the wg_transform is, and ultimately using the same `wg_sample` primitive for zero-copy.
Most of the wg_parser_buffer fields are really only there for the purpose of this logic, moving it to the unix side will make them unnecessary. It also saves syscalls.
On Thu Oct 13 18:43:17 2022 +0000, Rémi Bernon wrote:
Simplifying the interface, so that it's just be about pushing data to the parser / reading data out of it, in the same way as the wg_transform is, and ultimately using the same `wg_sample` primitive for zero-copy. Most of the wg_parser_buffer fields are really only there for the purpose of this logic, moving it to the unix side will make them unnecessary. It also saves syscalls.
This doesn't really simplify the interface, though. It simplifies the wmvcore glue at the expense of making the wg_parser interface more complicated, but it's not clear to me that's a worthwhile trade.
I think that given the wmvcore interface, it should be possible to implement zero-copy while still dealing with the alternation logic in the wmvcore frontend. In particular we'd end up caching the whole sample on the PE side.
That said, it's possible this is less pretty than caching on the Unix side; I haven't tried the whole way through...
On Mon Oct 17 03:20:51 2022 +0000, Zebediah Figura wrote:
This doesn't really simplify the interface, though. It simplifies the wmvcore glue at the expense of making the wg_parser interface more complicated, but it's not clear to me that's a worthwhile trade. I think that given the wmvcore interface, it should be possible to implement zero-copy while still dealing with the alternation logic in the wmvcore frontend. In particular we'd end up caching the whole sample on the PE side. That said, it's possible this is less pretty than caching on the Unix side; I haven't tried the whole way through...
There could perhaps be alternative ways of doing this, but I don't see any reason for this specific change to be unnacceptable. It simplifies the interface, as there'll be less calls to be done for the same thing.
The wg_parser_stream_get_buffer function will need to be changed anyway, in order for sample allocation requests to be eventually returned in addition to buffer output requests. This can hardly be done differently as the allocation needs be be done from a specific stream thread and may also be blocking.
The whole series is at https://gitlab.winehq.org/rbernon/wine/-/commits/wip/zerocopy/v2 if you want to have a look.
There could perhaps be alternative ways of doing this, but I don't see any reason for this specific change to be unnacceptable. It simplifies the interface, as there'll be less calls to be done for the same thing.
No, not unacceptable, but my point is to ask about other ways of approaching the problem, when something doesn't seem like clearly the best way.
I think there is value in having the winegstreamer unixlib interface, i.e. the API, be simple and minimal, to aid its adoption in other places. Adding the "NULL = stream with the least pts" option to that interface makes it more complex. (Of course, with the monster that is zero-copy, this may be a losing battle. Just recently someone was put off from using it in new code...) It also makes the wg_parser backend glue more complex, which is probably the worst place to put more complexity right now.
On the other hand, this is not really that bad of a change, at least comparatively, and it doesn't affect any of the other frontends, so I'm not flat-out opposed to it—just trying to see if another approach, which may not have been considered, might end up being preferable.
The wg_parser_stream_get_buffer function will need to be changed anyway, in order for sample allocation requests to be eventually returned in addition to buffer output requests. This can hardly be done differently as the allocation needs be be done from a specific stream thread and may also be blocking.
The whole series is at https://gitlab.winehq.org/rbernon/wine/-/commits/wip/zerocopy/v2 if you want to have a look.
I've only looked briefly at the branch, but this seems very complicated. Do we need to call back to the PE side to allocate? Could we pool instead (and fall back to copying if the pool is empty)?
I also don't quite see how that's relevant to this patch.
On Tue Oct 18 05:03:29 2022 +0000, Zebediah Figura wrote:
There could perhaps be alternative ways of doing this, but I don't see
any reason for this specific change to be unnacceptable. It simplifies the interface, as there'll be less calls to be done for the same thing. No, not unacceptable, but my point is to ask about other ways of approaching the problem, when something doesn't seem like clearly the best way. I think there is value in having the winegstreamer unixlib interface, i.e. the API, be simple and minimal, to aid its adoption in other places. Adding the "NULL = stream with the least pts" option to that interface makes it more complex. (Of course, with the monster that is zero-copy, this may be a losing battle. Just recently someone was put off from using it in new code...) It also makes the wg_parser backend glue more complex, which is probably the worst place to put more complexity right now. On the other hand, this is not really that bad of a change, at least comparatively, and it doesn't affect any of the other frontends, so I'm not flat-out opposed to it—just trying to see if another approach, which may not have been considered, might end up being preferable.
The wg_parser_stream_get_buffer function will need to be changed
anyway, in order for sample allocation requests to be eventually returned in addition to buffer output requests. This can hardly be done differently as the allocation needs be be done from a specific stream thread and may also be blocking.
The whole series is at
https://gitlab.winehq.org/rbernon/wine/-/commits/wip/zerocopy/v2 if you want to have a look. I've only looked briefly at the branch, but this seems very complicated. Do we need to call back to the PE side to allocate? Could we pool instead (and fall back to copying if the pool is empty)? I also don't quite see how that's relevant to this patch.
I don't think a preallocated pool can work. Not with allocations callbacks like in wmvcore and quartz. Especially as these may block and as they are supposed to be called at specific times and in specific threads.
And yes I have considered and tried it, and some alternate approaches, and I believe the one implemented here is good, and that it is also not so complex. It brings wg_transform and wg_parser interfaces closer to each other and use the same zero copy primitives for both.
Just recently someone was put off from using it in new code...
I've missed that probably, in which context? Was the discussion public?
On Tue Oct 18 06:42:23 2022 +0000, Rémi Bernon wrote:
I don't think a preallocated pool can work. Not with allocations callbacks like in wmvcore and quartz. Especially as these may block and as they are supposed to be called at specific times and in specific threads. And yes I have considered and tried it, and some alternate approaches, and I believe the one implemented here is good, and that it is also not so complex. It brings wg_transform and wg_parser interfaces closer to each other and use the same zero copy primitives for both.
Just recently someone was put off from using it in new code...
I've missed that probably, in which context? Was the discussion public?
Sorry for the slow response.
I don't think a preallocated pool can work. Not with allocations callbacks like in wmvcore and quartz. Especially as these may block and as they are supposed to be called at specific times and in specific threads.
Sorry, "pool" was the wrong word to use. I mean rather that we should be able to provide a specific buffer to the winegstreamer backend, just like with the transform. We don't exactly know the frame size beforehand, but we can make a good guess in pretty much every case (after all, quartz basically has to), and if we fail then we can fall back to a blit.
I know there was discussion about this in merge request 197 already, but I'd like to bring this up again. (Partly because that discussion operated under some false premises on my part, e.g. we don't actually have to establish a fixed size pool beforehand, at least not for mfplat and wmvcore.)
Ultimately it'd be nice if this can act *exactly* like wg_transform, i.e. pass in a pre-allocated sample, let the backend fill that and zero-copy if it can, with the frontend agnostic as to whether zero-copy actually happened. That way we only have the one really complex pattern.
And yes I have considered and tried it, and some alternate approaches, and I believe the one implemented here is good, and that it is also not so complex. It brings wg_transform and wg_parser interfaces closer to each other and use the same zero copy primitives for both.
I understand it's frustrating to have design decisions questioned, or have approaches suggested that you've already tried and found inviable, but from my perspective, it's also frustrating to have no explanation of that. If this patch series is a worthwhile tradeoff, I'd like to understand why. I currently don't understand why. I also feel like this (and other design decisions made in winegstreamer, including those I've made) are not as simple as they're made out to be, and I'm not sure if that's an effect of the submitter's then-familiarity with the code, or a failing on my part to understand and remember the structure. To be sure, sometimes it's necessary to accept complex code, because of things like PE conversion or zero-copy not working well with the winegstreamer threading model, but it's not as clear to me that this is one of those times.
Having taken some more time to look at this, and examine the context around it: on its own, this change seems arguably like an improvement, on the grounds that we already need to buffer one sample on the wg_parser side anyway, and my counter-proposal would involve adding extra buffering. My concern, however, is that this buffering will make zero-copy support in the parser harder, rather than easier. Partly because it's more code to work around, but partly because I think that it would interact badly with a world where we don't call back to the PE side to allocate new samples, and it's really not clear to me that calling back to the PE side is what we want. But again, these are only the interactions that I can foresee; if there's others, it's not really clear to me what they are and why.
Just recently someone was put off from using it in new code...
I've missed that probably, in which context? Was the discussion public?
It wasn't public, no. The context was basically for a VfW decompressor, and the question was brought up to me privately about how we should implement it.
I don't think a preallocated pool can work. Not with allocations callbacks
like in wmvcore and quartz. Especially as these may block and as they are supposed to be called at specific times and in specific threads.
Sorry, "pool" was the wrong word to use. I mean rather that we should be able
to provide a specific buffer to the winegstreamer backend, just like with the transform. We don't exactly know the frame size beforehand, but we can make a good guess in pretty much every case (after all, quartz basically has to), and if we fail then we can fall back to a blit.
I know there was discussion about this in merge request 197 already, but I'd
like to bring this up again. (Partly because that discussion operated under some false premises on my part, e.g. we don't actually have to establish a fixed size pool beforehand, at least not for mfplat and wmvcore.)
Ultimately it'd be nice if this can act exactly like wg_transform, i.e. pass
in a pre-allocated sample, let the backend fill that and zero-copy if it can, with the frontend agnostic as to whether zero-copy actually happened. That way we only have the one really complex pattern.
I don't think this can work. To the contrary to the wg_transform, which pretty much works in a serialized way, because we buffer the input samples and only provide them to GStreamer when we're ready to receive at least one output, the wg_parser is asynchronous in nature. Input buffers are provided on GStreamer requests, and we should do the same for buffer allocations I don't think we can reliably assume that buffer allocations are quickly followed by buffer output and that it's safe to wait for the output when we allocated a buffer.
And yes I have considered and tried it, and some alternate approaches, and I
believe the one implemented here is good, and that it is also not so complex. It brings wg_transform and wg_parser interfaces closer to each other and use the same zero copy primitives for both.
I understand it's frustrating to have design decisions questioned, or have
approaches suggested that you've already tried and found inviable, but from my perspective, it's also frustrating to have no explanation of that. If this patch series is a worthwhile tradeoff, I'd like to understand why. I currently don't understand why. I also feel like this (and other design decisions made in winegstreamer, including those I've made) are not as simple as they're made out to be, and I'm not sure if that's an effect of the submitter's then-familiarity with the code, or a failing on my part to understand and remember the structure. To be sure, sometimes it's necessary to accept complex code, because of things like PE conversion or zero-copy not working well with the winegstreamer threading model, but it's not as clear to me that this is one of those times.
Having taken some more time to look at this, and examine the context around
it: on its own, this change seems arguably like an improvement, on the grounds that we already need to buffer one sample on the wg_parser side anyway, and my counter-proposal would involve adding extra buffering. My concern, however, is that this buffering will make zero-copy support in the parser harder, rather than easier. Partly because it's more code to work around, but partly because I think that it would interact badly with a world where we don't call back to the PE side to allocate new samples, and it's really not clear to me that calling back to the PE side is what we want. But again, these are only the interactions that I can foresee; if there's others, it's not really clear to me what they are and why.
There's no PE callback at all in my implementation. I also don't think it's so complex as you seem to put it. I think it's actually simpler, and more flexible to support some future changes than the current design:
The design is based on a black-box wg_parser, which interfaces resumes in a request / reply model.
There's a wait_request function (currently also a wait_stream_request for some implementation detail but both could be merged), and reply functions for each request. Waiting for a request can be done with a specific stream, and specific request type mask.
Requests are basically what you can expect: 'Input' / 'Allocation' / 'Output'.
* Input requests are received when the wg_parser needs data, the request carries the offset and size of the data that needs to be fed.
* Allocation requests are received when GStreamer needs to allocate a sample for a future output. They can be replied with a sample, or no sample which will let GStreamer allocate its own memory.
* Output requests are received when a stream wants to output some data, with a token that can be used by the client to find a sample it previously allocated. If the sample was previously allocated, zero-copy happens when the lient replies to it. If not, then a copy happens.
This currently works with a 1-n stream model, like the wg_parser currently uses but it will very easily scale to a n-n stream model if, in the future, we want to support muxers for instance.
For that, the Input requests would come for input streams, and wait_request / wait_stream_request functions could then be merged.
This design complements the simpler wg_transform which is meant to stay 1-1 and synchronous, but which otherwise works with the same wg_sample primitive.
I don't think this can work. To the contrary to the wg_transform, which pretty much works in a serialized way, because we buffer the input samples and only provide them to GStreamer when we're ready to receive at least one output, the wg_parser is asynchronous in nature. Input buffers are provided on GStreamer requests, and we should do the same for buffer allocations
Input buffers are different, yes, but I don't understand why output buffers need to be different?
wg_parser isn't really asynchronous in nature, at least not the way I designed it, and I don't think it makes sense for it to be asynchronous. The goal was to make a synchronous API that hides the asynchronous parts of GStreamer. The synchronous API was motivated by the fact that both mfplat and wmvcore have synchronous APIs, the need to control threads with the PE/Unix split, and the fact that I think synchronous APIs are just generally easier to reason with.
I don't think we can reliably assume that buffer allocations are quickly followed by buffer output and that it's safe to wait for the output when we allocated a buffer.
I don't understand these statements either. Why do we care that buffer allocations are quickly followed by buffer output, and why would that assumption be different for transforms vs parsers? What do you mean by "wait for the output when we allocated a buffer"?
Input buffers are different, yes, but I don't understand why output buffers need to be different?
Because GStreamer may want to allocate buffers at times where the only active thread is the reading thread for instance. This is the case with the media source, and allocating buffers from the MF sample request callbacks doesn't work.
wg_parser isn't really asynchronous in nature, at least not the way I designed it, and I don't think it makes sense for it to be asynchronous. The goal was to make a synchronous API that hides the asynchronous parts of GStreamer. The synchronous API was motivated by the fact that both mfplat and wmvcore have synchronous APIs, the need to control threads with the PE/Unix split, and the fact that I think synchronous APIs are just generally easier to reason with.
I don't see how it can **not** be asynchronous when it needs at least two threads, one reading thread that pushes data and at least another one that asynchronously waits and receives the output buffers.
I don't know if mfplat is really supposed to be synchronous and without a reading thread, but I'm pretty sure wmvcore sync reader isn't, and with the current API it's simply not possible to do it (remove the reading thread and use only one thread). With the input / allocation / output requests, it would be.
I don't understand these statements either. Why do we care that buffer allocations are quickly followed by buffer output, and why would that assumption be different for transforms vs parsers? What do you mean by "wait for the output when we allocated a buffer"?
Because wg_transform pushes input buffers after we've allocated and prepared an output buffer, and because it all happens synchronously: we never provide input concurrently and if the GStreamer pipeline is synchronous we can be sure that no allocation will be requested before or after the wg_transform_read_data call. If the GStreamer pipeline is asynchronous, any allocation that happens after or before that moment can be denied, though we'd like to avoid this case from the zero-copy perspective.
For the wg_parser, the GStreamer pipeline is always asynchronous, and we always provide input concurrently. Allocations are then unpredictable and, as I described above sometimes required before the PE side is ready to handle them.
wg_parser isn't really asynchronous in nature, at least not the way I designed it, and I don't think it makes sense for it to be asynchronous. The goal was to make a synchronous API that hides the asynchronous parts of GStreamer. The synchronous API was motivated by the fact that both mfplat and wmvcore have synchronous APIs, the need to control threads with the PE/Unix split, and the fact that I think synchronous APIs are just generally easier to reason with.
I don't see how it can **not** be asynchronous when it needs at least two threads, one reading thread that pushes data and at least another one that asynchronously waits and receives the output buffers.
With the exception of the read thread, yes. But the read thread is conceptually just a hack that exists because we can't just do callbacks. The point I was trying to make is that wg_parser is synchronous (like the wmvcore sync reader, or mfplat) and not asynchronous (like the wmvcore async reader, or quartz).
That said, I see what you mean by synchronous vs asynchronous, and the issue you're getting at, so thanks for bearing with me.
However, I don't like exposing that asynchronicity to the PE side if we can avoid it, or having a different API between parsers and transforms when we don't need to. That could mean waiting in parser_request_sample() until we get wg_parser_stream_get_sample().
The disadvantage there is that we don't get any buffering on the GStreamer side, though. Maybe that's not a problem, but it's not great to get rid of either.
I still don't like the generalized "request" mechanism; I still maintain it's a lot more complicated than it seems to you. I think my preferred approach, assuming I'm considering everything, is that we'd just introduce an allocation callback which is handled like read callbacks. (Possibly it could use the same thread, and hence also turn read requests back into the gst_cbs infrastructure...)
With the exception of the read thread, yes. But the read thread is conceptually just a hack that exists because we can't just do callbacks. The point I was trying to make is that wg_parser is synchronous (like the wmvcore sync reader, or mfplat) and not asynchronous (like the wmvcore async reader, or quartz).
That said, I see what you mean by synchronous vs asynchronous, and the issue you're getting at, so thanks for bearing with me.
However, I don't like exposing that asynchronicity to the PE side if we can avoid it, or having a different API between parsers and transforms when we don't need to. That could mean waiting in parser_request_sample () until we get wg_parser_stream_get_sample().
The disadvantage there is that we don't get any buffering on the GStreamer side, though. Maybe that's not a problem, but it's not great to get rid of either.
I still don't like the generalized "request" mechanism; I still maintain it's a lot more complicated than it seems to you. I think my preferred approach, assuming I'm considering everything, is that we'd just introduce an allocation callback which is handled like read callbacks. (Possibly it could use the same thread, and hence also turn read requests back into the gst_cbs infrastructure...)
Like I said above, having allocations done in the reading thread context (as opposed to the output threads context) isn't working for several use cases: it works and is somehow required for mf, but quartz and wmvcore aren't supposed to do that. Quartz in particular quickly ends up with deadlocks because its allocators callbacks are blocking the read thread.
Like I also pointed out, there's several scenarios that simply cannot be solved with the current design, and that can only be solved with either syscall callbacks, which we don't want, or a design similar to the one I've described above.
For instance and as you said, the wmvcore read thread is a hack. I can write a wmvcore test showing that the reading thread isn't supposed to be, and that stream seek / reads calls are supposed to come from the same thread as the caller in the sync reader, as allocs callbacks, and as outputs callbacks in the case of the async reader without dedicated delivery threads.
That alone seems to me enough proof that the design is sane, and more flexible.
Like I said above, having allocations done in the reading thread context (as opposed to the output threads context) isn't working for several use cases: it works and is somehow required for mf, but quartz and wmvcore aren't supposed to do that. Quartz in particular quickly ends up with deadlocks because its allocators callbacks are blocking the read thread.
Why does quartz deadlock? Read requests shouldn't block, and while sample requests can block they should be resolved by earlier samples getting delivered, which shouldn't deadlock.
Like I also pointed out, there's several scenarios that simply cannot be solved with the current design, and that can only be solved with either syscall callbacks, which we don't want, or a design similar to the one I've described above.
For instance and as you said, the wmvcore read thread is a hack. I can write a wmvcore test showing that the reading thread isn't supposed to be, and that stream seek / reads calls are supposed to come from the same thread as the caller in the sync reader, as allocs callbacks, and as outputs callbacks in the case of the async reader without dedicated delivery threads.
Sure, but there's also no known application that depends on that detail. I don't like the idea of making the parser interface that much more complicated if it's not necessary.
Why does quartz deadlock? Read requests shouldn't block, and while
sample requests can block they should be resolved by earlier samples getting delivered, which shouldn't deadlock.
Because it synchronizes streams together in a way that's is supposed to be. You don't know whether read requests are blocking or not, as soon as it goes back to application code it can be anything.
Sample allocators definitely are blocking, depending on the pool sizes and synchronizing stream threads toghether is not a good idea. We should not assume anything about it, and we should instead make sure that the code is flexible and avoids unncessary synchronization on our side.
We also have no control over the GStreamer decisions, or its queueing strategy, and whether it will deliver output buffer soon enough to unblock allocators is beyond our reach.
Sure, but there's also no known application that depends on that
detail. I don't like the idea of making the parser interface that much more complicated if it's not necessary.
I'd appreciate something more detailed than "much more complicated" because saying that it is isn't enough to make a case.
I could for instance say the opposite, then point out that:
1) it reduces the number of necessary entry points, ultimately:
wait_request, push_data, read_data, done_alloc,
vs
get_next_read_offset, push_data, stream_get_buffer, stream_copy_buffer, stream_release_buffer,
2) it's not even required to have the allocation request / done_alloc, it's only to support zero-copy, and without it that would even be one less entry.
3) the design doesn't force anything upon the clients, wait_request can be called concurrently, as long as the stream / request masks are disjoint.
4) replying to requests is done in a single call, and doesn't require the client to correctly sequence the get_buffer / copy_buffer / release_buffer calls.
5) replies can be done concurrently with other requests as they are kept in a list and the client provides their token.
Why does quartz deadlock? Read requests shouldn't block, and while sample requests can block they should be resolved by earlier samples getting delivered, which shouldn't deadlock.
Because it synchronizes streams together in a way that's is supposed to be. You don't know whether read requests are blocking or not, as soon as it goes back to application code it can be anything.
Yes, sure, a quartz client can do anything in its COM callbacks, and of course the documentation drastically underspecifies what's allowed, but we can't afford to test and anticipate every possible deadlock, and I think it's a viable approach to anticipate the common and conceptually sensible usage, and then deal with deadlocks later if we do come across them.
Conceptually a read request might not be filled instantaneously, but it shouldn't *block* on anything else related to quartz.
Sample allocators definitely are blocking, depending on the pool sizes and synchronizing stream threads toghether is not a good idea. We should not assume anything about it, and we should instead make sure that the code is flexible and avoids unncessary synchronization on our side.
Here too: the way a quartz allocator is supposed to work is that it sets up a pool of a fixed size, which the peer knows, and it's explicitly supposed to wait in IMemAllocator::GetBuffer() if all of the buffers are currently in use. I can see an application violating the first part, and we effectively can't depend on the number of buffers anyway, but I think the second part again holds conceptually; I don't see any good reason an allocator would block in GetBuffer() on something else related to quartz.
We also have no control over the GStreamer decisions, or its queueing strategy, and whether it will deliver output buffer soon enough to unblock allocators is beyond our reach.
I guess conceptually the problem here is that a GStreamer element might request and fill exactly as many buffers as we have in our quartz pool, but not actually chain any of them to us, and demand to read more before chaining (but not to allocate more samples)? That's quite pathological, does this actually happen?
Or is there a simpler deadlock I'm missing? I can't come up with anything that's not along the lines of "GStreamer demands more buffers than the quartz pool has", which would deadlock *anyway* regardless of whether it's synchronized with read requests.
Sure, but there's also no known application that depends on that detail. I don't like the idea of making the parser interface that much more complicated if it's not necessary.
I'd appreciate something more detailed than "much more complicated" because saying that it is isn't enough to make a case.
I could for instance say the opposite, then point out that:
it reduces the number of necessary entry points, ultimately:
wait_request, push_data, read_data, done_alloc,
vs
get_next_read_offset, push_data, stream_get_buffer, stream_copy_buffer, stream_release_buffer,
All else aside, I'd really rather have many simple and modular entry points than one complex one. In general, writing a lot of boilerplate is preferable to me than writing code that's hard to follow.
- the design doesn't force anything upon the clients, wait_request can be called concurrently, as long as the stream / request masks are disjoint.
I'm less worried about the simplicity of the interface (although I do think there are ways we could make this design friendlier) and more worried about the back-end code. I think it's very hard to follow already, and this is going to make it worse.
- replying to requests is done in a single call, and doesn't require the client to correctly sequence the get_buffer / copy_buffer / release_buffer calls.
Yeah, although I want those to go away anyway. They're pretty much an artifact of either history or just bad design on my part (or both). If we didn't care about zero-copy, it should just be one API that returns ERROR_INSUFFICIENT_BUFFER, or partially fills the buffer, or something.
I guess conceptually the problem here is that a GStreamer element might request and fill exactly as many buffers as we have in our quartz pool, but not actually chain any of them to us, and demand to read more before chaining (but not to allocate more samples)? That's quite pathological, does this actually happen?
Or is there a simpler deadlock I'm missing? I can't come up with anything that's not along the lines of "GStreamer demands more buffers than the quartz pool has", which would deadlock *anyway* regardless of whether it's synchronized with read requests.
Not necessarily, it would block the demanding thread, and synchronize it with the client thread only. This is a much smaller constraint than synchronizing all the involved threads toghether. That constraint alone usually stands because indeed, streams threads are consuming the samples they are producing, but I don't see any reason why different streams synchronized toghether would necessarily end up consuming the samples in an order that would always satisfy the same constraints.
I could spend time trying to find out an example, but to be honest I don't understand why I need to and I don't think it's even worth it, we can simply avoid the problem altogether by avoiding unnecessary synchronization.
Maybe I can point a gun at my feet and not get shot but my usual strategy is to not even try, especially when debugging such deadlocks is notoriously difficult as the number of threads involved is huge.
All else aside, I'd really rather have many simple and modular entry points than one complex one. In general, writing a lot of boilerplate is preferable to me than writing code that's hard to follow.
I think multiplying the number of entry points is making things more complicated rather than less, and more error prone. If we need boilerplate for that I think we're doing it wrong.
I'm less worried about the simplicity of the interface (although I do think there are ways we could make this design friendlier) and more worried about the back-end code. I think it's very hard to follow already, and this is going to make it worse.
Ah thanks for pointing that out, because the design also reduces the complexity there: it merges the read_cond, read_done_cond and all the streams event_cond, event_empty_cond variables together using a single requests list, and request_cond, request_done_cond variables.
Could we move forward on this? Regardless of the discussion about zero-copy, this is not completely related and I think it makes sense even if only for the sake of simplifying GetNextSample.
I no longer find it productive to block this patch.
This merge request was approved by Zebediah Figura.