Signed-off-by: Zebediah Figura z.figura12@gmail.com --- dlls/winegstreamer/gstdemux.c | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-)
diff --git a/dlls/winegstreamer/gstdemux.c b/dlls/winegstreamer/gstdemux.c index db1825e34a1..40c5e7ae017 100644 --- a/dlls/winegstreamer/gstdemux.c +++ b/dlls/winegstreamer/gstdemux.c @@ -1070,14 +1070,13 @@ static gboolean activate_mode(GstPad *pad, GstObject *parent, GstPadMode mode, g return FALSE; }
-static GstBusSyncReply watch_bus(GstBus *bus, GstMessage *msg, gpointer data) +static GstBusSyncReply watch_bus(GstBus *bus, GstMessage *msg, gpointer user) { - struct parser *filter = data; - struct wg_parser *parser = filter->wg_parser; + struct wg_parser *parser = user; GError *err = NULL; gchar *dbg_info = NULL;
- GST_DEBUG("filter %p, message type %s.", filter, GST_MESSAGE_TYPE_NAME(msg)); + GST_DEBUG("parser %p, message type %s.", parser, GST_MESSAGE_TYPE_NAME(msg));
switch (msg->type) { @@ -1157,7 +1156,7 @@ static HRESULT GST_Connect(struct parser *This, IPin *pConnectPin) if (!parser->bus) { parser->bus = gst_bus_new(); - gst_bus_set_sync_handler(parser->bus, watch_bus, This, NULL); + gst_bus_set_sync_handler(parser->bus, watch_bus, parser, NULL); }
parser->container = gst_bin_new(NULL);
Signed-off-by: Zebediah Figura z.figura12@gmail.com --- dlls/winegstreamer/gstdemux.c | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-)
diff --git a/dlls/winegstreamer/gstdemux.c b/dlls/winegstreamer/gstdemux.c index 40c5e7ae017..8b63aa57702 100644 --- a/dlls/winegstreamer/gstdemux.c +++ b/dlls/winegstreamer/gstdemux.c @@ -1134,25 +1134,18 @@ static LONGLONG query_duration(GstPad *pad) return 0; }
-static HRESULT GST_Connect(struct parser *This, IPin *pConnectPin) +static HRESULT GST_Connect(struct wg_parser *parser, LONGLONG file_size) { - struct wg_parser *parser = This->wg_parser; unsigned int i; - LONGLONG avail; GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE( "quartz_src", GST_PAD_SRC, GST_PAD_ALWAYS, GST_STATIC_CAPS_ANY);
- IAsyncReader_Length(This->reader, &This->file_size, &avail); - parser->file_size = This->file_size; - - This->sink_connected = true; + parser->file_size = file_size; parser->sink_connected = true;
- This->read_thread = CreateThread(NULL, 0, read_thread, This, 0, NULL); - if (!parser->bus) { parser->bus = gst_bus_new(); @@ -1170,7 +1163,6 @@ static HRESULT GST_Connect(struct parser *This, IPin *pConnectPin) gst_pad_set_element_private(parser->my_src, parser);
parser->start_offset = parser->next_offset = parser->stop_offset = 0; - This->next_pull_offset = 0;
if (!parser->init_gst(parser)) return E_FAIL; @@ -1194,7 +1186,6 @@ static HRESULT GST_Connect(struct parser *This, IPin *pConnectPin) pthread_mutex_unlock(&parser->mutex);
parser->next_offset = 0; - This->next_pull_offset = 0; return S_OK; }
@@ -1357,6 +1348,7 @@ static HRESULT parser_sink_connect(struct strmbase_sink *iface, IPin *peer, cons { struct parser *filter = impl_from_strmbase_sink(iface); HRESULT hr = S_OK; + LONGLONG unused; unsigned int i;
mark_wine_thread(); @@ -1365,7 +1357,13 @@ static HRESULT parser_sink_connect(struct strmbase_sink *iface, IPin *peer, cons if (FAILED(hr = IPin_QueryInterface(peer, &IID_IAsyncReader, (void **)&filter->reader))) return hr;
- if (FAILED(hr = GST_Connect(filter, peer))) + IAsyncReader_Length(filter->reader, &filter->file_size, &unused); + + filter->sink_connected = true; + filter->read_thread = CreateThread(NULL, 0, read_thread, filter, 0, NULL); + filter->next_pull_offset = 0; + + if (FAILED(hr = GST_Connect(filter->wg_parser, filter->file_size))) goto err;
if (!filter->init_gst(filter))
Signed-off-by: Zebediah Figura z.figura12@gmail.com --- dlls/winegstreamer/gst_private.h | 2 + dlls/winegstreamer/gstdemux.c | 372 +------------------------------ dlls/winegstreamer/wg_parser.c | 372 +++++++++++++++++++++++++++++++ 3 files changed, 375 insertions(+), 371 deletions(-)
diff --git a/dlls/winegstreamer/gst_private.h b/dlls/winegstreamer/gst_private.h index 15a20749786..ce500a92356 100644 --- a/dlls/winegstreamer/gst_private.h +++ b/dlls/winegstreamer/gst_private.h @@ -211,6 +211,8 @@ struct unix_funcs struct wg_parser *(CDECL *wg_mpeg_audio_parser_create)(void); struct wg_parser *(CDECL *wg_wave_parser_create)(void); void (CDECL *wg_parser_destroy)(struct wg_parser *parser); + + HRESULT (CDECL *wg_parser_connect)(struct wg_parser *parser, uint64_t file_size); };
extern const struct unix_funcs *unix_funcs; diff --git a/dlls/winegstreamer/gstdemux.c b/dlls/winegstreamer/gstdemux.c index 8b63aa57702..9a74e09f5dd 100644 --- a/dlls/winegstreamer/gstdemux.c +++ b/dlls/winegstreamer/gstdemux.c @@ -42,9 +42,6 @@
WINE_DEFAULT_DEBUG_CHANNEL(gstreamer);
-GST_DEBUG_CATEGORY_STATIC(wine); -#define GST_CAT_DEFAULT wine - static const GUID MEDIASUBTYPE_CVID = {mmioFOURCC('c','v','i','d'), 0x0000, 0x0010, {0x80, 0x00, 0x00, 0xaa, 0x00, 0x38, 0x9b, 0x71}}; static const GUID MEDIASUBTYPE_MP3 = {WAVE_FORMAT_MPEGLAYER3, 0x0000, 0x0010, {0x80, 0x00, 0x00, 0xaa, 0x00, 0x38, 0x9b, 0x71}};
@@ -569,133 +566,6 @@ static bool amt_to_wg_format(const AM_MEDIA_TYPE *mt, struct wg_format *format) return false; }
-static gboolean gst_base_src_perform_seek(struct wg_parser *parser, GstEvent *event) -{ - gboolean res = TRUE; - gdouble rate; - GstFormat seek_format; - GstSeekFlags flags; - GstSeekType cur_type, stop_type; - gint64 cur, stop; - gboolean flush; - guint32 seqnum; - GstEvent *tevent; - BOOL thread = !!parser->push_thread; - - gst_event_parse_seek(event, &rate, &seek_format, &flags, - &cur_type, &cur, &stop_type, &stop); - - if (seek_format != GST_FORMAT_BYTES) - { - GST_FIXME("Unhandled format "%s".", gst_format_get_name(seek_format)); - return FALSE; - } - - flush = flags & GST_SEEK_FLAG_FLUSH; - seqnum = gst_event_get_seqnum(event); - - /* send flush start */ - if (flush) { - tevent = gst_event_new_flush_start(); - gst_event_set_seqnum(tevent, seqnum); - gst_pad_push_event(parser->my_src, tevent); - if (thread) - gst_pad_set_active(parser->my_src, 1); - } - - parser->next_offset = parser->start_offset = cur; - - /* and prepare to continue streaming */ - if (flush) { - tevent = gst_event_new_flush_stop(TRUE); - gst_event_set_seqnum(tevent, seqnum); - gst_pad_push_event(parser->my_src, tevent); - if (thread) - gst_pad_set_active(parser->my_src, 1); - } - - return res; -} - -static gboolean event_src(GstPad *pad, GstObject *parent, GstEvent *event) -{ - struct wg_parser *parser = gst_pad_get_element_private(pad); - gboolean ret = TRUE; - - GST_LOG("parser %p, type "%s".", parser, GST_EVENT_TYPE_NAME(event)); - - switch (event->type) - { - case GST_EVENT_SEEK: - ret = gst_base_src_perform_seek(parser, event); - break; - - case GST_EVENT_FLUSH_START: - case GST_EVENT_FLUSH_STOP: - case GST_EVENT_QOS: - case GST_EVENT_RECONFIGURE: - break; - - default: - GST_WARNING("Ignoring "%s" event.", GST_EVENT_TYPE_NAME(event)); - ret = FALSE; - break; - } - gst_event_unref(event); - return ret; -} - -static GstFlowReturn request_buffer_src(GstPad *pad, GstObject *parent, guint64 offset, guint size, GstBuffer **buffer); - -static void *push_data(void *arg) -{ - struct wg_parser *parser = arg; - GstBuffer *buffer; - LONGLONG maxlen; - - GST_DEBUG("Starting push thread."); - - if (!(buffer = gst_buffer_new_allocate(NULL, 16384, NULL))) - { - GST_ERROR("Failed to allocate memory."); - return NULL; - } - - maxlen = parser->stop_offset ? parser->stop_offset : parser->file_size; - - for (;;) { - ULONG len; - int ret; - - if (parser->next_offset >= maxlen) - break; - len = min(16384, maxlen - parser->next_offset); - - if ((ret = request_buffer_src(parser->my_src, NULL, parser->next_offset, len, &buffer)) < 0) - { - GST_ERROR("Failed to read data, ret %s.", gst_flow_get_name(ret)); - break; - } - - parser->next_offset += len; - - buffer->duration = buffer->pts = -1; - if ((ret = gst_pad_push(parser->my_src, buffer)) < 0) - { - GST_ERROR("Failed to push data, ret %s.", gst_flow_get_name(ret)); - break; - } - } - - gst_buffer_unref(buffer); - - gst_pad_push_event(parser->my_src, gst_event_new_eos()); - - GST_DEBUG("Stopping push thread."); - - return NULL; -} - /* Fill and send a single IMediaSample. */ static HRESULT send_sample(struct parser_source *pin, IMediaSample *sample, GstBuffer *buf, GstMapInfo *info, gsize offset, gsize size, DWORD bytes_per_second) @@ -890,45 +760,6 @@ static DWORD CALLBACK stream_thread(void *arg) return 0; }
-static GstFlowReturn request_buffer_src(GstPad *pad, GstObject *parent, guint64 offset, guint size, GstBuffer **buffer) -{ - struct wg_parser *parser = gst_pad_get_element_private(pad); - GstBuffer *new_buffer = NULL; - GstFlowReturn ret; - - GST_LOG("pad %p, offset %" G_GINT64_MODIFIER "u, length %u, buffer %p.", pad, offset, size, *buffer); - - if (!*buffer) - *buffer = new_buffer = gst_buffer_new_and_alloc(size); - - pthread_mutex_lock(&parser->mutex); - - assert(!parser->read_request.buffer); - parser->read_request.buffer = *buffer; - parser->read_request.offset = offset; - parser->read_request.size = size; - parser->read_request.done = false; - pthread_cond_signal(&parser->read_cond); - - /* Note that we don't unblock this wait on GST_EVENT_FLUSH_START. We expect - * the upstream pin to flush if necessary. We should never be blocked on - * read_thread() not running. */ - - while (!parser->read_request.done) - pthread_cond_wait(&parser->read_done_cond, &parser->mutex); - - ret = parser->read_request.ret; - - pthread_mutex_unlock(&parser->mutex); - - GST_LOG("Request returned %s.", gst_flow_get_name(ret)); - - if (ret != GST_FLOW_OK && new_buffer) - gst_buffer_unref(new_buffer); - - return ret; -} - static GstFlowReturn read_buffer(struct parser *This, guint64 ofs, guint len, GstBuffer *buffer) { HRESULT hr; @@ -989,206 +820,6 @@ static DWORD CALLBACK read_thread(void *arg) return 0; }
-static gboolean query_function(GstPad *pad, GstObject *parent, GstQuery *query) -{ - struct wg_parser *parser = gst_pad_get_element_private(pad); - GstFormat format; - - GST_LOG("parser %p, type %s.", parser, GST_QUERY_TYPE_NAME(query)); - - switch (GST_QUERY_TYPE(query)) { - case GST_QUERY_DURATION: - gst_query_parse_duration(query, &format, NULL); - if (format == GST_FORMAT_PERCENT) - { - gst_query_set_duration(query, GST_FORMAT_PERCENT, GST_FORMAT_PERCENT_MAX); - return TRUE; - } - else if (format == GST_FORMAT_BYTES) - { - gst_query_set_duration(query, GST_FORMAT_BYTES, parser->file_size); - return TRUE; - } - return FALSE; - case GST_QUERY_SEEKING: - gst_query_parse_seeking (query, &format, NULL, NULL, NULL); - if (format != GST_FORMAT_BYTES) - { - GST_WARNING("Cannot seek using format "%s".", gst_format_get_name(format)); - return FALSE; - } - gst_query_set_seeking(query, GST_FORMAT_BYTES, 1, 0, parser->file_size); - return TRUE; - case GST_QUERY_SCHEDULING: - gst_query_set_scheduling(query, GST_SCHEDULING_FLAG_SEEKABLE, 1, -1, 0); - gst_query_add_scheduling_mode(query, GST_PAD_MODE_PUSH); - gst_query_add_scheduling_mode(query, GST_PAD_MODE_PULL); - return TRUE; - default: - GST_WARNING("Unhandled query type %s.", GST_QUERY_TYPE_NAME(query)); - return FALSE; - } -} - -static gboolean activate_push(GstPad *pad, gboolean activate) -{ - struct wg_parser *parser = gst_pad_get_element_private(pad); - - if (!activate) { - if (parser->push_thread) { - pthread_join(parser->push_thread, NULL); - parser->push_thread = 0; - } - } else if (!parser->push_thread) { - int ret; - - if ((ret = pthread_create(&parser->push_thread, NULL, push_data, parser))) - { - GST_ERROR("Failed to create push thread: %s", strerror(errno)); - parser->push_thread = 0; - return FALSE; - } - } - return TRUE; -} - -static gboolean activate_mode(GstPad *pad, GstObject *parent, GstPadMode mode, gboolean activate) -{ - struct wg_parser *parser = gst_pad_get_element_private(pad); - - GST_DEBUG("%s source pad for parser %p in %s mode.", - activate ? "Activating" : "Deactivating", parser, gst_pad_mode_get_name(mode)); - - switch (mode) { - case GST_PAD_MODE_PULL: - return TRUE; - case GST_PAD_MODE_PUSH: - return activate_push(pad, activate); - default: - return FALSE; - } - return FALSE; -} - -static GstBusSyncReply watch_bus(GstBus *bus, GstMessage *msg, gpointer user) -{ - struct wg_parser *parser = user; - GError *err = NULL; - gchar *dbg_info = NULL; - - GST_DEBUG("parser %p, message type %s.", parser, GST_MESSAGE_TYPE_NAME(msg)); - - switch (msg->type) - { - case GST_MESSAGE_ERROR: - gst_message_parse_error(msg, &err, &dbg_info); - fprintf(stderr, "winegstreamer: error: %s: %s\n", GST_OBJECT_NAME(msg->src), err->message); - fprintf(stderr, "winegstreamer: error: %s: %s\n", GST_OBJECT_NAME(msg->src), dbg_info); - g_error_free(err); - g_free(dbg_info); - pthread_mutex_lock(&parser->mutex); - parser->error = true; - pthread_mutex_unlock(&parser->mutex); - pthread_cond_signal(&parser->init_cond); - break; - - case GST_MESSAGE_WARNING: - gst_message_parse_warning(msg, &err, &dbg_info); - fprintf(stderr, "winegstreamer: warning: %s: %s\n", GST_OBJECT_NAME(msg->src), err->message); - fprintf(stderr, "winegstreamer: warning: %s: %s\n", GST_OBJECT_NAME(msg->src), dbg_info); - g_error_free(err); - g_free(dbg_info); - break; - - case GST_MESSAGE_DURATION_CHANGED: - pthread_mutex_lock(&parser->mutex); - parser->has_duration = true; - pthread_mutex_unlock(&parser->mutex); - pthread_cond_signal(&parser->init_cond); - break; - - default: - break; - } - gst_message_unref(msg); - return GST_BUS_DROP; -} - -static LONGLONG query_duration(GstPad *pad) -{ - gint64 duration, byte_length; - - if (gst_pad_query_duration(pad, GST_FORMAT_TIME, &duration)) - return duration / 100; - - WARN("Failed to query time duration; trying to convert from byte length.\n"); - - /* To accurately get a duration for the stream, we want to only consider the - * length of that stream. Hence, query for the pad duration, instead of - * using the file duration. */ - if (gst_pad_query_duration(pad, GST_FORMAT_BYTES, &byte_length) - && gst_pad_query_convert(pad, GST_FORMAT_BYTES, byte_length, GST_FORMAT_TIME, &duration)) - return duration / 100; - - ERR("Failed to query duration.\n"); - return 0; -} - -static HRESULT GST_Connect(struct wg_parser *parser, LONGLONG file_size) -{ - unsigned int i; - GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE( - "quartz_src", - GST_PAD_SRC, - GST_PAD_ALWAYS, - GST_STATIC_CAPS_ANY); - - parser->file_size = file_size; - parser->sink_connected = true; - - if (!parser->bus) - { - parser->bus = gst_bus_new(); - gst_bus_set_sync_handler(parser->bus, watch_bus, parser, NULL); - } - - parser->container = gst_bin_new(NULL); - gst_element_set_bus(parser->container, parser->bus); - - parser->my_src = gst_pad_new_from_static_template(&src_template, "quartz-src"); - gst_pad_set_getrange_function(parser->my_src, request_buffer_src); - gst_pad_set_query_function(parser->my_src, query_function); - gst_pad_set_activatemode_function(parser->my_src, activate_mode); - gst_pad_set_event_function(parser->my_src, event_src); - gst_pad_set_element_private(parser->my_src, parser); - - parser->start_offset = parser->next_offset = parser->stop_offset = 0; - - if (!parser->init_gst(parser)) - return E_FAIL; - - pthread_mutex_lock(&parser->mutex); - - for (i = 0; i < parser->stream_count; ++i) - { - struct wg_parser_stream *stream = parser->streams[i]; - - stream->duration = query_duration(stream->their_src); - while (!stream->has_caps && !parser->error) - pthread_cond_wait(&parser->init_cond, &parser->mutex); - if (parser->error) - { - pthread_mutex_unlock(&parser->mutex); - return E_FAIL; - } - } - - pthread_mutex_unlock(&parser->mutex); - - parser->next_offset = 0; - return S_OK; -} - static inline struct parser_source *impl_from_IMediaSeeking(IMediaSeeking *iface) { return CONTAINING_RECORD(iface, struct parser_source, seek.IMediaSeeking_iface); @@ -1363,7 +994,7 @@ static HRESULT parser_sink_connect(struct strmbase_sink *iface, IPin *peer, cons filter->read_thread = CreateThread(NULL, 0, read_thread, filter, 0, NULL); filter->next_pull_offset = 0;
- if (FAILED(hr = GST_Connect(filter->wg_parser, filter->file_size))) + if (FAILED(hr = unix_funcs->wg_parser_connect(filter->wg_parser, filter->file_size))) goto err;
if (!filter->init_gst(filter)) @@ -1485,7 +1116,6 @@ static BOOL parser_init_gstreamer(void) { if (!init_gstreamer()) return FALSE; - GST_DEBUG_CATEGORY_INIT(wine, "WINE", GST_DEBUG_FG_RED, "Wine GStreamer support"); return TRUE; }
diff --git a/dlls/winegstreamer/wg_parser.c b/dlls/winegstreamer/wg_parser.c index 071694f77cc..3e7c41d644d 100644 --- a/dlls/winegstreamer/wg_parser.c +++ b/dlls/winegstreamer/wg_parser.c @@ -757,6 +757,376 @@ static void removed_decoded_pad(GstElement *element, GstPad *pad, gpointer user) g_free(name); }
+static GstFlowReturn request_buffer_src(GstPad *pad, GstObject *parent, guint64 offset, guint size, GstBuffer **buffer) +{ + struct wg_parser *parser = gst_pad_get_element_private(pad); + GstBuffer *new_buffer = NULL; + GstFlowReturn ret; + + GST_LOG("pad %p, offset %" G_GINT64_MODIFIER "u, length %u, buffer %p.", pad, offset, size, *buffer); + + if (!*buffer) + *buffer = new_buffer = gst_buffer_new_and_alloc(size); + + pthread_mutex_lock(&parser->mutex); + + assert(!parser->read_request.buffer); + parser->read_request.buffer = *buffer; + parser->read_request.offset = offset; + parser->read_request.size = size; + parser->read_request.done = false; + pthread_cond_signal(&parser->read_cond); + + /* Note that we don't unblock this wait on GST_EVENT_FLUSH_START. We expect + * the upstream pin to flush if necessary. We should never be blocked on + * read_thread() not running. */ + + while (!parser->read_request.done) + pthread_cond_wait(&parser->read_done_cond, &parser->mutex); + + ret = parser->read_request.ret; + + pthread_mutex_unlock(&parser->mutex); + + GST_LOG("Request returned %s.", gst_flow_get_name(ret)); + + if (ret != GST_FLOW_OK && new_buffer) + gst_buffer_unref(new_buffer); + + return ret; +} + +static gboolean query_function(GstPad *pad, GstObject *parent, GstQuery *query) +{ + struct wg_parser *parser = gst_pad_get_element_private(pad); + GstFormat format; + + GST_LOG("parser %p, type %s.", parser, GST_QUERY_TYPE_NAME(query)); + + switch (GST_QUERY_TYPE(query)) + { + case GST_QUERY_DURATION: + gst_query_parse_duration(query, &format, NULL); + if (format == GST_FORMAT_PERCENT) + { + gst_query_set_duration(query, GST_FORMAT_PERCENT, GST_FORMAT_PERCENT_MAX); + return TRUE; + } + else if (format == GST_FORMAT_BYTES) + { + gst_query_set_duration(query, GST_FORMAT_BYTES, parser->file_size); + return TRUE; + } + return FALSE; + + case GST_QUERY_SEEKING: + gst_query_parse_seeking (query, &format, NULL, NULL, NULL); + if (format != GST_FORMAT_BYTES) + { + GST_WARNING("Cannot seek using format "%s".", gst_format_get_name(format)); + return FALSE; + } + gst_query_set_seeking(query, GST_FORMAT_BYTES, 1, 0, parser->file_size); + return TRUE; + + case GST_QUERY_SCHEDULING: + gst_query_set_scheduling(query, GST_SCHEDULING_FLAG_SEEKABLE, 1, -1, 0); + gst_query_add_scheduling_mode(query, GST_PAD_MODE_PUSH); + gst_query_add_scheduling_mode(query, GST_PAD_MODE_PULL); + return TRUE; + + default: + GST_WARNING("Unhandled query type %s.", GST_QUERY_TYPE_NAME(query)); + return FALSE; + } +} + +static void *push_data(void *arg) +{ + struct wg_parser *parser = arg; + GstBuffer *buffer; + guint max_size; + + GST_DEBUG("Starting push thread."); + + if (!(buffer = gst_buffer_new_allocate(NULL, 16384, NULL))) + { + GST_ERROR("Failed to allocate memory."); + return NULL; + } + + max_size = parser->stop_offset ? parser->stop_offset : parser->file_size; + + for (;;) + { + ULONG size; + int ret; + + if (parser->next_offset >= max_size) + break; + size = min(16384, max_size - parser->next_offset); + + if ((ret = request_buffer_src(parser->my_src, NULL, parser->next_offset, size, &buffer)) < 0) + { + GST_ERROR("Failed to read data, ret %s.", gst_flow_get_name(ret)); + break; + } + + parser->next_offset += size; + + buffer->duration = buffer->pts = -1; + if ((ret = gst_pad_push(parser->my_src, buffer)) < 0) + { + GST_ERROR("Failed to push data, ret %s.", gst_flow_get_name(ret)); + break; + } + } + + gst_buffer_unref(buffer); + + gst_pad_push_event(parser->my_src, gst_event_new_eos()); + + GST_DEBUG("Stopping push thread."); + + return NULL; +} + +static gboolean activate_push(GstPad *pad, gboolean activate) +{ + struct wg_parser *parser = gst_pad_get_element_private(pad); + + if (!activate) + { + if (parser->push_thread) + { + pthread_join(parser->push_thread, NULL); + parser->push_thread = 0; + } + } + else if (!parser->push_thread) + { + int ret; + + if ((ret = pthread_create(&parser->push_thread, NULL, push_data, parser))) + { + GST_ERROR("Failed to create push thread: %s", strerror(errno)); + parser->push_thread = 0; + return FALSE; + } + } + return TRUE; +} + +static gboolean activate_mode(GstPad *pad, GstObject *parent, GstPadMode mode, gboolean activate) +{ + struct wg_parser *parser = gst_pad_get_element_private(pad); + + GST_DEBUG("%s source pad for parser %p in %s mode.", + activate ? "Activating" : "Deactivating", parser, gst_pad_mode_get_name(mode)); + + switch (mode) + { + case GST_PAD_MODE_PULL: + return TRUE; + case GST_PAD_MODE_PUSH: + return activate_push(pad, activate); + case GST_PAD_MODE_NONE: + break; + } + return FALSE; +} + +static GstBusSyncReply watch_bus(GstBus *bus, GstMessage *msg, gpointer user) +{ + struct wg_parser *parser = user; + gchar *dbg_info = NULL; + GError *err = NULL; + + GST_DEBUG("parser %p, message type %s.", parser, GST_MESSAGE_TYPE_NAME(msg)); + + switch (msg->type) + { + case GST_MESSAGE_ERROR: + gst_message_parse_error(msg, &err, &dbg_info); + fprintf(stderr, "winegstreamer: error: %s: %s\n", GST_OBJECT_NAME(msg->src), err->message); + fprintf(stderr, "winegstreamer: error: %s: %s\n", GST_OBJECT_NAME(msg->src), dbg_info); + g_error_free(err); + g_free(dbg_info); + pthread_mutex_lock(&parser->mutex); + parser->error = true; + pthread_mutex_unlock(&parser->mutex); + pthread_cond_signal(&parser->init_cond); + break; + + case GST_MESSAGE_WARNING: + gst_message_parse_warning(msg, &err, &dbg_info); + fprintf(stderr, "winegstreamer: warning: %s: %s\n", GST_OBJECT_NAME(msg->src), err->message); + fprintf(stderr, "winegstreamer: warning: %s: %s\n", GST_OBJECT_NAME(msg->src), dbg_info); + g_error_free(err); + g_free(dbg_info); + break; + + case GST_MESSAGE_DURATION_CHANGED: + pthread_mutex_lock(&parser->mutex); + parser->has_duration = true; + pthread_mutex_unlock(&parser->mutex); + pthread_cond_signal(&parser->init_cond); + break; + + default: + break; + } + gst_message_unref(msg); + return GST_BUS_DROP; +} + +static gboolean gst_base_src_perform_seek(struct wg_parser *parser, GstEvent *event) +{ + BOOL thread = !!parser->push_thread; + GstSeekType cur_type, stop_type; + GstFormat seek_format; + GstEvent *flush_event; + GstSeekFlags flags; + gint64 cur, stop; + guint32 seqnum; + gdouble rate; + + gst_event_parse_seek(event, &rate, &seek_format, &flags, + &cur_type, &cur, &stop_type, &stop); + + if (seek_format != GST_FORMAT_BYTES) + { + GST_FIXME("Unhandled format "%s".", gst_format_get_name(seek_format)); + return FALSE; + } + + seqnum = gst_event_get_seqnum(event); + + /* send flush start */ + if (flags & GST_SEEK_FLAG_FLUSH) + { + flush_event = gst_event_new_flush_start(); + gst_event_set_seqnum(flush_event, seqnum); + gst_pad_push_event(parser->my_src, flush_event); + if (thread) + gst_pad_set_active(parser->my_src, 1); + } + + parser->next_offset = parser->start_offset = cur; + + /* and prepare to continue streaming */ + if (flags & GST_SEEK_FLAG_FLUSH) + { + flush_event = gst_event_new_flush_stop(TRUE); + gst_event_set_seqnum(flush_event, seqnum); + gst_pad_push_event(parser->my_src, flush_event); + if (thread) + gst_pad_set_active(parser->my_src, 1); + } + + return TRUE; +} + +static gboolean event_src(GstPad *pad, GstObject *parent, GstEvent *event) +{ + struct wg_parser *parser = gst_pad_get_element_private(pad); + gboolean ret = TRUE; + + GST_LOG("parser %p, type "%s".", parser, GST_EVENT_TYPE_NAME(event)); + + switch (event->type) + { + case GST_EVENT_SEEK: + ret = gst_base_src_perform_seek(parser, event); + break; + + case GST_EVENT_FLUSH_START: + case GST_EVENT_FLUSH_STOP: + case GST_EVENT_QOS: + case GST_EVENT_RECONFIGURE: + break; + + default: + GST_WARNING("Ignoring "%s" event.", GST_EVENT_TYPE_NAME(event)); + ret = FALSE; + break; + } + gst_event_unref(event); + return ret; +} + +static LONGLONG query_duration(GstPad *pad) +{ + gint64 duration, byte_length; + + if (gst_pad_query_duration(pad, GST_FORMAT_TIME, &duration)) + return duration / 100; + + WARN("Failed to query time duration; trying to convert from byte length.\n"); + + /* To accurately get a duration for the stream, we want to only consider the + * length of that stream. Hence, query for the pad duration, instead of + * using the file duration. */ + if (gst_pad_query_duration(pad, GST_FORMAT_BYTES, &byte_length) + && gst_pad_query_convert(pad, GST_FORMAT_BYTES, byte_length, GST_FORMAT_TIME, &duration)) + return duration / 100; + + ERR("Failed to query duration.\n"); + return 0; +} + +static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_size) +{ + GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE("quartz_src", + GST_PAD_SRC, GST_PAD_ALWAYS, GST_STATIC_CAPS_ANY); + unsigned int i; + + parser->file_size = file_size; + parser->sink_connected = true; + + if (!parser->bus) + { + parser->bus = gst_bus_new(); + gst_bus_set_sync_handler(parser->bus, watch_bus, parser, NULL); + } + + parser->container = gst_bin_new(NULL); + gst_element_set_bus(parser->container, parser->bus); + + parser->my_src = gst_pad_new_from_static_template(&src_template, "quartz-src"); + gst_pad_set_getrange_function(parser->my_src, request_buffer_src); + gst_pad_set_query_function(parser->my_src, query_function); + gst_pad_set_activatemode_function(parser->my_src, activate_mode); + gst_pad_set_event_function(parser->my_src, event_src); + gst_pad_set_element_private(parser->my_src, parser); + + parser->start_offset = parser->next_offset = parser->stop_offset = 0; + + if (!parser->init_gst(parser)) + return E_FAIL; + + pthread_mutex_lock(&parser->mutex); + + for (i = 0; i < parser->stream_count; ++i) + { + struct wg_parser_stream *stream = parser->streams[i]; + + stream->duration = query_duration(stream->their_src); + while (!stream->has_caps && !parser->error) + pthread_cond_wait(&parser->init_cond, &parser->mutex); + if (parser->error) + { + pthread_mutex_unlock(&parser->mutex); + return E_FAIL; + } + } + + pthread_mutex_unlock(&parser->mutex); + + parser->next_offset = 0; + return S_OK; +} + static BOOL decodebin_parser_init_gst(struct wg_parser *parser) { GstElement *element = gst_element_factory_make("decodebin", NULL); @@ -1034,6 +1404,8 @@ static const struct unix_funcs funcs = wg_mpeg_audio_parser_create, wg_wave_parser_create, wg_parser_destroy, + + wg_parser_connect, };
NTSTATUS CDECL __wine_init_unix_lib(HMODULE module, DWORD reason, const void *ptr_in, void *ptr_out)
Signed-off-by: Zebediah Figura z.figura12@gmail.com --- dlls/winegstreamer/gst_private.h | 1 + dlls/winegstreamer/gstdemux.c | 55 ++-------------------------- dlls/winegstreamer/wg_parser.c | 61 ++++++++++++++++++++++++++++++++ 3 files changed, 65 insertions(+), 52 deletions(-)
diff --git a/dlls/winegstreamer/gst_private.h b/dlls/winegstreamer/gst_private.h index ce500a92356..0ddd9ea1da8 100644 --- a/dlls/winegstreamer/gst_private.h +++ b/dlls/winegstreamer/gst_private.h @@ -213,6 +213,7 @@ struct unix_funcs void (CDECL *wg_parser_destroy)(struct wg_parser *parser);
HRESULT (CDECL *wg_parser_connect)(struct wg_parser *parser, uint64_t file_size); + void (CDECL *wg_parser_disconnect)(struct wg_parser *parser); };
extern const struct unix_funcs *unix_funcs; diff --git a/dlls/winegstreamer/gstdemux.c b/dlls/winegstreamer/gstdemux.c index 9a74e09f5dd..2093608c443 100644 --- a/dlls/winegstreamer/gstdemux.c +++ b/dlls/winegstreamer/gstdemux.c @@ -1546,30 +1546,6 @@ static void source_disconnect(struct strmbase_source *iface) stream->enabled = false; }
-static void free_stream(struct wg_parser_stream *stream) -{ - if (stream->their_src) - { - if (stream->post_sink) - { - gst_pad_unlink(stream->their_src, stream->post_sink); - gst_pad_unlink(stream->post_src, stream->my_sink); - gst_object_unref(stream->post_src); - gst_object_unref(stream->post_sink); - stream->post_src = stream->post_sink = NULL; - } - else - gst_pad_unlink(stream->their_src, stream->my_sink); - gst_object_unref(stream->their_src); - } - gst_object_unref(stream->my_sink); - - pthread_cond_destroy(&stream->event_cond); - pthread_cond_destroy(&stream->event_empty_cond); - - free(stream); -} - static void free_source_pin(struct parser_source *pin) { if (pin->pin.pin.peer) @@ -1579,8 +1555,6 @@ static void free_source_pin(struct parser_source *pin) IPin_Disconnect(&pin->pin.pin.IPin_iface); }
- free_stream(pin->wg_stream); - pin->flushing_cs.DebugInfo->Spare[0] = 0; DeleteCriticalSection(&pin->flushing_cs);
@@ -1628,37 +1602,19 @@ static struct parser_source *create_pin(struct parser *filter,
static HRESULT GST_RemoveOutputPins(struct parser *This) { - struct wg_parser *parser = This->wg_parser; unsigned int i;
TRACE("(%p)\n", This); mark_wine_thread();
- if (!parser->container) + if (!This->sink_connected) return S_OK;
- /* Unblock all of our streams. */ - pthread_mutex_lock(&parser->mutex); - for (i = 0; i < parser->stream_count; ++i) - { - parser->streams[i]->flushing = true; - pthread_cond_signal(&parser->streams[i]->event_empty_cond); - } - pthread_mutex_unlock(&parser->mutex); - - gst_element_set_state(parser->container, GST_STATE_NULL); - gst_pad_unlink(parser->my_src, parser->their_sink); - gst_object_unref(parser->my_src); - gst_object_unref(parser->their_sink); - parser->my_src = parser->their_sink = NULL; + unix_funcs->wg_parser_disconnect(This->wg_parser);
/* read_thread() needs to stay alive to service any read requests GStreamer * sends, so we can only shut it down after GStreamer stops. */ This->sink_connected = false; - pthread_mutex_lock(&parser->mutex); - parser->sink_connected = false; - pthread_mutex_unlock(&parser->mutex); - pthread_cond_signal(&parser->read_cond); WaitForSingleObject(This->read_thread, INFINITE); CloseHandle(This->read_thread);
@@ -1671,12 +1627,7 @@ static HRESULT GST_RemoveOutputPins(struct parser *This) This->source_count = 0; heap_free(This->sources); This->sources = NULL; - parser->stream_count = 0; - free(parser->streams); - parser->streams = NULL; - gst_element_set_bus(parser->container, NULL); - gst_object_unref(parser->container); - parser->container = NULL; + BaseFilterImpl_IncrementPinVersion(&This->filter); return S_OK; } diff --git a/dlls/winegstreamer/wg_parser.c b/dlls/winegstreamer/wg_parser.c index 3e7c41d644d..31b5a67032a 100644 --- a/dlls/winegstreamer/wg_parser.c +++ b/dlls/winegstreamer/wg_parser.c @@ -1127,6 +1127,66 @@ static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_s return S_OK; }
+static void free_stream(struct wg_parser_stream *stream) +{ + if (stream->their_src) + { + if (stream->post_sink) + { + gst_pad_unlink(stream->their_src, stream->post_sink); + gst_pad_unlink(stream->post_src, stream->my_sink); + gst_object_unref(stream->post_src); + gst_object_unref(stream->post_sink); + stream->post_src = stream->post_sink = NULL; + } + else + gst_pad_unlink(stream->their_src, stream->my_sink); + gst_object_unref(stream->their_src); + } + gst_object_unref(stream->my_sink); + + pthread_cond_destroy(&stream->event_cond); + pthread_cond_destroy(&stream->event_empty_cond); + + free(stream); +} + +static void CDECL wg_parser_disconnect(struct wg_parser *parser) +{ + unsigned int i; + + /* Unblock all of our streams. */ + pthread_mutex_lock(&parser->mutex); + for (i = 0; i < parser->stream_count; ++i) + { + parser->streams[i]->flushing = true; + pthread_cond_signal(&parser->streams[i]->event_empty_cond); + } + pthread_mutex_unlock(&parser->mutex); + + gst_element_set_state(parser->container, GST_STATE_NULL); + gst_pad_unlink(parser->my_src, parser->their_sink); + gst_object_unref(parser->my_src); + gst_object_unref(parser->their_sink); + parser->my_src = parser->their_sink = NULL; + + pthread_mutex_lock(&parser->mutex); + parser->sink_connected = false; + pthread_mutex_unlock(&parser->mutex); + pthread_cond_signal(&parser->read_cond); + + for (i = 0; i < parser->stream_count; ++i) + free_stream(parser->streams[i]); + + parser->stream_count = 0; + free(parser->streams); + parser->streams = NULL; + + gst_element_set_bus(parser->container, NULL); + gst_object_unref(parser->container); + parser->container = NULL; +} + static BOOL decodebin_parser_init_gst(struct wg_parser *parser) { GstElement *element = gst_element_factory_make("decodebin", NULL); @@ -1406,6 +1466,7 @@ static const struct unix_funcs funcs = wg_parser_destroy,
wg_parser_connect, + wg_parser_disconnect, };
NTSTATUS CDECL __wine_init_unix_lib(HMODULE module, DWORD reason, const void *ptr_in, void *ptr_out)
Signed-off-by: Zebediah Figura z.figura12@gmail.com --- dlls/winegstreamer/gst_private.h | 3 +++ dlls/winegstreamer/gstdemux.c | 18 ++++++++++-------- dlls/winegstreamer/wg_parser.c | 13 +++++++++++++ 3 files changed, 26 insertions(+), 8 deletions(-)
diff --git a/dlls/winegstreamer/gst_private.h b/dlls/winegstreamer/gst_private.h index 0ddd9ea1da8..0a37e10c840 100644 --- a/dlls/winegstreamer/gst_private.h +++ b/dlls/winegstreamer/gst_private.h @@ -214,6 +214,9 @@ struct unix_funcs
HRESULT (CDECL *wg_parser_connect)(struct wg_parser *parser, uint64_t file_size); void (CDECL *wg_parser_disconnect)(struct wg_parser *parser); + + uint32_t (CDECL *wg_parser_get_stream_count)(struct wg_parser *parser); + struct wg_parser_stream *(CDECL *wg_parser_get_stream)(struct wg_parser *parser, uint32_t index); };
extern const struct unix_funcs *unix_funcs; diff --git a/dlls/winegstreamer/gstdemux.c b/dlls/winegstreamer/gstdemux.c index 2093608c443..be1c59453d5 100644 --- a/dlls/winegstreamer/gstdemux.c +++ b/dlls/winegstreamer/gstdemux.c @@ -1039,13 +1039,14 @@ static BOOL decodebin_parser_filter_init_gst(struct parser *filter) { static const WCHAR formatW[] = {'S','t','r','e','a','m',' ','%','0','2','u',0}; struct wg_parser *parser = filter->wg_parser; + unsigned int i, stream_count; WCHAR source_name[20]; - unsigned int i;
- for (i = 0; i < parser->stream_count; ++i) + stream_count = unix_funcs->wg_parser_get_stream_count(parser); + for (i = 0; i < stream_count; ++i) { sprintfW(source_name, formatW, i); - if (!create_pin(filter, parser->streams[i], source_name)) + if (!create_pin(filter, unix_funcs->wg_parser_get_stream(parser, i), source_name)) return FALSE; }
@@ -1664,7 +1665,7 @@ static BOOL wave_parser_filter_init_gst(struct parser *filter) static const WCHAR source_name[] = {'o','u','t','p','u','t',0}; struct wg_parser *parser = filter->wg_parser;
- if (!create_pin(filter, parser->streams[0], source_name)) + if (!create_pin(filter, unix_funcs->wg_parser_get_stream(parser, 0), source_name)) return FALSE;
return TRUE; @@ -1744,13 +1745,14 @@ static BOOL avi_splitter_filter_init_gst(struct parser *filter) { static const WCHAR formatW[] = {'S','t','r','e','a','m',' ','%','0','2','u',0}; struct wg_parser *parser = filter->wg_parser; + uint32_t i, stream_count; WCHAR source_name[20]; - unsigned int i;
- for (i = 0; i < parser->stream_count; ++i) + stream_count = unix_funcs->wg_parser_get_stream_count(parser); + for (i = 0; i < stream_count; ++i) { sprintfW(source_name, formatW, i); - if (!create_pin(filter, parser->streams[i], source_name)) + if (!create_pin(filter, unix_funcs->wg_parser_get_stream(parser, i), source_name)) return FALSE; }
@@ -1837,7 +1839,7 @@ static BOOL mpeg_splitter_filter_init_gst(struct parser *filter) static const WCHAR source_name[] = {'A','u','d','i','o',0}; struct wg_parser *parser = filter->wg_parser;
- if (!create_pin(filter, parser->streams[0], source_name)) + if (!create_pin(filter, unix_funcs->wg_parser_get_stream(parser, 0), source_name)) return FALSE;
return TRUE; diff --git a/dlls/winegstreamer/wg_parser.c b/dlls/winegstreamer/wg_parser.c index 31b5a67032a..8468bde4894 100644 --- a/dlls/winegstreamer/wg_parser.c +++ b/dlls/winegstreamer/wg_parser.c @@ -317,6 +317,16 @@ static bool wg_format_compare(const struct wg_format *a, const struct wg_format return false; }
+static uint32_t CDECL wg_parser_get_stream_count(struct wg_parser *parser) +{ + return parser->stream_count; +} + +static struct wg_parser_stream * CDECL wg_parser_get_stream(struct wg_parser *parser, uint32_t index) +{ + return parser->streams[index]; +} + static GstAutoplugSelectResult autoplug_blacklist(GstElement *bin, GstPad *pad, GstCaps *caps, GstElementFactory *fact, gpointer user) { const char *name = gst_element_factory_get_longname(fact); @@ -1467,6 +1477,9 @@ static const struct unix_funcs funcs =
wg_parser_connect, wg_parser_disconnect, + + wg_parser_get_stream_count, + wg_parser_get_stream, };
NTSTATUS CDECL __wine_init_unix_lib(HMODULE module, DWORD reason, const void *ptr_in, void *ptr_out)