Splitting mutex and semaphore, preparing the message queue to use both server and inproc syncs.
-- v2: server: Use a signaled flag for message queue sync. server: Use a flag to keep track of message queue waits. server: Split semaphore to a dedicated sync object. server: Split mutex to a dedicated sync object. server: Move some checks inside of mutex do_release.
From: Rémi Bernon rbernon@codeweavers.com
--- server/mutex.c | 44 +++++++++++++++++++------------------------- 1 file changed, 19 insertions(+), 25 deletions(-)
diff --git a/server/mutex.c b/server/mutex.c index 4737b6f711b..90cc594c741 100644 --- a/server/mutex.c +++ b/server/mutex.c @@ -105,13 +105,21 @@ static void do_grab( struct mutex *mutex, struct thread *thread ) }
/* release a mutex once the recursion count is 0 */ -static void do_release( struct mutex *mutex ) +static int do_release( struct mutex *mutex, struct thread *thread, int count ) { - assert( !mutex->count ); - /* remove the mutex from the thread list of owned mutexes */ - list_remove( &mutex->entry ); - mutex->owner = NULL; - wake_up( &mutex->obj, 0 ); + if (!mutex->count || (mutex->owner != thread)) + { + set_error( STATUS_MUTANT_NOT_OWNED ); + return 0; + } + if (!(mutex->count -= count)) + { + /* remove the mutex from the thread list of owned mutexes */ + list_remove( &mutex->entry ); + mutex->owner = NULL; + wake_up( &mutex->obj, 0 ); + } + return 1; }
static struct mutex *create_mutex( struct object *root, const struct unicode_str *name, @@ -141,9 +149,8 @@ void abandon_mutexes( struct thread *thread ) { struct mutex *mutex = LIST_ENTRY( ptr, struct mutex, entry ); assert( mutex->owner == thread ); - mutex->count = 0; mutex->abandoned = 1; - do_release( mutex ); + do_release( mutex, thread, mutex->count ); } }
@@ -181,23 +188,14 @@ static int mutex_signal( struct object *obj, unsigned int access ) set_error( STATUS_ACCESS_DENIED ); return 0; } - if (!mutex->count || (mutex->owner != current)) - { - set_error( STATUS_MUTANT_NOT_OWNED ); - return 0; - } - if (!--mutex->count) do_release( mutex ); - return 1; + return do_release( mutex, current, 1 ); }
static void mutex_destroy( struct object *obj ) { struct mutex *mutex = (struct mutex *)obj; assert( obj->ops == &mutex_ops ); - - if (!mutex->count) return; - mutex->count = 0; - do_release( mutex ); + if (mutex->count) do_release( mutex, current, mutex->count ); }
/* create a mutex */ @@ -241,12 +239,8 @@ DECL_HANDLER(release_mutex) if ((mutex = (struct mutex *)get_handle_obj( current->process, req->handle, 0, &mutex_ops ))) { - if (!mutex->count || (mutex->owner != current)) set_error( STATUS_MUTANT_NOT_OWNED ); - else - { - reply->prev_count = mutex->count; - if (!--mutex->count) do_release( mutex ); - } + reply->prev_count = mutex->count; + do_release( mutex, current, 1 ); release_object( mutex ); } }
From: Rémi Bernon rbernon@codeweavers.com
--- server/mutex.c | 166 +++++++++++++++++++++++++++++++++++-------------- 1 file changed, 119 insertions(+), 47 deletions(-)
diff --git a/server/mutex.c b/server/mutex.c index 90cc594c741..573fcda1083 100644 --- a/server/mutex.c +++ b/server/mutex.c @@ -50,31 +50,30 @@ struct type_descr mutex_type = }, };
-struct mutex +struct mutex_sync { - struct object obj; /* object header */ - struct thread *owner; /* mutex owner */ - unsigned int count; /* recursion count */ - int abandoned; /* has it been abandoned? */ - struct list entry; /* entry in owner thread mutex list */ + struct object obj; /* object header */ + struct thread *owner; /* mutex owner */ + unsigned int count; /* recursion count */ + int abandoned; /* has it been abandoned? */ + struct list entry; /* entry in owner thread mutex list */ };
-static void mutex_dump( struct object *obj, int verbose ); -static int mutex_signaled( struct object *obj, struct wait_queue_entry *entry ); -static void mutex_satisfied( struct object *obj, struct wait_queue_entry *entry ); -static void mutex_destroy( struct object *obj ); -static int mutex_signal( struct object *obj, unsigned int access ); +static void mutex_sync_dump( struct object *obj, int verbose ); +static int mutex_sync_signaled( struct object *obj, struct wait_queue_entry *entry ); +static void mutex_sync_satisfied( struct object *obj, struct wait_queue_entry *entry ); +static void mutex_sync_destroy( struct object *obj );
-static const struct object_ops mutex_ops = +static const struct object_ops mutex_sync_ops = { - sizeof(struct mutex), /* size */ - &mutex_type, /* type */ - mutex_dump, /* dump */ + sizeof(struct mutex_sync), /* size */ + &no_type, /* type */ + mutex_sync_dump, /* dump */ add_queue, /* add_queue */ remove_queue, /* remove_queue */ - mutex_signaled, /* signaled */ - mutex_satisfied, /* satisfied */ - mutex_signal, /* signal */ + mutex_sync_signaled, /* signaled */ + mutex_sync_satisfied, /* satisfied */ + no_signal, /* signal */ no_get_fd, /* get_fd */ default_get_sync, /* get_sync */ default_map_access, /* map_access */ @@ -87,12 +86,11 @@ static const struct object_ops mutex_ops = no_open_file, /* open_file */ no_kernel_obj_list, /* get_kernel_obj_list */ no_close_handle, /* close_handle */ - mutex_destroy /* destroy */ + mutex_sync_destroy, /* destroy */ };
- /* grab a mutex for a given thread */ -static void do_grab( struct mutex *mutex, struct thread *thread ) +static void do_grab( struct mutex_sync *mutex, struct thread *thread ) { assert( !mutex->count || (mutex->owner == thread) );
@@ -105,7 +103,7 @@ static void do_grab( struct mutex *mutex, struct thread *thread ) }
/* release a mutex once the recursion count is 0 */ -static int do_release( struct mutex *mutex, struct thread *thread, int count ) +static int do_release( struct mutex_sync *mutex, struct thread *thread, int count ) { if (!mutex->count || (mutex->owner != thread)) { @@ -122,6 +120,87 @@ static int do_release( struct mutex *mutex, struct thread *thread, int count ) return 1; }
+static void mutex_sync_dump( struct object *obj, int verbose ) +{ + struct mutex_sync *mutex = (struct mutex_sync *)obj; + assert( obj->ops == &mutex_sync_ops ); + fprintf( stderr, "Mutex count=%u owner=%p\n", mutex->count, mutex->owner ); +} + +static void mutex_sync_destroy( struct object *obj ) +{ + struct mutex_sync *mutex = (struct mutex_sync *)obj; + assert( obj->ops == &mutex_sync_ops ); + + if (mutex->count) do_release( mutex, current, mutex->count ); +} + +static int mutex_sync_signaled( struct object *obj, struct wait_queue_entry *entry ) +{ + struct mutex_sync *mutex = (struct mutex_sync *)obj; + assert( obj->ops == &mutex_sync_ops ); + return (!mutex->count || (mutex->owner == get_wait_queue_thread( entry ))); +} + +static void mutex_sync_satisfied( struct object *obj, struct wait_queue_entry *entry ) +{ + struct mutex_sync *mutex = (struct mutex_sync *)obj; + assert( obj->ops == &mutex_sync_ops ); + + do_grab( mutex, get_wait_queue_thread( entry )); + if (mutex->abandoned) make_wait_abandoned( entry ); + mutex->abandoned = 0; +} + +static struct mutex_sync *create_mutex_sync( int owned ) +{ + struct mutex_sync *mutex; + + if (!(mutex = alloc_object( &mutex_sync_ops ))) return NULL; + mutex->count = 0; + mutex->owner = NULL; + mutex->abandoned = 0; + if (owned) do_grab( mutex, current ); + + return mutex; +} + +struct mutex +{ + struct object obj; /* object header */ + struct mutex_sync *sync; /* mutex sync object */ +}; + +static void mutex_dump( struct object *obj, int verbose ); +static struct object *mutex_get_sync( struct object *obj ); +static int mutex_signal( struct object *obj, unsigned int access ); +static void mutex_destroy( struct object *obj ); + +static const struct object_ops mutex_ops = +{ + sizeof(struct mutex), /* size */ + &mutex_type, /* type */ + mutex_dump, /* dump */ + NULL, /* add_queue */ + NULL, /* remove_queue */ + NULL, /* signaled */ + NULL, /* satisfied */ + mutex_signal, /* signal */ + no_get_fd, /* get_fd */ + mutex_get_sync, /* get_sync */ + default_map_access, /* map_access */ + default_get_sd, /* get_sd */ + default_set_sd, /* set_sd */ + default_get_full_name, /* get_full_name */ + no_lookup_name, /* lookup_name */ + directory_link_name, /* link_name */ + default_unlink_name, /* unlink_name */ + no_open_file, /* open_file */ + no_kernel_obj_list, /* get_kernel_obj_list */ + no_close_handle, /* close_handle */ + mutex_destroy, /* destroy */ +}; + static struct mutex *create_mutex( struct object *root, const struct unicode_str *name, unsigned int attr, int owned, const struct security_descriptor *sd ) { @@ -132,10 +211,13 @@ static struct mutex *create_mutex( struct object *root, const struct unicode_str if (get_error() != STATUS_OBJECT_NAME_EXISTS) { /* initialize it if it didn't already exist */ - mutex->count = 0; - mutex->owner = NULL; - mutex->abandoned = 0; - if (owned) do_grab( mutex, current ); + mutex->sync = NULL; + + if (!(mutex->sync = create_mutex_sync( owned ))) + { + release_object( mutex ); + return NULL; + } } } return mutex; @@ -147,7 +229,7 @@ void abandon_mutexes( struct thread *thread )
while ((ptr = list_head( &thread->mutex_list )) != NULL) { - struct mutex *mutex = LIST_ENTRY( ptr, struct mutex, entry ); + struct mutex_sync *mutex = LIST_ENTRY( ptr, struct mutex_sync, entry ); assert( mutex->owner == thread ); mutex->abandoned = 1; do_release( mutex, thread, mutex->count ); @@ -158,24 +240,14 @@ static void mutex_dump( struct object *obj, int verbose ) { struct mutex *mutex = (struct mutex *)obj; assert( obj->ops == &mutex_ops ); - fprintf( stderr, "Mutex count=%u owner=%p\n", mutex->count, mutex->owner ); + mutex->sync->obj.ops->dump( &mutex->sync->obj, verbose ); }
-static int mutex_signaled( struct object *obj, struct wait_queue_entry *entry ) +static struct object *mutex_get_sync( struct object *obj ) { struct mutex *mutex = (struct mutex *)obj; assert( obj->ops == &mutex_ops ); - return (!mutex->count || (mutex->owner == get_wait_queue_thread( entry ))); -} - -static void mutex_satisfied( struct object *obj, struct wait_queue_entry *entry ) -{ - struct mutex *mutex = (struct mutex *)obj; - assert( obj->ops == &mutex_ops ); - - do_grab( mutex, get_wait_queue_thread( entry )); - if (mutex->abandoned) make_wait_abandoned( entry ); - mutex->abandoned = 0; + return grab_object( mutex->sync ); }
static int mutex_signal( struct object *obj, unsigned int access ) @@ -188,14 +260,14 @@ static int mutex_signal( struct object *obj, unsigned int access ) set_error( STATUS_ACCESS_DENIED ); return 0; } - return do_release( mutex, current, 1 ); + return do_release( mutex->sync, current, 1 ); }
static void mutex_destroy( struct object *obj ) { struct mutex *mutex = (struct mutex *)obj; assert( obj->ops == &mutex_ops ); - if (mutex->count) do_release( mutex, current, mutex->count ); + if (mutex->sync) release_object( mutex->sync ); }
/* create a mutex */ @@ -239,8 +311,8 @@ DECL_HANDLER(release_mutex) if ((mutex = (struct mutex *)get_handle_obj( current->process, req->handle, 0, &mutex_ops ))) { - reply->prev_count = mutex->count; - do_release( mutex, current, 1 ); + reply->prev_count = mutex->sync->count; + do_release( mutex->sync, current, 1 ); release_object( mutex ); } } @@ -253,9 +325,9 @@ DECL_HANDLER(query_mutex) if ((mutex = (struct mutex *)get_handle_obj( current->process, req->handle, MUTANT_QUERY_STATE, &mutex_ops ))) { - reply->count = mutex->count; - reply->owned = (mutex->owner == current); - reply->abandoned = mutex->abandoned; + reply->count = mutex->sync->count; + reply->owned = (mutex->sync->owner == current); + reply->abandoned = mutex->sync->abandoned;
release_object( mutex ); }
From: Rémi Bernon rbernon@codeweavers.com
--- server/semaphore.c | 172 +++++++++++++++++++++++++++++++-------------- 1 file changed, 121 insertions(+), 51 deletions(-)
diff --git a/server/semaphore.c b/server/semaphore.c index 304a821bcec..4b31bfe806c 100644 --- a/server/semaphore.c +++ b/server/semaphore.c @@ -50,30 +50,119 @@ struct type_descr semaphore_type = }, };
+struct semaphore_sync +{ + struct object obj; /* object header */ + unsigned int count; /* current count */ + unsigned int max; /* maximum possible count */ +}; + +static void semaphore_sync_dump( struct object *obj, int verbose ); +static int semaphore_sync_signaled( struct object *obj, struct wait_queue_entry *entry ); +static void semaphore_sync_satisfied( struct object *obj, struct wait_queue_entry *entry ); + +static const struct object_ops semaphore_sync_ops = +{ + sizeof(struct semaphore_sync), /* size */ + &no_type, /* type */ + semaphore_sync_dump, /* dump */ + add_queue, /* add_queue */ + remove_queue, /* remove_queue */ + semaphore_sync_signaled, /* signaled */ + semaphore_sync_satisfied, /* satisfied */ + no_signal, /* signal */ + no_get_fd, /* get_fd */ + default_get_sync, /* get_sync */ + default_map_access, /* map_access */ + default_get_sd, /* get_sd */ + default_set_sd, /* set_sd */ + default_get_full_name, /* get_full_name */ + no_lookup_name, /* lookup_name */ + directory_link_name, /* link_name */ + default_unlink_name, /* unlink_name */ + no_open_file, /* open_file */ + no_kernel_obj_list, /* get_kernel_obj_list */ + no_close_handle, /* close_handle */ + no_destroy /* destroy */ +}; + +static int release_semaphore( struct semaphore_sync *sem, unsigned int count, + unsigned int *prev ) +{ + if (prev) *prev = sem->count; + if (sem->count + count < sem->count || sem->count + count > sem->max) + { + set_error( STATUS_SEMAPHORE_LIMIT_EXCEEDED ); + return 0; + } + else if (sem->count) + { + /* there cannot be any thread to wake up if the count is != 0 */ + sem->count += count; + } + else + { + sem->count = count; + wake_up( &sem->obj, count ); + } + return 1; +} + +static void semaphore_sync_dump( struct object *obj, int verbose ) +{ + struct semaphore_sync *sem = (struct semaphore_sync *)obj; + assert( obj->ops == &semaphore_sync_ops ); + fprintf( stderr, "Semaphore count=%d max=%d\n", sem->count, sem->max ); +} + +static int semaphore_sync_signaled( struct object *obj, struct wait_queue_entry *entry ) +{ + struct semaphore_sync *sem = (struct semaphore_sync *)obj; + assert( obj->ops == &semaphore_sync_ops ); + return (sem->count > 0); +} + +static void semaphore_sync_satisfied( struct object *obj, struct wait_queue_entry *entry ) +{ + struct semaphore_sync *sem = (struct semaphore_sync *)obj; + assert( obj->ops == &semaphore_sync_ops ); + assert( sem->count ); + sem->count--; +} + +static struct semaphore_sync *create_semaphore_sync( unsigned int initial, unsigned int max ) +{ + struct semaphore_sync *sem; + + if (!(sem = alloc_object( &semaphore_sync_ops ))) return NULL; + sem->count = initial; + sem->max = max; + return sem; +} + struct semaphore { - struct object obj; /* object header */ - unsigned int count; /* current count */ - unsigned int max; /* maximum possible count */ + struct object obj; /* object header */ + struct semaphore_sync *sync; /* semaphore sync object */ };
static void semaphore_dump( struct object *obj, int verbose ); -static int semaphore_signaled( struct object *obj, struct wait_queue_entry *entry ); -static void semaphore_satisfied( struct object *obj, struct wait_queue_entry *entry ); +static struct object *semaphore_get_sync( struct object *obj ); static int semaphore_signal( struct object *obj, unsigned int access ); +static void semaphore_destroy( struct object *obj );
static const struct object_ops semaphore_ops = { sizeof(struct semaphore), /* size */ &semaphore_type, /* type */ semaphore_dump, /* dump */ - add_queue, /* add_queue */ - remove_queue, /* remove_queue */ - semaphore_signaled, /* signaled */ - semaphore_satisfied, /* satisfied */ + NULL, /* add_queue */ + NULL, /* remove_queue */ + NULL, /* signaled */ + NULL, /* satisfied */ semaphore_signal, /* signal */ no_get_fd, /* get_fd */ - default_get_sync, /* get_sync */ + semaphore_get_sync, /* get_sync */ default_map_access, /* map_access */ default_get_sd, /* get_sd */ default_set_sd, /* set_sd */ @@ -84,10 +173,9 @@ static const struct object_ops semaphore_ops = no_open_file, /* open_file */ no_kernel_obj_list, /* get_kernel_obj_list */ no_close_handle, /* close_handle */ - no_destroy /* destroy */ + semaphore_destroy, /* destroy */ };
- static struct semaphore *create_semaphore( struct object *root, const struct unicode_str *name, unsigned int attr, unsigned int initial, unsigned int max, const struct security_descriptor *sd ) @@ -104,55 +192,30 @@ static struct semaphore *create_semaphore( struct object *root, const struct uni if (get_error() != STATUS_OBJECT_NAME_EXISTS) { /* initialize it if it didn't already exist */ - sem->count = initial; - sem->max = max; + sem->sync = NULL; + + if (!(sem->sync = create_semaphore_sync( initial, max ))) + { + release_object( sem ); + return NULL; + } } } return sem; }
-static int release_semaphore( struct semaphore *sem, unsigned int count, - unsigned int *prev ) -{ - if (prev) *prev = sem->count; - if (sem->count + count < sem->count || sem->count + count > sem->max) - { - set_error( STATUS_SEMAPHORE_LIMIT_EXCEEDED ); - return 0; - } - else if (sem->count) - { - /* there cannot be any thread to wake up if the count is != 0 */ - sem->count += count; - } - else - { - sem->count = count; - wake_up( &sem->obj, count ); - } - return 1; -} - static void semaphore_dump( struct object *obj, int verbose ) { struct semaphore *sem = (struct semaphore *)obj; assert( obj->ops == &semaphore_ops ); - fprintf( stderr, "Semaphore count=%d max=%d\n", sem->count, sem->max ); + sem->sync->obj.ops->dump( &sem->sync->obj, verbose ); }
-static int semaphore_signaled( struct object *obj, struct wait_queue_entry *entry ) +static struct object *semaphore_get_sync( struct object *obj ) { struct semaphore *sem = (struct semaphore *)obj; assert( obj->ops == &semaphore_ops ); - return (sem->count > 0); -} - -static void semaphore_satisfied( struct object *obj, struct wait_queue_entry *entry ) -{ - struct semaphore *sem = (struct semaphore *)obj; - assert( obj->ops == &semaphore_ops ); - assert( sem->count ); - sem->count--; + return grab_object( sem->sync ); }
static int semaphore_signal( struct object *obj, unsigned int access ) @@ -165,7 +228,14 @@ static int semaphore_signal( struct object *obj, unsigned int access ) set_error( STATUS_ACCESS_DENIED ); return 0; } - return release_semaphore( sem, 1, NULL ); + return release_semaphore( sem->sync, 1, NULL ); +} + +static void semaphore_destroy( struct object *obj ) +{ + struct semaphore *sem = (struct semaphore *)obj; + assert( obj->ops == &semaphore_ops ); + if (sem->sync) release_object( sem->sync ); }
/* create a semaphore */ @@ -209,7 +279,7 @@ DECL_HANDLER(release_semaphore) if ((sem = (struct semaphore *)get_handle_obj( current->process, req->handle, SEMAPHORE_MODIFY_STATE, &semaphore_ops ))) { - release_semaphore( sem, req->count, &reply->prev_count ); + release_semaphore( sem->sync, req->count, &reply->prev_count ); release_object( sem ); } } @@ -222,8 +292,8 @@ DECL_HANDLER(query_semaphore) if ((sem = (struct semaphore *)get_handle_obj( current->process, req->handle, SEMAPHORE_QUERY_STATE, &semaphore_ops ))) { - reply->current = sem->count; - reply->max = sem->max; + reply->current = sem->sync->count; + reply->max = sem->sync->max; release_object( sem ); } }
From: Rémi Bernon rbernon@codeweavers.com
--- server/queue.c | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-)
diff --git a/server/queue.c b/server/queue.c index 3b0bd1826ef..7d058454972 100644 --- a/server/queue.c +++ b/server/queue.c @@ -133,6 +133,7 @@ struct msg_queue struct hook_table *hooks; /* hook table */ timeout_t last_get_msg; /* time of last get message call */ int keystate_lock; /* owns an input keystate lock */ + int waiting; /* is thread waiting on queue */ queue_shm_t *shared; /* queue in session shared memory */ };
@@ -315,6 +316,7 @@ static struct msg_queue *create_msg_queue( struct thread *thread, struct thread_ queue->hooks = NULL; queue->last_get_msg = current_time; queue->keystate_lock = 0; + queue->waiting = 0; list_init( &queue->send_result ); list_init( &queue->callback_result ); list_init( &queue->pending_timers ); @@ -1260,16 +1262,21 @@ static void cleanup_results( struct msg_queue *queue ) /* check if the thread owning the queue is hung (not checking for messages) */ static int is_queue_hung( struct msg_queue *queue ) { - struct wait_queue_entry *entry; - if (current_time - queue->last_get_msg <= 5 * TICKS_PER_SEC) return 0; /* less than 5 seconds since last get message -> not hung */ + return !queue->waiting; +}
- LIST_FOR_EACH_ENTRY( entry, &queue->obj.wait_queue, struct wait_queue_entry, entry ) +static int msg_queue_select( struct msg_queue *queue, int events ) +{ + if (queue->waiting == !!events) { - if (get_wait_queue_thread(entry)->queue == queue) - return 0; /* thread is waiting on queue -> not hung */ + set_error( STATUS_ACCESS_DENIED ); + return 0; } + queue->waiting = !!events; + + if (queue->fd) set_fd_events( queue->fd, events ); return 1; }
@@ -1284,8 +1291,7 @@ static int msg_queue_add_queue( struct object *obj, struct wait_queue_entry *ent return 0; }
- if (queue->fd && list_empty( &obj->wait_queue )) /* first on the queue */ - set_fd_events( queue->fd, POLLIN ); + if (!msg_queue_select( queue, POLLIN )) return 0; add_queue( obj, entry ); return 1; } @@ -1295,8 +1301,7 @@ static void msg_queue_remove_queue(struct object *obj, struct wait_queue_entry * struct msg_queue *queue = (struct msg_queue *)obj;
remove_queue( obj, entry ); - if (queue->fd && list_empty( &obj->wait_queue )) /* last on the queue is gone */ - set_fd_events( queue->fd, 0 ); + msg_queue_select( queue, 0 ); }
static void msg_queue_dump( struct object *obj, int verbose ) @@ -1317,7 +1322,7 @@ static int msg_queue_signaled( struct object *obj, struct wait_queue_entry *entr if ((ret = check_fd_events( queue->fd, POLLIN ))) /* stop waiting on select() if we are signaled */ set_fd_events( queue->fd, 0 ); - else if (!list_empty( &obj->wait_queue )) + else if (queue->waiting) /* restart waiting on poll() if we are no longer signaled */ set_fd_events( queue->fd, POLLIN ); }
From: Rémi Bernon rbernon@codeweavers.com
--- server/queue.c | 62 +++++++++++++++++++++++++++----------------------- 1 file changed, 33 insertions(+), 29 deletions(-)
diff --git a/server/queue.c b/server/queue.c index 7d058454972..3845a86c962 100644 --- a/server/queue.c +++ b/server/queue.c @@ -116,6 +116,7 @@ struct msg_queue { struct object obj; /* object header */ struct fd *fd; /* optional file descriptor to poll */ + int signaled; /* queue is signaled from fd POLLIN or masks */ int paint_count; /* pending paint messages count */ int hotkey_count; /* pending hotkey messages count */ int quit_message; /* is there a pending quit message? */ @@ -305,6 +306,7 @@ static struct msg_queue *create_msg_queue( struct thread *thread, struct thread_ if ((queue = alloc_object( &msg_queue_ops ))) { queue->fd = NULL; + queue->signaled = 0; queue->paint_count = 0; queue->hotkey_count = 0; queue->quit_message = 0; @@ -707,6 +709,18 @@ void add_queue_hook_count( struct thread *thread, unsigned int index, int count assert( thread->queue->shared->hooks_count[index] >= 0 ); }
+static void signal_queue_sync( struct msg_queue *queue ) +{ + if (queue->signaled) return; + queue->signaled = 1; + wake_up( &queue->obj, 0 ); +} + +static void reset_queue_sync( struct msg_queue *queue ) +{ + queue->signaled = 0; +} + /* check the queue status */ static inline int is_signaled( struct msg_queue *queue ) { @@ -733,7 +747,7 @@ static inline void set_queue_bits( struct msg_queue *queue, unsigned int bits ) } SHARED_WRITE_END;
- if (is_signaled( queue )) wake_up( &queue->obj, 0 ); + if (is_signaled( queue )) signal_queue_sync( queue ); }
/* clear some queue bits */ @@ -753,6 +767,7 @@ static inline void clear_queue_bits( struct msg_queue *queue, unsigned int bits if (queue->keystate_lock) unlock_input_keystate( queue->input ); queue->keystate_lock = 0; } + if (!is_signaled( queue )) reset_queue_sync( queue ); }
/* check if message is matched by the filter */ @@ -1276,7 +1291,11 @@ static int msg_queue_select( struct msg_queue *queue, int events ) } queue->waiting = !!events;
- if (queue->fd) set_fd_events( queue->fd, events ); + if (queue->fd) + { + if (events && check_fd_events( queue->fd, POLLIN )) signal_queue_sync( queue ); + else set_fd_events( queue->fd, events ); + } return 1; }
@@ -1315,18 +1334,8 @@ static void msg_queue_dump( struct object *obj, int verbose ) static int msg_queue_signaled( struct object *obj, struct wait_queue_entry *entry ) { struct msg_queue *queue = (struct msg_queue *)obj; - int ret = 0; - - if (queue->fd) - { - if ((ret = check_fd_events( queue->fd, POLLIN ))) - /* stop waiting on select() if we are signaled */ - set_fd_events( queue->fd, 0 ); - else if (queue->waiting) - /* restart waiting on poll() if we are no longer signaled */ - set_fd_events( queue->fd, POLLIN ); - } - return ret || is_signaled( queue ); + assert( obj->ops == &msg_queue_ops ); + return queue->signaled; }
static void msg_queue_satisfied( struct object *obj, struct wait_queue_entry *entry ) @@ -1340,6 +1349,7 @@ static void msg_queue_satisfied( struct object *obj, struct wait_queue_entry *en shared->changed_mask = 0; } SHARED_WRITE_END; + reset_queue_sync( queue ); }
static void msg_queue_destroy( struct object *obj ) @@ -1394,7 +1404,7 @@ static void msg_queue_poll_event( struct fd *fd, int event )
if (event & (POLLERR | POLLHUP)) set_fd_events( fd, -1 ); else set_fd_events( queue->fd, 0 ); - wake_up( &queue->obj, 0 ); + signal_queue_sync( queue ); }
static void thread_input_dump( struct object *obj, int verbose ) @@ -3159,20 +3169,9 @@ DECL_HANDLER(set_queue_mask) reply->wake_bits = queue_shm->wake_bits; reply->changed_bits = queue_shm->changed_bits;
- if (is_signaled( queue )) - { - /* if skip wait is set, do what would have been done in the subsequent wait */ - if (req->skip_wait) - { - SHARED_WRITE_BEGIN( queue_shm, queue_shm_t ) - { - shared->wake_mask = 0; - shared->changed_mask = 0; - } - SHARED_WRITE_END; - } - else wake_up( &queue->obj, 0 ); - } + if (!is_signaled( queue )) reset_queue_sync( queue ); + else if (!req->skip_wait) signal_queue_sync( queue ); + else msg_queue_satisfied( &queue->obj, NULL ); } }
@@ -3193,6 +3192,8 @@ DECL_HANDLER(get_queue_status) shared->changed_bits &= ~req->clear_bits; } SHARED_WRITE_END; + + if (!is_signaled( queue )) reset_queue_sync( queue ); } else reply->wake_bits = reply->changed_bits = 0; } @@ -3390,6 +3391,8 @@ DECL_HANDLER(get_message) } SHARED_WRITE_END;
+ if (!is_signaled( queue )) reset_queue_sync( queue ); + /* then check for posted messages */ if ((filter & QS_POSTMESSAGE) && get_posted_message( queue, get_win, req->get_first, req->get_last, req->flags, reply )) @@ -3449,6 +3452,7 @@ DECL_HANDLER(get_message) } SHARED_WRITE_END;
+ reset_queue_sync( queue ); set_error( STATUS_PENDING ); /* FIXME */ }
``` /* check if the thread owning the queue is hung (not checking for messages) */ static int is_queue_hung( struct msg_queue *queue ) { - struct wait_queue_entry *entry; - if (current_time - queue->last_get_msg <= 5 * TICKS_PER_SEC) return 0; /* less than 5 seconds since last get message -> not hung */ + return !queue->waiting; +} ```
That changes the definition in a rather non-obvious way, meaning this commit changes behaviour and refactors at the same time. That should be split, and this should be justified. It looks to me like it's going to cause a lot more spurious "hung" treatment.
What's the purpose of patches 1-3?
What's the purpose of patches 1-3?
To prepare the objects to later use inproc syncs isntead of server syncs.
That changes the definition in a rather non-obvious way, meaning this commit changes behaviour and refactors at the same time. That should be split, and this should be justified. It looks to me like it's going to cause a lot more spurious "hung" treatment.
I don't see how this changes the definition? The `waiting` flag indicates whether a thread is currently waiting or not on its queue, this is the same as checking the wait queues for an entry that waits on the queue?
What's the purpose of patches 1-3?
To prepare the objects to later use inproc syncs isntead of server syncs.
How?
That changes the definition in a rather non-obvious way, meaning this commit changes behaviour and refactors at the same time. That should be split, and this should be justified. It looks to me like it's going to cause a lot more spurious "hung" treatment.
I don't see how this changes the definition? The `waiting` flag indicates whether a thread is currently waiting or not on its queue, this is the same as checking the wait queues for an entry that waits on the queue?
Gah, I'm sorry, I cannot read. I somehow misread the diff as removing the 5-second logic.
How?
The same way as with the event / event_sync separation. Later, `inproc_sync` objects will be created in place of each of the event_sync / mutex_sync / semaphore_sync objects which are the server sync implementation.
Fwiw I pushed my current plan to https://gitlab.winehq.org/rbernon/wine/-/commits/wip/ntsync