From: Torge Matthies tmatthies@codeweavers.com
Based on a patch by Jinoh Kang from February 2022 [1]. The following description is copied from said patch:
NtFlushProcessWriteBuffers is the NT equivalent of Linux membarrier() system call. The .NET Framework garbage collector uses it to synchronize with other threads, and thus is required to avoid silent memory corruption.
[1] https://www.winehq.org/mailman3/hyperkitty/list/wine-devel@winehq.org/messag... --- dlls/ntdll/unix/server.c | 9 +++ dlls/ntdll/unix/unix_private.h | 3 + dlls/ntdll/unix/virtual.c | 114 +++++++++++++++++++++++++++- server/protocol.def | 19 ++++- server/thread.c | 135 +++++++++++++++++++++++++++++++++ server/thread.h | 2 + 6 files changed, 278 insertions(+), 4 deletions(-)
diff --git a/dlls/ntdll/unix/server.c b/dlls/ntdll/unix/server.c index 77e8d5c7566..9857ed81aaf 100644 --- a/dlls/ntdll/unix/server.c +++ b/dlls/ntdll/unix/server.c @@ -574,6 +574,12 @@ static void invoke_system_apc( const apc_call_t *call, apc_result_t *result, BOO if (!self) NtClose( wine_server_ptr_handle(call->dup_handle.dst_process) ); break; } + case APC_MEMORY_BARRIER: + { + MemoryBarrier(); + result->type = call->type; + break; + } default: server_protocol_error( "get_apc_request: bad type %d\n", call->type ); break; @@ -603,6 +609,9 @@ unsigned int server_select( const select_op_t *select_op, data_size_t size, UINT pthread_sigmask( SIG_BLOCK, &server_block_set, &old_set ); for (;;) { + /* ensure writes so far are visible to other threads */ + MemoryBarrier(); + SERVER_START_REQ( select ) { req->flags = flags; diff --git a/dlls/ntdll/unix/unix_private.h b/dlls/ntdll/unix/unix_private.h index 47f0f9c56a9..b5843c9ccfa 100644 --- a/dlls/ntdll/unix/unix_private.h +++ b/dlls/ntdll/unix/unix_private.h @@ -61,6 +61,9 @@ struct ntdll_thread_data PRTL_THREAD_START_ROUTINE start; /* thread entry point */ void *param; /* thread entry point parameter */ void *jmp_buf; /* setjmp buffer for exception handling */ +#if defined(__linux__) && (defined(__i386__) || defined(__x86_64__)) + char *dontneed_page; /* page used for NtFlushProcessWriteBuffers implementation */ +#endif };
C_ASSERT( sizeof(struct ntdll_thread_data) <= sizeof(((TEB *)0)->GdiTebBatch) ); diff --git a/dlls/ntdll/unix/virtual.c b/dlls/ntdll/unix/virtual.c index 96a5e095d16..14374f4c504 100644 --- a/dlls/ntdll/unix/virtual.c +++ b/dlls/ntdll/unix/virtual.c @@ -39,6 +39,9 @@ #ifdef HAVE_SYS_SYSINFO_H # include <sys/sysinfo.h> #endif +#ifdef HAVE_SYS_SYSCALL_H +# include <sys/syscall.h> +#endif #ifdef HAVE_SYS_SYSCTL_H # include <sys/sysctl.h> #endif @@ -214,6 +217,12 @@ struct range_entry static struct range_entry *free_ranges; static struct range_entry *free_ranges_end;
+#if defined(__linux__) && defined(__NR_membarrier) +static BOOL membarrier_shared_available; +static BOOL membarrier_exp_available; +static pthread_once_t membarrier_init_once = PTHREAD_ONCE_INIT; +#endif +
static inline BOOL is_beyond_limit( const void *addr, size_t size, const void *limit ) { @@ -3028,6 +3037,10 @@ void virtual_free_teb( TEB *teb ) size = 0; NtFreeVirtualMemory( GetCurrentProcess(), &thread_data->kernel_stack, &size, MEM_RELEASE ); } +#if defined(__linux__) && (defined(__i386__) || defined(__x86_64__)) + if (thread_data->dontneed_page) + munmap( thread_data->dontneed_page, page_size ); +#endif if (wow_teb && (ptr = ULongToPtr( wow_teb->DeallocationStack ))) { size = 0; @@ -5015,13 +5028,110 @@ NTSTATUS WINAPI NtFlushInstructionCache( HANDLE handle, const void *addr, SIZE_T }
+#if defined(__linux__) && defined(__NR_membarrier) +#define MEMBARRIER_CMD_QUERY 0x00 +#define MEMBARRIER_CMD_SHARED 0x01 +#define MEMBARRIER_CMD_PRIVATE_EXPEDITED 0x08 +#define MEMBARRIER_CMD_REGISTER_PRIVATE_EXPEDITED 0x10 + + +static int membarrier( int cmd, unsigned int flags, int cpu_id ) +{ + return syscall( __NR_membarrier, cmd, flags, cpu_id ); +} + + +static void membarrier_init( void ) +{ + static const int exp_required_cmds = + MEMBARRIER_CMD_PRIVATE_EXPEDITED | MEMBARRIER_CMD_REGISTER_PRIVATE_EXPEDITED; + int available_cmds = membarrier( MEMBARRIER_CMD_QUERY, 0, 0 ); + if (available_cmds == -1) + return; + membarrier_shared_available = !!(available_cmds & MEMBARRIER_CMD_SHARED); + if ((available_cmds & exp_required_cmds) == exp_required_cmds) + membarrier_exp_available = !membarrier( MEMBARRIER_CMD_REGISTER_PRIVATE_EXPEDITED, 0, 0 ); +} + + +static int try_exp_membarrier( void ) +{ + pthread_once(&membarrier_init_once, membarrier_init); + if (!membarrier_exp_available) + return 0; + return !membarrier( MEMBARRIER_CMD_PRIVATE_EXPEDITED, 0, 0 ); +} + + +static int try_shared_membarrier( void ) +{ + if (!membarrier_shared_available) + return 0; + return !membarrier( MEMBARRIER_CMD_SHARED, 0, 0 ); +} +#else +static int try_exp_membarrier( void ) { return 0; } +static int try_shared_membarrier( void ) { return 0; } +#endif + + +#if defined(__linux__) && defined(__i386__) || defined(__x86_64__) +static int try_madvise( void ) +{ + /* Credits to Avi Kivity (scylladb) and Aliaksei Kandratsenka (gperftools) for this trick, + * see https://github.com/scylladb/seastar/commit/77a58e4dc020233f66fccb8d9e8f7a8b7... */ + char *mem = ntdll_get_thread_data()->dontneed_page; + if (!mem) + { + mem = anon_mmap_alloc( page_size, PROT_READ | PROT_WRITE ); + if (mem == MAP_FAILED) + return 0; + ntdll_get_thread_data()->dontneed_page = mem; + } + *mem = 3; + return !madvise( mem, page_size, MADV_DONTNEED ); +} +#else +static int try_madvise( void ) { return 0; } +#endif + + +static void do_apc_memorybarrier( void ) +{ + NTSTATUS status; + + MemoryBarrier(); + + do + { + select_op_t select_op; + + SERVER_START_REQ( flush_process_write_buffers ) + { + status = wine_server_call( req ); + } + SERVER_END_REQ; + if (status) continue; + + select_op.membarrier.op = SELECT_MEMBARRIER; + status = server_select( &select_op, sizeof(select_op.membarrier), SELECT_INTERRUPTIBLE, TIMEOUT_INFINITE, NULL, NULL ); + } + while (status); +} + + /********************************************************************** * NtFlushProcessWriteBuffers (NTDLL.@) */ void WINAPI NtFlushProcessWriteBuffers(void) { - static int once = 0; - if (!once++) FIXME( "stub\n" ); + if (try_exp_membarrier()) + return; + if (try_madvise()) + return; + if (try_shared_membarrier()) + return; + do_apc_memorybarrier(); }
diff --git a/server/protocol.def b/server/protocol.def index d828d41d1f7..eb062d3c7a3 100644 --- a/server/protocol.def +++ b/server/protocol.def @@ -462,7 +462,8 @@ enum select_op SELECT_WAIT_ALL, SELECT_SIGNAL_AND_WAIT, SELECT_KEYED_EVENT_WAIT, - SELECT_KEYED_EVENT_RELEASE + SELECT_KEYED_EVENT_RELEASE, + SELECT_MEMBARRIER };
typedef union @@ -485,6 +486,10 @@ typedef union obj_handle_t handle; client_ptr_t key; } keyed_event; + struct + { + enum select_op op; /* SELECT_MEMBARRIER */ + } membarrier; } select_op_t;
enum apc_type @@ -502,7 +507,8 @@ enum apc_type APC_MAP_VIEW, APC_UNMAP_VIEW, APC_CREATE_THREAD, - APC_DUP_HANDLE + APC_DUP_HANDLE, + APC_MEMORY_BARRIER };
typedef struct @@ -611,6 +617,10 @@ typedef union unsigned int attributes; /* object attributes */ unsigned int options; /* duplicate options */ } dup_handle; + struct + { + enum apc_type type; /* APC_MEMORY_BARRIER */ + } memory_barrier; } apc_call_t;
typedef union @@ -1610,6 +1620,11 @@ enum server_fd_type @END
+/* Issue a memory barrier on other threads in the same process */ +@REQ(flush_process_write_buffers) +@END + + struct thread_info { timeout_t start_time; diff --git a/server/thread.c b/server/thread.c index f49fbf40b78..4f159da357b 100644 --- a/server/thread.c +++ b/server/thread.c @@ -112,6 +112,42 @@ static const struct object_ops thread_apc_ops = thread_apc_destroy /* destroy */ };
+/* process-wide memory barriers */ + +struct thread_membarrier +{ + struct object obj; /* object header */ + int pending; +}; + +static void dump_thread_membarrier( struct object *obj, int verbose ); +static int thread_membarrier_signaled( struct object *obj, struct wait_queue_entry *entry ); +static struct thread_membarrier *create_membarrier( void ); + +static const struct object_ops thread_membarrier_ops = +{ + sizeof(struct thread_membarrier), /* size */ + &no_type, /* type */ + dump_thread_membarrier, /* dump */ + add_queue, /* add_queue */ + remove_queue, /* remove_queue */ + thread_membarrier_signaled, /* signaled */ + no_satisfied, /* satisfied */ + no_signal, /* signal */ + no_get_fd, /* get_fd */ + default_map_access, /* map_access */ + default_get_sd, /* get_sd */ + default_set_sd, /* set_sd */ + no_get_full_name, /* get_full_name */ + no_lookup_name, /* lookup_name */ + no_link_name, /* link_name */ + NULL, /* unlink_name */ + no_open_file, /* open_file */ + no_kernel_obj_list, /* get_kernel_obj_list */ + no_close_handle, /* close_handle */ + no_destroy /* destroy */ +}; +
/* thread CPU context */
@@ -232,6 +268,7 @@ static inline void init_thread_structure( struct thread *thread ) thread->system_regs = 0; thread->queue = NULL; thread->wait = NULL; + thread->membarrier = NULL; thread->error = 0; thread->req_data = NULL; thread->req_toread = 0; @@ -360,6 +397,12 @@ struct thread *create_thread( int fd, struct process *process, const struct secu release_object( thread ); return NULL; } + if (!(thread->membarrier = create_membarrier())) + { + close( fd ); + release_object( thread ); + return NULL; + } if (!(thread->request_fd = create_anonymous_fd( &thread_fd_ops, fd, &thread->obj, 0 ))) { release_object( thread ); @@ -415,6 +458,7 @@ static void cleanup_thread( struct thread *thread ) } clear_apc_queue( &thread->system_apc ); clear_apc_queue( &thread->user_apc ); + if (thread->membarrier) release_object( thread->membarrier ); free( thread->req_data ); free( thread->reply_data ); if (thread->request_fd) release_object( thread->request_fd ); @@ -433,6 +477,7 @@ static void cleanup_thread( struct thread *thread ) } } free( thread->desc ); + thread->membarrier = NULL; thread->req_data = NULL; thread->reply_data = NULL; thread->request_fd = NULL; @@ -526,6 +571,30 @@ static struct thread_apc *create_apc( struct object *owner, const apc_call_t *ca return apc; }
+static void dump_thread_membarrier( struct object *obj, int verbose ) +{ + struct thread_membarrier *mb = (struct thread_membarrier *)obj; + assert( obj->ops == &thread_membarrier_ops ); + + fprintf( stderr, "MB pending=%u\n", mb->pending ); +} + +static int thread_membarrier_signaled( struct object *obj, struct wait_queue_entry *entry ) +{ + struct thread_membarrier *mb = (struct thread_membarrier *)obj; + return !mb->pending; +} + +/* create a thread_membarrier object */ +static struct thread_membarrier *create_membarrier( void ) +{ + struct thread_membarrier *mb; + + if ((mb = alloc_object( &thread_membarrier_ops ))) + mb->pending = 0; + return mb; +} + /* get a thread pointer from a thread id (and increment the refcount) */ struct thread *get_thread_from_id( thread_id_t id ) { @@ -1029,6 +1098,13 @@ static int select_on( const select_op_t *select_op, data_size_t op_size, client_ current->wait->key = select_op->keyed_event.key; break;
+ case SELECT_MEMBARRIER: + object = ¤t->membarrier->obj; + if (!object) return 1; + ret = wait_on( select_op, 1, &object, flags, when ); + if (!ret) return 1; + break; + default: set_error( STATUS_INVALID_PARAMETER ); return 1; @@ -1165,6 +1241,20 @@ int thread_queue_apc( struct process *process, struct thread *thread, struct obj return ret; }
+static void finish_membarrier_apc( struct thread_apc *apc ) +{ + struct thread *caller = apc->caller; + + assert( caller ); + + if (caller->membarrier) + { + assert( caller->membarrier->pending > 0 ); + if (!--caller->membarrier->pending) + wake_up( &caller->membarrier->obj, 0 ); + } +} + /* cancel the async procedure call owned by a specific object */ void thread_cancel_apc( struct thread *thread, struct object *owner, enum apc_type type ) { @@ -1176,6 +1266,8 @@ void thread_cancel_apc( struct thread *thread, struct object *owner, enum apc_ty if (apc->owner != owner) continue; list_remove( &apc->entry ); apc->executed = 1; + if (apc->call.type == APC_MEMORY_BARRIER) + finish_membarrier_apc( apc ); wake_up( &apc->obj, 0 ); release_object( apc ); return; @@ -1206,6 +1298,8 @@ static void clear_apc_queue( struct list *queue ) struct thread_apc *apc = LIST_ENTRY( ptr, struct thread_apc, entry ); list_remove( &apc->entry ); apc->executed = 1; + if (apc->call.type == APC_MEMORY_BARRIER) + finish_membarrier_apc( apc ); wake_up( &apc->obj, 0 ); release_object( apc ); } @@ -1650,6 +1744,8 @@ DECL_HANDLER(select) apc->result.create_thread.handle = handle; clear_error(); /* ignore errors from the above calls */ } + if (apc->call.type == APC_MEMORY_BARRIER) /* wake up caller if membarriers done */ + finish_membarrier_apc( apc ); wake_up( &apc->obj, 0 ); close_handle( current->process, req->prev_apc ); release_object( apc ); @@ -1671,6 +1767,8 @@ DECL_HANDLER(select) else { apc->executed = 1; + if (apc->call.type == APC_MEMORY_BARRIER) + finish_membarrier_apc( apc ); wake_up( &apc->obj, 0 ); } release_object( apc ); @@ -2012,3 +2110,40 @@ DECL_HANDLER(get_next_thread) set_error( STATUS_NO_MORE_ENTRIES ); release_object( process ); } + +DECL_HANDLER(flush_process_write_buffers) +{ + struct process *process = current->process; + struct thread_membarrier *membarrier = current->membarrier; + struct thread *thread; + apc_call_t call; + + assert( membarrier ); + + memset( &call, 0, sizeof(call) ); + call.memory_barrier.type = APC_MEMORY_BARRIER; + + LIST_FOR_EACH_ENTRY( thread, &process->thread_list, struct thread, proc_entry ) + { + struct thread_apc *apc; + int success; + + if (thread == current || thread->state == TERMINATED) continue; + + if (!(apc = create_apc( &membarrier->obj, &call ))) break; + + if ((success = queue_apc( NULL, thread, apc ))) + { + apc->caller = (struct thread *)grab_object( current ); + membarrier->pending++; + } + + release_object( apc ); + + if (!success) + { + set_error( STATUS_UNSUCCESSFUL ); + break; + } + } +} diff --git a/server/thread.h b/server/thread.h index 8dcf966a90a..d7369134f42 100644 --- a/server/thread.h +++ b/server/thread.h @@ -28,6 +28,7 @@ struct process; struct thread_wait; struct thread_apc; +struct thread_membarrier; struct debug_obj; struct debug_event; struct msg_queue; @@ -59,6 +60,7 @@ struct thread struct thread_wait *wait; /* current wait condition if sleeping */ struct list system_apc; /* queue of system async procedure calls */ struct list user_apc; /* queue of user async procedure calls */ + struct thread_membarrier *membarrier; /* membarrier object for this thread */ struct inflight_fd inflight[MAX_INFLIGHT_FDS]; /* fds currently in flight */ unsigned int error; /* current error code */ union generic_request req; /* current request */