Changes server to use a posix semaphore instead of an int. This patch alone would be pointless, but creates a mechanism for client processes to interact with the same semaphores. --- server/Makefile.in | 2 +- server/object.h | 4 + server/semaphore.c | 272 +++++++++++++++++++++++++++++++++++++++++++++++------ server/thread.c | 54 ++++++++++- 4 files changed, 297 insertions(+), 35 deletions(-)
diff --git a/server/Makefile.in b/server/Makefile.in index 19a4fac..48a4522 100644 --- a/server/Makefile.in +++ b/server/Makefile.in @@ -1,4 +1,4 @@ -EXTRALIBS = $(POLL_LIBS) $(RT_LIBS) +EXTRALIBS = $(POLL_LIBS) $(RT_LIBS) $(PTHREAD_LIBS)
C_SRCS = \ async.c \ diff --git a/server/object.h b/server/object.h index b59811f..ead4c50 100644 --- a/server/object.h +++ b/server/object.h @@ -90,6 +90,10 @@ struct object_ops int (*close_handle)(struct object *,struct process *,obj_handle_t); /* destroy on refcount == 0 */ void (*destroy)(struct object *); + /* like signaled(), but tries to get the lock, returns 1 upon success */ + int (*trylock)(struct object *,struct wait_queue_entry *); + /* undo a previously sucessful call to trylock */ + void (*trylock_undo)(struct object *,struct wait_queue_entry *); };
struct object diff --git a/server/semaphore.c b/server/semaphore.c index d87325c..6900804 100644 --- a/server/semaphore.c +++ b/server/semaphore.c @@ -26,6 +26,21 @@ #include <stdlib.h> #include <stdarg.h>
+#ifdef ENABLE_HYBRID_SYNC +# include <fcntl.h> +# include <sys/stat.h> +# include <errno.h> +# ifdef HAVE_SEMAPHORE_H +# include <semaphore.h> +# endif +# ifdef HAVE_SYS_TYPES_H +# include <sys/types.h> +# endif +# ifdef HAVE_UNISTD_H +# include <unistd.h> +# endif +#endif + #include "ntstatus.h" #define WIN32_NO_STATUS #include "windef.h" @@ -36,10 +51,17 @@ #include "request.h" #include "security.h"
+#define NATIVE_SEMAPHORE_MAX_NAME (32) struct semaphore { struct object obj; /* object header */ +#ifdef ENABLE_HYBRID_SYNC + sem_t *p; + unsigned int key; + char name[NATIVE_SEMAPHORE_MAX_NAME]; +#else unsigned int count; /* current count */ +#endif /* ENABLE_HYBRID_SYNC */ unsigned int max; /* maximum possible count */ };
@@ -49,6 +71,14 @@ static int semaphore_signaled( struct object *obj, struct wait_queue_entry *entr static void semaphore_satisfied( struct object *obj, struct wait_queue_entry *entry ); static unsigned int semaphore_map_access( struct object *obj, unsigned int access ); static int semaphore_signal( struct object *obj, unsigned int access ); +static void semaphore_new( struct semaphore *sem, unsigned int initial ); +static unsigned int semaphore_get_value( struct semaphore *sem ); + +#ifdef ENABLE_HYBRID_SYNC +static void semaphore_destroy( struct object *obj ); +static int semaphore_trylock( struct object *obj, struct wait_queue_entry *entry ); +static void semaphore_trylock_undo( struct object *obj, struct wait_queue_entry *entry ); +#endif /* ENABLE_HYBRID_SYNC */
static const struct object_ops semaphore_ops = { @@ -67,35 +97,32 @@ static const struct object_ops semaphore_ops = no_lookup_name, /* lookup_name */ no_open_file, /* open_file */ no_close_handle, /* close_handle */ - no_destroy /* destroy */ +#ifdef ENABLE_HYBRID_SYNC + semaphore_destroy, /* destroy */ + semaphore_trylock, /* trylock */ + semaphore_trylock_undo /* trylock_undo */ +#else + no_destroy, /* destroy */ + NULL, /* trylock */ + NULL /* trylock_undo */ +#endif };
+#ifndef ENABLE_HYBRID_SYNC
-static struct semaphore *create_semaphore( struct directory *root, const struct unicode_str *name, - unsigned int attr, unsigned int initial, unsigned int max, - const struct security_descriptor *sd ) +static inline void semaphore_new( struct semaphore *sem, unsigned int initial ) { - struct semaphore *sem; + sem->count = initial; +}
- if (!max || (initial > max)) - { - set_error( STATUS_INVALID_PARAMETER ); - return NULL; - } - if ((sem = create_named_object_dir( root, name, attr, &semaphore_ops ))) - { - if (get_error() != STATUS_OBJECT_NAME_EXISTS) - { - /* initialize it if it didn't already exist */ - sem->count = initial; - sem->max = max; - if (sd) default_set_sd( &sem->obj, sd, OWNER_SECURITY_INFORMATION| - GROUP_SECURITY_INFORMATION| - DACL_SECURITY_INFORMATION| - SACL_SECURITY_INFORMATION ); - } - } - return sem; +static inline unsigned int semaphore_get_value( struct semaphore *sem ) +{ + return sem->count; +} + +static inline void semaphore_down( struct semaphore *sem ) +{ + --sem->count; }
static int release_semaphore( struct semaphore *sem, unsigned int count, @@ -119,12 +146,188 @@ static int release_semaphore( struct semaphore *sem, unsigned int count, } return 1; } +#else /* ENABLE_HYBRID_SYNC */ + +static DWORD next_semaphore_id = 0; + +static void semaphore_new( struct semaphore *sem, unsigned int initial ) +{ + /* imperfect, but good enough for now */ + int tries; + for (tries = 0; tries < 0x10000; ++tries) + { + DWORD pid = (DWORD)getpid(); + DWORD reverse_pid = 0; + DWORD bit; + + for (bit = (1 << (sizeof(DWORD) * 8 - 1)); bit && pid; bit >>= 1, pid >>= 1) + { + if (pid & 1) + reverse_pid |= bit; + } + + sem->key = reverse_pid ^ next_semaphore_id++; + snprintf(sem->name, sizeof(sem->name) - 1, "/wine-sem-%08x", sem->key); + sem->p = sem_open(sem->name, O_CREAT | O_EXCL, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP, initial); + + if (sem->p != SEM_FAILED) + { + /* fprintf(stderr, "semaphore_new: initial = %u, sem = %p, sem->p = %p, sem->name = %s\n", + initial, sem, sem->p, sem->name); */ + return; + } + + switch (errno) + { + case EACCES: + case EEXIST: + continue; + + case EINVAL: + set_error( STATUS_INVALID_PARAMETER ); + break; + case EMFILE: + case ENFILE: + set_error( STATUS_TOO_MANY_OPENED_FILES ); + break; + case ENOMEM: + set_error( STATUS_NO_MEMORY ); + break; + default: + break; + } + perror("sem_open"); + assert(0); + exit(1); + } + fprintf(stderr, "failed to create unique key for semaphore\n"); + assert(0); +} + +static unsigned int semaphore_get_value( struct semaphore *sem ) +{ + int ret; + + if (sem_getvalue(sem->p, &ret) == -1) + { + perror("sem_getvalue"); + exit(1); + + return STATUS_INVALID_HANDLE; + } + + return (unsigned int)ret; +} + +static void semaphore_destroy( struct object *obj ) +{ + struct semaphore *sem = (struct semaphore *)obj; + assert( obj->ops == &semaphore_ops ); + sem_destroy(sem->p); +} + +static int semaphore_trylock( struct object *obj, struct wait_queue_entry *entry ) +{ + struct semaphore *sem = (struct semaphore *)obj; + assert( obj->ops == &semaphore_ops ); + if (sem_trywait(sem->p) == -1) + { + switch(errno) { + case EAGAIN: + return 0; + + default: + perror("sem_trywait"); + exit(1); + } + } + return 1; +} + +static void semaphore_trylock_undo( struct object *obj, struct wait_queue_entry *entry ) +{ + struct semaphore *sem = (struct semaphore *)obj; + assert( obj->ops == &semaphore_ops ); + if (sem_post(sem->p) == -1) + { + perror("sem_post"); + exit(1); + } + +} + +static int release_semaphore( struct semaphore *sem, unsigned int count, + unsigned int *prev ) +{ + unsigned int cur_count; + + cur_count = semaphore_get_value(sem); + + if (prev) *prev = cur_count; + if (cur_count + count < cur_count || cur_count + count > sem->max) + { + set_error( STATUS_SEMAPHORE_LIMIT_EXCEEDED ); + return 0; + } + else + { + while(count--) + { + if (sem_post(sem->p) == -1) + { + /* FIXME: no atomic way to increase more than once */ + perror("sem_post"); + } + } + + /* there cannot be any thread to wake up if the count is != 0 */ + if (!cur_count) + wake_up( &sem->obj, count ); + } + return 1; +} + +#endif /* ENABLE_HYBRID_SYNC */ + +static struct semaphore *create_semaphore( struct directory *root, const struct unicode_str *name, + unsigned int attr, unsigned int initial, unsigned int max, + const struct security_descriptor *sd ) +{ + struct semaphore *sem; + + if (!max || (initial > max)) + { + set_error( STATUS_INVALID_PARAMETER ); + return NULL; + } + if ((sem = create_named_object_dir( root, name, attr, &semaphore_ops ))) + { + if (get_error() != STATUS_OBJECT_NAME_EXISTS) + { + /* initialize it if it didn't already exist */ + semaphore_new(sem, initial); + sem->max = max; + if (sd) default_set_sd( &sem->obj, sd, OWNER_SECURITY_INFORMATION| + GROUP_SECURITY_INFORMATION| + DACL_SECURITY_INFORMATION| + SACL_SECURITY_INFORMATION ); + } +#ifdef ENABLE_HYBRID_SYNC + else + { + assert(sem->p); + assert(sem->name[0]); + } +#endif + } + return sem; +}
static void semaphore_dump( struct object *obj, int verbose ) { struct semaphore *sem = (struct semaphore *)obj; assert( obj->ops == &semaphore_ops ); - fprintf( stderr, "Semaphore count=%d max=%d ", sem->count, sem->max ); + fprintf( stderr, "Semaphore count=%d max=%d ", semaphore_get_value(sem), sem->max ); dump_object_name( &sem->obj ); fputc( '\n', stderr ); } @@ -140,15 +343,17 @@ static int semaphore_signaled( struct object *obj, struct wait_queue_entry *entr { struct semaphore *sem = (struct semaphore *)obj; assert( obj->ops == &semaphore_ops ); - return (sem->count > 0); + return (semaphore_get_value(sem) > 0); }
static void semaphore_satisfied( struct object *obj, struct wait_queue_entry *entry ) { +#ifndef ENABLE_HYBRID_SYNC struct semaphore *sem = (struct semaphore *)obj; assert( obj->ops == &semaphore_ops ); - assert( sem->count ); - sem->count--; + assert( semaphore_get_value(sem) ); + semaphore_down(sem); +#endif }
static unsigned int semaphore_map_access( struct object *obj, unsigned int access ) @@ -199,6 +404,10 @@ DECL_HANDLER(create_semaphore) reply->handle = alloc_handle( current->process, sem, req->access, req->attributes ); else reply->handle = alloc_handle_no_access_check( current->process, sem, req->access, req->attributes ); +#ifdef ENABLE_HYBRID_SYNC + reply->key = sem->key; +#endif + reply->server_ptr = (unsigned long)sem; release_object( sem ); }
@@ -219,6 +428,11 @@ DECL_HANDLER(open_semaphore) if ((sem = open_object_dir( root, &name, req->attributes, &semaphore_ops ))) { reply->handle = alloc_handle( current->process, &sem->obj, req->access, req->attributes ); +#ifdef ENABLE_HYBRID_SYNC + reply->key = sem->key; +#endif + reply->max = sem->max; + reply->server_ptr = (unsigned long)sem; release_object( sem ); }
@@ -246,7 +460,7 @@ DECL_HANDLER(query_semaphore) if ((sem = (struct semaphore *)get_handle_obj( current->process, req->handle, SEMAPHORE_QUERY_STATE, &semaphore_ops ))) { - reply->current = sem->count; + reply->current = semaphore_get_value(sem); reply->max = sem->max; release_object( sem ); } diff --git a/server/thread.c b/server/thread.c index 6383000..bb21b93 100644 --- a/server/thread.c +++ b/server/thread.c @@ -653,6 +653,12 @@ static int check_wait( struct thread *thread ) int i; struct thread_wait *wait = thread->wait; struct wait_queue_entry *entry; + const int hybrid_sync_enabled = +#ifdef ENABLE_HYBRID_SYNC + 1; +#else + 0; +#endif
assert( wait );
@@ -662,24 +668,62 @@ static int check_wait( struct thread *thread ) /* Suspended threads may not acquire locks, but they can run system APCs */ if (thread->process->suspend + thread->suspend > 0) return -1;
- if (wait->select == SELECT_WAIT_ALL) + if (wait->select == SELECT_WAIT_ALL) /* bWaitAll == TRUE */ { int not_ok = 0; - /* Note: we must check them all anyway, as some objects may - * want to do something when signaled, even if others are not */ + int have_hybrid_objects = 0; + ULONGLONG undo_list = 0; + + assert(MAXIMUM_WAIT_OBJECTS <= sizeof(ULONGLONG) * 8); + /* Note: we must check them all anyway, as some objects (msg queues) + * may want to do something when signaled, even if others are not */ for (i = 0, entry = wait->queues; i < wait->count; i++, entry++) + { not_ok |= !entry->obj->ops->signaled( entry->obj, entry ); + have_hybrid_objects |= hybrid_sync_enabled && entry->obj->ops->trylock; + } if (not_ok) goto other_checks; + + if (!have_hybrid_objects) + goto do_satisfied; + + /* all objects signaled, try to acquire locks on hybrid objects */ + for (i = 0, entry = wait->queues; i < wait->count; i++, entry++) + { + if (!entry->obj->ops->trylock) + continue; + + /* it's still possible for failure because one of the hybrid objects + * may have since been locked */ + if (entry->obj->ops->trylock( entry->obj, entry )) + undo_list |= 1ULL << i; + else + { + /* if a lock cannot be acquired, then undo previously acquired locks */ + for (--i, --entry; i >= 0; --i, --entry) + { + if (undo_list & (1ULL << i)) + entry->obj->ops->trylock_undo( entry->obj, entry ); + } + goto other_checks; + } + } + +do_satisfied: /* Wait satisfied: tell it to all objects */ for (i = 0, entry = wait->queues; i < wait->count; i++, entry++) entry->obj->ops->satisfied( entry->obj, entry ); return wait->abandoned ? STATUS_ABANDONED_WAIT_0 : STATUS_WAIT_0; } - else + else /* bWaitAll == FALSE */ { for (i = 0, entry = wait->queues; i < wait->count; i++, entry++) { - if (!entry->obj->ops->signaled( entry->obj, entry )) continue; + int signaled = hybrid_sync_enabled && entry->obj->ops->trylock + ? entry->obj->ops->trylock( entry->obj, entry ) + : entry->obj->ops->signaled( entry->obj, entry ); + + if (!signaled) continue; /* Wait satisfied: tell it to the object */ entry->obj->ops->satisfied( entry->obj, entry ); if (wait->abandoned) i += STATUS_ABANDONED_WAIT_0;