-- v3: ntdll/tests: Add tests for completion port signaling. kernelbase: Set the proper error code in GetQueuedCompletionStatus{Ex} when the handle is closed. server: Signal completion port waits on handle close. ntdll: Handle user APCs explicitly in NtRemoveIoCompletionEx().
From: Paul Gofman pgofman@codeweavers.com
--- dlls/ntdll/unix/sync.c | 8 ++- server/completion.c | 118 ++++++++++++++++++++++++++++++++++++++++- server/file.h | 1 + server/protocol.def | 1 + server/thread.c | 2 + server/thread.h | 2 + 6 files changed, 129 insertions(+), 3 deletions(-)
diff --git a/dlls/ntdll/unix/sync.c b/dlls/ntdll/unix/sync.c index 4269873be6c..e4a659f93bb 100644 --- a/dlls/ntdll/unix/sync.c +++ b/dlls/ntdll/unix/sync.c @@ -1999,6 +1999,7 @@ NTSTATUS WINAPI NtSetIoCompletion( HANDLE handle, ULONG_PTR key, ULONG_PTR value NTSTATUS WINAPI NtRemoveIoCompletion( HANDLE handle, ULONG_PTR *key, ULONG_PTR *value, IO_STATUS_BLOCK *io, LARGE_INTEGER *timeout ) { + HANDLE wait_handle = NULL; unsigned int status;
TRACE( "(%p, %p, %p, %p, %p)\n", handle, key, value, io, timeout ); @@ -2015,10 +2016,11 @@ NTSTATUS WINAPI NtRemoveIoCompletion( HANDLE handle, ULONG_PTR *key, ULONG_PTR * io->Information = reply->information; io->Status = reply->status; } + else wait_handle = wine_server_ptr_handle( reply->wait_handle ); } SERVER_END_REQ; if (status != STATUS_PENDING) return status; - status = NtWaitForSingleObject( handle, FALSE, timeout ); + status = NtWaitForSingleObject( wait_handle, FALSE, timeout ); if (status != WAIT_OBJECT_0) return status; } } @@ -2030,6 +2032,7 @@ NTSTATUS WINAPI NtRemoveIoCompletion( HANDLE handle, ULONG_PTR *key, ULONG_PTR * NTSTATUS WINAPI NtRemoveIoCompletionEx( HANDLE handle, FILE_IO_COMPLETION_INFORMATION *info, ULONG count, ULONG *written, LARGE_INTEGER *timeout, BOOLEAN alertable ) { + HANDLE wait_handle = NULL; unsigned int status; ULONG i = 0;
@@ -2049,6 +2052,7 @@ NTSTATUS WINAPI NtRemoveIoCompletionEx( HANDLE handle, FILE_IO_COMPLETION_INFORM info[i].IoStatusBlock.Information = reply->information; info[i].IoStatusBlock.Status = reply->status; } + else wait_handle = wine_server_ptr_handle( reply->wait_handle ); } SERVER_END_REQ; if (status != STATUS_SUCCESS) break; @@ -2059,7 +2063,7 @@ NTSTATUS WINAPI NtRemoveIoCompletionEx( HANDLE handle, FILE_IO_COMPLETION_INFORM if (status == STATUS_PENDING) status = STATUS_SUCCESS; break; } - status = NtWaitForSingleObject( handle, alertable, timeout ); + status = NtWaitForSingleObject( wait_handle, alertable, timeout ); if (status != WAIT_OBJECT_0) break; } *written = i ? i : 1; diff --git a/server/completion.c b/server/completion.c index 6933195e72d..f00962a306d 100644 --- a/server/completion.c +++ b/server/completion.c @@ -56,13 +56,71 @@ struct type_descr completion_type = }, };
+struct completion_wait +{ + struct object obj; + obj_handle_t handle; + struct completion *completion; + struct thread *thread; + struct list wait_queue_entry; +}; + struct completion { struct object obj; struct list queue; + struct list wait_queue; unsigned int depth; };
+static void completion_wait_dump( struct object*, int ); +static int completion_wait_signaled( struct object *obj, struct wait_queue_entry *entry ); +static void completion_wait_destroy( struct object * ); + +static const struct object_ops completion_wait_ops = +{ + sizeof(struct completion_wait), /* size */ + &no_type, /* type */ + completion_wait_dump, /* dump */ + add_queue, /* add_queue */ + remove_queue, /* remove_queue */ + completion_wait_signaled, /* signaled */ + no_satisfied, /* satisfied */ + no_signal, /* signal */ + no_get_fd, /* get_fd */ + default_map_access, /* map_access */ + default_get_sd, /* get_sd */ + default_set_sd, /* set_sd */ + no_get_full_name, /* get_full_name */ + no_lookup_name, /* lookup_name */ + no_link_name, /* link_name */ + NULL, /* unlink_name */ + no_open_file, /* open_file */ + no_kernel_obj_list, /* get_kernel_obj_list */ + no_close_handle, /* close_handle */ + completion_wait_destroy /* destroy */ +}; + +static void completion_wait_destroy( struct object *obj ) +{ +} + +static void completion_wait_dump( struct object *obj, int verbose ) +{ + struct completion_wait *wait = (struct completion_wait *)obj; + + assert( obj->ops == &completion_wait_ops ); + fprintf( stderr, "Completion wait completion=%p\n", wait->completion ); +} + +static int completion_wait_signaled( struct object *obj, struct wait_queue_entry *entry ) +{ + struct completion_wait *wait = (struct completion_wait *)obj; + + assert( obj->ops == &completion_wait_ops ); + return wait->completion->depth; +} + static void completion_dump( struct object*, int ); static int completion_signaled( struct object *obj, struct wait_queue_entry *entry ); static void completion_destroy( struct object * ); @@ -103,12 +161,19 @@ struct comp_msg static void completion_destroy( struct object *obj) { struct completion *completion = (struct completion *) obj; + struct completion_wait *wait, *wait_next; struct comp_msg *tmp, *next;
LIST_FOR_EACH_ENTRY_SAFE( tmp, next, &completion->queue, struct comp_msg, queue_entry ) { free( tmp ); } + + LIST_FOR_EACH_ENTRY_SAFE( wait, wait_next, &completion->wait_queue, struct completion_wait, wait_queue_entry ) + { + assert( wait->completion ); + cleanup_thread_completion( wait->thread ); + } }
static void completion_dump( struct object *obj, int verbose ) @@ -126,6 +191,35 @@ static int completion_signaled( struct object *obj, struct wait_queue_entry *ent return !list_empty( &completion->queue ); }
+void cleanup_thread_completion( struct thread *thread ) +{ + if (!thread->completion_wait) return; + + if (thread->completion_wait->handle) + { + close_handle( thread->process, thread->completion_wait->handle ); + thread->completion_wait->handle = 0; + } + list_remove( &thread->completion_wait->wait_queue_entry ); + release_object( &thread->completion_wait->obj ); + thread->completion_wait = NULL; +} + +static struct completion_wait *create_completion_wait( struct thread *thread ) +{ + struct completion_wait *wait; + + if (!(wait = alloc_object( &completion_wait_ops ))) return NULL; + wait->completion = NULL; + wait->thread = thread; + if (!(wait->handle = alloc_handle( current->process, wait, SYNCHRONIZE, 0 ))) + { + release_object( &wait->obj ); + return NULL; + } + return wait; +} + static struct completion *create_completion( struct object *root, const struct unicode_str *name, unsigned int attr, unsigned int concurrent, const struct security_descriptor *sd ) @@ -137,6 +231,7 @@ static struct completion *create_completion( struct object *root, const struct u if (get_error() != STATUS_OBJECT_NAME_EXISTS) { list_init( &completion->queue ); + list_init( &completion->wait_queue ); completion->depth = 0; } } @@ -153,6 +248,7 @@ void add_completion( struct completion *completion, apc_param_t ckey, apc_param_ unsigned int status, apc_param_t information ) { struct comp_msg *msg = mem_alloc( sizeof( *msg ) ); + struct completion_wait *wait;
if (!msg) return; @@ -164,7 +260,12 @@ void add_completion( struct completion *completion, apc_param_t ckey, apc_param_
list_add_tail( &completion->queue, &msg->queue_entry ); completion->depth++; - wake_up( &completion->obj, 1 ); + LIST_FOR_EACH_ENTRY( wait, &completion->wait_queue, struct completion_wait, wait_queue_entry ) + { + wake_up( &wait->obj, 1 ); + if (list_empty( &completion->queue )) return; + } + if (!list_empty( &completion->queue )) wake_up( &completion->obj, 0 ); }
/* create a completion */ @@ -219,8 +320,22 @@ DECL_HANDLER(remove_completion) if (!completion) return;
entry = list_head( &completion->queue ); + if (current->completion_wait) + { + list_remove( ¤t->completion_wait->wait_queue_entry ); + } + else if (!(current->completion_wait = create_completion_wait( current ))) + { + release_object( completion ); + return; + } + current->completion_wait->completion = completion; + list_add_head( &completion->wait_queue, ¤t->completion_wait->wait_queue_entry ); if (!entry) + { + reply->wait_handle = current->completion_wait->handle; set_error( STATUS_PENDING ); + } else { list_remove( entry ); @@ -231,6 +346,7 @@ DECL_HANDLER(remove_completion) reply->status = msg->status; reply->information = msg->information; free( msg ); + reply->wait_handle = 0; }
release_object( completion ); diff --git a/server/file.h b/server/file.h index f486f823f25..3d7cdc460ff 100644 --- a/server/file.h +++ b/server/file.h @@ -238,6 +238,7 @@ extern struct dir *get_dir_obj( struct process *process, obj_handle_t handle, un extern struct completion *get_completion_obj( struct process *process, obj_handle_t handle, unsigned int access ); extern void add_completion( struct completion *completion, apc_param_t ckey, apc_param_t cvalue, unsigned int status, apc_param_t information ); +extern void cleanup_thread_completion( struct thread *thread );
/* serial port functions */
diff --git a/server/protocol.def b/server/protocol.def index a4f25e805f8..693a20e3437 100644 --- a/server/protocol.def +++ b/server/protocol.def @@ -3798,6 +3798,7 @@ typedef union apc_param_t cvalue; /* completion value */ apc_param_t information; /* IO_STATUS_BLOCK Information */ unsigned int status; /* completion result */ + obj_handle_t wait_handle; /* handle to completion wait internal object */ @END
diff --git a/server/thread.c b/server/thread.c index 6542e1584ab..f3880eebedb 100644 --- a/server/thread.c +++ b/server/thread.c @@ -249,6 +249,7 @@ static inline void init_thread_structure( struct thread *thread )
thread->creation_time = current_time; thread->exit_time = 0; + thread->completion_wait = NULL;
list_init( &thread->mutex_list ); list_init( &thread->system_apc ); @@ -402,6 +403,7 @@ static void cleanup_thread( struct thread *thread ) { int i;
+ cleanup_thread_completion( thread ); if (thread->context) { thread->context->status = STATUS_ACCESS_DENIED; diff --git a/server/thread.h b/server/thread.h index 766ed78a72f..3448f332b0b 100644 --- a/server/thread.h +++ b/server/thread.h @@ -31,6 +31,7 @@ struct thread_apc; struct debug_obj; struct debug_event; struct msg_queue; +struct completion_wait;
enum run_state { @@ -91,6 +92,7 @@ struct thread struct list kernel_object; /* list of kernel object pointers */ data_size_t desc_len; /* thread description length in bytes */ WCHAR *desc; /* thread description string */ + struct completion_wait *completion_wait; /* completion port wait object the thread is associated with */ };
extern struct thread *current;
From: Paul Gofman pgofman@codeweavers.com
--- dlls/ntdll/unix/sync.c | 95 ++++++++++++++++++++++++++---------------- server/completion.c | 62 ++++++++++++++++++++++----- server/protocol.def | 10 +++++ 3 files changed, 121 insertions(+), 46 deletions(-)
diff --git a/dlls/ntdll/unix/sync.c b/dlls/ntdll/unix/sync.c index e4a659f93bb..f5536e398b5 100644 --- a/dlls/ntdll/unix/sync.c +++ b/dlls/ntdll/unix/sync.c @@ -2004,25 +2004,37 @@ NTSTATUS WINAPI NtRemoveIoCompletion( HANDLE handle, ULONG_PTR *key, ULONG_PTR *
TRACE( "(%p, %p, %p, %p, %p)\n", handle, key, value, io, timeout );
- for (;;) + SERVER_START_REQ( remove_completion ) { - SERVER_START_REQ( remove_completion ) + req->handle = wine_server_obj_handle( handle ); + if (!(status = wine_server_call( req ))) { - req->handle = wine_server_obj_handle( handle ); - if (!(status = wine_server_call( req ))) - { - *key = reply->ckey; - *value = reply->cvalue; - io->Information = reply->information; - io->Status = reply->status; - } - else wait_handle = wine_server_ptr_handle( reply->wait_handle ); + *key = reply->ckey; + *value = reply->cvalue; + io->Information = reply->information; + io->Status = reply->status; + } + else wait_handle = wine_server_ptr_handle( reply->wait_handle ); + } + SERVER_END_REQ; + if (status != STATUS_PENDING) return status; + if (!timeout || timeout->QuadPart) status = NtWaitForSingleObject( wait_handle, FALSE, timeout ); + else status = STATUS_TIMEOUT; + if (status != WAIT_OBJECT_0) return status; + + SERVER_START_REQ( get_thread_completion ) + { + if (!(status = wine_server_call( req ))) + { + *key = reply->ckey; + *value = reply->cvalue; + io->Information = reply->information; + io->Status = reply->status; } - SERVER_END_REQ; - if (status != STATUS_PENDING) return status; - status = NtWaitForSingleObject( wait_handle, FALSE, timeout ); - if (status != WAIT_OBJECT_0) return status; } + SERVER_END_REQ; + + return status; }
@@ -2038,34 +2050,47 @@ NTSTATUS WINAPI NtRemoveIoCompletionEx( HANDLE handle, FILE_IO_COMPLETION_INFORM
TRACE( "%p %p %u %p %p %u\n", handle, info, (int)count, written, timeout, alertable );
- for (;;) + while (i < count) { - while (i < count) + SERVER_START_REQ( remove_completion ) { - SERVER_START_REQ( remove_completion ) + req->handle = wine_server_obj_handle( handle ); + if (!(status = wine_server_call( req ))) { - req->handle = wine_server_obj_handle( handle ); - if (!(status = wine_server_call( req ))) - { - info[i].CompletionKey = reply->ckey; - info[i].CompletionValue = reply->cvalue; - info[i].IoStatusBlock.Information = reply->information; - info[i].IoStatusBlock.Status = reply->status; - } - else wait_handle = wine_server_ptr_handle( reply->wait_handle ); + info[i].CompletionKey = reply->ckey; + info[i].CompletionValue = reply->cvalue; + info[i].IoStatusBlock.Information = reply->information; + info[i].IoStatusBlock.Status = reply->status; } - SERVER_END_REQ; - if (status != STATUS_SUCCESS) break; - ++i; + else wait_handle = wine_server_ptr_handle( reply->wait_handle ); } - if (i || status != STATUS_PENDING) + SERVER_END_REQ; + if (status != STATUS_SUCCESS) break; + ++i; + } + if (i || status != STATUS_PENDING) + { + if (i) status = STATUS_SUCCESS; + goto done; + } + if (!timeout || timeout->QuadPart || alertable) status = NtWaitForSingleObject( wait_handle, alertable, timeout ); + else status = STATUS_TIMEOUT; + if (status != WAIT_OBJECT_0) goto done; + + SERVER_START_REQ( get_thread_completion ) + { + if (!(status = wine_server_call( req ))) { - if (status == STATUS_PENDING) status = STATUS_SUCCESS; - break; + info[i].CompletionKey = reply->ckey; + info[i].CompletionValue = reply->cvalue; + info[i].IoStatusBlock.Information = reply->information; + info[i].IoStatusBlock.Status = reply->status; + ++i; } - status = NtWaitForSingleObject( wait_handle, alertable, timeout ); - if (status != WAIT_OBJECT_0) break; } + SERVER_END_REQ; + +done: *written = i ? i : 1; return status; } diff --git a/server/completion.c b/server/completion.c index f00962a306d..f56162aa22d 100644 --- a/server/completion.c +++ b/server/completion.c @@ -21,7 +21,6 @@
/* FIXMEs: * - built-in wait queues used which means: - * + threads are awaken FIFO and not LIFO as native does * + "max concurrent active threads" parameter not used * + completion handle is waitable, while native isn't */ @@ -56,12 +55,22 @@ struct type_descr completion_type = }, };
+struct comp_msg +{ + struct list queue_entry; + apc_param_t ckey; + apc_param_t cvalue; + apc_param_t information; + unsigned int status; +}; + struct completion_wait { struct object obj; obj_handle_t handle; struct completion *completion; struct thread *thread; + struct comp_msg *msg; struct list wait_queue_entry; };
@@ -75,6 +84,7 @@ struct completion
static void completion_wait_dump( struct object*, int ); static int completion_wait_signaled( struct object *obj, struct wait_queue_entry *entry ); +static void completion_wait_satisfied( struct object *obj, struct wait_queue_entry *entry ); static void completion_wait_destroy( struct object * );
static const struct object_ops completion_wait_ops = @@ -85,7 +95,7 @@ static const struct object_ops completion_wait_ops = add_queue, /* add_queue */ remove_queue, /* remove_queue */ completion_wait_signaled, /* signaled */ - no_satisfied, /* satisfied */ + completion_wait_satisfied, /* satisfied */ no_signal, /* signal */ no_get_fd, /* get_fd */ default_map_access, /* map_access */ @@ -103,6 +113,9 @@ static const struct object_ops completion_wait_ops =
static void completion_wait_destroy( struct object *obj ) { + struct completion_wait *wait = (struct completion_wait *)obj; + + free( wait->msg ); }
static void completion_wait_dump( struct object *obj, int verbose ) @@ -121,6 +134,22 @@ static int completion_wait_signaled( struct object *obj, struct wait_queue_entry return wait->completion->depth; }
+static void completion_wait_satisfied( struct object *obj, struct wait_queue_entry *entry ) +{ + struct completion_wait *wait = (struct completion_wait *)obj; + struct list *msg_entry; + struct comp_msg *msg; + + assert( obj->ops == &completion_wait_ops ); + msg_entry = list_head( &wait->completion->queue ); + assert( msg_entry ); + msg = LIST_ENTRY( msg_entry, struct comp_msg, queue_entry ); + --wait->completion->depth; + list_remove( &msg->queue_entry ); + if (wait->msg) free( wait->msg ); + wait->msg = msg; +} + static void completion_dump( struct object*, int ); static int completion_signaled( struct object *obj, struct wait_queue_entry *entry ); static void completion_destroy( struct object * ); @@ -149,15 +178,6 @@ static const struct object_ops completion_ops = completion_destroy /* destroy */ };
-struct comp_msg -{ - struct list queue_entry; - apc_param_t ckey; - apc_param_t cvalue; - apc_param_t information; - unsigned int status; -}; - static void completion_destroy( struct object *obj) { struct completion *completion = (struct completion *) obj; @@ -212,6 +232,7 @@ static struct completion_wait *create_completion_wait( struct thread *thread ) if (!(wait = alloc_object( &completion_wait_ops ))) return NULL; wait->completion = NULL; wait->thread = thread; + wait->msg = NULL; if (!(wait->handle = alloc_handle( current->process, wait, SYNCHRONIZE, 0 ))) { release_object( &wait->obj ); @@ -352,6 +373,25 @@ DECL_HANDLER(remove_completion) release_object( completion ); }
+/* get completion after successful waiting for it */ +DECL_HANDLER(get_thread_completion) +{ + struct comp_msg *msg; + + if (!current->completion_wait || !(msg = current->completion_wait->msg)) + { + set_error( STATUS_INVALID_HANDLE ); + return; + } + + reply->ckey = msg->ckey; + reply->cvalue = msg->cvalue; + reply->status = msg->status; + reply->information = msg->information; + free( msg ); + current->completion_wait->msg = NULL; +} + /* get queue depth for completion port */ DECL_HANDLER(query_completion) { diff --git a/server/protocol.def b/server/protocol.def index 693a20e3437..0ad102283d1 100644 --- a/server/protocol.def +++ b/server/protocol.def @@ -3802,6 +3802,16 @@ typedef union @END
+/* get completion after successful wait */ +@REQ(get_thread_completion) +@REPLY + apc_param_t ckey; /* completion key */ + apc_param_t cvalue; /* completion value */ + apc_param_t information; /* IO_STATUS_BLOCK Information */ + unsigned int status; /* completion result */ +@END + + /* get completion queue depth */ @REQ(query_completion) obj_handle_t handle; /* port handle */
From: Paul Gofman pgofman@codeweavers.com
--- dlls/ntdll/tests/file.c | 47 ++++++++++++++++++++++++++++++++++++++++- dlls/ntdll/unix/sync.c | 14 +++++++++--- server/completion.c | 7 ++++++ server/protocol.def | 1 + 4 files changed, 65 insertions(+), 4 deletions(-)
diff --git a/dlls/ntdll/tests/file.c b/dlls/ntdll/tests/file.c index 0adc0998ee2..e60d392346b 100644 --- a/dlls/ntdll/tests/file.c +++ b/dlls/ntdll/tests/file.c @@ -958,14 +958,59 @@ static void test_set_io_completion(void) NTSTATUS res; ULONG count; SIZE_T size = 3; - HANDLE h; + HANDLE h, h2;
if (sizeof(size) > 4) size |= (ULONGLONG)0x12345678 << 32;
+ res = pNtCreateIoCompletion( &h2, IO_COMPLETION_ALL_ACCESS, NULL, 0 ); + ok( res == STATUS_SUCCESS, "NtCreateIoCompletion failed: %#lx\n", res ); + ok( h2 && h2 != INVALID_HANDLE_VALUE, "got invalid handle %p\n", h2 ); + res = pNtSetIoCompletion( h2, 123, 456, 789, size ); + ok( res == STATUS_SUCCESS, "NtSetIoCompletion failed: %#lx\n", res ); + res = pNtRemoveIoCompletionEx( h2, info, 2, &count, &timeout, TRUE ); + ok( res == STATUS_SUCCESS, "NtRemoveIoCompletionEx failed: %#lx\n", res ); + ok( count == 1, "wrong count %lu\n", count ); + res = pNtCreateIoCompletion( &h, IO_COMPLETION_ALL_ACCESS, NULL, 0 ); ok( res == STATUS_SUCCESS, "NtCreateIoCompletion failed: %#lx\n", res ); ok( h && h != INVALID_HANDLE_VALUE, "got invalid handle %p\n", h );
+ apc_count = 0; + QueueUserAPC( user_apc_proc, GetCurrentThread(), (ULONG_PTR)&apc_count ); + res = pNtSetIoCompletion( h, 123, 456, 789, size ); + ok( res == STATUS_SUCCESS, "NtSetIoCompletion failed: %#lx\n", res ); + res = pNtRemoveIoCompletionEx( h, info, 2, &count, &timeout, TRUE ); + /* APC goes first associated with completion port APC takes priority over pending completion. + * Even if the thread is associated with some other completion port. */ + ok( res == STATUS_USER_APC, "NtRemoveIoCompletionEx unexpected status %#lx\n", res ); + ok( apc_count == 1, "wrong apc count %u\n", apc_count ); + + CloseHandle( h2 ); + + apc_count = 0; + QueueUserAPC( user_apc_proc, GetCurrentThread(), (ULONG_PTR)&apc_count ); + res = pNtRemoveIoCompletionEx( h, info, 2, &count, &timeout, TRUE ); + /* Previous call resulted in STATUS_USER_APC did not associate the thread with the port. */ + ok( res == STATUS_USER_APC, "NtRemoveIoCompletion unexpected status %#lx\n", res ); + ok( apc_count == 1, "wrong apc count %u\n", apc_count ); + + res = pNtRemoveIoCompletionEx( h, info, 2, &count, &timeout, TRUE ); + /* Now the thread is associated. */ + ok( res == STATUS_SUCCESS, "NtRemoveIoCompletion failed: %#lx\n", res ); + ok( count == 1, "wrong count %lu\n", count ); + + apc_count = 0; + QueueUserAPC( user_apc_proc, GetCurrentThread(), (ULONG_PTR)&apc_count ); + res = pNtSetIoCompletion( h, 123, 456, 789, size ); + ok( res == STATUS_SUCCESS, "NtSetIoCompletion failed: %#lx\n", res ); + res = pNtRemoveIoCompletionEx( h, info, 2, &count, &timeout, TRUE ); + /* After a thread is associated with completion port existing completion is returned if APC is pending. */ + ok( res == STATUS_SUCCESS, "NtRemoveIoCompletionEx failed: %#lx\n", res ); + ok( count == 1, "wrong count %lu\n", count ); + ok( apc_count == 0, "wrong apc count %u\n", apc_count ); + SleepEx( 0, TRUE); + ok( apc_count == 1, "wrong apc count %u\n", apc_count ); + res = pNtRemoveIoCompletion( h, &key, &value, &iosb, &timeout ); ok( res == STATUS_TIMEOUT, "NtRemoveIoCompletion failed: %#lx\n", res );
diff --git a/dlls/ntdll/unix/sync.c b/dlls/ntdll/unix/sync.c index f5536e398b5..80f82c18730 100644 --- a/dlls/ntdll/unix/sync.c +++ b/dlls/ntdll/unix/sync.c @@ -2007,6 +2007,7 @@ NTSTATUS WINAPI NtRemoveIoCompletion( HANDLE handle, ULONG_PTR *key, ULONG_PTR * SERVER_START_REQ( remove_completion ) { req->handle = wine_server_obj_handle( handle ); + req->alertable = 0; if (!(status = wine_server_call( req ))) { *key = reply->ckey; @@ -2055,6 +2056,7 @@ NTSTATUS WINAPI NtRemoveIoCompletionEx( HANDLE handle, FILE_IO_COMPLETION_INFORM SERVER_START_REQ( remove_completion ) { req->handle = wine_server_obj_handle( handle ); + req->alertable = alertable; if (!(status = wine_server_call( req ))) { info[i].CompletionKey = reply->ckey; @@ -2068,13 +2070,19 @@ NTSTATUS WINAPI NtRemoveIoCompletionEx( HANDLE handle, FILE_IO_COMPLETION_INFORM if (status != STATUS_SUCCESS) break; ++i; } - if (i || status != STATUS_PENDING) + if (i || (status != STATUS_PENDING && status != STATUS_USER_APC)) { if (i) status = STATUS_SUCCESS; goto done; } - if (!timeout || timeout->QuadPart || alertable) status = NtWaitForSingleObject( wait_handle, alertable, timeout ); - else status = STATUS_TIMEOUT; + if (status == STATUS_USER_APC) + { + status = NtDelayExecution( TRUE, NULL ); + assert( status == STATUS_USER_APC ); + goto done; + } + if (!timeout || timeout->QuadPart) status = NtWaitForSingleObject( wait_handle, alertable, timeout ); + else status = STATUS_TIMEOUT; if (status != WAIT_OBJECT_0) goto done;
SERVER_START_REQ( get_thread_completion ) diff --git a/server/completion.c b/server/completion.c index f56162aa22d..108aff46818 100644 --- a/server/completion.c +++ b/server/completion.c @@ -341,6 +341,13 @@ DECL_HANDLER(remove_completion) if (!completion) return;
entry = list_head( &completion->queue ); + if (req->alertable && !list_empty( ¤t->user_apc ) + && !(entry && current->completion_wait && current->completion_wait->completion == completion)) + { + set_error( STATUS_USER_APC ); + release_object( completion ); + return; + } if (current->completion_wait) { list_remove( ¤t->completion_wait->wait_queue_entry ); diff --git a/server/protocol.def b/server/protocol.def index 0ad102283d1..ac22f9853e5 100644 --- a/server/protocol.def +++ b/server/protocol.def @@ -3793,6 +3793,7 @@ typedef union /* get completion from completion port queue */ @REQ(remove_completion) obj_handle_t handle; /* port handle */ + int alertable; /* completion wait is alertable */ @REPLY apc_param_t ckey; /* completion key */ apc_param_t cvalue; /* completion value */
From: Paul Gofman pgofman@codeweavers.com
--- server/completion.c | 52 ++++++++++++++++++++++++++++++++------------- 1 file changed, 37 insertions(+), 15 deletions(-)
diff --git a/server/completion.c b/server/completion.c index 108aff46818..f9e68c523f1 100644 --- a/server/completion.c +++ b/server/completion.c @@ -19,11 +19,7 @@ * */
-/* FIXMEs: - * - built-in wait queues used which means: - * + "max concurrent active threads" parameter not used - * + completion handle is waitable, while native isn't - */ +/* FIXME: "max concurrent active threads" parameter is not used */
#include "config.h"
@@ -80,6 +76,7 @@ struct completion struct list queue; struct list wait_queue; unsigned int depth; + int closed; };
static void completion_wait_dump( struct object*, int ); @@ -131,6 +128,7 @@ static int completion_wait_signaled( struct object *obj, struct wait_queue_entry struct completion_wait *wait = (struct completion_wait *)obj;
assert( obj->ops == &completion_wait_ops ); + if (!wait->completion) return 1; return wait->completion->depth; }
@@ -141,6 +139,11 @@ static void completion_wait_satisfied( struct object *obj, struct wait_queue_ent struct comp_msg *msg;
assert( obj->ops == &completion_wait_ops ); + if (!wait->completion) + { + make_wait_abandoned( entry ); + return; + } msg_entry = list_head( &wait->completion->queue ); assert( msg_entry ); msg = LIST_ENTRY( msg_entry, struct comp_msg, queue_entry ); @@ -152,6 +155,7 @@ static void completion_wait_satisfied( struct object *obj, struct wait_queue_ent
static void completion_dump( struct object*, int ); static int completion_signaled( struct object *obj, struct wait_queue_entry *entry ); +static int completion_close_handle( struct object *obj, struct process *process, obj_handle_t handle ); static void completion_destroy( struct object * );
static const struct object_ops completion_ops = @@ -174,26 +178,19 @@ static const struct object_ops completion_ops = default_unlink_name, /* unlink_name */ no_open_file, /* open_file */ no_kernel_obj_list, /* get_kernel_obj_list */ - no_close_handle, /* close_handle */ + completion_close_handle, /* close_handle */ completion_destroy /* destroy */ };
static void completion_destroy( struct object *obj) { struct completion *completion = (struct completion *) obj; - struct completion_wait *wait, *wait_next; struct comp_msg *tmp, *next;
LIST_FOR_EACH_ENTRY_SAFE( tmp, next, &completion->queue, struct comp_msg, queue_entry ) { free( tmp ); } - - LIST_FOR_EACH_ENTRY_SAFE( wait, wait_next, &completion->wait_queue, struct completion_wait, wait_queue_entry ) - { - assert( wait->completion ); - cleanup_thread_completion( wait->thread ); - } }
static void completion_dump( struct object *obj, int verbose ) @@ -208,7 +205,30 @@ static int completion_signaled( struct object *obj, struct wait_queue_entry *ent { struct completion *completion = (struct completion *)obj;
- return !list_empty( &completion->queue ); + return !list_empty( &completion->queue ) || completion->closed; +} + +static int completion_close_handle( struct object *obj, struct process *process, obj_handle_t handle ) +{ + struct completion *completion = (struct completion *)obj; + struct completion_wait *wait, *wait_next; + + if (completion->obj.handle_count != 1) return 1; + + LIST_FOR_EACH_ENTRY_SAFE( wait, wait_next, &completion->wait_queue, struct completion_wait, wait_queue_entry ) + { + assert( wait->completion ); + wait->completion = NULL; + list_remove( &wait->wait_queue_entry ); + if (!wait->msg) + { + wake_up( &wait->obj, 0 ); + cleanup_thread_completion( wait->thread ); + } + } + completion->closed = 1; + wake_up( obj, 0 ); + return 1; }
void cleanup_thread_completion( struct thread *thread ) @@ -220,7 +240,7 @@ void cleanup_thread_completion( struct thread *thread ) close_handle( thread->process, thread->completion_wait->handle ); thread->completion_wait->handle = 0; } - list_remove( &thread->completion_wait->wait_queue_entry ); + if (thread->completion_wait->completion) list_remove( &thread->completion_wait->wait_queue_entry ); release_object( &thread->completion_wait->obj ); thread->completion_wait = NULL; } @@ -254,6 +274,7 @@ static struct completion *create_completion( struct object *root, const struct u list_init( &completion->queue ); list_init( &completion->wait_queue ); completion->depth = 0; + completion->closed = 0; } }
@@ -397,6 +418,7 @@ DECL_HANDLER(get_thread_completion) reply->information = msg->information; free( msg ); current->completion_wait->msg = NULL; + if (!current->completion_wait->completion) cleanup_thread_completion( current ); }
/* get queue depth for completion port */
From: Alexey Prokhin alexey@prokhin.ru
--- dlls/kernelbase/sync.c | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-)
diff --git a/dlls/kernelbase/sync.c b/dlls/kernelbase/sync.c index 1d935b02ddf..f03a6c3150a 100644 --- a/dlls/kernelbase/sync.c +++ b/dlls/kernelbase/sync.c @@ -1197,7 +1197,8 @@ BOOL WINAPI DECLSPEC_HOTPATCH GetQueuedCompletionStatus( HANDLE port, LPDWORD co return FALSE; }
- if (status == STATUS_TIMEOUT) SetLastError( WAIT_TIMEOUT ); + if (status == STATUS_TIMEOUT) SetLastError( WAIT_TIMEOUT ); + else if (status == STATUS_ABANDONED) SetLastError( ERROR_ABANDONED_WAIT_0 ); else SetLastError( RtlNtStatusToDosError(status) ); return FALSE; } @@ -1217,8 +1218,9 @@ BOOL WINAPI DECLSPEC_HOTPATCH GetQueuedCompletionStatusEx( HANDLE port, OVERLAPP ret = NtRemoveIoCompletionEx( port, (FILE_IO_COMPLETION_INFORMATION *)entries, count, written, get_nt_timeout( &time, timeout ), alertable ); if (ret == STATUS_SUCCESS) return TRUE; - else if (ret == STATUS_TIMEOUT) SetLastError( WAIT_TIMEOUT ); - else if (ret == STATUS_USER_APC) SetLastError( WAIT_IO_COMPLETION ); + else if (ret == STATUS_TIMEOUT) SetLastError( WAIT_TIMEOUT ); + else if (ret == STATUS_USER_APC) SetLastError( WAIT_IO_COMPLETION ); + else if (ret == STATUS_ABANDONED) SetLastError( ERROR_ABANDONED_WAIT_0 ); else SetLastError( RtlNtStatusToDosError(ret) ); return FALSE; }
From: Paul Gofman pgofman@codeweavers.com
--- dlls/ntdll/tests/sync.c | 194 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 194 insertions(+)
diff --git a/dlls/ntdll/tests/sync.c b/dlls/ntdll/tests/sync.c index f356d3ec38f..3ab89bbf361 100644 --- a/dlls/ntdll/tests/sync.c +++ b/dlls/ntdll/tests/sync.c @@ -837,6 +837,199 @@ static void test_tid_alert( char **argv ) CloseHandle( pi.hThread ); }
+struct test_completion_port_scheduling_param +{ + HANDLE ready, test_ready; + HANDLE port; + int index; +}; + +static DWORD WINAPI test_completion_port_scheduling_thread(void *param) +{ + struct test_completion_port_scheduling_param *p = param; + FILE_IO_COMPLETION_INFORMATION info; + OVERLAPPED_ENTRY overlapped_entry; + OVERLAPPED *overlapped; + IO_STATUS_BLOCK iosb; + ULONG_PTR key, value; + NTSTATUS status; + DWORD ret, err; + ULONG count; + BOOL bret; + + /* both threads are woken when comleption added. */ + ret = WaitForSingleObject( p->ready, INFINITE ); + ok( ret == WAIT_OBJECT_0, "got %#lx.\n", ret ); + ret = WaitForSingleObject( p->port, INFINITE ); + ok( ret == WAIT_OBJECT_0, "got %#lx.\n", ret ); + SetEvent( p->test_ready ); + + /* if a thread is waiting for completion which is added threads which wait on port handle are not woken. */ + ret = WaitForSingleObject( p->ready, INFINITE ); + if (p->index) + { + bret = GetQueuedCompletionStatus( p->port, &count, &key, &overlapped, INFINITE ); + ok( bret, "got error %lu.\n", GetLastError() ); + } + else + { + ret = WaitForSingleObject( p->port, 100 ); + ok( ret == WAIT_TIMEOUT || broken( !ret ) /* before Win10 1607 */, "got %#lx.\n", ret ); + } + SetEvent( p->test_ready ); + + /* Two threads in GetQueuedCompletionStatus, the second is supposed to start first. */ + ret = WaitForSingleObject( p->ready, INFINITE ); + ok( ret == WAIT_OBJECT_0, "got %#lx.\n", ret ); + bret = GetQueuedCompletionStatus( p->port, &count, &key, &overlapped, INFINITE ); + ok( bret, "got error %lu.\n", GetLastError() ); + ok( key == 3 + p->index || broken( p->index && key == 5 ) /* before Win10 */, "got %Iu, expected %u.\n", key, 3 + p->index ); + SetEvent( p->test_ready ); + + /* Port is being closed. */ + ret = WaitForSingleObject( p->ready, INFINITE ); + ret = WaitForSingleObject( p->port, INFINITE ); + if (ret == WAIT_FAILED) + skip( "Handle closed before wait started.\n" ); + else + ok( ret == WAIT_OBJECT_0, "got %#lx.\n", ret ); + SetEvent( p->test_ready ); + + /* Port is being closed. */ + ret = WaitForSingleObject( p->ready, INFINITE ); + ok( ret == WAIT_OBJECT_0, "got %#lx.\n", ret ); + SetEvent( p->test_ready ); + status = NtRemoveIoCompletion( p->port, &key, &value, &iosb, NULL ); + if (status == STATUS_INVALID_HANDLE) + skip( "Handle closed before wait started.\n" ); + else + ok( status == STATUS_ABANDONED_WAIT_0, "got %#lx.\n", status ); + + /* Port is being closed. */ + ret = WaitForSingleObject( p->ready, INFINITE ); + ok( ret == WAIT_OBJECT_0, "got %#lx.\n", ret ); + SetEvent( p->test_ready ); + count = 0xdeadbeef; + status = NtRemoveIoCompletionEx( p->port, &info, 1, &count, NULL, FALSE ); + ok( count <= 1, "Got unexpected count %lu.\n", count ); + if (status == STATUS_INVALID_HANDLE) + skip( "Handle closed before wait started.\n" ); + else + ok( status == STATUS_ABANDONED_WAIT_0, "got %#lx.\n", status ); + + /* Port is being closed. */ + ret = WaitForSingleObject( p->ready, INFINITE ); + ok( ret == WAIT_OBJECT_0, "got %#lx.\n", ret ); + SetEvent( p->test_ready ); + bret = GetQueuedCompletionStatus( p->port, &count, &key, &overlapped, INFINITE ); + err = GetLastError(); + ok( !bret, "got %d.\n", bret ); + if (err == ERROR_INVALID_HANDLE) + skip( "Handle closed before wait started.\n" ); + else + ok( err == ERROR_ABANDONED_WAIT_0, "got error %#lx.\n", err ); + + /* Port is being closed. */ + ret = WaitForSingleObject( p->ready, INFINITE ); + ok( ret == WAIT_OBJECT_0, "got %#lx.\n", ret ); + SetEvent( p->test_ready ); + bret = GetQueuedCompletionStatusEx( p->port, &overlapped_entry, 1, &count, INFINITE, TRUE ); + err = GetLastError(); + ok( !bret, "got %d.\n", bret ); + if (err == ERROR_INVALID_HANDLE) + skip( "Handle closed before wait started.\n" ); + else + ok( err == ERROR_ABANDONED_WAIT_0, "got error %#lx.\n", err ); + + return 0; +} + +static void test_completion_port_scheduling(void) +{ + struct test_completion_port_scheduling_param p[2]; + HANDLE threads[2], port; + OVERLAPPED *overlapped; + unsigned int i, j; + DWORD ret, count; + NTSTATUS status; + ULONG_PTR key; + BOOL bret; + + for (i = 0; i < 2; ++i) + { + p[i].index = 0; + p[i].ready = CreateEventA(NULL, FALSE, FALSE, NULL); + p[i].test_ready = CreateEventA(NULL, FALSE, FALSE, NULL); + threads[i] = CreateThread( NULL, 0, test_completion_port_scheduling_thread, &p[i], 0, NULL ); + ok( !!threads[i], "got error %lu.\n", GetLastError() ); + } + + status = NtCreateIoCompletion( &port, IO_COMPLETION_ALL_ACCESS, NULL, 0 ); + ok( !status, "got %#lx.\n", status ); + /* Waking multiple threads directly waiting on port */ + for (i = 0; i < 2; ++i) + { + p[i].index = i; + p[i].port = port; + SetEvent( p[i].ready ); + } + PostQueuedCompletionStatus( port, 0, 1, NULL ); + for (i = 0; i < 2; ++i) WaitForSingleObject( p[i].test_ready, INFINITE ); + bret = GetQueuedCompletionStatus( port, &count, &key, &overlapped, INFINITE ); + ok( bret, "got error %lu.\n", GetLastError() ); + + /* One thread is waiting on port, another in GetQueuedCompletionStatus(). */ + SetEvent( p[1].ready ); + Sleep( 40 ); + SetEvent( p[0].ready ); + Sleep( 10 ); + PostQueuedCompletionStatus( port, 0, 2, NULL ); + for (i = 0; i < 2; ++i) WaitForSingleObject( p[i].test_ready, INFINITE ); + + /* Both threads are waiting in GetQueuedCompletionStatus, LIFO wake up order. */ + SetEvent( p[1].ready ); + Sleep( 40 ); + SetEvent( p[0].ready ); + Sleep( 20 ); + PostQueuedCompletionStatus( port, 0, 3, NULL ); + PostQueuedCompletionStatus( port, 0, 4, NULL ); + PostQueuedCompletionStatus( port, 0, 5, NULL ); + bret = GetQueuedCompletionStatus( p->port, &count, &key, &overlapped, INFINITE ); + ok( bret, "got error %lu.\n", GetLastError() ); + ok( key == 5 || broken( key == 4 ) /* before Win10 */, "got %Iu, expected 5.\n", key ); + + /* Close port handle while threads are waiting on it directly. */ + for (i = 0; i < 2; ++i) SetEvent( p[i].ready ); + Sleep( 20 ); + NtClose( port ); + for (i = 0; i < 2; ++i) WaitForSingleObject( p[i].test_ready, INFINITE ); + + /* Test signaling on port close. */ + for (i = 0; i < 4; ++i) + { + status = NtCreateIoCompletion( &port, IO_COMPLETION_ALL_ACCESS, NULL, 0 ); + ok( !status, "got %#lx.\n", status ); + for (j = 0; j < 2; ++j) + { + p[j].port = port; + ret = SignalObjectAndWait( p[j].ready, p[j].test_ready, + INFINITE, FALSE ); + ok( ret == WAIT_OBJECT_0, "got %#lx.\n", ret ); + } + Sleep( 20 ); + status = NtClose( port ); + ok( !status, "got %#lx.\n", status ); + } + + WaitForMultipleObjects( 2, threads, TRUE, INFINITE ); + for (i = 0; i < 2; ++i) + { + CloseHandle( threads[i] ); + CloseHandle( p[i].ready ); + CloseHandle( p[i].test_ready ); + } +} + START_TEST(sync) { HMODULE module = GetModuleHandleA("ntdll.dll"); @@ -884,4 +1077,5 @@ START_TEST(sync) test_keyed_events(); test_resource(); test_tid_alert( argv ); + test_completion_port_scheduling(); }
v3: - fix clang warning ('h' instead of 'h2' in test's message parameters).
This merge request was approved by Elizabeth Figura.