On 9/6/21 4:14 PM, Zebediah Figura wrote:
pthread_cond_wait(&parser->read_cond, &parser->mutex); if (!parser->sink_connected) @@ -541,15 +537,47 @@ static bool CDECL wg_parser_get_next_read_offset(struct wg_parser *parser, static void CDECL wg_parser_push_data(struct wg_parser *parser, const void *data, uint32_t size) { + GstBuffer *buffer; + GstFlowReturn ret;
+ if (!data) + { + /* premature EOS should trigger an error path */ + pthread_mutex_lock(&parser->mutex); + parser->read_request.offset = -1; + g_signal_emit_by_name(G_OBJECT(parser->appsrc), "end-of-stream", &ret); + pthread_mutex_unlock(&parser->mutex); + return; + }
Hmm, this is as much a commentary on 2/5, but, I feel like EOS should be signaled in a consistent way, probably by passing a short (or even zero) size with a valid buffer.
The other way we can get here is error condition, and I'm not even sure we want to call wg_parser_push_data() in that case. The GStreamer side can't exactly do anything useful with that information.
Well, we need some way to not block forever upon an error. Traditionally, we used GStreamer error propagation for this; if we returned an error in getrange_cb, any of the blocking initialization functions would unblock and we could cleanup, and after initialization, we'd get an error in get_event. Absent this, we need another mechanism to invoke a cleanup. wg_parser_disconnect(/wg_parser_destroy later) in read_thread could work if we made sure that no other threads use the objects after destruction, and for patch 2 we unblock getrange_cb upon disconnection. Is this what you were going for?
In any case we shouldn't have to call g_signal_emit_by_name("end-of-stream") more than once in this function.
+ /* We could avoid this extra copy using gst_buffer_new_wrapped. + However, PE wouldn't know when to release the buffer allocations as the buffer + objects are queued, so we'd have to create a ring buffer the size of the gstappsrc + queue on the PE side and validate that we don't overrun on the unix side. I'm not + yet convinced that trying to reduce copies of compressed data is worth the + complexity. */ + buffer = gst_buffer_new_and_alloc(size); + gst_buffer_fill(buffer, 0, data, size);
pthread_mutex_lock(&parser->mutex); - parser->read_request.size = size; - parser->read_request.done = true; - parser->read_request.ret = !!data; - if (data) - memcpy(parser->read_request.data, data, size); - parser->read_request.data = NULL;
+ assert(parser->read_request.offset != -1); + GST_BUFFER_OFFSET(buffer) = parser->read_request.offset; + g_signal_emit_by_name(G_OBJECT(parser->appsrc), "push-buffer", buffer, &ret); + parser->read_request.offset = -1;
+ /* In random-access mode, GST_FLOW_EOS shouldn't be returned */ + assert(ret == GST_FLOW_OK || ret == GST_FLOW_FLUSHING); + if (ret == GST_FLOW_OK) + parser->next_pull_offset += size; + else + gst_buffer_unref(buffer);
+ assert(parser->next_pull_offset <= parser->file_size); + if (parser->next_pull_offset == parser->file_size) + g_signal_emit_by_name(G_OBJECT(parser->appsrc), "end-of-stream", &ret);
The awkward thing about this is that either we might pass the wrong size to wg_parser_get_next_read_offset(), or appsrc guarantees that we don't but we're not validating it (i.e. we're only validating the offset). For what it's worth, I'm not sure the documentation guarantees either one :-/
Either way, this should probably live in wg_parser_get_next_read_offset() and not here.
My impression from "EOS should be signaled in a consistent way, probably by passing a short (or even zero) size with a valid buffer. " was that we should be determining EOS on the client side of the interface, and push_data seems a more natural fit for something determined on the client side, not to mention that in a future push mode, we will have to be determining EOS on the client side.
+static void src_need_data(GstElement *appsrc, guint length, gpointer user) {
+ struct wg_parser *parser = user; pthread_mutex_lock(&parser->mutex);
+ /* Sometimes GstAppSrc sends identical need-data requests when it is woken up, + we can rely on push-buffer not having completed (and hence offset being -1) + because they are blocked on the internal mutex held by the pulling thread + calling this callback. */ + if (parser->read_request.offset != -1) {
Well, sure, but do we really need this if block at all?
Yes, we do. Without it, if there a spurious wakeup of appsrc's wait loop in gst_app_src_create (get_range) [1], and the "push-buffer" signal is signaled just after the loop's g_cond_wait reacquires the mutex on return [1], we could have "push-buffer" blocked on acquisition of the mutex [2] held by gst_app_src_create. The function would block before pushing the buffer to the internal queue. Then, gst_app_src_create continues and, seeing as there are no buffers in the queue, unlocks the mutex and calls need_data. Here we have our race:
- if the need_data's code is called first, read_request's offset and size of overwritten with the same values. Then, the code waiting on the mutex pushes the buffer responding to our request, and everything continues as normal (as if another need_data never occurred).
(Prepare for the text wall 😁)
- If the push-buffer code waiting on the mutex continues first, the internal buffer queue has the buffer written to it, and the push-buffer signal returns. Afterwards, read_request.offset is set to the sentinel value indicating that the request has been responded to, and that a new need-data is awaited, push_data returns. Then, need_data is run, acquiring the mutex too late to catch the previous buffer send, and it requests a read of the same size directly following the prior read request, since the offset had been updated by push_data. gst_app_src_create then sees the pushed buffer and returns it back to the getrange-related function, only for another getrange to come in with an offset not directly following the last request's. While the client code is reading the invalid request's data, src_seek_data is called. Right now we have an assert to make sure that there is no active request when seek_data is called, but if we didn't, we'd just set next_pull_offset to the seek's value. Then, need_data would be called again, but all it would do is update the size to the new request's size. After this, push_data (in response to the previous request) would be called and, for simplicity's sake, if the size of the two requests were identical, a push-buffer would be called, src_seek_data would pick up and have no way of knowing that the data is coming from after previous request's location in the file, which could cause any number of errors. If we keep around both next_pull_offset and read_request.offset, we could compare them in push_data and discard the buffer if they don't match, but this seems a lot more complicated than catching this case earlier on and preventing all this confusion.
1: https://github.com/GStreamer/gst-plugins-base/blob/master/gst-libs/gst/app/g...
2: https://github.com/GStreamer/gst-plugins-base/blob/master/gst-libs/gst/app/g...
+static gboolean src_seek_data(GstElement *appsrc, guint64 offset, gpointer user) { + struct wg_parser *parser = user; + pthread_mutex_lock(&parser->mutex); + assert(parser->read_request.offset == -1);
Is this guaranteed, though?
Do you mean by the publicly documented interface or the actual code? Looking at the code, in random access mode, it seems that the answer is yes. getrange turns into a seek and need-data combo and getrange would only be called after the previous buffer is sent. And for push mode, I don't think we'll ever get a seek request (and if we did there'd be no way to service it, as MFTs aren't seekable). Either way though, the assert probably isn't too necessary and is only there to increase understanding, if can easily be left out if you'd like.