On 9/13/21 1:08 PM, Derek Lesho wrote:
On 9/13/21 1:57 PM, Zebediah Figura wrote:
@@ -541,15 +536,63 @@ static bool CDECL wg_parser_get_next_read_offset(struct wg_parser *parser, static void CDECL wg_parser_push_data(struct wg_parser *parser, const void *data, uint32_t size) { + GstBuffer *buffer; + GstFlowReturn ret; + GError *error; + GstMessage *message;
+ if (!data) + { + pthread_mutex_lock(&parser->mutex);
+ error = g_error_new(G_FILE_ERROR, G_FILE_ERROR_FAILED, "WG-Parser client failed to read data at offset %" G_GUINT64_FORMAT, parser->next_pull_offset); + message = gst_message_new_error(NULL, error, ""); + if (!gst_bus_post(parser->bus, message)) + { + GST_ERROR("Failed to post error message to bus!\n"); + gst_message_unref(message); + }
Is there a point in posting a message to the bus?
The point would be to have a unified error path when something goes wrong. (whether the error be GStreamer internal or a read failure)
Oh, I guess we would probably need to unblock initialization here; I forgot that we did something other than just printing an error. Okay, that makes sense.
I don't think there's any point checking for failure from gst_bus_post(), though.
- parser->read_request.pending_read = false;
This is fine temporarily. Ultimately I think we should be terminating the client-side thread on error, and if possible not calling wg_parser_push_data() at all.
It would be a little tricky to do so given that this thread exists throughout initialization, and not signaling the error to GStreamer would mean we have to add yet another shutdown path that doesn't interfere with the others.
+ pthread_mutex_unlock(&parser->mutex); + return; + }
+ if (!size) + { + pthread_mutex_lock(&parser->mutex);
+ g_signal_emit_by_name(G_OBJECT(parser->appsrc), "end-of-stream", &ret); + parser->read_request.pending_read = false;
+ pthread_mutex_unlock(&parser->mutex); + return; + }
+ /* We could avoid this extra copy using gst_buffer_new_wrapped. + However, PE wouldn't know when to release the buffer allocations as the buffer + objects are queued, so we'd have to create a ring buffer the size of the gstappsrc + queue on the PE side and validate that we don't overrun on the unix side. I'm not + yet convinced that trying to reduce copies of compressed data is worth the + complexity. */ + buffer = gst_buffer_new_and_alloc(size); + gst_buffer_fill(buffer, 0, data, size);
pthread_mutex_lock(&parser->mutex); - parser->read_request.size = size; - parser->read_request.done = true; - parser->read_request.ret = !!data; - if (data) - memcpy(parser->read_request.data, data, size); - parser->read_request.data = NULL; + assert(parser->read_request.pending_read);
+ GST_BUFFER_OFFSET(buffer) = parser->next_pull_offset; + g_signal_emit_by_name(G_OBJECT(parser->appsrc), "push-buffer", buffer, &ret);
+ /* In random-access mode, GST_FLOW_EOS shouldn't be returned */ + assert(ret == GST_FLOW_OK || ret == GST_FLOW_FLUSHING); + if (ret == GST_FLOW_OK) + parser->next_pull_offset += size; + else + gst_buffer_unref(buffer);
According to my reading of the source code, GstAppSrc will take the reference even if it returns GST_FLOW_FLUSHING. That's a bit unintuitive and should probably be clearly documented on the GStreamer side, but anyway...
Good catch, thanks.
+ parser->read_request.pending_read = false; pthread_mutex_unlock(&parser->mutex); - pthread_cond_signal(&parser->read_done_cond); } static void CDECL wg_parser_set_unlimited_buffering(struct wg_parser *parser)