Based on [a patch](https://www.winehq.org/mailman3/hyperkitty/list/wine-devel@winehq.org/messag...) by Jinoh Kang (@iamahuman) from February 2022.
I removed the need for the event object and implemented fast paths for Linux. On Linux 4.16+ `membarrier(MEMBARRIER_CMD_GLOBAL_EXPEDITED, ...)` is used. On x86 Linux <= 4.15 `madvise(..., MADV_DONTNEED)` is used, which sends IPIs to all cores causing them to do a memory barrier. On non-x86 Linux 4.3+ `membarrier(MEMBARRIER_CMD_SHARED, ...)` is used. On Linux <= 4.2 and on other platforms the fallback path using APCs is used.
From: Torge Matthies tmatthies@codeweavers.com
Based on a patch by Jinoh Kang from February 2022 [1]. The following description is copied from said patch:
NtFlushProcessWriteBuffers is the NT equivalent of Linux membarrier() system call. The .NET Framework garbage collector uses it to synchronize with other threads, and thus is required to avoid silent memory corruption.
[1] https://www.winehq.org/mailman3/hyperkitty/list/wine-devel@winehq.org/messag... --- dlls/ntdll/unix/server.c | 9 +++ dlls/ntdll/unix/unix_private.h | 3 + dlls/ntdll/unix/virtual.c | 114 +++++++++++++++++++++++++++- server/protocol.def | 19 ++++- server/thread.c | 135 +++++++++++++++++++++++++++++++++ server/thread.h | 2 + 6 files changed, 278 insertions(+), 4 deletions(-)
diff --git a/dlls/ntdll/unix/server.c b/dlls/ntdll/unix/server.c index 77e8d5c7566..9857ed81aaf 100644 --- a/dlls/ntdll/unix/server.c +++ b/dlls/ntdll/unix/server.c @@ -574,6 +574,12 @@ static void invoke_system_apc( const apc_call_t *call, apc_result_t *result, BOO if (!self) NtClose( wine_server_ptr_handle(call->dup_handle.dst_process) ); break; } + case APC_MEMORY_BARRIER: + { + MemoryBarrier(); + result->type = call->type; + break; + } default: server_protocol_error( "get_apc_request: bad type %d\n", call->type ); break; @@ -603,6 +609,9 @@ unsigned int server_select( const select_op_t *select_op, data_size_t size, UINT pthread_sigmask( SIG_BLOCK, &server_block_set, &old_set ); for (;;) { + /* ensure writes so far are visible to other threads */ + MemoryBarrier(); + SERVER_START_REQ( select ) { req->flags = flags; diff --git a/dlls/ntdll/unix/unix_private.h b/dlls/ntdll/unix/unix_private.h index 47f0f9c56a9..b5843c9ccfa 100644 --- a/dlls/ntdll/unix/unix_private.h +++ b/dlls/ntdll/unix/unix_private.h @@ -61,6 +61,9 @@ struct ntdll_thread_data PRTL_THREAD_START_ROUTINE start; /* thread entry point */ void *param; /* thread entry point parameter */ void *jmp_buf; /* setjmp buffer for exception handling */ +#if defined(__linux__) && (defined(__i386__) || defined(__x86_64__)) + char *dontneed_page; /* page used for NtFlushProcessWriteBuffers implementation */ +#endif };
C_ASSERT( sizeof(struct ntdll_thread_data) <= sizeof(((TEB *)0)->GdiTebBatch) ); diff --git a/dlls/ntdll/unix/virtual.c b/dlls/ntdll/unix/virtual.c index 96a5e095d16..14374f4c504 100644 --- a/dlls/ntdll/unix/virtual.c +++ b/dlls/ntdll/unix/virtual.c @@ -39,6 +39,9 @@ #ifdef HAVE_SYS_SYSINFO_H # include <sys/sysinfo.h> #endif +#ifdef HAVE_SYS_SYSCALL_H +# include <sys/syscall.h> +#endif #ifdef HAVE_SYS_SYSCTL_H # include <sys/sysctl.h> #endif @@ -214,6 +217,12 @@ struct range_entry static struct range_entry *free_ranges; static struct range_entry *free_ranges_end;
+#if defined(__linux__) && defined(__NR_membarrier) +static BOOL membarrier_shared_available; +static BOOL membarrier_exp_available; +static pthread_once_t membarrier_init_once = PTHREAD_ONCE_INIT; +#endif +
static inline BOOL is_beyond_limit( const void *addr, size_t size, const void *limit ) { @@ -3028,6 +3037,10 @@ void virtual_free_teb( TEB *teb ) size = 0; NtFreeVirtualMemory( GetCurrentProcess(), &thread_data->kernel_stack, &size, MEM_RELEASE ); } +#if defined(__linux__) && (defined(__i386__) || defined(__x86_64__)) + if (thread_data->dontneed_page) + munmap( thread_data->dontneed_page, page_size ); +#endif if (wow_teb && (ptr = ULongToPtr( wow_teb->DeallocationStack ))) { size = 0; @@ -5015,13 +5028,110 @@ NTSTATUS WINAPI NtFlushInstructionCache( HANDLE handle, const void *addr, SIZE_T }
+#if defined(__linux__) && defined(__NR_membarrier) +#define MEMBARRIER_CMD_QUERY 0x00 +#define MEMBARRIER_CMD_SHARED 0x01 +#define MEMBARRIER_CMD_PRIVATE_EXPEDITED 0x08 +#define MEMBARRIER_CMD_REGISTER_PRIVATE_EXPEDITED 0x10 + + +static int membarrier( int cmd, unsigned int flags, int cpu_id ) +{ + return syscall( __NR_membarrier, cmd, flags, cpu_id ); +} + + +static void membarrier_init( void ) +{ + static const int exp_required_cmds = + MEMBARRIER_CMD_PRIVATE_EXPEDITED | MEMBARRIER_CMD_REGISTER_PRIVATE_EXPEDITED; + int available_cmds = membarrier( MEMBARRIER_CMD_QUERY, 0, 0 ); + if (available_cmds == -1) + return; + membarrier_shared_available = !!(available_cmds & MEMBARRIER_CMD_SHARED); + if ((available_cmds & exp_required_cmds) == exp_required_cmds) + membarrier_exp_available = !membarrier( MEMBARRIER_CMD_REGISTER_PRIVATE_EXPEDITED, 0, 0 ); +} + + +static int try_exp_membarrier( void ) +{ + pthread_once(&membarrier_init_once, membarrier_init); + if (!membarrier_exp_available) + return 0; + return !membarrier( MEMBARRIER_CMD_PRIVATE_EXPEDITED, 0, 0 ); +} + + +static int try_shared_membarrier( void ) +{ + if (!membarrier_shared_available) + return 0; + return !membarrier( MEMBARRIER_CMD_SHARED, 0, 0 ); +} +#else +static int try_exp_membarrier( void ) { return 0; } +static int try_shared_membarrier( void ) { return 0; } +#endif + + +#if defined(__linux__) && defined(__i386__) || defined(__x86_64__) +static int try_madvise( void ) +{ + /* Credits to Avi Kivity (scylladb) and Aliaksei Kandratsenka (gperftools) for this trick, + * see https://github.com/scylladb/seastar/commit/77a58e4dc020233f66fccb8d9e8f7a8b7... */ + char *mem = ntdll_get_thread_data()->dontneed_page; + if (!mem) + { + mem = anon_mmap_alloc( page_size, PROT_READ | PROT_WRITE ); + if (mem == MAP_FAILED) + return 0; + ntdll_get_thread_data()->dontneed_page = mem; + } + *mem = 3; + return !madvise( mem, page_size, MADV_DONTNEED ); +} +#else +static int try_madvise( void ) { return 0; } +#endif + + +static void do_apc_memorybarrier( void ) +{ + NTSTATUS status; + + MemoryBarrier(); + + do + { + select_op_t select_op; + + SERVER_START_REQ( flush_process_write_buffers ) + { + status = wine_server_call( req ); + } + SERVER_END_REQ; + if (status) continue; + + select_op.membarrier.op = SELECT_MEMBARRIER; + status = server_select( &select_op, sizeof(select_op.membarrier), SELECT_INTERRUPTIBLE, TIMEOUT_INFINITE, NULL, NULL ); + } + while (status); +} + + /********************************************************************** * NtFlushProcessWriteBuffers (NTDLL.@) */ void WINAPI NtFlushProcessWriteBuffers(void) { - static int once = 0; - if (!once++) FIXME( "stub\n" ); + if (try_exp_membarrier()) + return; + if (try_madvise()) + return; + if (try_shared_membarrier()) + return; + do_apc_memorybarrier(); }
diff --git a/server/protocol.def b/server/protocol.def index d828d41d1f7..eb062d3c7a3 100644 --- a/server/protocol.def +++ b/server/protocol.def @@ -462,7 +462,8 @@ enum select_op SELECT_WAIT_ALL, SELECT_SIGNAL_AND_WAIT, SELECT_KEYED_EVENT_WAIT, - SELECT_KEYED_EVENT_RELEASE + SELECT_KEYED_EVENT_RELEASE, + SELECT_MEMBARRIER };
typedef union @@ -485,6 +486,10 @@ typedef union obj_handle_t handle; client_ptr_t key; } keyed_event; + struct + { + enum select_op op; /* SELECT_MEMBARRIER */ + } membarrier; } select_op_t;
enum apc_type @@ -502,7 +507,8 @@ enum apc_type APC_MAP_VIEW, APC_UNMAP_VIEW, APC_CREATE_THREAD, - APC_DUP_HANDLE + APC_DUP_HANDLE, + APC_MEMORY_BARRIER };
typedef struct @@ -611,6 +617,10 @@ typedef union unsigned int attributes; /* object attributes */ unsigned int options; /* duplicate options */ } dup_handle; + struct + { + enum apc_type type; /* APC_MEMORY_BARRIER */ + } memory_barrier; } apc_call_t;
typedef union @@ -1610,6 +1620,11 @@ enum server_fd_type @END
+/* Issue a memory barrier on other threads in the same process */ +@REQ(flush_process_write_buffers) +@END + + struct thread_info { timeout_t start_time; diff --git a/server/thread.c b/server/thread.c index f49fbf40b78..4f159da357b 100644 --- a/server/thread.c +++ b/server/thread.c @@ -112,6 +112,42 @@ static const struct object_ops thread_apc_ops = thread_apc_destroy /* destroy */ };
+/* process-wide memory barriers */ + +struct thread_membarrier +{ + struct object obj; /* object header */ + int pending; +}; + +static void dump_thread_membarrier( struct object *obj, int verbose ); +static int thread_membarrier_signaled( struct object *obj, struct wait_queue_entry *entry ); +static struct thread_membarrier *create_membarrier( void ); + +static const struct object_ops thread_membarrier_ops = +{ + sizeof(struct thread_membarrier), /* size */ + &no_type, /* type */ + dump_thread_membarrier, /* dump */ + add_queue, /* add_queue */ + remove_queue, /* remove_queue */ + thread_membarrier_signaled, /* signaled */ + no_satisfied, /* satisfied */ + no_signal, /* signal */ + no_get_fd, /* get_fd */ + default_map_access, /* map_access */ + default_get_sd, /* get_sd */ + default_set_sd, /* set_sd */ + no_get_full_name, /* get_full_name */ + no_lookup_name, /* lookup_name */ + no_link_name, /* link_name */ + NULL, /* unlink_name */ + no_open_file, /* open_file */ + no_kernel_obj_list, /* get_kernel_obj_list */ + no_close_handle, /* close_handle */ + no_destroy /* destroy */ +}; +
/* thread CPU context */
@@ -232,6 +268,7 @@ static inline void init_thread_structure( struct thread *thread ) thread->system_regs = 0; thread->queue = NULL; thread->wait = NULL; + thread->membarrier = NULL; thread->error = 0; thread->req_data = NULL; thread->req_toread = 0; @@ -360,6 +397,12 @@ struct thread *create_thread( int fd, struct process *process, const struct secu release_object( thread ); return NULL; } + if (!(thread->membarrier = create_membarrier())) + { + close( fd ); + release_object( thread ); + return NULL; + } if (!(thread->request_fd = create_anonymous_fd( &thread_fd_ops, fd, &thread->obj, 0 ))) { release_object( thread ); @@ -415,6 +458,7 @@ static void cleanup_thread( struct thread *thread ) } clear_apc_queue( &thread->system_apc ); clear_apc_queue( &thread->user_apc ); + if (thread->membarrier) release_object( thread->membarrier ); free( thread->req_data ); free( thread->reply_data ); if (thread->request_fd) release_object( thread->request_fd ); @@ -433,6 +477,7 @@ static void cleanup_thread( struct thread *thread ) } } free( thread->desc ); + thread->membarrier = NULL; thread->req_data = NULL; thread->reply_data = NULL; thread->request_fd = NULL; @@ -526,6 +571,30 @@ static struct thread_apc *create_apc( struct object *owner, const apc_call_t *ca return apc; }
+static void dump_thread_membarrier( struct object *obj, int verbose ) +{ + struct thread_membarrier *mb = (struct thread_membarrier *)obj; + assert( obj->ops == &thread_membarrier_ops ); + + fprintf( stderr, "MB pending=%u\n", mb->pending ); +} + +static int thread_membarrier_signaled( struct object *obj, struct wait_queue_entry *entry ) +{ + struct thread_membarrier *mb = (struct thread_membarrier *)obj; + return !mb->pending; +} + +/* create a thread_membarrier object */ +static struct thread_membarrier *create_membarrier( void ) +{ + struct thread_membarrier *mb; + + if ((mb = alloc_object( &thread_membarrier_ops ))) + mb->pending = 0; + return mb; +} + /* get a thread pointer from a thread id (and increment the refcount) */ struct thread *get_thread_from_id( thread_id_t id ) { @@ -1029,6 +1098,13 @@ static int select_on( const select_op_t *select_op, data_size_t op_size, client_ current->wait->key = select_op->keyed_event.key; break;
+ case SELECT_MEMBARRIER: + object = ¤t->membarrier->obj; + if (!object) return 1; + ret = wait_on( select_op, 1, &object, flags, when ); + if (!ret) return 1; + break; + default: set_error( STATUS_INVALID_PARAMETER ); return 1; @@ -1165,6 +1241,20 @@ int thread_queue_apc( struct process *process, struct thread *thread, struct obj return ret; }
+static void finish_membarrier_apc( struct thread_apc *apc ) +{ + struct thread *caller = apc->caller; + + assert( caller ); + + if (caller->membarrier) + { + assert( caller->membarrier->pending > 0 ); + if (!--caller->membarrier->pending) + wake_up( &caller->membarrier->obj, 0 ); + } +} + /* cancel the async procedure call owned by a specific object */ void thread_cancel_apc( struct thread *thread, struct object *owner, enum apc_type type ) { @@ -1176,6 +1266,8 @@ void thread_cancel_apc( struct thread *thread, struct object *owner, enum apc_ty if (apc->owner != owner) continue; list_remove( &apc->entry ); apc->executed = 1; + if (apc->call.type == APC_MEMORY_BARRIER) + finish_membarrier_apc( apc ); wake_up( &apc->obj, 0 ); release_object( apc ); return; @@ -1206,6 +1298,8 @@ static void clear_apc_queue( struct list *queue ) struct thread_apc *apc = LIST_ENTRY( ptr, struct thread_apc, entry ); list_remove( &apc->entry ); apc->executed = 1; + if (apc->call.type == APC_MEMORY_BARRIER) + finish_membarrier_apc( apc ); wake_up( &apc->obj, 0 ); release_object( apc ); } @@ -1650,6 +1744,8 @@ DECL_HANDLER(select) apc->result.create_thread.handle = handle; clear_error(); /* ignore errors from the above calls */ } + if (apc->call.type == APC_MEMORY_BARRIER) /* wake up caller if membarriers done */ + finish_membarrier_apc( apc ); wake_up( &apc->obj, 0 ); close_handle( current->process, req->prev_apc ); release_object( apc ); @@ -1671,6 +1767,8 @@ DECL_HANDLER(select) else { apc->executed = 1; + if (apc->call.type == APC_MEMORY_BARRIER) + finish_membarrier_apc( apc ); wake_up( &apc->obj, 0 ); } release_object( apc ); @@ -2012,3 +2110,40 @@ DECL_HANDLER(get_next_thread) set_error( STATUS_NO_MORE_ENTRIES ); release_object( process ); } + +DECL_HANDLER(flush_process_write_buffers) +{ + struct process *process = current->process; + struct thread_membarrier *membarrier = current->membarrier; + struct thread *thread; + apc_call_t call; + + assert( membarrier ); + + memset( &call, 0, sizeof(call) ); + call.memory_barrier.type = APC_MEMORY_BARRIER; + + LIST_FOR_EACH_ENTRY( thread, &process->thread_list, struct thread, proc_entry ) + { + struct thread_apc *apc; + int success; + + if (thread == current || thread->state == TERMINATED) continue; + + if (!(apc = create_apc( &membarrier->obj, &call ))) break; + + if ((success = queue_apc( NULL, thread, apc ))) + { + apc->caller = (struct thread *)grab_object( current ); + membarrier->pending++; + } + + release_object( apc ); + + if (!success) + { + set_error( STATUS_UNSUCCESSFUL ); + break; + } + } +} diff --git a/server/thread.h b/server/thread.h index 8dcf966a90a..d7369134f42 100644 --- a/server/thread.h +++ b/server/thread.h @@ -28,6 +28,7 @@ struct process; struct thread_wait; struct thread_apc; +struct thread_membarrier; struct debug_obj; struct debug_event; struct msg_queue; @@ -59,6 +60,7 @@ struct thread struct thread_wait *wait; /* current wait condition if sleeping */ struct list system_apc; /* queue of system async procedure calls */ struct list user_apc; /* queue of user async procedure calls */ + struct thread_membarrier *membarrier; /* membarrier object for this thread */ struct inflight_fd inflight[MAX_INFLIGHT_FDS]; /* fds currently in flight */ unsigned int error; /* current error code */ union generic_request req; /* current request */
@iamahuman Do you remember why you added a memory barrier in `server_select`? Afaict it's not needed but I copied it into this mr for now.
Also if you want me to add a Co-Authored-By for you please let me know.
```diff diff --git a/dlls/ntdll/unix/server.c b/dlls/ntdll/unix/server.c index 48710cc0621..3eff200edef 100644 --- a/dlls/ntdll/unix/server.c +++ b/dlls/ntdll/unix/server.c @@ -662,6 +683,9 @@ unsigned int server_select( const select_op_t *select_op, data_size_t size, UINT pthread_sigmask( SIG_BLOCK, &server_block_set, &old_set ); for (;;) { + /* ensure writes so far are visible to other threads */ + MemoryBarrier(); + SERVER_START_REQ( select ) { req->flags = flags; ```
Don���t use MEMBARRIER_CMD_SHARED under any circumstance: its performance hit is potentially catastrophic:
https://lttng.org/blog/2018/01/15/membarrier-system-call-performance-and-use...
Also, dotnet core has code paths for a few more platforms so you might want to pick some of that too.
https://github.com/dotnet/runtime/blob/7be37908e5a1cbb83b1062768c1649827eeac...
Do you remember why you added a memory barrier in `server_select`?
It was accidental. That change should be a distinct patch on its own.
Afaict it's not needed
No, it's not needed for this MR.
The idea was that `server_select` should act as a strong memory barrier itself, so that the thread calling `FlushProcessWriteBuffers` did not need to call `MemoryBarrier()` explicitly
This is based on my reading of https://docs.microsoft.com/en-us/windows/win32/sync/synchronization-and-mult...: it seems to imply that wait functions shall act as barriers themselves. It makes sense since the waiter would expect to read latest values from shared variables when it wakes up.
but I copied it into this mr for now.
Also if you want me to add a Co-Authored-By for you please let me know.
How about splitting the patch instead? The first commit would introduce the fallback path, the second membarrier(), and so on. We're not introducing regression by rolling out an inefficient `FlushProcessWriteBuffers` implementation, since it had never worked in the first place.
Jinoh Kang (@iamahuman) commented about dlls/ntdll/unix/unix_private.h:
PRTL_THREAD_START_ROUTINE start; /* thread entry point */ void *param; /* thread entry point parameter */ void *jmp_buf; /* setjmp buffer for exception handling */
+#if defined(__linux__) && (defined(__i386__) || defined(__x86_64__))
- char *dontneed_page; /* page used for NtFlushProcessWriteBuffers implementation */
+#endif `#if defined(__linux__) && (defined(__i386__) || defined(__x86_64__))`
Although `madvise(MADV_DONTNEED)` is Linux-specific, we can extend the use of the dummy page to the `mprotect()` approach which should be portable across all Unices.
` char *dontneed_page; /* page used for NtFlushProcessWriteBuffers implementation */`
Allocating a page per thread may incur significant virtual memory overhead if the process is 32-bit and the application spawns too many threads. Unlike ScyllaDB, we don't have much control over them. How about making the page a singleton and guarding it with a mutex?
How about making the membarrier object per-process instead?
Also, if we're introducing a new select type, we might as well make the select call itself the membarrier.
How about making the membarrier object per-process instead?
I don't think I can/should do that, since then if thread 2 issues a memory barrier while thread 1 is already waiting on a memory barrier, then thread 1 will also wait for the second memory barrier to complete instead of just its own.
Although `madvise(MADV_DONTNEED)` is Linux-specific, we can extend the use of the dummy page to the `mprotect()` approach which should be portable across all Unices.
Are you sure that all Unices will issue a memory barrier on all running threads of the process though?