[PATCH v5 0/2] MR8628: Draft: winepulse.drv: Synchronize audio streams with the same periods.
-- v5: winepulse.drv: Process streams timer updates from PA main loop. winepulse.drv: Move pulse_release_stream() below. https://gitlab.winehq.org/wine/wine/-/merge_requests/8628
From: Paul Gofman <pgofman@codeweavers.com> --- dlls/winepulse.drv/pulse.c | 72 +++++++++++++++++++------------------- 1 file changed, 36 insertions(+), 36 deletions(-) diff --git a/dlls/winepulse.drv/pulse.c b/dlls/winepulse.drv/pulse.c index abc5e60821f..cd734d8c52e 100644 --- a/dlls/winepulse.drv/pulse.c +++ b/dlls/winepulse.drv/pulse.c @@ -1237,42 +1237,6 @@ exit: return STATUS_SUCCESS; } -static NTSTATUS pulse_release_stream(void *args) -{ - struct release_stream_params *params = args; - struct pulse_stream *stream = handle_get_stream(params->stream); - SIZE_T size; - - if(params->timer_thread) { - stream->please_quit = TRUE; - NtWaitForSingleObject(params->timer_thread, FALSE, NULL); - NtClose(params->timer_thread); - } - - pulse_lock(); - if (PA_STREAM_IS_GOOD(pa_stream_get_state(stream->stream))) { - pa_stream_disconnect(stream->stream); - while (pulse_ml && PA_STREAM_IS_GOOD(pa_stream_get_state(stream->stream))) - pulse_cond_wait(); - } - pa_stream_unref(stream->stream); - pulse_unlock(); - - if (stream->tmp_buffer) { - size = 0; - NtFreeVirtualMemory(GetCurrentProcess(), (void **)&stream->tmp_buffer, - &size, MEM_RELEASE); - } - if (stream->local_buffer) { - size = 0; - NtFreeVirtualMemory(GetCurrentProcess(), (void **)&stream->local_buffer, - &size, MEM_RELEASE); - } - free(stream->peek_buffer); - free(stream); - return STATUS_SUCCESS; -} - static int write_buffer(const struct pulse_stream *stream, BYTE *buffer, UINT32 bytes) { const float *vol = stream->vol; @@ -1659,6 +1623,42 @@ static NTSTATUS pulse_timer_loop(void *args) return STATUS_SUCCESS; } +static NTSTATUS pulse_release_stream(void *args) +{ + struct release_stream_params *params = args; + struct pulse_stream *stream = handle_get_stream(params->stream); + SIZE_T size; + + if(params->timer_thread) { + stream->please_quit = TRUE; + NtWaitForSingleObject(params->timer_thread, FALSE, NULL); + NtClose(params->timer_thread); + } + + pulse_lock(); + if (PA_STREAM_IS_GOOD(pa_stream_get_state(stream->stream))) { + pa_stream_disconnect(stream->stream); + while (pulse_ml && PA_STREAM_IS_GOOD(pa_stream_get_state(stream->stream))) + pulse_cond_wait(); + } + pa_stream_unref(stream->stream); + pulse_unlock(); + + if (stream->tmp_buffer) { + size = 0; + NtFreeVirtualMemory(GetCurrentProcess(), (void **)&stream->tmp_buffer, + &size, MEM_RELEASE); + } + if (stream->local_buffer) { + size = 0; + NtFreeVirtualMemory(GetCurrentProcess(), (void **)&stream->local_buffer, + &size, MEM_RELEASE); + } + free(stream->peek_buffer); + free(stream); + return STATUS_SUCCESS; +} + static NTSTATUS pulse_start(void *args) { struct start_params *params = args; -- GitLab https://gitlab.winehq.org/wine/wine/-/merge_requests/8628
From: Paul Gofman <pgofman@codeweavers.com> --- dlls/winepulse.drv/pulse.c | 256 +++++++++++++++++++++++++------------ 1 file changed, 171 insertions(+), 85 deletions(-) diff --git a/dlls/winepulse.drv/pulse.c b/dlls/winepulse.drv/pulse.c index cd734d8c52e..83cd00c9320 100644 --- a/dlls/winepulse.drv/pulse.c +++ b/dlls/winepulse.drv/pulse.c @@ -53,10 +53,23 @@ enum phys_device_bus_type { phys_device_bus_usb }; +struct pulse_period +{ + struct list entry; + char *device; + pa_usec_t period; + struct list streams; + pa_time_event *time_event; + struct pulse_stream *timer_stream; +}; + +static struct list active_periods = LIST_INIT(active_periods); + struct pulse_stream { EDataFlow dataflow; + char *device; pa_stream *stream; pa_sample_spec ss; pa_channel_map map; @@ -74,16 +87,19 @@ struct pulse_stream BOOL started; SIZE_T bufsize_frames, real_bufsize_bytes, period_bytes; SIZE_T peek_ofs, read_offs_bytes, lcl_offs_bytes, pa_offs_bytes; - SIZE_T tmp_buffer_bytes, held_bytes, peek_len, peek_buffer_len, pa_held_bytes; + SIZE_T tmp_buffer_bytes, held_bytes, peek_len, peek_buffer_len, pa_held_bytes, max_pa_held_bytes; BYTE *local_buffer, *tmp_buffer, *peek_buffer; void *locked_ptr; - BOOL please_quit, just_started, just_underran; + BOOL just_underran; pa_usec_t mmdev_period_usec; + pa_usec_t stream_time, last_time; INT64 clock_lastpos, clock_written; struct list packet_free_head; struct list packet_filled_head; + struct list period_entry; + struct pulse_period *period; }; typedef struct _ACPacket @@ -1040,7 +1056,8 @@ static HRESULT pulse_spec_from_waveformat(struct pulse_stream *stream, const WAV static HRESULT pulse_stream_connect(struct pulse_stream *stream, const char *pulse_name, UINT32 period_bytes) { - pa_stream_flags_t flags = PA_STREAM_START_CORKED | PA_STREAM_START_UNMUTED | PA_STREAM_ADJUST_LATENCY; + pa_stream_flags_t flags = PA_STREAM_START_CORKED | PA_STREAM_START_UNMUTED | PA_STREAM_ADJUST_LATENCY + | PA_STREAM_AUTO_TIMING_UPDATE | PA_STREAM_INTERPOLATE_TIMING; int ret; char buffer[64]; static LONG number; @@ -1063,7 +1080,7 @@ static HRESULT pulse_stream_connect(struct pulse_stream *stream, const char *pul attr.minreq = attr.fragsize = period_bytes; attr.tlength = period_bytes * 3; attr.maxlength = stream->bufsize_frames * pa_frame_size(&stream->ss); - attr.prebuf = pa_frame_size(&stream->ss); + attr.prebuf = 0; dump_attr(&attr); /* If specific device was requested, use it exactly */ @@ -1218,6 +1235,7 @@ static NTSTATUS pulse_create_stream(void *args) } } } + stream->device = strdup(params->device); } *params->channel_count = stream->ss.channels; @@ -1230,6 +1248,7 @@ exit: pa_stream_disconnect(stream->stream); pa_stream_unref(stream->stream); } + free(stream->device); free(stream); } @@ -1524,103 +1543,163 @@ static void pulse_read(struct pulse_stream *stream) static NTSTATUS pulse_timer_loop(void *args) { - struct timer_loop_params *params = args; - struct pulse_stream *stream = handle_get_stream(params->stream); - LARGE_INTEGER delay; - pa_usec_t last_time; + /* Stream's data are read and written from the main loop timer callback. */ + return STATUS_SUCCESS; +} + +static void pa_streams_timer_cb(pa_mainloop_api *api, pa_time_event *e, const struct timeval *tv, void *userdata) +{ + pa_usec_t now = pa_rtclock_now(), next_timer, stream_time = 0; + struct pulse_period *period = userdata; + struct pulse_stream *stream; + int64_t adjust = 0; UINT32 adv_bytes; - int success; + int err; - pulse_lock(); - delay.QuadPart = -stream->mmdev_period_usec * 10; - pa_stream_get_time(stream->stream, &last_time); - pulse_unlock(); + if (period->timer_stream && !period->timer_stream->started) + period->timer_stream = NULL; - while (!stream->please_quit) + if (!period->timer_stream) { - pa_usec_t now, adv_usec = 0; - int err; - - NtDelayExecution(FALSE, &delay); - - pulse_lock(); + LIST_FOR_EACH_ENTRY(stream, &period->streams, struct pulse_stream, period_entry) + { + if (stream->started) + { + if ((err = pa_stream_get_time(stream->stream, &stream->stream_time))) + ERR("pa_stream_get_time error %d.\n", err); + stream->last_time = stream->stream_time; + TRACE("period %p, stream %p is now timer stream, stream_time %llu.\n", period, stream, (long long)stream->stream_time); + stream->stream_time -= period->period; + period->timer_stream = stream; + break; + } + } + } - delay.QuadPart = -stream->mmdev_period_usec * 10; + next_timer = now + period->period; + if (period->timer_stream) + { + if ((err = pa_stream_get_time(period->timer_stream->stream, &stream_time))) + ERR("pa_stream_get_time error %d.\n", err); + if (stream_time > period->timer_stream->last_time) + { + period->timer_stream->stream_time += period->period; + adjust = period->timer_stream->stream_time - stream_time; + TRACE("period %p, stream_time %lld, adjust %lld.\n", period, (long long)stream_time, (long long)adjust); + if (adjust > (int64_t)(period->period / 3)) + adjust = period->period / 3; + else if (adjust < -(int64_t)(period->period / 3)) + adjust = -(int64_t)period->period / 3; + next_timer += adjust; + } + else + { + WARN("stream time did not advance, last_time %lld, stream_time %lld.\n", + (long long)period->timer_stream->last_time, (long long)stream_time); + } + period->timer_stream->last_time = stream_time; + } - wait_pa_operation_complete(pa_stream_update_timing_info(stream->stream, pulse_op_cb, &success)); - err = pa_stream_get_time(stream->stream, &now); - if (err == 0) + LIST_FOR_EACH_ENTRY(stream, &period->streams, struct pulse_stream, period_entry) + { + if (stream->started) { - TRACE("got now: %s, last time: %s\n", wine_dbgstr_longlong(now), wine_dbgstr_longlong(last_time)); - if (stream->started && (stream->dataflow == eCapture || stream->held_bytes)) + if (stream->dataflow == eRender && stream->held_bytes) { - if(stream->just_underran) - { - last_time = now; - stream->just_started = TRUE; - } + pulse_write(stream); - if (stream->just_started) - { - /* let it play out a period to absorb some latency and get accurate timing */ - pa_usec_t diff = now - last_time; + /* regardless of what PA does, advance one per`iod */ + adv_bytes = min(stream->period_bytes, stream->held_bytes); + stream->lcl_offs_bytes += adv_bytes; + stream->lcl_offs_bytes %= stream->real_bufsize_bytes; + stream->held_bytes -= adv_bytes; + } + else if (stream->dataflow == eCapture) + { + pulse_read(stream); + } + } + if (stream->event) + NtSetEvent(stream->event, NULL); + } - if (diff > stream->mmdev_period_usec) - { - stream->just_started = FALSE; - last_time = now; - } - } - else - { - INT32 adjust = last_time + stream->mmdev_period_usec - now; + TRACE("period %p, now %llu, next_timer %llu.\n", period, (long long)now, (long long)next_timer); + pa_context_rttime_restart(pulse_ctx, e, next_timer); +} - adv_usec = now - last_time; +static void pa_streams_timer_cb_destroy(pa_mainloop_api *api, pa_time_event *e, void *userdata) +{ + struct pulse_period *period = userdata; - if(adjust > ((INT32)(stream->mmdev_period_usec / 2))) - adjust = stream->mmdev_period_usec / 2; - else if(adjust < -((INT32)(stream->mmdev_period_usec / 2))) - adjust = -1 * stream->mmdev_period_usec / 2; + TRACE("period %p.\n", period); - delay.QuadPart = -(stream->mmdev_period_usec + adjust) * 10; + list_remove(&period->entry); + free(period->device); + free(period); +} - last_time += stream->mmdev_period_usec; - } +static void remove_stream_from_period(struct pulse_stream *stream) +{ + if (!stream->period) + return; - if (stream->dataflow == eRender) - { - pulse_write(stream); + if (stream->period->timer_stream == stream) + stream->period->timer_stream = NULL; - /* regardless of what PA does, advance one period */ - adv_bytes = min(stream->period_bytes, stream->held_bytes); - stream->lcl_offs_bytes += adv_bytes; - stream->lcl_offs_bytes %= stream->real_bufsize_bytes; - stream->held_bytes -= adv_bytes; - } - else if(stream->dataflow == eCapture) - { - pulse_read(stream); - } - } - else - { - last_time = now; - delay.QuadPart = -stream->mmdev_period_usec * 10; - } - } + list_remove(&stream->period_entry); + if (list_empty(&stream->period->streams) && pulse_ml) + { + pa_mainloop_api *api = pa_mainloop_get_api(pulse_ml); - if (stream->event) - NtSetEvent(stream->event, NULL); + TRACE("freeing time event for period %p.\n", stream->period); + api->time_free(stream->period->time_event); + stream->period->time_event = NULL; + } +} - TRACE("%p after update, adv usec: %d, held: %u, delay usec: %u\n", - stream, (int)adv_usec, - (int)(stream->held_bytes/ pa_frame_size(&stream->ss)), - (unsigned int)(-delay.QuadPart / 10)); +static void pulse_add_stream_to_period(struct pulse_stream *stream) +{ + struct pulse_period *period; + pa_mainloop_api *api; - pulse_unlock(); + if ((period = stream->period)) + { + assert(stream->mmdev_period_usec == period->period); + assert(!strcmp(stream->device, period->device)); + /* */ + list_remove(&stream->period_entry); + list_add_tail(&period->streams, &stream->period_entry); + return; } - return STATUS_SUCCESS; + LIST_FOR_EACH_ENTRY(period, &active_periods, struct pulse_period, entry) + { + if (!period->time_event) + { + /* Period is being removed but pa_streams_timer_cb_destroy was not called yet. */ + continue; + } + if (period->period == stream->mmdev_period_usec && !strcmp(period->device, stream->device)) + { + TRACE("Using period %p.\n", period); + stream->period = period; + list_add_tail(&period->streams, &stream->period_entry); + return; + } + } + + period = calloc(1, sizeof(*period)); + period->period = stream->mmdev_period_usec; + period->device = strdup(stream->device); + list_init(&period->streams); + stream->period = period; + list_add_tail(&period->streams, &stream->period_entry); + list_add_tail(&active_periods, &period->entry); + period->time_event = pa_context_rttime_new(pulse_ctx, pa_rtclock_now() + period->period, + pa_streams_timer_cb, period); + api = pa_mainloop_get_api(pulse_ml); + api->time_set_destroy(period->time_event, pa_streams_timer_cb_destroy); + TRACE("Created period %p, %s, %lld.\n", period, debugstr_a(period->device), (long long)period->period); } static NTSTATUS pulse_release_stream(void *args) @@ -1630,12 +1709,12 @@ static NTSTATUS pulse_release_stream(void *args) SIZE_T size; if(params->timer_thread) { - stream->please_quit = TRUE; NtWaitForSingleObject(params->timer_thread, FALSE, NULL); NtClose(params->timer_thread); } pulse_lock(); + remove_stream_from_period(stream); if (PA_STREAM_IS_GOOD(pa_stream_get_state(stream->stream))) { pa_stream_disconnect(stream->stream); while (pulse_ml && PA_STREAM_IS_GOOD(pa_stream_get_state(stream->stream))) @@ -1655,6 +1734,7 @@ static NTSTATUS pulse_release_stream(void *args) &size, MEM_RELEASE); } free(stream->peek_buffer); + free(stream->device); free(stream); return STATUS_SUCCESS; } @@ -1701,7 +1781,7 @@ static NTSTATUS pulse_start(void *args) if (SUCCEEDED(params->result)) { stream->started = TRUE; - stream->just_started = TRUE; + pulse_add_stream_to_period(stream); } pulse_unlock(); return STATUS_SUCCESS; @@ -1958,11 +2038,17 @@ static NTSTATUS pulse_release_render_buffer(void *args) stream->held_bytes += written_bytes; stream->pa_held_bytes += written_bytes; + if (stream->pa_held_bytes > stream->max_pa_held_bytes) + { + stream->max_pa_held_bytes = stream->pa_held_bytes; + TRACE("%p max_pa_held_bytes %lld.\n", stream, (long long)stream->max_pa_held_bytes); + } if (stream->pa_held_bytes > stream->real_bufsize_bytes) { - stream->pa_offs_bytes += stream->pa_held_bytes - stream->real_bufsize_bytes; - stream->pa_offs_bytes %= stream->real_bufsize_bytes; - stream->pa_held_bytes = stream->real_bufsize_bytes; + WARN("%p PA buffer overflow.\n", stream); + stream->max_pa_held_bytes = 0; + stream->pa_offs_bytes = stream->lcl_offs_bytes; + stream->pa_held_bytes = stream->held_bytes; } stream->clock_written += written_bytes; stream->locked = 0; -- GitLab https://gitlab.winehq.org/wine/wine/-/merge_requests/8628
The averaging of stream delays doesn't work very well, there is an offset if streams are stopped and started at arbitrary time and the adjustment doesn't converge. So instead I've implemented adjustment based on one of the streams which are currently running which is supposed to account for regular device time drift. I tested that with an ad-hoc test adding and removing the stream (forcing the switch of the timer stream), simulating underrun of those and the thing recovers from those. -- https://gitlab.winehq.org/wine/wine/-/merge_requests/8628#note_139500
participants (2)
-
Paul Gofman -
Paul Gofman (@gofman)