-- v2: ntdll/tests: Add tests for completion port signaling. kernelbase: Set the proper error code in GetQueuedCompletionStatus{Ex} when the handle is closed. server: Signal completion port waits on handle close. ntdll: Handle user APCs explicitly in NtRemoveIoCompletionEx(). ntdll: Assign completion to thread when wait for completion is satisfied. ntdll: Introduce a separate per-thread object for internal completion waits.
From: Paul Gofman pgofman@codeweavers.com
--- dlls/ntdll/unix/sync.c | 8 ++- server/completion.c | 118 ++++++++++++++++++++++++++++++++++++++++- server/file.h | 1 + server/protocol.def | 1 + server/thread.c | 2 + server/thread.h | 2 + 6 files changed, 129 insertions(+), 3 deletions(-)
diff --git a/dlls/ntdll/unix/sync.c b/dlls/ntdll/unix/sync.c index 4269873be6c..e4a659f93bb 100644 --- a/dlls/ntdll/unix/sync.c +++ b/dlls/ntdll/unix/sync.c @@ -1999,6 +1999,7 @@ NTSTATUS WINAPI NtSetIoCompletion( HANDLE handle, ULONG_PTR key, ULONG_PTR value NTSTATUS WINAPI NtRemoveIoCompletion( HANDLE handle, ULONG_PTR *key, ULONG_PTR *value, IO_STATUS_BLOCK *io, LARGE_INTEGER *timeout ) { + HANDLE wait_handle = NULL; unsigned int status;
TRACE( "(%p, %p, %p, %p, %p)\n", handle, key, value, io, timeout ); @@ -2015,10 +2016,11 @@ NTSTATUS WINAPI NtRemoveIoCompletion( HANDLE handle, ULONG_PTR *key, ULONG_PTR * io->Information = reply->information; io->Status = reply->status; } + else wait_handle = wine_server_ptr_handle( reply->wait_handle ); } SERVER_END_REQ; if (status != STATUS_PENDING) return status; - status = NtWaitForSingleObject( handle, FALSE, timeout ); + status = NtWaitForSingleObject( wait_handle, FALSE, timeout ); if (status != WAIT_OBJECT_0) return status; } } @@ -2030,6 +2032,7 @@ NTSTATUS WINAPI NtRemoveIoCompletion( HANDLE handle, ULONG_PTR *key, ULONG_PTR * NTSTATUS WINAPI NtRemoveIoCompletionEx( HANDLE handle, FILE_IO_COMPLETION_INFORMATION *info, ULONG count, ULONG *written, LARGE_INTEGER *timeout, BOOLEAN alertable ) { + HANDLE wait_handle = NULL; unsigned int status; ULONG i = 0;
@@ -2049,6 +2052,7 @@ NTSTATUS WINAPI NtRemoveIoCompletionEx( HANDLE handle, FILE_IO_COMPLETION_INFORM info[i].IoStatusBlock.Information = reply->information; info[i].IoStatusBlock.Status = reply->status; } + else wait_handle = wine_server_ptr_handle( reply->wait_handle ); } SERVER_END_REQ; if (status != STATUS_SUCCESS) break; @@ -2059,7 +2063,7 @@ NTSTATUS WINAPI NtRemoveIoCompletionEx( HANDLE handle, FILE_IO_COMPLETION_INFORM if (status == STATUS_PENDING) status = STATUS_SUCCESS; break; } - status = NtWaitForSingleObject( handle, alertable, timeout ); + status = NtWaitForSingleObject( wait_handle, alertable, timeout ); if (status != WAIT_OBJECT_0) break; } *written = i ? i : 1; diff --git a/server/completion.c b/server/completion.c index 6933195e72d..f00962a306d 100644 --- a/server/completion.c +++ b/server/completion.c @@ -56,13 +56,71 @@ struct type_descr completion_type = }, };
+struct completion_wait +{ + struct object obj; + obj_handle_t handle; + struct completion *completion; + struct thread *thread; + struct list wait_queue_entry; +}; + struct completion { struct object obj; struct list queue; + struct list wait_queue; unsigned int depth; };
+static void completion_wait_dump( struct object*, int ); +static int completion_wait_signaled( struct object *obj, struct wait_queue_entry *entry ); +static void completion_wait_destroy( struct object * ); + +static const struct object_ops completion_wait_ops = +{ + sizeof(struct completion_wait), /* size */ + &no_type, /* type */ + completion_wait_dump, /* dump */ + add_queue, /* add_queue */ + remove_queue, /* remove_queue */ + completion_wait_signaled, /* signaled */ + no_satisfied, /* satisfied */ + no_signal, /* signal */ + no_get_fd, /* get_fd */ + default_map_access, /* map_access */ + default_get_sd, /* get_sd */ + default_set_sd, /* set_sd */ + no_get_full_name, /* get_full_name */ + no_lookup_name, /* lookup_name */ + no_link_name, /* link_name */ + NULL, /* unlink_name */ + no_open_file, /* open_file */ + no_kernel_obj_list, /* get_kernel_obj_list */ + no_close_handle, /* close_handle */ + completion_wait_destroy /* destroy */ +}; + +static void completion_wait_destroy( struct object *obj ) +{ +} + +static void completion_wait_dump( struct object *obj, int verbose ) +{ + struct completion_wait *wait = (struct completion_wait *)obj; + + assert( obj->ops == &completion_wait_ops ); + fprintf( stderr, "Completion wait completion=%p\n", wait->completion ); +} + +static int completion_wait_signaled( struct object *obj, struct wait_queue_entry *entry ) +{ + struct completion_wait *wait = (struct completion_wait *)obj; + + assert( obj->ops == &completion_wait_ops ); + return wait->completion->depth; +} + static void completion_dump( struct object*, int ); static int completion_signaled( struct object *obj, struct wait_queue_entry *entry ); static void completion_destroy( struct object * ); @@ -103,12 +161,19 @@ struct comp_msg static void completion_destroy( struct object *obj) { struct completion *completion = (struct completion *) obj; + struct completion_wait *wait, *wait_next; struct comp_msg *tmp, *next;
LIST_FOR_EACH_ENTRY_SAFE( tmp, next, &completion->queue, struct comp_msg, queue_entry ) { free( tmp ); } + + LIST_FOR_EACH_ENTRY_SAFE( wait, wait_next, &completion->wait_queue, struct completion_wait, wait_queue_entry ) + { + assert( wait->completion ); + cleanup_thread_completion( wait->thread ); + } }
static void completion_dump( struct object *obj, int verbose ) @@ -126,6 +191,35 @@ static int completion_signaled( struct object *obj, struct wait_queue_entry *ent return !list_empty( &completion->queue ); }
+void cleanup_thread_completion( struct thread *thread ) +{ + if (!thread->completion_wait) return; + + if (thread->completion_wait->handle) + { + close_handle( thread->process, thread->completion_wait->handle ); + thread->completion_wait->handle = 0; + } + list_remove( &thread->completion_wait->wait_queue_entry ); + release_object( &thread->completion_wait->obj ); + thread->completion_wait = NULL; +} + +static struct completion_wait *create_completion_wait( struct thread *thread ) +{ + struct completion_wait *wait; + + if (!(wait = alloc_object( &completion_wait_ops ))) return NULL; + wait->completion = NULL; + wait->thread = thread; + if (!(wait->handle = alloc_handle( current->process, wait, SYNCHRONIZE, 0 ))) + { + release_object( &wait->obj ); + return NULL; + } + return wait; +} + static struct completion *create_completion( struct object *root, const struct unicode_str *name, unsigned int attr, unsigned int concurrent, const struct security_descriptor *sd ) @@ -137,6 +231,7 @@ static struct completion *create_completion( struct object *root, const struct u if (get_error() != STATUS_OBJECT_NAME_EXISTS) { list_init( &completion->queue ); + list_init( &completion->wait_queue ); completion->depth = 0; } } @@ -153,6 +248,7 @@ void add_completion( struct completion *completion, apc_param_t ckey, apc_param_ unsigned int status, apc_param_t information ) { struct comp_msg *msg = mem_alloc( sizeof( *msg ) ); + struct completion_wait *wait;
if (!msg) return; @@ -164,7 +260,12 @@ void add_completion( struct completion *completion, apc_param_t ckey, apc_param_
list_add_tail( &completion->queue, &msg->queue_entry ); completion->depth++; - wake_up( &completion->obj, 1 ); + LIST_FOR_EACH_ENTRY( wait, &completion->wait_queue, struct completion_wait, wait_queue_entry ) + { + wake_up( &wait->obj, 1 ); + if (list_empty( &completion->queue )) return; + } + if (!list_empty( &completion->queue )) wake_up( &completion->obj, 0 ); }
/* create a completion */ @@ -219,8 +320,22 @@ DECL_HANDLER(remove_completion) if (!completion) return;
entry = list_head( &completion->queue ); + if (current->completion_wait) + { + list_remove( ¤t->completion_wait->wait_queue_entry ); + } + else if (!(current->completion_wait = create_completion_wait( current ))) + { + release_object( completion ); + return; + } + current->completion_wait->completion = completion; + list_add_head( &completion->wait_queue, ¤t->completion_wait->wait_queue_entry ); if (!entry) + { + reply->wait_handle = current->completion_wait->handle; set_error( STATUS_PENDING ); + } else { list_remove( entry ); @@ -231,6 +346,7 @@ DECL_HANDLER(remove_completion) reply->status = msg->status; reply->information = msg->information; free( msg ); + reply->wait_handle = 0; }
release_object( completion ); diff --git a/server/file.h b/server/file.h index f486f823f25..3d7cdc460ff 100644 --- a/server/file.h +++ b/server/file.h @@ -238,6 +238,7 @@ extern struct dir *get_dir_obj( struct process *process, obj_handle_t handle, un extern struct completion *get_completion_obj( struct process *process, obj_handle_t handle, unsigned int access ); extern void add_completion( struct completion *completion, apc_param_t ckey, apc_param_t cvalue, unsigned int status, apc_param_t information ); +extern void cleanup_thread_completion( struct thread *thread );
/* serial port functions */
diff --git a/server/protocol.def b/server/protocol.def index a4f25e805f8..693a20e3437 100644 --- a/server/protocol.def +++ b/server/protocol.def @@ -3798,6 +3798,7 @@ typedef union apc_param_t cvalue; /* completion value */ apc_param_t information; /* IO_STATUS_BLOCK Information */ unsigned int status; /* completion result */ + obj_handle_t wait_handle; /* handle to completion wait internal object */ @END
diff --git a/server/thread.c b/server/thread.c index 6542e1584ab..f3880eebedb 100644 --- a/server/thread.c +++ b/server/thread.c @@ -249,6 +249,7 @@ static inline void init_thread_structure( struct thread *thread )
thread->creation_time = current_time; thread->exit_time = 0; + thread->completion_wait = NULL;
list_init( &thread->mutex_list ); list_init( &thread->system_apc ); @@ -402,6 +403,7 @@ static void cleanup_thread( struct thread *thread ) { int i;
+ cleanup_thread_completion( thread ); if (thread->context) { thread->context->status = STATUS_ACCESS_DENIED; diff --git a/server/thread.h b/server/thread.h index 766ed78a72f..3448f332b0b 100644 --- a/server/thread.h +++ b/server/thread.h @@ -31,6 +31,7 @@ struct thread_apc; struct debug_obj; struct debug_event; struct msg_queue; +struct completion_wait;
enum run_state { @@ -91,6 +92,7 @@ struct thread struct list kernel_object; /* list of kernel object pointers */ data_size_t desc_len; /* thread description length in bytes */ WCHAR *desc; /* thread description string */ + struct completion_wait *completion_wait; /* completion port wait object the thread is associated with */ };
extern struct thread *current;
From: Paul Gofman pgofman@codeweavers.com
--- dlls/ntdll/unix/sync.c | 95 ++++++++++++++++++++++++++---------------- server/completion.c | 62 ++++++++++++++++++++++----- server/protocol.def | 10 +++++ 3 files changed, 121 insertions(+), 46 deletions(-)
diff --git a/dlls/ntdll/unix/sync.c b/dlls/ntdll/unix/sync.c index e4a659f93bb..f5536e398b5 100644 --- a/dlls/ntdll/unix/sync.c +++ b/dlls/ntdll/unix/sync.c @@ -2004,25 +2004,37 @@ NTSTATUS WINAPI NtRemoveIoCompletion( HANDLE handle, ULONG_PTR *key, ULONG_PTR *
TRACE( "(%p, %p, %p, %p, %p)\n", handle, key, value, io, timeout );
- for (;;) + SERVER_START_REQ( remove_completion ) { - SERVER_START_REQ( remove_completion ) + req->handle = wine_server_obj_handle( handle ); + if (!(status = wine_server_call( req ))) { - req->handle = wine_server_obj_handle( handle ); - if (!(status = wine_server_call( req ))) - { - *key = reply->ckey; - *value = reply->cvalue; - io->Information = reply->information; - io->Status = reply->status; - } - else wait_handle = wine_server_ptr_handle( reply->wait_handle ); + *key = reply->ckey; + *value = reply->cvalue; + io->Information = reply->information; + io->Status = reply->status; + } + else wait_handle = wine_server_ptr_handle( reply->wait_handle ); + } + SERVER_END_REQ; + if (status != STATUS_PENDING) return status; + if (!timeout || timeout->QuadPart) status = NtWaitForSingleObject( wait_handle, FALSE, timeout ); + else status = STATUS_TIMEOUT; + if (status != WAIT_OBJECT_0) return status; + + SERVER_START_REQ( get_thread_completion ) + { + if (!(status = wine_server_call( req ))) + { + *key = reply->ckey; + *value = reply->cvalue; + io->Information = reply->information; + io->Status = reply->status; } - SERVER_END_REQ; - if (status != STATUS_PENDING) return status; - status = NtWaitForSingleObject( wait_handle, FALSE, timeout ); - if (status != WAIT_OBJECT_0) return status; } + SERVER_END_REQ; + + return status; }
@@ -2038,34 +2050,47 @@ NTSTATUS WINAPI NtRemoveIoCompletionEx( HANDLE handle, FILE_IO_COMPLETION_INFORM
TRACE( "%p %p %u %p %p %u\n", handle, info, (int)count, written, timeout, alertable );
- for (;;) + while (i < count) { - while (i < count) + SERVER_START_REQ( remove_completion ) { - SERVER_START_REQ( remove_completion ) + req->handle = wine_server_obj_handle( handle ); + if (!(status = wine_server_call( req ))) { - req->handle = wine_server_obj_handle( handle ); - if (!(status = wine_server_call( req ))) - { - info[i].CompletionKey = reply->ckey; - info[i].CompletionValue = reply->cvalue; - info[i].IoStatusBlock.Information = reply->information; - info[i].IoStatusBlock.Status = reply->status; - } - else wait_handle = wine_server_ptr_handle( reply->wait_handle ); + info[i].CompletionKey = reply->ckey; + info[i].CompletionValue = reply->cvalue; + info[i].IoStatusBlock.Information = reply->information; + info[i].IoStatusBlock.Status = reply->status; } - SERVER_END_REQ; - if (status != STATUS_SUCCESS) break; - ++i; + else wait_handle = wine_server_ptr_handle( reply->wait_handle ); } - if (i || status != STATUS_PENDING) + SERVER_END_REQ; + if (status != STATUS_SUCCESS) break; + ++i; + } + if (i || status != STATUS_PENDING) + { + if (i) status = STATUS_SUCCESS; + goto done; + } + if (!timeout || timeout->QuadPart || alertable) status = NtWaitForSingleObject( wait_handle, alertable, timeout ); + else status = STATUS_TIMEOUT; + if (status != WAIT_OBJECT_0) goto done; + + SERVER_START_REQ( get_thread_completion ) + { + if (!(status = wine_server_call( req ))) { - if (status == STATUS_PENDING) status = STATUS_SUCCESS; - break; + info[i].CompletionKey = reply->ckey; + info[i].CompletionValue = reply->cvalue; + info[i].IoStatusBlock.Information = reply->information; + info[i].IoStatusBlock.Status = reply->status; + ++i; } - status = NtWaitForSingleObject( wait_handle, alertable, timeout ); - if (status != WAIT_OBJECT_0) break; } + SERVER_END_REQ; + +done: *written = i ? i : 1; return status; } diff --git a/server/completion.c b/server/completion.c index f00962a306d..f56162aa22d 100644 --- a/server/completion.c +++ b/server/completion.c @@ -21,7 +21,6 @@
/* FIXMEs: * - built-in wait queues used which means: - * + threads are awaken FIFO and not LIFO as native does * + "max concurrent active threads" parameter not used * + completion handle is waitable, while native isn't */ @@ -56,12 +55,22 @@ struct type_descr completion_type = }, };
+struct comp_msg +{ + struct list queue_entry; + apc_param_t ckey; + apc_param_t cvalue; + apc_param_t information; + unsigned int status; +}; + struct completion_wait { struct object obj; obj_handle_t handle; struct completion *completion; struct thread *thread; + struct comp_msg *msg; struct list wait_queue_entry; };
@@ -75,6 +84,7 @@ struct completion
static void completion_wait_dump( struct object*, int ); static int completion_wait_signaled( struct object *obj, struct wait_queue_entry *entry ); +static void completion_wait_satisfied( struct object *obj, struct wait_queue_entry *entry ); static void completion_wait_destroy( struct object * );
static const struct object_ops completion_wait_ops = @@ -85,7 +95,7 @@ static const struct object_ops completion_wait_ops = add_queue, /* add_queue */ remove_queue, /* remove_queue */ completion_wait_signaled, /* signaled */ - no_satisfied, /* satisfied */ + completion_wait_satisfied, /* satisfied */ no_signal, /* signal */ no_get_fd, /* get_fd */ default_map_access, /* map_access */ @@ -103,6 +113,9 @@ static const struct object_ops completion_wait_ops =
static void completion_wait_destroy( struct object *obj ) { + struct completion_wait *wait = (struct completion_wait *)obj; + + free( wait->msg ); }
static void completion_wait_dump( struct object *obj, int verbose ) @@ -121,6 +134,22 @@ static int completion_wait_signaled( struct object *obj, struct wait_queue_entry return wait->completion->depth; }
+static void completion_wait_satisfied( struct object *obj, struct wait_queue_entry *entry ) +{ + struct completion_wait *wait = (struct completion_wait *)obj; + struct list *msg_entry; + struct comp_msg *msg; + + assert( obj->ops == &completion_wait_ops ); + msg_entry = list_head( &wait->completion->queue ); + assert( msg_entry ); + msg = LIST_ENTRY( msg_entry, struct comp_msg, queue_entry ); + --wait->completion->depth; + list_remove( &msg->queue_entry ); + if (wait->msg) free( wait->msg ); + wait->msg = msg; +} + static void completion_dump( struct object*, int ); static int completion_signaled( struct object *obj, struct wait_queue_entry *entry ); static void completion_destroy( struct object * ); @@ -149,15 +178,6 @@ static const struct object_ops completion_ops = completion_destroy /* destroy */ };
-struct comp_msg -{ - struct list queue_entry; - apc_param_t ckey; - apc_param_t cvalue; - apc_param_t information; - unsigned int status; -}; - static void completion_destroy( struct object *obj) { struct completion *completion = (struct completion *) obj; @@ -212,6 +232,7 @@ static struct completion_wait *create_completion_wait( struct thread *thread ) if (!(wait = alloc_object( &completion_wait_ops ))) return NULL; wait->completion = NULL; wait->thread = thread; + wait->msg = NULL; if (!(wait->handle = alloc_handle( current->process, wait, SYNCHRONIZE, 0 ))) { release_object( &wait->obj ); @@ -352,6 +373,25 @@ DECL_HANDLER(remove_completion) release_object( completion ); }
+/* get completion after successful waiting for it */ +DECL_HANDLER(get_thread_completion) +{ + struct comp_msg *msg; + + if (!current->completion_wait || !(msg = current->completion_wait->msg)) + { + set_error( STATUS_INVALID_HANDLE ); + return; + } + + reply->ckey = msg->ckey; + reply->cvalue = msg->cvalue; + reply->status = msg->status; + reply->information = msg->information; + free( msg ); + current->completion_wait->msg = NULL; +} + /* get queue depth for completion port */ DECL_HANDLER(query_completion) { diff --git a/server/protocol.def b/server/protocol.def index 693a20e3437..0ad102283d1 100644 --- a/server/protocol.def +++ b/server/protocol.def @@ -3802,6 +3802,16 @@ typedef union @END
+/* get completion after successful wait */ +@REQ(get_thread_completion) +@REPLY + apc_param_t ckey; /* completion key */ + apc_param_t cvalue; /* completion value */ + apc_param_t information; /* IO_STATUS_BLOCK Information */ + unsigned int status; /* completion result */ +@END + + /* get completion queue depth */ @REQ(query_completion) obj_handle_t handle; /* port handle */
From: Paul Gofman pgofman@codeweavers.com
--- dlls/ntdll/tests/file.c | 47 ++++++++++++++++++++++++++++++++++++++++- dlls/ntdll/unix/sync.c | 14 +++++++++--- server/completion.c | 7 ++++++ server/protocol.def | 1 + 4 files changed, 65 insertions(+), 4 deletions(-)
diff --git a/dlls/ntdll/tests/file.c b/dlls/ntdll/tests/file.c index 0adc0998ee2..b110335dba6 100644 --- a/dlls/ntdll/tests/file.c +++ b/dlls/ntdll/tests/file.c @@ -958,14 +958,59 @@ static void test_set_io_completion(void) NTSTATUS res; ULONG count; SIZE_T size = 3; - HANDLE h; + HANDLE h, h2;
if (sizeof(size) > 4) size |= (ULONGLONG)0x12345678 << 32;
+ res = pNtCreateIoCompletion( &h2, IO_COMPLETION_ALL_ACCESS, NULL, 0 ); + ok( res == STATUS_SUCCESS, "NtCreateIoCompletion failed: %#lx\n", res ); + ok( h2 && h2 != INVALID_HANDLE_VALUE, "got invalid handle %p\n", h ); + res = pNtSetIoCompletion( h2, 123, 456, 789, size ); + ok( res == STATUS_SUCCESS, "NtSetIoCompletion failed: %#lx\n", res ); + res = pNtRemoveIoCompletionEx( h2, info, 2, &count, &timeout, TRUE ); + ok( res == STATUS_SUCCESS, "NtRemoveIoCompletionEx failed: %#lx\n", res ); + ok( count == 1, "wrong count %lu\n", count ); + res = pNtCreateIoCompletion( &h, IO_COMPLETION_ALL_ACCESS, NULL, 0 ); ok( res == STATUS_SUCCESS, "NtCreateIoCompletion failed: %#lx\n", res ); ok( h && h != INVALID_HANDLE_VALUE, "got invalid handle %p\n", h );
+ apc_count = 0; + QueueUserAPC( user_apc_proc, GetCurrentThread(), (ULONG_PTR)&apc_count ); + res = pNtSetIoCompletion( h, 123, 456, 789, size ); + ok( res == STATUS_SUCCESS, "NtSetIoCompletion failed: %#lx\n", res ); + res = pNtRemoveIoCompletionEx( h, info, 2, &count, &timeout, TRUE ); + /* APC goes first associated with completion port APC takes priority over pending completion. + * Even if the thread is associated with some other completion port. */ + ok( res == STATUS_USER_APC, "NtRemoveIoCompletionEx unexpected status %#lx\n", res ); + ok( apc_count == 1, "wrong apc count %u\n", apc_count ); + + CloseHandle( h2 ); + + apc_count = 0; + QueueUserAPC( user_apc_proc, GetCurrentThread(), (ULONG_PTR)&apc_count ); + res = pNtRemoveIoCompletionEx( h, info, 2, &count, &timeout, TRUE ); + /* Previous call resulted in STATUS_USER_APC did not associate the thread with the port. */ + ok( res == STATUS_USER_APC, "NtRemoveIoCompletion unexpected status %#lx\n", res ); + ok( apc_count == 1, "wrong apc count %u\n", apc_count ); + + res = pNtRemoveIoCompletionEx( h, info, 2, &count, &timeout, TRUE ); + /* Now the thread is associated. */ + ok( res == STATUS_SUCCESS, "NtRemoveIoCompletion failed: %#lx\n", res ); + ok( count == 1, "wrong count %lu\n", count ); + + apc_count = 0; + QueueUserAPC( user_apc_proc, GetCurrentThread(), (ULONG_PTR)&apc_count ); + res = pNtSetIoCompletion( h, 123, 456, 789, size ); + ok( res == STATUS_SUCCESS, "NtSetIoCompletion failed: %#lx\n", res ); + res = pNtRemoveIoCompletionEx( h, info, 2, &count, &timeout, TRUE ); + /* After a thread is associated with completion port existing completion is returned if APC is pending. */ + ok( res == STATUS_SUCCESS, "NtRemoveIoCompletionEx failed: %#lx\n", res ); + ok( count == 1, "wrong count %lu\n", count ); + ok( apc_count == 0, "wrong apc count %u\n", apc_count ); + SleepEx( 0, TRUE); + ok( apc_count == 1, "wrong apc count %u\n", apc_count ); + res = pNtRemoveIoCompletion( h, &key, &value, &iosb, &timeout ); ok( res == STATUS_TIMEOUT, "NtRemoveIoCompletion failed: %#lx\n", res );
diff --git a/dlls/ntdll/unix/sync.c b/dlls/ntdll/unix/sync.c index f5536e398b5..80f82c18730 100644 --- a/dlls/ntdll/unix/sync.c +++ b/dlls/ntdll/unix/sync.c @@ -2007,6 +2007,7 @@ NTSTATUS WINAPI NtRemoveIoCompletion( HANDLE handle, ULONG_PTR *key, ULONG_PTR * SERVER_START_REQ( remove_completion ) { req->handle = wine_server_obj_handle( handle ); + req->alertable = 0; if (!(status = wine_server_call( req ))) { *key = reply->ckey; @@ -2055,6 +2056,7 @@ NTSTATUS WINAPI NtRemoveIoCompletionEx( HANDLE handle, FILE_IO_COMPLETION_INFORM SERVER_START_REQ( remove_completion ) { req->handle = wine_server_obj_handle( handle ); + req->alertable = alertable; if (!(status = wine_server_call( req ))) { info[i].CompletionKey = reply->ckey; @@ -2068,13 +2070,19 @@ NTSTATUS WINAPI NtRemoveIoCompletionEx( HANDLE handle, FILE_IO_COMPLETION_INFORM if (status != STATUS_SUCCESS) break; ++i; } - if (i || status != STATUS_PENDING) + if (i || (status != STATUS_PENDING && status != STATUS_USER_APC)) { if (i) status = STATUS_SUCCESS; goto done; } - if (!timeout || timeout->QuadPart || alertable) status = NtWaitForSingleObject( wait_handle, alertable, timeout ); - else status = STATUS_TIMEOUT; + if (status == STATUS_USER_APC) + { + status = NtDelayExecution( TRUE, NULL ); + assert( status == STATUS_USER_APC ); + goto done; + } + if (!timeout || timeout->QuadPart) status = NtWaitForSingleObject( wait_handle, alertable, timeout ); + else status = STATUS_TIMEOUT; if (status != WAIT_OBJECT_0) goto done;
SERVER_START_REQ( get_thread_completion ) diff --git a/server/completion.c b/server/completion.c index f56162aa22d..108aff46818 100644 --- a/server/completion.c +++ b/server/completion.c @@ -341,6 +341,13 @@ DECL_HANDLER(remove_completion) if (!completion) return;
entry = list_head( &completion->queue ); + if (req->alertable && !list_empty( ¤t->user_apc ) + && !(entry && current->completion_wait && current->completion_wait->completion == completion)) + { + set_error( STATUS_USER_APC ); + release_object( completion ); + return; + } if (current->completion_wait) { list_remove( ¤t->completion_wait->wait_queue_entry ); diff --git a/server/protocol.def b/server/protocol.def index 0ad102283d1..ac22f9853e5 100644 --- a/server/protocol.def +++ b/server/protocol.def @@ -3793,6 +3793,7 @@ typedef union /* get completion from completion port queue */ @REQ(remove_completion) obj_handle_t handle; /* port handle */ + int alertable; /* completion wait is alertable */ @REPLY apc_param_t ckey; /* completion key */ apc_param_t cvalue; /* completion value */
From: Paul Gofman pgofman@codeweavers.com
--- server/completion.c | 52 ++++++++++++++++++++++++++++++++------------- 1 file changed, 37 insertions(+), 15 deletions(-)
diff --git a/server/completion.c b/server/completion.c index 108aff46818..f9e68c523f1 100644 --- a/server/completion.c +++ b/server/completion.c @@ -19,11 +19,7 @@ * */
-/* FIXMEs: - * - built-in wait queues used which means: - * + "max concurrent active threads" parameter not used - * + completion handle is waitable, while native isn't - */ +/* FIXME: "max concurrent active threads" parameter is not used */
#include "config.h"
@@ -80,6 +76,7 @@ struct completion struct list queue; struct list wait_queue; unsigned int depth; + int closed; };
static void completion_wait_dump( struct object*, int ); @@ -131,6 +128,7 @@ static int completion_wait_signaled( struct object *obj, struct wait_queue_entry struct completion_wait *wait = (struct completion_wait *)obj;
assert( obj->ops == &completion_wait_ops ); + if (!wait->completion) return 1; return wait->completion->depth; }
@@ -141,6 +139,11 @@ static void completion_wait_satisfied( struct object *obj, struct wait_queue_ent struct comp_msg *msg;
assert( obj->ops == &completion_wait_ops ); + if (!wait->completion) + { + make_wait_abandoned( entry ); + return; + } msg_entry = list_head( &wait->completion->queue ); assert( msg_entry ); msg = LIST_ENTRY( msg_entry, struct comp_msg, queue_entry ); @@ -152,6 +155,7 @@ static void completion_wait_satisfied( struct object *obj, struct wait_queue_ent
static void completion_dump( struct object*, int ); static int completion_signaled( struct object *obj, struct wait_queue_entry *entry ); +static int completion_close_handle( struct object *obj, struct process *process, obj_handle_t handle ); static void completion_destroy( struct object * );
static const struct object_ops completion_ops = @@ -174,26 +178,19 @@ static const struct object_ops completion_ops = default_unlink_name, /* unlink_name */ no_open_file, /* open_file */ no_kernel_obj_list, /* get_kernel_obj_list */ - no_close_handle, /* close_handle */ + completion_close_handle, /* close_handle */ completion_destroy /* destroy */ };
static void completion_destroy( struct object *obj) { struct completion *completion = (struct completion *) obj; - struct completion_wait *wait, *wait_next; struct comp_msg *tmp, *next;
LIST_FOR_EACH_ENTRY_SAFE( tmp, next, &completion->queue, struct comp_msg, queue_entry ) { free( tmp ); } - - LIST_FOR_EACH_ENTRY_SAFE( wait, wait_next, &completion->wait_queue, struct completion_wait, wait_queue_entry ) - { - assert( wait->completion ); - cleanup_thread_completion( wait->thread ); - } }
static void completion_dump( struct object *obj, int verbose ) @@ -208,7 +205,30 @@ static int completion_signaled( struct object *obj, struct wait_queue_entry *ent { struct completion *completion = (struct completion *)obj;
- return !list_empty( &completion->queue ); + return !list_empty( &completion->queue ) || completion->closed; +} + +static int completion_close_handle( struct object *obj, struct process *process, obj_handle_t handle ) +{ + struct completion *completion = (struct completion *)obj; + struct completion_wait *wait, *wait_next; + + if (completion->obj.handle_count != 1) return 1; + + LIST_FOR_EACH_ENTRY_SAFE( wait, wait_next, &completion->wait_queue, struct completion_wait, wait_queue_entry ) + { + assert( wait->completion ); + wait->completion = NULL; + list_remove( &wait->wait_queue_entry ); + if (!wait->msg) + { + wake_up( &wait->obj, 0 ); + cleanup_thread_completion( wait->thread ); + } + } + completion->closed = 1; + wake_up( obj, 0 ); + return 1; }
void cleanup_thread_completion( struct thread *thread ) @@ -220,7 +240,7 @@ void cleanup_thread_completion( struct thread *thread ) close_handle( thread->process, thread->completion_wait->handle ); thread->completion_wait->handle = 0; } - list_remove( &thread->completion_wait->wait_queue_entry ); + if (thread->completion_wait->completion) list_remove( &thread->completion_wait->wait_queue_entry ); release_object( &thread->completion_wait->obj ); thread->completion_wait = NULL; } @@ -254,6 +274,7 @@ static struct completion *create_completion( struct object *root, const struct u list_init( &completion->queue ); list_init( &completion->wait_queue ); completion->depth = 0; + completion->closed = 0; } }
@@ -397,6 +418,7 @@ DECL_HANDLER(get_thread_completion) reply->information = msg->information; free( msg ); current->completion_wait->msg = NULL; + if (!current->completion_wait->completion) cleanup_thread_completion( current ); }
/* get queue depth for completion port */
From: Alexey Prokhin alexey@prokhin.ru
--- dlls/kernelbase/sync.c | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-)
diff --git a/dlls/kernelbase/sync.c b/dlls/kernelbase/sync.c index 1d935b02ddf..f03a6c3150a 100644 --- a/dlls/kernelbase/sync.c +++ b/dlls/kernelbase/sync.c @@ -1197,7 +1197,8 @@ BOOL WINAPI DECLSPEC_HOTPATCH GetQueuedCompletionStatus( HANDLE port, LPDWORD co return FALSE; }
- if (status == STATUS_TIMEOUT) SetLastError( WAIT_TIMEOUT ); + if (status == STATUS_TIMEOUT) SetLastError( WAIT_TIMEOUT ); + else if (status == STATUS_ABANDONED) SetLastError( ERROR_ABANDONED_WAIT_0 ); else SetLastError( RtlNtStatusToDosError(status) ); return FALSE; } @@ -1217,8 +1218,9 @@ BOOL WINAPI DECLSPEC_HOTPATCH GetQueuedCompletionStatusEx( HANDLE port, OVERLAPP ret = NtRemoveIoCompletionEx( port, (FILE_IO_COMPLETION_INFORMATION *)entries, count, written, get_nt_timeout( &time, timeout ), alertable ); if (ret == STATUS_SUCCESS) return TRUE; - else if (ret == STATUS_TIMEOUT) SetLastError( WAIT_TIMEOUT ); - else if (ret == STATUS_USER_APC) SetLastError( WAIT_IO_COMPLETION ); + else if (ret == STATUS_TIMEOUT) SetLastError( WAIT_TIMEOUT ); + else if (ret == STATUS_USER_APC) SetLastError( WAIT_IO_COMPLETION ); + else if (ret == STATUS_ABANDONED) SetLastError( ERROR_ABANDONED_WAIT_0 ); else SetLastError( RtlNtStatusToDosError(ret) ); return FALSE; }
From: Paul Gofman pgofman@codeweavers.com
--- dlls/ntdll/tests/sync.c | 194 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 194 insertions(+)
diff --git a/dlls/ntdll/tests/sync.c b/dlls/ntdll/tests/sync.c index f356d3ec38f..3ab89bbf361 100644 --- a/dlls/ntdll/tests/sync.c +++ b/dlls/ntdll/tests/sync.c @@ -837,6 +837,199 @@ static void test_tid_alert( char **argv ) CloseHandle( pi.hThread ); }
+struct test_completion_port_scheduling_param +{ + HANDLE ready, test_ready; + HANDLE port; + int index; +}; + +static DWORD WINAPI test_completion_port_scheduling_thread(void *param) +{ + struct test_completion_port_scheduling_param *p = param; + FILE_IO_COMPLETION_INFORMATION info; + OVERLAPPED_ENTRY overlapped_entry; + OVERLAPPED *overlapped; + IO_STATUS_BLOCK iosb; + ULONG_PTR key, value; + NTSTATUS status; + DWORD ret, err; + ULONG count; + BOOL bret; + + /* both threads are woken when comleption added. */ + ret = WaitForSingleObject( p->ready, INFINITE ); + ok( ret == WAIT_OBJECT_0, "got %#lx.\n", ret ); + ret = WaitForSingleObject( p->port, INFINITE ); + ok( ret == WAIT_OBJECT_0, "got %#lx.\n", ret ); + SetEvent( p->test_ready ); + + /* if a thread is waiting for completion which is added threads which wait on port handle are not woken. */ + ret = WaitForSingleObject( p->ready, INFINITE ); + if (p->index) + { + bret = GetQueuedCompletionStatus( p->port, &count, &key, &overlapped, INFINITE ); + ok( bret, "got error %lu.\n", GetLastError() ); + } + else + { + ret = WaitForSingleObject( p->port, 100 ); + ok( ret == WAIT_TIMEOUT || broken( !ret ) /* before Win10 1607 */, "got %#lx.\n", ret ); + } + SetEvent( p->test_ready ); + + /* Two threads in GetQueuedCompletionStatus, the second is supposed to start first. */ + ret = WaitForSingleObject( p->ready, INFINITE ); + ok( ret == WAIT_OBJECT_0, "got %#lx.\n", ret ); + bret = GetQueuedCompletionStatus( p->port, &count, &key, &overlapped, INFINITE ); + ok( bret, "got error %lu.\n", GetLastError() ); + ok( key == 3 + p->index || broken( p->index && key == 5 ) /* before Win10 */, "got %Iu, expected %u.\n", key, 3 + p->index ); + SetEvent( p->test_ready ); + + /* Port is being closed. */ + ret = WaitForSingleObject( p->ready, INFINITE ); + ret = WaitForSingleObject( p->port, INFINITE ); + if (ret == WAIT_FAILED) + skip( "Handle closed before wait started.\n" ); + else + ok( ret == WAIT_OBJECT_0, "got %#lx.\n", ret ); + SetEvent( p->test_ready ); + + /* Port is being closed. */ + ret = WaitForSingleObject( p->ready, INFINITE ); + ok( ret == WAIT_OBJECT_0, "got %#lx.\n", ret ); + SetEvent( p->test_ready ); + status = NtRemoveIoCompletion( p->port, &key, &value, &iosb, NULL ); + if (status == STATUS_INVALID_HANDLE) + skip( "Handle closed before wait started.\n" ); + else + ok( status == STATUS_ABANDONED_WAIT_0, "got %#lx.\n", status ); + + /* Port is being closed. */ + ret = WaitForSingleObject( p->ready, INFINITE ); + ok( ret == WAIT_OBJECT_0, "got %#lx.\n", ret ); + SetEvent( p->test_ready ); + count = 0xdeadbeef; + status = NtRemoveIoCompletionEx( p->port, &info, 1, &count, NULL, FALSE ); + ok( count <= 1, "Got unexpected count %lu.\n", count ); + if (status == STATUS_INVALID_HANDLE) + skip( "Handle closed before wait started.\n" ); + else + ok( status == STATUS_ABANDONED_WAIT_0, "got %#lx.\n", status ); + + /* Port is being closed. */ + ret = WaitForSingleObject( p->ready, INFINITE ); + ok( ret == WAIT_OBJECT_0, "got %#lx.\n", ret ); + SetEvent( p->test_ready ); + bret = GetQueuedCompletionStatus( p->port, &count, &key, &overlapped, INFINITE ); + err = GetLastError(); + ok( !bret, "got %d.\n", bret ); + if (err == ERROR_INVALID_HANDLE) + skip( "Handle closed before wait started.\n" ); + else + ok( err == ERROR_ABANDONED_WAIT_0, "got error %#lx.\n", err ); + + /* Port is being closed. */ + ret = WaitForSingleObject( p->ready, INFINITE ); + ok( ret == WAIT_OBJECT_0, "got %#lx.\n", ret ); + SetEvent( p->test_ready ); + bret = GetQueuedCompletionStatusEx( p->port, &overlapped_entry, 1, &count, INFINITE, TRUE ); + err = GetLastError(); + ok( !bret, "got %d.\n", bret ); + if (err == ERROR_INVALID_HANDLE) + skip( "Handle closed before wait started.\n" ); + else + ok( err == ERROR_ABANDONED_WAIT_0, "got error %#lx.\n", err ); + + return 0; +} + +static void test_completion_port_scheduling(void) +{ + struct test_completion_port_scheduling_param p[2]; + HANDLE threads[2], port; + OVERLAPPED *overlapped; + unsigned int i, j; + DWORD ret, count; + NTSTATUS status; + ULONG_PTR key; + BOOL bret; + + for (i = 0; i < 2; ++i) + { + p[i].index = 0; + p[i].ready = CreateEventA(NULL, FALSE, FALSE, NULL); + p[i].test_ready = CreateEventA(NULL, FALSE, FALSE, NULL); + threads[i] = CreateThread( NULL, 0, test_completion_port_scheduling_thread, &p[i], 0, NULL ); + ok( !!threads[i], "got error %lu.\n", GetLastError() ); + } + + status = NtCreateIoCompletion( &port, IO_COMPLETION_ALL_ACCESS, NULL, 0 ); + ok( !status, "got %#lx.\n", status ); + /* Waking multiple threads directly waiting on port */ + for (i = 0; i < 2; ++i) + { + p[i].index = i; + p[i].port = port; + SetEvent( p[i].ready ); + } + PostQueuedCompletionStatus( port, 0, 1, NULL ); + for (i = 0; i < 2; ++i) WaitForSingleObject( p[i].test_ready, INFINITE ); + bret = GetQueuedCompletionStatus( port, &count, &key, &overlapped, INFINITE ); + ok( bret, "got error %lu.\n", GetLastError() ); + + /* One thread is waiting on port, another in GetQueuedCompletionStatus(). */ + SetEvent( p[1].ready ); + Sleep( 40 ); + SetEvent( p[0].ready ); + Sleep( 10 ); + PostQueuedCompletionStatus( port, 0, 2, NULL ); + for (i = 0; i < 2; ++i) WaitForSingleObject( p[i].test_ready, INFINITE ); + + /* Both threads are waiting in GetQueuedCompletionStatus, LIFO wake up order. */ + SetEvent( p[1].ready ); + Sleep( 40 ); + SetEvent( p[0].ready ); + Sleep( 20 ); + PostQueuedCompletionStatus( port, 0, 3, NULL ); + PostQueuedCompletionStatus( port, 0, 4, NULL ); + PostQueuedCompletionStatus( port, 0, 5, NULL ); + bret = GetQueuedCompletionStatus( p->port, &count, &key, &overlapped, INFINITE ); + ok( bret, "got error %lu.\n", GetLastError() ); + ok( key == 5 || broken( key == 4 ) /* before Win10 */, "got %Iu, expected 5.\n", key ); + + /* Close port handle while threads are waiting on it directly. */ + for (i = 0; i < 2; ++i) SetEvent( p[i].ready ); + Sleep( 20 ); + NtClose( port ); + for (i = 0; i < 2; ++i) WaitForSingleObject( p[i].test_ready, INFINITE ); + + /* Test signaling on port close. */ + for (i = 0; i < 4; ++i) + { + status = NtCreateIoCompletion( &port, IO_COMPLETION_ALL_ACCESS, NULL, 0 ); + ok( !status, "got %#lx.\n", status ); + for (j = 0; j < 2; ++j) + { + p[j].port = port; + ret = SignalObjectAndWait( p[j].ready, p[j].test_ready, + INFINITE, FALSE ); + ok( ret == WAIT_OBJECT_0, "got %#lx.\n", ret ); + } + Sleep( 20 ); + status = NtClose( port ); + ok( !status, "got %#lx.\n", status ); + } + + WaitForMultipleObjects( 2, threads, TRUE, INFINITE ); + for (i = 0; i < 2; ++i) + { + CloseHandle( threads[i] ); + CloseHandle( p[i].ready ); + CloseHandle( p[i].test_ready ); + } +} + START_TEST(sync) { HMODULE module = GetModuleHandleA("ntdll.dll"); @@ -884,4 +1077,5 @@ START_TEST(sync) test_keyed_events(); test_resource(); test_tid_alert( argv ); + test_completion_port_scheduling(); }
Well, I was thinking you only need to remove it in two cases: when returning a packet, and when timing out. But I was also forgetting that satisfied() isn't called in the timeout case, and anyway it's a moot point.
There is also at least one more case, suspended thread handling. If a thread gets suspended while in NtRemoveIoCompletion it is essentially not a in a wait queue, other threads can get the completion. This works correctly with generic server wait queues logic.
The one advantage of doing it this way is that we get rid of a server object when it's not needed. I don't know if that's worthwhile or not.
But we will remove the object once completion port is closed, or thread is terminated. Until then, it is generally needed... and removing it will trade that object existence for the need of recreation, and also seems considerably more complicated as we need to know when the client actually got off the wait?
But as of 2/6 the completion_wait is still destroyed when a completion is destroyed, and the associated packet with it. This is fixed by the "if (!wait->msg)" before calling cleanup_thread_completion(), but that's not done until 4/6. Or maybe I'm missing something here?
Anyway, in v2 I reshuffled things a bit and introducing the possibility of 'detached' completion wait (with wait->completion == NULL) only in 4/6. This way it surely doesn't fix any of those two issues (both ends up fixed in 4/6) and is mostly an intermediate step. The the patch message says "ntdll: Assign completion to thread when wait for completion is satisfied.", it doesn't promise any fixes related to handle close? The functional effect is that it introduces LIFO wake up and denies stealing completions by "newcomer" threads when the wait for this completion is already satisfied.
v2: - introduce 'detached' completion waits (with complretion == NULL) only in patch 4 (as before that those NULL completion checks is essentially a 'dangling' logic); - Do not recreate completion waits in (remove_completion) if the existing one is for a different IOCP; - add more tests for APC vs existing completion priority, modify the handling so removing completion which resulted in APC doesn't associate a thread with completion; - drop a couple of confusing completion count checks with STATUS_USER_APC return.
Well, I was thinking you only need to remove it in two cases: when returning a packet, and when timing out. But I was also forgetting that satisfied() isn't called in the timeout case, and anyway it's a moot point.
There is also at least one more case, suspended thread handling. If a thread gets suspended while in NtRemoveIoCompletion it is essentially not a in a wait queue, other threads can get the completion. This works correctly with generic server wait queues logic.
Oh that's weird. Definitely would not have expected that one.
But as of 2/6 the completion_wait is still destroyed when a completion is destroyed, and the associated packet with it. This is fixed by the "if (!wait->msg)" before calling cleanup_thread_completion(), but that's not done until 4/6. Or maybe I'm missing something here?
Anyway, in v2 I reshuffled things a bit and introducing the possibility of 'detached' completion wait (with wait->completion == NULL) only in 4/6. This way it surely doesn't fix any of those two issues (both ends up fixed in 4/6) and is mostly an intermediate step. The the patch message says "ntdll: Assign completion to thread when wait for completion is satisfied.", it doesn't promise any fixes related to handle close? The functional effect is that it introduces LIFO wake up and denies stealing completions by "newcomer" threads when the wait for this completion is already satisfied.
I guess, but given the names of the commits I would have expected that bug to be fixed in 2/6 rather than 4/6. Maybe it's not worth bikeshedding.
There is also at least one more case, suspended thread handling. If a thread gets suspended while in NtRemoveIoCompletion it is essentially not a in a wait queue, other threads can get the completion. This works correctly with generic server wait queues logic.
Oh that's weird. Definitely would not have expected that one.
That's the same for e. g., events, wait can't be satisfied (and auto event de-signaled) by a suspended thread which happens to be also present in event's wait queue.