[PATCH v4 0/2] MR8628: winepulse.drv: Synchronize audio streams with the same periods.
-- v4: winepulse.drv: Process streams timer updates from PA main loop. winepulse.drv: Move pulse_release_stream() below. https://gitlab.winehq.org/wine/wine/-/merge_requests/8628
From: Paul Gofman <pgofman@codeweavers.com> --- dlls/winepulse.drv/pulse.c | 72 +++++++++++++++++++------------------- 1 file changed, 36 insertions(+), 36 deletions(-) diff --git a/dlls/winepulse.drv/pulse.c b/dlls/winepulse.drv/pulse.c index abc5e60821f..cd734d8c52e 100644 --- a/dlls/winepulse.drv/pulse.c +++ b/dlls/winepulse.drv/pulse.c @@ -1237,42 +1237,6 @@ exit: return STATUS_SUCCESS; } -static NTSTATUS pulse_release_stream(void *args) -{ - struct release_stream_params *params = args; - struct pulse_stream *stream = handle_get_stream(params->stream); - SIZE_T size; - - if(params->timer_thread) { - stream->please_quit = TRUE; - NtWaitForSingleObject(params->timer_thread, FALSE, NULL); - NtClose(params->timer_thread); - } - - pulse_lock(); - if (PA_STREAM_IS_GOOD(pa_stream_get_state(stream->stream))) { - pa_stream_disconnect(stream->stream); - while (pulse_ml && PA_STREAM_IS_GOOD(pa_stream_get_state(stream->stream))) - pulse_cond_wait(); - } - pa_stream_unref(stream->stream); - pulse_unlock(); - - if (stream->tmp_buffer) { - size = 0; - NtFreeVirtualMemory(GetCurrentProcess(), (void **)&stream->tmp_buffer, - &size, MEM_RELEASE); - } - if (stream->local_buffer) { - size = 0; - NtFreeVirtualMemory(GetCurrentProcess(), (void **)&stream->local_buffer, - &size, MEM_RELEASE); - } - free(stream->peek_buffer); - free(stream); - return STATUS_SUCCESS; -} - static int write_buffer(const struct pulse_stream *stream, BYTE *buffer, UINT32 bytes) { const float *vol = stream->vol; @@ -1659,6 +1623,42 @@ static NTSTATUS pulse_timer_loop(void *args) return STATUS_SUCCESS; } +static NTSTATUS pulse_release_stream(void *args) +{ + struct release_stream_params *params = args; + struct pulse_stream *stream = handle_get_stream(params->stream); + SIZE_T size; + + if(params->timer_thread) { + stream->please_quit = TRUE; + NtWaitForSingleObject(params->timer_thread, FALSE, NULL); + NtClose(params->timer_thread); + } + + pulse_lock(); + if (PA_STREAM_IS_GOOD(pa_stream_get_state(stream->stream))) { + pa_stream_disconnect(stream->stream); + while (pulse_ml && PA_STREAM_IS_GOOD(pa_stream_get_state(stream->stream))) + pulse_cond_wait(); + } + pa_stream_unref(stream->stream); + pulse_unlock(); + + if (stream->tmp_buffer) { + size = 0; + NtFreeVirtualMemory(GetCurrentProcess(), (void **)&stream->tmp_buffer, + &size, MEM_RELEASE); + } + if (stream->local_buffer) { + size = 0; + NtFreeVirtualMemory(GetCurrentProcess(), (void **)&stream->local_buffer, + &size, MEM_RELEASE); + } + free(stream->peek_buffer); + free(stream); + return STATUS_SUCCESS; +} + static NTSTATUS pulse_start(void *args) { struct start_params *params = args; -- GitLab https://gitlab.winehq.org/wine/wine/-/merge_requests/8628
From: Paul Gofman <pgofman@codeweavers.com> --- dlls/winepulse.drv/pulse.c | 218 ++++++++++++++++++++++++------------- 1 file changed, 144 insertions(+), 74 deletions(-) diff --git a/dlls/winepulse.drv/pulse.c b/dlls/winepulse.drv/pulse.c index cd734d8c52e..d58d7deaa6f 100644 --- a/dlls/winepulse.drv/pulse.c +++ b/dlls/winepulse.drv/pulse.c @@ -53,10 +53,22 @@ enum phys_device_bus_type { phys_device_bus_usb }; +struct pulse_period +{ + struct list entry; + char *device; + pa_usec_t period; + struct list streams; + pa_time_event *time_event; +}; + +static struct list active_periods = LIST_INIT(active_periods); + struct pulse_stream { EDataFlow dataflow; + char *device; pa_stream *stream; pa_sample_spec ss; pa_channel_map map; @@ -77,13 +89,16 @@ struct pulse_stream SIZE_T tmp_buffer_bytes, held_bytes, peek_len, peek_buffer_len, pa_held_bytes; BYTE *local_buffer, *tmp_buffer, *peek_buffer; void *locked_ptr; - BOOL please_quit, just_started, just_underran; + BOOL just_started, just_underran; pa_usec_t mmdev_period_usec; + pa_usec_t stream_time; INT64 clock_lastpos, clock_written; struct list packet_free_head; struct list packet_filled_head; + struct list period_entry; + struct pulse_period *period; }; typedef struct _ACPacket @@ -1040,7 +1055,8 @@ static HRESULT pulse_spec_from_waveformat(struct pulse_stream *stream, const WAV static HRESULT pulse_stream_connect(struct pulse_stream *stream, const char *pulse_name, UINT32 period_bytes) { - pa_stream_flags_t flags = PA_STREAM_START_CORKED | PA_STREAM_START_UNMUTED | PA_STREAM_ADJUST_LATENCY; + pa_stream_flags_t flags = PA_STREAM_START_CORKED | PA_STREAM_START_UNMUTED | PA_STREAM_ADJUST_LATENCY + | PA_STREAM_AUTO_TIMING_UPDATE | PA_STREAM_INTERPOLATE_TIMING; int ret; char buffer[64]; static LONG number; @@ -1218,6 +1234,7 @@ static NTSTATUS pulse_create_stream(void *args) } } } + stream->device = strdup(params->device); } *params->channel_count = stream->ss.channels; @@ -1230,6 +1247,7 @@ exit: pa_stream_disconnect(stream->stream); pa_stream_unref(stream->stream); } + free(stream->device); free(stream); } @@ -1524,103 +1542,153 @@ static void pulse_read(struct pulse_stream *stream) static NTSTATUS pulse_timer_loop(void *args) { - struct timer_loop_params *params = args; - struct pulse_stream *stream = handle_get_stream(params->stream); - LARGE_INTEGER delay; - pa_usec_t last_time; + /* Stream's data are read and written from the main loop timer callback. */ + return STATUS_SUCCESS; +} + +static void pa_streams_timer_cb(pa_mainloop_api *api, pa_time_event *e, const struct timeval *tv, void *userdata) +{ + struct pulse_period *period = userdata; + pa_usec_t stream_time, next_timer; + struct pulse_stream *stream; + int64_t avg_adjust = 0; + int adj_count = 0; UINT32 adv_bytes; - int success; - pulse_lock(); - delay.QuadPart = -stream->mmdev_period_usec * 10; - pa_stream_get_time(stream->stream, &last_time); - pulse_unlock(); + next_timer = pa_rtclock_now() + period->period; + TRACE("period %p, next_timer %llu.\n", period, (long long)next_timer); - while (!stream->please_quit) + LIST_FOR_EACH_ENTRY(stream, &period->streams, struct pulse_stream, period_entry) { - pa_usec_t now, adv_usec = 0; - int err; - - NtDelayExecution(FALSE, &delay); - - pulse_lock(); - - delay.QuadPart = -stream->mmdev_period_usec * 10; - - wait_pa_operation_complete(pa_stream_update_timing_info(stream->stream, pulse_op_cb, &success)); - err = pa_stream_get_time(stream->stream, &now); - if (err == 0) + if (stream->started) { - TRACE("got now: %s, last time: %s\n", wine_dbgstr_longlong(now), wine_dbgstr_longlong(last_time)); - if (stream->started && (stream->dataflow == eCapture || stream->held_bytes)) + pa_stream_get_time(stream->stream, &stream_time); + if (stream->dataflow == eCapture || stream->held_bytes) { - if(stream->just_underran) + if (stream->just_underran) { - last_time = now; stream->just_started = TRUE; + stream->stream_time = stream_time; } - if (stream->just_started) { - /* let it play out a period to absorb some latency and get accurate timing */ - pa_usec_t diff = now - last_time; - - if (diff > stream->mmdev_period_usec) + if (stream_time - stream->stream_time > period->period) { stream->just_started = FALSE; - last_time = now; + stream->stream_time = stream_time; } } else { - INT32 adjust = last_time + stream->mmdev_period_usec - now; - - adv_usec = now - last_time; - - if(adjust > ((INT32)(stream->mmdev_period_usec / 2))) - adjust = stream->mmdev_period_usec / 2; - else if(adjust < -((INT32)(stream->mmdev_period_usec / 2))) - adjust = -1 * stream->mmdev_period_usec / 2; - - delay.QuadPart = -(stream->mmdev_period_usec + adjust) * 10; - - last_time += stream->mmdev_period_usec; - } - - if (stream->dataflow == eRender) - { - pulse_write(stream); - - /* regardless of what PA does, advance one period */ - adv_bytes = min(stream->period_bytes, stream->held_bytes); - stream->lcl_offs_bytes += adv_bytes; - stream->lcl_offs_bytes %= stream->real_bufsize_bytes; - stream->held_bytes -= adv_bytes; - } - else if(stream->dataflow == eCapture) - { - pulse_read(stream); + avg_adjust += stream->stream_time + period->period - stream_time; + ++adj_count; + stream->stream_time += period->period; } } else { - last_time = now; - delay.QuadPart = -stream->mmdev_period_usec * 10; + stream->stream_time = stream_time; } - } + if (stream->dataflow == eRender) + { + pulse_write(stream); + /* regardless of what PA does, advance one period */ + adv_bytes = min(stream->period_bytes, stream->held_bytes); + stream->lcl_offs_bytes += adv_bytes; + stream->lcl_offs_bytes %= stream->real_bufsize_bytes; + stream->held_bytes -= adv_bytes; + } + else if(stream->dataflow == eCapture) + { + pulse_read(stream); + } + } + } + LIST_FOR_EACH_ENTRY(stream, &period->streams, struct pulse_stream, period_entry) + { if (stream->event) NtSetEvent(stream->event, NULL); + } + if (adj_count) + { + avg_adjust /= adj_count; + if(avg_adjust > ((INT32)(period->period / 3))) + avg_adjust = period->period / 3; + else if(avg_adjust < -((INT32)period->period)) + avg_adjust = -1 * period->period / 3; + next_timer += avg_adjust; + } + pa_context_rttime_restart(pulse_ctx, e, next_timer); +} + +static void pa_streams_timer_cb_destroy(pa_mainloop_api *api, pa_time_event *e, void *userdata) +{ + struct pulse_period *period = userdata; - TRACE("%p after update, adv usec: %d, held: %u, delay usec: %u\n", - stream, (int)adv_usec, - (int)(stream->held_bytes/ pa_frame_size(&stream->ss)), - (unsigned int)(-delay.QuadPart / 10)); + TRACE("period %p.\n", period); - pulse_unlock(); + list_remove(&period->entry); + free(period->device); + free(period); +} + +static void remove_stream_from_period(struct pulse_stream *stream) +{ + if (!stream->period) + return; + + list_remove(&stream->period_entry); + if (list_empty(&stream->period->streams) && pulse_ml) + { + pa_mainloop_api *api = pa_mainloop_get_api(pulse_ml); + + TRACE("freeing time event for period %p.\n", stream->period); + api->time_free(stream->period->time_event); + stream->period->time_event = NULL; } +} - return STATUS_SUCCESS; +static void pulse_add_stream_to_period(struct pulse_stream *stream) +{ + struct pulse_period *period; + pa_mainloop_api *api; + + if ((period = stream->period)) + { + assert(stream->mmdev_period_usec == period->period); + assert(!strcmp(stream->device, period->device)); + return; + } + + LIST_FOR_EACH_ENTRY(period, &active_periods, struct pulse_period, entry) + { + if (!period->time_event) + { + /* Period is being removed but pa_streams_timer_cb_destroy was not called yet. */ + continue; + } + if (period->period == stream->mmdev_period_usec && !strcmp(period->device, stream->device)) + { + TRACE("Using period %p.\n", period); + stream->period = period; + list_add_tail(&period->streams, &stream->period_entry); + return; + } + } + + period = calloc(1, sizeof(*period)); + period->period = stream->mmdev_period_usec; + period->device = strdup(stream->device); + list_init(&period->streams); + stream->period = period; + list_add_tail(&period->streams, &stream->period_entry); + list_add_tail(&active_periods, &period->entry); + period->time_event = pa_context_rttime_new(pulse_ctx, pa_rtclock_now() + period->period, + pa_streams_timer_cb, period); + api = pa_mainloop_get_api(pulse_ml); + api->time_set_destroy(period->time_event, pa_streams_timer_cb_destroy); + TRACE("Created period %p, %s, %lld.\n", period, debugstr_a(period->device), (long long)period->period); } static NTSTATUS pulse_release_stream(void *args) @@ -1630,12 +1698,12 @@ static NTSTATUS pulse_release_stream(void *args) SIZE_T size; if(params->timer_thread) { - stream->please_quit = TRUE; NtWaitForSingleObject(params->timer_thread, FALSE, NULL); NtClose(params->timer_thread); } pulse_lock(); + remove_stream_from_period(stream); if (PA_STREAM_IS_GOOD(pa_stream_get_state(stream->stream))) { pa_stream_disconnect(stream->stream); while (pulse_ml && PA_STREAM_IS_GOOD(pa_stream_get_state(stream->stream))) @@ -1655,6 +1723,7 @@ static NTSTATUS pulse_release_stream(void *args) &size, MEM_RELEASE); } free(stream->peek_buffer); + free(stream->device); free(stream); return STATUS_SUCCESS; } @@ -1702,6 +1771,8 @@ static NTSTATUS pulse_start(void *args) { stream->started = TRUE; stream->just_started = TRUE; + pa_stream_get_time(stream->stream, &stream->stream_time); + pulse_add_stream_to_period(stream); } pulse_unlock(); return STATUS_SUCCESS; @@ -1960,9 +2031,8 @@ static NTSTATUS pulse_release_render_buffer(void *args) stream->pa_held_bytes += written_bytes; if (stream->pa_held_bytes > stream->real_bufsize_bytes) { - stream->pa_offs_bytes += stream->pa_held_bytes - stream->real_bufsize_bytes; - stream->pa_offs_bytes %= stream->real_bufsize_bytes; - stream->pa_held_bytes = stream->real_bufsize_bytes; + stream->pa_offs_bytes = stream->lcl_offs_bytes; + stream->pa_held_bytes = stream->held_bytes; } stream->clock_written += written_bytes; stream->locked = 0; -- GitLab https://gitlab.winehq.org/wine/wine/-/merge_requests/8628
Of course it is possible that there is systematic difference in stream time vs global pulse audio clock.
The analysis and discussion in https://gitlab.winehq.org/wine/wine/-/merge_requests/10792 suggests that this is an actual issue. I could not ever really reproduce it in full myself but the time may drift indeed. At least with real pulseaudio server and not pipewire-pulse the buffer usage can grow, maybe not necessarily due to different time source but it looks like occasional delays in PA (e. g., due to heavy load) may push stream time back a bit and the timeline never catches up, and if it hits buffer overflow the pa_held_bytes is not reset, buffer stays full and causes constant audio clicking. PA has a possibility of syncing streams on the same device (by providing master stream to pa_stream_connect), that could in theory be helpful here. I explored that a bit but in the end it doesn't look feasible to use: * not sure if it is doing anything at all on pipewire pulse, none of the listed constraints seem to take place (e. g., I can connect streams from different devices using the same master stream); * that is supported for render streams only; * on pulseaudio server when streams are on default device (empty name) that happens to prevent audio from switching device when default device is switched in settings; * solvable, but the synced streams can only be corked and resumed together, so if a single stream is paused it would have to stay running with silence. So I went a different way, assumed that there is no reason for the steams to have systematic time difference on the same PA device, started only grouping streams from the same device (or default device) in the same period and reintroduced timeline adjustment logic which is currently present. I also changed what happens in pulse_release_render_buffer() in case of 'pa_held_bytes' overflow: made it reset pa_held_bytes bytes to held_bytes so if for whatever reason a stream overflows over long run it can recover instead of getting stuck in the bad state with no spare output space. -- https://gitlab.winehq.org/wine/wine/-/merge_requests/8628#note_139156
participants (2)
-
Paul Gofman -
Paul Gofman (@gofman)