NtFlushProcessWriteBuffers is the NT equivalent of Linux membarrier() system call. The .NET Framework garbage collector uses it to synchronize with other threads, and thus is required to avoid silent memory corruption.
Signed-off-by: Jinoh Kang jinoh.kang.kr@gmail.com ---
Notes: This is a naive implementation of the barrier system call. The memory overhead for each NtFlushProcessWriteBuffers call is: - 1 event object - N APC objects (where N = <number of threads in process> - 1)
Possible directions for improvements include:
- Don't allocate any memory, so that it won't fail on out-of-memory conditions. - This also means we don't allocate APC objects. Currently - Use membarrier() on Linux. - Use TLB shootdown IPI instead of APC. (Is this a correct method?)
dlls/ntdll/unix/server.c | 63 +++++++++++++++++++++++++++++++++++++++ dlls/ntdll/unix/virtual.c | 10 ------- server/protocol.def | 16 +++++++++- server/thread.c | 59 ++++++++++++++++++++++++++++++++---- 4 files changed, 131 insertions(+), 17 deletions(-)
diff --git a/dlls/ntdll/unix/server.c b/dlls/ntdll/unix/server.c index 9d0594d3374..26e2c288440 100644 --- a/dlls/ntdll/unix/server.c +++ b/dlls/ntdll/unix/server.c @@ -108,6 +108,15 @@ static int initial_cwd = -1; static pid_t server_pid; static pthread_mutex_t fd_cache_mutex = PTHREAD_MUTEX_INITIALIZER;
+#define PROC_MEM_BARRIER_MAGIC 0x626d654dUL + +struct proc_mem_barrier_cb +{ + ULONG magic; + volatile LONG count; + HANDLE event; +}; + /* atomically exchange a 64-bit value */ static inline LONG64 interlocked_xchg64( LONG64 *dest, LONG64 val ) { @@ -574,6 +583,18 @@ static void invoke_system_apc( const apc_call_t *call, apc_result_t *result, BOO if (!self) NtClose( wine_server_ptr_handle(call->dup_handle.dst_process) ); break; } + case APC_MEMORY_BARRIER: + { + struct proc_mem_barrier_cb *cb = wine_server_get_ptr( call->memory_barrier.cb ); + + assert( cb->magic == PROC_MEM_BARRIER_MAGIC ); + + MemoryBarrier(); + result->type = call->type; + if (!InterlockedDecrement( &cb->count )) + NtSetEvent( cb->event, NULL ); + break; + } default: server_protocol_error( "get_apc_request: bad type %d\n", call->type ); break; @@ -603,6 +624,9 @@ unsigned int server_select( const select_op_t *select_op, data_size_t size, UINT pthread_sigmask( SIG_BLOCK, &server_block_set, &old_set ); for (;;) { + /* ensure writes so far are visible to other threads */ + MemoryBarrier(); + SERVER_START_REQ( select ) { req->flags = flags; @@ -1729,3 +1753,42 @@ NTSTATUS WINAPI NtClose( HANDLE handle ) } return ret; } + + +/********************************************************************** + * NtFlushProcessWriteBuffers (NTDLL.@) + */ +void WINAPI NtFlushProcessWriteBuffers(void) +{ + for (;;) + { + NTSTATUS ret; + LARGE_INTEGER timeout; + struct proc_mem_barrier_cb cb = { PROC_MEM_BARRIER_MAGIC }; + + ret = NtCreateEvent( &cb.event, EVENT_ALL_ACCESS, NULL, SynchronizationEvent, FALSE ); + if (ret == STATUS_SUCCESS) + { + MemoryBarrier(); + SERVER_START_REQ( flush_process_write_buffers ) + { + req->cb = wine_server_client_ptr( &cb ); + ret = wine_server_call( req ); + InterlockedAdd( &cb.count, reply->count ); + } + SERVER_END_REQ; + + while (cb.count) + NtWaitForSingleObject( cb.event, FALSE, NULL ); + MemoryBarrier(); /* read cb.count before end of barrier */ + + NtClose( cb.event ); + memset( &cb, 0, sizeof(cb) ); + if (ret == STATUS_SUCCESS) break; + } + + /* NtFlushProcessWriteBuffers cannot fail */ + timeout.QuadPart = -100 * (ULONGLONG)10000; + NtDelayExecution( FALSE, &timeout ); + } +} diff --git a/dlls/ntdll/unix/virtual.c b/dlls/ntdll/unix/virtual.c index 94b300c5057..3e2d985f8d0 100644 --- a/dlls/ntdll/unix/virtual.c +++ b/dlls/ntdll/unix/virtual.c @@ -4935,16 +4935,6 @@ NTSTATUS WINAPI NtFlushInstructionCache( HANDLE handle, const void *addr, SIZE_T }
-/********************************************************************** - * NtFlushProcessWriteBuffers (NTDLL.@) - */ -void WINAPI NtFlushProcessWriteBuffers(void) -{ - static int once = 0; - if (!once++) FIXME( "stub\n" ); -} - - /********************************************************************** * NtCreatePagingFile (NTDLL.@) */ diff --git a/server/protocol.def b/server/protocol.def index 02e73047f9b..907cb2f5298 100644 --- a/server/protocol.def +++ b/server/protocol.def @@ -502,7 +502,8 @@ enum apc_type APC_MAP_VIEW, APC_UNMAP_VIEW, APC_CREATE_THREAD, - APC_DUP_HANDLE + APC_DUP_HANDLE, + APC_MEMORY_BARRIER };
typedef struct @@ -611,6 +612,11 @@ typedef union unsigned int attributes; /* object attributes */ unsigned int options; /* duplicate options */ } dup_handle; + struct + { + enum apc_type type; /* APC_MEMORY_BARRIER */ + client_ptr_t cb; /* address of control block */ + } memory_barrier; } apc_call_t;
typedef union @@ -1592,6 +1598,14 @@ enum server_fd_type @END
+/* Issue APC_MEMORY_BARRIER on other threads in the same process */ +@REQ(flush_process_write_buffers) + client_ptr_t cb; /* address of control block */ +@REPLY + data_size_t count; /* number of threads signaled */ +@END + + struct thread_info { timeout_t start_time; diff --git a/server/thread.c b/server/thread.c index 467ccd1f0db..4f02f6a5118 100644 --- a/server/thread.c +++ b/server/thread.c @@ -1151,6 +1151,21 @@ static int queue_apc( struct process *process, struct thread *thread, struct thr return 1; }
+static void withdraw_apc( struct thread_apc *apc ) +{ + if (apc->call.type == APC_MEMORY_BARRIER && apc->result.type != apc->call.type && apc->owner) + { + struct process *process = (struct process *)apc->owner; + + /* Re-queue APC to another thread to balance the proc_mem_barrier_cb counter */ + if (queue_apc( process, NULL, apc )) + return; + } + + apc->executed = 1; + wake_up( &apc->obj, 0 ); +} + /* queue an async procedure call */ int thread_queue_apc( struct process *process, struct thread *thread, struct object *owner, const apc_call_t *call_data ) { @@ -1175,8 +1190,7 @@ void thread_cancel_apc( struct thread *thread, struct object *owner, enum apc_ty { if (apc->owner != owner) continue; list_remove( &apc->entry ); - apc->executed = 1; - wake_up( &apc->obj, 0 ); + withdraw_apc( apc ); release_object( apc ); return; } @@ -1205,8 +1219,7 @@ static void clear_apc_queue( struct list *queue ) { struct thread_apc *apc = LIST_ENTRY( ptr, struct thread_apc, entry ); list_remove( &apc->entry ); - apc->executed = 1; - wake_up( &apc->obj, 0 ); + withdraw_apc( apc ); release_object( apc ); } } @@ -1669,8 +1682,7 @@ DECL_HANDLER(select) reply->call = apc->call; else { - apc->executed = 1; - wake_up( &apc->obj, 0 ); + withdraw_apc( apc ); } release_object( apc ); } @@ -2011,3 +2023,38 @@ DECL_HANDLER(get_next_thread) set_error( STATUS_NO_MORE_ENTRIES ); release_object( process ); } + +/* issue APC_MEMORY_BARRIER on other threads in the same process */ +DECL_HANDLER(flush_process_write_buffers) +{ + struct process *process = current->process; + struct thread *thread; + apc_call_t call; + + memset( &call, 0, sizeof(call) ); + call.memory_barrier.type = APC_MEMORY_BARRIER; + call.memory_barrier.cb = req->cb; + + reply->count = 0; + + LIST_FOR_EACH_ENTRY( thread, &process->thread_list, struct thread, proc_entry ) + { + struct thread_apc *apc; + int success; + + if (thread == current || thread->state == TERMINATED) continue; + + if (!(apc = create_apc( &process->obj, &call ))) break; + + if ((success = queue_apc( NULL, thread, apc ))) + reply->count++; + + release_object( apc ); + + if (!success) + { + set_error( STATUS_UNSUCCESSFUL ); + break; + } + } +}