Signed-off-by: Zebediah Figura z.figura12@gmail.com --- dlls/winegstreamer/media_source.c | 139 ++++++++++++++++++++++++++---- 1 file changed, 121 insertions(+), 18 deletions(-)
diff --git a/dlls/winegstreamer/media_source.c b/dlls/winegstreamer/media_source.c index ffbf8041ed8..6fb2c7f6f48 100644 --- a/dlls/winegstreamer/media_source.c +++ b/dlls/winegstreamer/media_source.c @@ -113,6 +113,21 @@ struct media_source HANDLE no_more_pads_event;
uint64_t file_size, next_pull_offset; + + HANDLE read_thread; + bool read_thread_shutdown; + + pthread_mutex_t mutex; + pthread_cond_t read_cond, read_done_cond; + struct + { + void *data; + uint64_t offset; + uint32_t size; + bool done; + bool ret; + } read_request; + bool shutdown; };
static inline struct media_stream *impl_from_IMFMediaStream(IMFMediaStream *iface) @@ -442,42 +457,109 @@ GstFlowReturn bytestream_wrapper_pull(GstPad *pad, GstObject *parent, guint64 of GstBuffer **buf) { struct media_source *source = gst_pad_get_element_private(pad); - IMFByteStream *byte_stream = source->byte_stream; GstBuffer *new_buffer = NULL; - ULONG bytes_read; GstMapInfo info; - BOOL is_eof; - HRESULT hr; + bool ret;
TRACE("requesting %u bytes at %s from source %p into buffer %p\n", len, wine_dbgstr_longlong(ofs), source, *buf);
if (ofs == GST_BUFFER_OFFSET_NONE) ofs = source->next_pull_offset; source->next_pull_offset = ofs + len; - - if (FAILED(IMFByteStream_SetCurrentPosition(byte_stream, ofs))) - return GST_FLOW_ERROR; - - if (FAILED(IMFByteStream_IsEndOfStream(byte_stream, &is_eof))) - return GST_FLOW_ERROR; - if (is_eof) + if (ofs >= source->file_size) return GST_FLOW_EOS; + if (ofs + len >= source->file_size) + len = source->file_size - ofs;
if (!(*buf)) *buf = new_buffer = gst_buffer_new_and_alloc(len); gst_buffer_map(*buf, &info, GST_MAP_WRITE); - hr = IMFByteStream_Read(byte_stream, info.data, len, &bytes_read); + + pthread_mutex_lock(&source->mutex); + + assert(!source->read_request.data); + source->read_request.data = info.data; + source->read_request.offset = ofs; + source->read_request.size = len; + source->read_request.done = false; + pthread_cond_signal(&source->read_cond); + + /* Note that we don't unblock this wait on GST_EVENT_FLUSH_START. We expect + * the upstream pin to flush if necessary. We should never be blocked on + * read_thread() not running. */ + + while (!source->read_request.done) + pthread_cond_wait(&source->read_done_cond, &source->mutex); + + ret = source->read_request.ret; + + pthread_mutex_unlock(&source->mutex); + gst_buffer_unmap(*buf, &info);
- gst_buffer_set_size(*buf, bytes_read); + if (!ret && new_buffer) + gst_buffer_unref(new_buffer); + return ret ? GST_FLOW_OK : GST_FLOW_ERROR; +} + +static bool get_read_request(struct media_source *source, void **data, uint64_t *offset, uint32_t *size) +{ + pthread_mutex_lock(&source->mutex); + + while (!source->shutdown && !source->read_request.data) + pthread_cond_wait(&source->read_cond, &source->mutex); + + if (source->shutdown) + { + pthread_mutex_unlock(&source->mutex); + return false; + } + + *data = source->read_request.data; + *offset = source->read_request.offset; + *size = source->read_request.size; + + pthread_mutex_unlock(&source->mutex); + return true; +} + +static void complete_read_request(struct media_source *source, bool ret) +{ + pthread_mutex_lock(&source->mutex); + source->read_request.done = true; + source->read_request.ret = ret; + source->read_request.data = NULL; + pthread_mutex_unlock(&source->mutex); + pthread_cond_signal(&source->read_done_cond); +}
- if (FAILED(hr)) +static DWORD CALLBACK read_thread(void *arg) +{ + struct media_source *source = arg; + IMFByteStream *byte_stream = source->byte_stream; + + TRACE("Starting read thread for media source %p.\n", source); + + while (!source->read_thread_shutdown) { - if (new_buffer) - gst_buffer_unref(new_buffer); - return GST_FLOW_ERROR; + uint64_t offset; + ULONG ret_size; + uint32_t size; + HRESULT hr; + void *data; + + if (!get_read_request(source, &data, &offset, &size)) + continue; + + if (SUCCEEDED(hr = IMFByteStream_SetCurrentPosition(byte_stream, offset))) + hr = IMFByteStream_Read(byte_stream, data, size, &ret_size); + if (SUCCEEDED(hr) && ret_size != size) + ERR("Unexpected short read: requested %u bytes, got %u.\n", size, ret_size); + complete_read_request(source, SUCCEEDED(hr)); } - return GST_FLOW_OK; + + TRACE("Media source is shutting down; exiting.\n"); + return 0; }
static gboolean bytestream_query(GstPad *pad, GstObject *parent, GstQuery *query) @@ -1164,6 +1246,21 @@ static HRESULT WINAPI media_source_Shutdown(IMFMediaSource *iface) IMFMediaStream_Release(&stream->IMFMediaStream_iface); }
+ if (source->read_thread) + { + source->read_thread_shutdown = true; + pthread_mutex_lock(&source->mutex); + source->shutdown = true; + pthread_mutex_unlock(&source->mutex); + pthread_cond_signal(&source->read_cond); + WaitForSingleObject(source->read_thread, INFINITE); + CloseHandle(source->read_thread); + } + + pthread_mutex_destroy(&source->mutex); + pthread_cond_destroy(&source->read_cond); + pthread_cond_destroy(&source->read_done_cond); + if (source->stream_count) heap_free(source->streams);
@@ -1284,6 +1381,12 @@ static HRESULT media_source_constructor(IMFByteStream *bytestream, struct media_ if (FAILED(hr = MFAllocateWorkQueue(&object->async_commands_queue))) goto fail;
+ pthread_mutex_init(&object->mutex, NULL); + pthread_cond_init(&object->read_cond, NULL); + pthread_cond_init(&object->read_done_cond, NULL); + + object->read_thread = CreateThread(NULL, 0, read_thread, object, 0, NULL); + object->container = gst_bin_new(NULL); object->bus = gst_bus_new(); gst_bus_set_sync_handler(object->bus, mf_src_bus_watch_wrapper, object, NULL);