Splitting mutex and semaphore, preparing the message queue to use both server and inproc syncs.
From: Rémi Bernon rbernon@codeweavers.com
--- server/mutex.c | 43 ++++++++++++++++++++----------------------- 1 file changed, 20 insertions(+), 23 deletions(-)
diff --git a/server/mutex.c b/server/mutex.c index 4737b6f711b..7ba79181dde 100644 --- a/server/mutex.c +++ b/server/mutex.c @@ -105,13 +105,22 @@ static void do_grab( struct mutex *mutex, struct thread *thread ) }
/* release a mutex once the recursion count is 0 */ -static void do_release( struct mutex *mutex ) +static int do_release( struct mutex *mutex, int count ) { - assert( !mutex->count ); - /* remove the mutex from the thread list of owned mutexes */ - list_remove( &mutex->entry ); - mutex->owner = NULL; - wake_up( &mutex->obj, 0 ); + if (!mutex->count || (mutex->owner != current)) + { + set_error( STATUS_MUTANT_NOT_OWNED ); + return 0; + } + mutex->count -= count; + if (!mutex->count) + { + /* remove the mutex from the thread list of owned mutexes */ + list_remove( &mutex->entry ); + mutex->owner = NULL; + wake_up( &mutex->obj, 0 ); + } + return 1; }
static struct mutex *create_mutex( struct object *root, const struct unicode_str *name, @@ -141,9 +150,8 @@ void abandon_mutexes( struct thread *thread ) { struct mutex *mutex = LIST_ENTRY( ptr, struct mutex, entry ); assert( mutex->owner == thread ); - mutex->count = 0; mutex->abandoned = 1; - do_release( mutex ); + do_release( mutex, mutex->count ); } }
@@ -181,13 +189,7 @@ static int mutex_signal( struct object *obj, unsigned int access ) set_error( STATUS_ACCESS_DENIED ); return 0; } - if (!mutex->count || (mutex->owner != current)) - { - set_error( STATUS_MUTANT_NOT_OWNED ); - return 0; - } - if (!--mutex->count) do_release( mutex ); - return 1; + return do_release( mutex, 1 ); }
static void mutex_destroy( struct object *obj ) @@ -196,8 +198,7 @@ static void mutex_destroy( struct object *obj ) assert( obj->ops == &mutex_ops );
if (!mutex->count) return; - mutex->count = 0; - do_release( mutex ); + do_release( mutex, mutex->count ); }
/* create a mutex */ @@ -241,12 +242,8 @@ DECL_HANDLER(release_mutex) if ((mutex = (struct mutex *)get_handle_obj( current->process, req->handle, 0, &mutex_ops ))) { - if (!mutex->count || (mutex->owner != current)) set_error( STATUS_MUTANT_NOT_OWNED ); - else - { - reply->prev_count = mutex->count; - if (!--mutex->count) do_release( mutex ); - } + reply->prev_count = mutex->count; + do_release( mutex, 1 ); release_object( mutex ); } }
From: Rémi Bernon rbernon@codeweavers.com
--- server/mutex.c | 168 ++++++++++++++++++++++++++++++++++--------------- 1 file changed, 119 insertions(+), 49 deletions(-)
diff --git a/server/mutex.c b/server/mutex.c index 7ba79181dde..f56cc6fab75 100644 --- a/server/mutex.c +++ b/server/mutex.c @@ -50,31 +50,30 @@ struct type_descr mutex_type = }, };
-struct mutex +struct mutex_sync { - struct object obj; /* object header */ - struct thread *owner; /* mutex owner */ - unsigned int count; /* recursion count */ - int abandoned; /* has it been abandoned? */ - struct list entry; /* entry in owner thread mutex list */ + struct object obj; /* object header */ + struct thread *owner; /* mutex owner */ + unsigned int count; /* recursion count */ + int abandoned; /* has it been abandoned? */ + struct list entry; /* entry in owner thread mutex list */ };
-static void mutex_dump( struct object *obj, int verbose ); -static int mutex_signaled( struct object *obj, struct wait_queue_entry *entry ); -static void mutex_satisfied( struct object *obj, struct wait_queue_entry *entry ); -static void mutex_destroy( struct object *obj ); -static int mutex_signal( struct object *obj, unsigned int access ); +static void mutex_sync_dump( struct object *obj, int verbose ); +static int mutex_sync_signaled( struct object *obj, struct wait_queue_entry *entry ); +static void mutex_sync_satisfied( struct object *obj, struct wait_queue_entry *entry ); +static void mutex_sync_destroy( struct object *obj );
-static const struct object_ops mutex_ops = +static const struct object_ops mutex_sync_ops = { - sizeof(struct mutex), /* size */ - &mutex_type, /* type */ - mutex_dump, /* dump */ + sizeof(struct mutex_sync), /* size */ + &no_type, /* type */ + mutex_sync_dump, /* dump */ add_queue, /* add_queue */ remove_queue, /* remove_queue */ - mutex_signaled, /* signaled */ - mutex_satisfied, /* satisfied */ - mutex_signal, /* signal */ + mutex_sync_signaled, /* signaled */ + mutex_sync_satisfied, /* satisfied */ + no_signal, /* signal */ no_get_fd, /* get_fd */ default_get_sync, /* get_sync */ default_map_access, /* map_access */ @@ -87,12 +86,11 @@ static const struct object_ops mutex_ops = no_open_file, /* open_file */ no_kernel_obj_list, /* get_kernel_obj_list */ no_close_handle, /* close_handle */ - mutex_destroy /* destroy */ + mutex_sync_destroy, /* destroy */ };
- /* grab a mutex for a given thread */ -static void do_grab( struct mutex *mutex, struct thread *thread ) +static void do_grab( struct mutex_sync *mutex, struct thread *thread ) { assert( !mutex->count || (mutex->owner == thread) );
@@ -105,7 +103,7 @@ static void do_grab( struct mutex *mutex, struct thread *thread ) }
/* release a mutex once the recursion count is 0 */ -static int do_release( struct mutex *mutex, int count ) +static int do_release( struct mutex_sync *mutex, int count ) { if (!mutex->count || (mutex->owner != current)) { @@ -123,6 +121,87 @@ static int do_release( struct mutex *mutex, int count ) return 1; }
+static void mutex_sync_dump( struct object *obj, int verbose ) +{ + struct mutex_sync *mutex = (struct mutex_sync *)obj; + assert( obj->ops == &mutex_sync_ops ); + fprintf( stderr, "Mutex count=%u owner=%p\n", mutex->count, mutex->owner ); +} + +static void mutex_sync_destroy( struct object *obj ) +{ + struct mutex_sync *mutex = (struct mutex_sync *)obj; + assert( obj->ops == &mutex_sync_ops ); + + if (mutex->count) do_release( mutex, mutex->count ); +} + +static int mutex_sync_signaled( struct object *obj, struct wait_queue_entry *entry ) +{ + struct mutex_sync *mutex = (struct mutex_sync *)obj; + assert( obj->ops == &mutex_sync_ops ); + return (!mutex->count || (mutex->owner == get_wait_queue_thread( entry ))); +} + +static void mutex_sync_satisfied( struct object *obj, struct wait_queue_entry *entry ) +{ + struct mutex_sync *mutex = (struct mutex_sync *)obj; + assert( obj->ops == &mutex_sync_ops ); + + do_grab( mutex, get_wait_queue_thread( entry )); + if (mutex->abandoned) make_wait_abandoned( entry ); + mutex->abandoned = 0; +} + +static struct mutex_sync *create_mutex_sync( int owned ) +{ + struct mutex_sync *mutex; + + if (!(mutex = alloc_object( &mutex_sync_ops ))) return NULL; + mutex->count = 0; + mutex->owner = NULL; + mutex->abandoned = 0; + if (owned) do_grab( mutex, current ); + + return mutex; +} + +struct mutex +{ + struct object obj; /* object header */ + struct mutex_sync *sync; /* mutex sync object */ +}; + +static void mutex_dump( struct object *obj, int verbose ); +static struct object *mutex_get_sync( struct object *obj ); +static int mutex_signal( struct object *obj, unsigned int access ); +static void mutex_destroy( struct object *obj ); + +static const struct object_ops mutex_ops = +{ + sizeof(struct mutex), /* size */ + &mutex_type, /* type */ + mutex_dump, /* dump */ + NULL, /* add_queue */ + NULL, /* remove_queue */ + NULL, /* signaled */ + NULL, /* satisfied */ + mutex_signal, /* signal */ + no_get_fd, /* get_fd */ + mutex_get_sync, /* get_sync */ + default_map_access, /* map_access */ + default_get_sd, /* get_sd */ + default_set_sd, /* set_sd */ + default_get_full_name, /* get_full_name */ + no_lookup_name, /* lookup_name */ + directory_link_name, /* link_name */ + default_unlink_name, /* unlink_name */ + no_open_file, /* open_file */ + no_kernel_obj_list, /* get_kernel_obj_list */ + no_close_handle, /* close_handle */ + mutex_destroy, /* destroy */ +}; + static struct mutex *create_mutex( struct object *root, const struct unicode_str *name, unsigned int attr, int owned, const struct security_descriptor *sd ) { @@ -133,10 +212,13 @@ static struct mutex *create_mutex( struct object *root, const struct unicode_str if (get_error() != STATUS_OBJECT_NAME_EXISTS) { /* initialize it if it didn't already exist */ - mutex->count = 0; - mutex->owner = NULL; - mutex->abandoned = 0; - if (owned) do_grab( mutex, current ); + mutex->sync = NULL; + + if (!(mutex->sync = create_mutex_sync( owned ))) + { + release_object( mutex ); + return NULL; + } } } return mutex; @@ -148,7 +230,7 @@ void abandon_mutexes( struct thread *thread )
while ((ptr = list_head( &thread->mutex_list )) != NULL) { - struct mutex *mutex = LIST_ENTRY( ptr, struct mutex, entry ); + struct mutex_sync *mutex = LIST_ENTRY( ptr, struct mutex_sync, entry ); assert( mutex->owner == thread ); mutex->abandoned = 1; do_release( mutex, mutex->count ); @@ -159,24 +241,14 @@ static void mutex_dump( struct object *obj, int verbose ) { struct mutex *mutex = (struct mutex *)obj; assert( obj->ops == &mutex_ops ); - fprintf( stderr, "Mutex count=%u owner=%p\n", mutex->count, mutex->owner ); + mutex->sync->obj.ops->dump( &mutex->sync->obj, verbose ); }
-static int mutex_signaled( struct object *obj, struct wait_queue_entry *entry ) +static struct object *mutex_get_sync( struct object *obj ) { struct mutex *mutex = (struct mutex *)obj; assert( obj->ops == &mutex_ops ); - return (!mutex->count || (mutex->owner == get_wait_queue_thread( entry ))); -} - -static void mutex_satisfied( struct object *obj, struct wait_queue_entry *entry ) -{ - struct mutex *mutex = (struct mutex *)obj; - assert( obj->ops == &mutex_ops ); - - do_grab( mutex, get_wait_queue_thread( entry )); - if (mutex->abandoned) make_wait_abandoned( entry ); - mutex->abandoned = 0; + return grab_object( mutex->sync ); }
static int mutex_signal( struct object *obj, unsigned int access ) @@ -189,16 +261,14 @@ static int mutex_signal( struct object *obj, unsigned int access ) set_error( STATUS_ACCESS_DENIED ); return 0; } - return do_release( mutex, 1 ); + return do_release( mutex->sync, 1 ); }
static void mutex_destroy( struct object *obj ) { struct mutex *mutex = (struct mutex *)obj; assert( obj->ops == &mutex_ops ); - - if (!mutex->count) return; - do_release( mutex, mutex->count ); + if (mutex->sync) release_object( mutex->sync ); }
/* create a mutex */ @@ -242,8 +312,8 @@ DECL_HANDLER(release_mutex) if ((mutex = (struct mutex *)get_handle_obj( current->process, req->handle, 0, &mutex_ops ))) { - reply->prev_count = mutex->count; - do_release( mutex, 1 ); + reply->prev_count = mutex->sync->count; + do_release( mutex->sync, 1 ); release_object( mutex ); } } @@ -256,9 +326,9 @@ DECL_HANDLER(query_mutex) if ((mutex = (struct mutex *)get_handle_obj( current->process, req->handle, MUTANT_QUERY_STATE, &mutex_ops ))) { - reply->count = mutex->count; - reply->owned = (mutex->owner == current); - reply->abandoned = mutex->abandoned; + reply->count = mutex->sync->count; + reply->owned = (mutex->sync->owner == current); + reply->abandoned = mutex->sync->abandoned;
release_object( mutex ); }
From: Rémi Bernon rbernon@codeweavers.com
--- server/semaphore.c | 172 +++++++++++++++++++++++++++++++-------------- 1 file changed, 121 insertions(+), 51 deletions(-)
diff --git a/server/semaphore.c b/server/semaphore.c index 304a821bcec..4b31bfe806c 100644 --- a/server/semaphore.c +++ b/server/semaphore.c @@ -50,30 +50,119 @@ struct type_descr semaphore_type = }, };
+struct semaphore_sync +{ + struct object obj; /* object header */ + unsigned int count; /* current count */ + unsigned int max; /* maximum possible count */ +}; + +static void semaphore_sync_dump( struct object *obj, int verbose ); +static int semaphore_sync_signaled( struct object *obj, struct wait_queue_entry *entry ); +static void semaphore_sync_satisfied( struct object *obj, struct wait_queue_entry *entry ); + +static const struct object_ops semaphore_sync_ops = +{ + sizeof(struct semaphore_sync), /* size */ + &no_type, /* type */ + semaphore_sync_dump, /* dump */ + add_queue, /* add_queue */ + remove_queue, /* remove_queue */ + semaphore_sync_signaled, /* signaled */ + semaphore_sync_satisfied, /* satisfied */ + no_signal, /* signal */ + no_get_fd, /* get_fd */ + default_get_sync, /* get_sync */ + default_map_access, /* map_access */ + default_get_sd, /* get_sd */ + default_set_sd, /* set_sd */ + default_get_full_name, /* get_full_name */ + no_lookup_name, /* lookup_name */ + directory_link_name, /* link_name */ + default_unlink_name, /* unlink_name */ + no_open_file, /* open_file */ + no_kernel_obj_list, /* get_kernel_obj_list */ + no_close_handle, /* close_handle */ + no_destroy /* destroy */ +}; + +static int release_semaphore( struct semaphore_sync *sem, unsigned int count, + unsigned int *prev ) +{ + if (prev) *prev = sem->count; + if (sem->count + count < sem->count || sem->count + count > sem->max) + { + set_error( STATUS_SEMAPHORE_LIMIT_EXCEEDED ); + return 0; + } + else if (sem->count) + { + /* there cannot be any thread to wake up if the count is != 0 */ + sem->count += count; + } + else + { + sem->count = count; + wake_up( &sem->obj, count ); + } + return 1; +} + +static void semaphore_sync_dump( struct object *obj, int verbose ) +{ + struct semaphore_sync *sem = (struct semaphore_sync *)obj; + assert( obj->ops == &semaphore_sync_ops ); + fprintf( stderr, "Semaphore count=%d max=%d\n", sem->count, sem->max ); +} + +static int semaphore_sync_signaled( struct object *obj, struct wait_queue_entry *entry ) +{ + struct semaphore_sync *sem = (struct semaphore_sync *)obj; + assert( obj->ops == &semaphore_sync_ops ); + return (sem->count > 0); +} + +static void semaphore_sync_satisfied( struct object *obj, struct wait_queue_entry *entry ) +{ + struct semaphore_sync *sem = (struct semaphore_sync *)obj; + assert( obj->ops == &semaphore_sync_ops ); + assert( sem->count ); + sem->count--; +} + +static struct semaphore_sync *create_semaphore_sync( unsigned int initial, unsigned int max ) +{ + struct semaphore_sync *sem; + + if (!(sem = alloc_object( &semaphore_sync_ops ))) return NULL; + sem->count = initial; + sem->max = max; + return sem; +} + struct semaphore { - struct object obj; /* object header */ - unsigned int count; /* current count */ - unsigned int max; /* maximum possible count */ + struct object obj; /* object header */ + struct semaphore_sync *sync; /* semaphore sync object */ };
static void semaphore_dump( struct object *obj, int verbose ); -static int semaphore_signaled( struct object *obj, struct wait_queue_entry *entry ); -static void semaphore_satisfied( struct object *obj, struct wait_queue_entry *entry ); +static struct object *semaphore_get_sync( struct object *obj ); static int semaphore_signal( struct object *obj, unsigned int access ); +static void semaphore_destroy( struct object *obj );
static const struct object_ops semaphore_ops = { sizeof(struct semaphore), /* size */ &semaphore_type, /* type */ semaphore_dump, /* dump */ - add_queue, /* add_queue */ - remove_queue, /* remove_queue */ - semaphore_signaled, /* signaled */ - semaphore_satisfied, /* satisfied */ + NULL, /* add_queue */ + NULL, /* remove_queue */ + NULL, /* signaled */ + NULL, /* satisfied */ semaphore_signal, /* signal */ no_get_fd, /* get_fd */ - default_get_sync, /* get_sync */ + semaphore_get_sync, /* get_sync */ default_map_access, /* map_access */ default_get_sd, /* get_sd */ default_set_sd, /* set_sd */ @@ -84,10 +173,9 @@ static const struct object_ops semaphore_ops = no_open_file, /* open_file */ no_kernel_obj_list, /* get_kernel_obj_list */ no_close_handle, /* close_handle */ - no_destroy /* destroy */ + semaphore_destroy, /* destroy */ };
- static struct semaphore *create_semaphore( struct object *root, const struct unicode_str *name, unsigned int attr, unsigned int initial, unsigned int max, const struct security_descriptor *sd ) @@ -104,55 +192,30 @@ static struct semaphore *create_semaphore( struct object *root, const struct uni if (get_error() != STATUS_OBJECT_NAME_EXISTS) { /* initialize it if it didn't already exist */ - sem->count = initial; - sem->max = max; + sem->sync = NULL; + + if (!(sem->sync = create_semaphore_sync( initial, max ))) + { + release_object( sem ); + return NULL; + } } } return sem; }
-static int release_semaphore( struct semaphore *sem, unsigned int count, - unsigned int *prev ) -{ - if (prev) *prev = sem->count; - if (sem->count + count < sem->count || sem->count + count > sem->max) - { - set_error( STATUS_SEMAPHORE_LIMIT_EXCEEDED ); - return 0; - } - else if (sem->count) - { - /* there cannot be any thread to wake up if the count is != 0 */ - sem->count += count; - } - else - { - sem->count = count; - wake_up( &sem->obj, count ); - } - return 1; -} - static void semaphore_dump( struct object *obj, int verbose ) { struct semaphore *sem = (struct semaphore *)obj; assert( obj->ops == &semaphore_ops ); - fprintf( stderr, "Semaphore count=%d max=%d\n", sem->count, sem->max ); + sem->sync->obj.ops->dump( &sem->sync->obj, verbose ); }
-static int semaphore_signaled( struct object *obj, struct wait_queue_entry *entry ) +static struct object *semaphore_get_sync( struct object *obj ) { struct semaphore *sem = (struct semaphore *)obj; assert( obj->ops == &semaphore_ops ); - return (sem->count > 0); -} - -static void semaphore_satisfied( struct object *obj, struct wait_queue_entry *entry ) -{ - struct semaphore *sem = (struct semaphore *)obj; - assert( obj->ops == &semaphore_ops ); - assert( sem->count ); - sem->count--; + return grab_object( sem->sync ); }
static int semaphore_signal( struct object *obj, unsigned int access ) @@ -165,7 +228,14 @@ static int semaphore_signal( struct object *obj, unsigned int access ) set_error( STATUS_ACCESS_DENIED ); return 0; } - return release_semaphore( sem, 1, NULL ); + return release_semaphore( sem->sync, 1, NULL ); +} + +static void semaphore_destroy( struct object *obj ) +{ + struct semaphore *sem = (struct semaphore *)obj; + assert( obj->ops == &semaphore_ops ); + if (sem->sync) release_object( sem->sync ); }
/* create a semaphore */ @@ -209,7 +279,7 @@ DECL_HANDLER(release_semaphore) if ((sem = (struct semaphore *)get_handle_obj( current->process, req->handle, SEMAPHORE_MODIFY_STATE, &semaphore_ops ))) { - release_semaphore( sem, req->count, &reply->prev_count ); + release_semaphore( sem->sync, req->count, &reply->prev_count ); release_object( sem ); } } @@ -222,8 +292,8 @@ DECL_HANDLER(query_semaphore) if ((sem = (struct semaphore *)get_handle_obj( current->process, req->handle, SEMAPHORE_QUERY_STATE, &semaphore_ops ))) { - reply->current = sem->count; - reply->max = sem->max; + reply->current = sem->sync->count; + reply->max = sem->sync->max; release_object( sem ); } }
From: Rémi Bernon rbernon@codeweavers.com
--- server/queue.c | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-)
diff --git a/server/queue.c b/server/queue.c index 3b0bd1826ef..7d058454972 100644 --- a/server/queue.c +++ b/server/queue.c @@ -133,6 +133,7 @@ struct msg_queue struct hook_table *hooks; /* hook table */ timeout_t last_get_msg; /* time of last get message call */ int keystate_lock; /* owns an input keystate lock */ + int waiting; /* is thread waiting on queue */ queue_shm_t *shared; /* queue in session shared memory */ };
@@ -315,6 +316,7 @@ static struct msg_queue *create_msg_queue( struct thread *thread, struct thread_ queue->hooks = NULL; queue->last_get_msg = current_time; queue->keystate_lock = 0; + queue->waiting = 0; list_init( &queue->send_result ); list_init( &queue->callback_result ); list_init( &queue->pending_timers ); @@ -1260,16 +1262,21 @@ static void cleanup_results( struct msg_queue *queue ) /* check if the thread owning the queue is hung (not checking for messages) */ static int is_queue_hung( struct msg_queue *queue ) { - struct wait_queue_entry *entry; - if (current_time - queue->last_get_msg <= 5 * TICKS_PER_SEC) return 0; /* less than 5 seconds since last get message -> not hung */ + return !queue->waiting; +}
- LIST_FOR_EACH_ENTRY( entry, &queue->obj.wait_queue, struct wait_queue_entry, entry ) +static int msg_queue_select( struct msg_queue *queue, int events ) +{ + if (queue->waiting == !!events) { - if (get_wait_queue_thread(entry)->queue == queue) - return 0; /* thread is waiting on queue -> not hung */ + set_error( STATUS_ACCESS_DENIED ); + return 0; } + queue->waiting = !!events; + + if (queue->fd) set_fd_events( queue->fd, events ); return 1; }
@@ -1284,8 +1291,7 @@ static int msg_queue_add_queue( struct object *obj, struct wait_queue_entry *ent return 0; }
- if (queue->fd && list_empty( &obj->wait_queue )) /* first on the queue */ - set_fd_events( queue->fd, POLLIN ); + if (!msg_queue_select( queue, POLLIN )) return 0; add_queue( obj, entry ); return 1; } @@ -1295,8 +1301,7 @@ static void msg_queue_remove_queue(struct object *obj, struct wait_queue_entry * struct msg_queue *queue = (struct msg_queue *)obj;
remove_queue( obj, entry ); - if (queue->fd && list_empty( &obj->wait_queue )) /* last on the queue is gone */ - set_fd_events( queue->fd, 0 ); + msg_queue_select( queue, 0 ); }
static void msg_queue_dump( struct object *obj, int verbose ) @@ -1317,7 +1322,7 @@ static int msg_queue_signaled( struct object *obj, struct wait_queue_entry *entr if ((ret = check_fd_events( queue->fd, POLLIN ))) /* stop waiting on select() if we are signaled */ set_fd_events( queue->fd, 0 ); - else if (!list_empty( &obj->wait_queue )) + else if (queue->waiting) /* restart waiting on poll() if we are no longer signaled */ set_fd_events( queue->fd, POLLIN ); }
From: Rémi Bernon rbernon@codeweavers.com
--- server/queue.c | 62 +++++++++++++++++++++++++++----------------------- 1 file changed, 33 insertions(+), 29 deletions(-)
diff --git a/server/queue.c b/server/queue.c index 7d058454972..3845a86c962 100644 --- a/server/queue.c +++ b/server/queue.c @@ -116,6 +116,7 @@ struct msg_queue { struct object obj; /* object header */ struct fd *fd; /* optional file descriptor to poll */ + int signaled; /* queue is signaled from fd POLLIN or masks */ int paint_count; /* pending paint messages count */ int hotkey_count; /* pending hotkey messages count */ int quit_message; /* is there a pending quit message? */ @@ -305,6 +306,7 @@ static struct msg_queue *create_msg_queue( struct thread *thread, struct thread_ if ((queue = alloc_object( &msg_queue_ops ))) { queue->fd = NULL; + queue->signaled = 0; queue->paint_count = 0; queue->hotkey_count = 0; queue->quit_message = 0; @@ -707,6 +709,18 @@ void add_queue_hook_count( struct thread *thread, unsigned int index, int count assert( thread->queue->shared->hooks_count[index] >= 0 ); }
+static void signal_queue_sync( struct msg_queue *queue ) +{ + if (queue->signaled) return; + queue->signaled = 1; + wake_up( &queue->obj, 0 ); +} + +static void reset_queue_sync( struct msg_queue *queue ) +{ + queue->signaled = 0; +} + /* check the queue status */ static inline int is_signaled( struct msg_queue *queue ) { @@ -733,7 +747,7 @@ static inline void set_queue_bits( struct msg_queue *queue, unsigned int bits ) } SHARED_WRITE_END;
- if (is_signaled( queue )) wake_up( &queue->obj, 0 ); + if (is_signaled( queue )) signal_queue_sync( queue ); }
/* clear some queue bits */ @@ -753,6 +767,7 @@ static inline void clear_queue_bits( struct msg_queue *queue, unsigned int bits if (queue->keystate_lock) unlock_input_keystate( queue->input ); queue->keystate_lock = 0; } + if (!is_signaled( queue )) reset_queue_sync( queue ); }
/* check if message is matched by the filter */ @@ -1276,7 +1291,11 @@ static int msg_queue_select( struct msg_queue *queue, int events ) } queue->waiting = !!events;
- if (queue->fd) set_fd_events( queue->fd, events ); + if (queue->fd) + { + if (events && check_fd_events( queue->fd, POLLIN )) signal_queue_sync( queue ); + else set_fd_events( queue->fd, events ); + } return 1; }
@@ -1315,18 +1334,8 @@ static void msg_queue_dump( struct object *obj, int verbose ) static int msg_queue_signaled( struct object *obj, struct wait_queue_entry *entry ) { struct msg_queue *queue = (struct msg_queue *)obj; - int ret = 0; - - if (queue->fd) - { - if ((ret = check_fd_events( queue->fd, POLLIN ))) - /* stop waiting on select() if we are signaled */ - set_fd_events( queue->fd, 0 ); - else if (queue->waiting) - /* restart waiting on poll() if we are no longer signaled */ - set_fd_events( queue->fd, POLLIN ); - } - return ret || is_signaled( queue ); + assert( obj->ops == &msg_queue_ops ); + return queue->signaled; }
static void msg_queue_satisfied( struct object *obj, struct wait_queue_entry *entry ) @@ -1340,6 +1349,7 @@ static void msg_queue_satisfied( struct object *obj, struct wait_queue_entry *en shared->changed_mask = 0; } SHARED_WRITE_END; + reset_queue_sync( queue ); }
static void msg_queue_destroy( struct object *obj ) @@ -1394,7 +1404,7 @@ static void msg_queue_poll_event( struct fd *fd, int event )
if (event & (POLLERR | POLLHUP)) set_fd_events( fd, -1 ); else set_fd_events( queue->fd, 0 ); - wake_up( &queue->obj, 0 ); + signal_queue_sync( queue ); }
static void thread_input_dump( struct object *obj, int verbose ) @@ -3159,20 +3169,9 @@ DECL_HANDLER(set_queue_mask) reply->wake_bits = queue_shm->wake_bits; reply->changed_bits = queue_shm->changed_bits;
- if (is_signaled( queue )) - { - /* if skip wait is set, do what would have been done in the subsequent wait */ - if (req->skip_wait) - { - SHARED_WRITE_BEGIN( queue_shm, queue_shm_t ) - { - shared->wake_mask = 0; - shared->changed_mask = 0; - } - SHARED_WRITE_END; - } - else wake_up( &queue->obj, 0 ); - } + if (!is_signaled( queue )) reset_queue_sync( queue ); + else if (!req->skip_wait) signal_queue_sync( queue ); + else msg_queue_satisfied( &queue->obj, NULL ); } }
@@ -3193,6 +3192,8 @@ DECL_HANDLER(get_queue_status) shared->changed_bits &= ~req->clear_bits; } SHARED_WRITE_END; + + if (!is_signaled( queue )) reset_queue_sync( queue ); } else reply->wake_bits = reply->changed_bits = 0; } @@ -3390,6 +3391,8 @@ DECL_HANDLER(get_message) } SHARED_WRITE_END;
+ if (!is_signaled( queue )) reset_queue_sync( queue ); + /* then check for posted messages */ if ((filter & QS_POSTMESSAGE) && get_posted_message( queue, get_win, req->get_first, req->get_last, req->flags, reply )) @@ -3449,6 +3452,7 @@ DECL_HANDLER(get_message) } SHARED_WRITE_END;
+ reset_queue_sync( queue ); set_error( STATUS_PENDING ); /* FIXME */ }