Changes server to use a posix semaphore instead of an int. This patch alone would be pointless, but creates a mechanism for client processes to interact with the same semaphores.
Signed-off-by: Daniel Santos daniel.santos@pobox.com --- server/Makefile.in | 2 +- server/object.h | 4 + server/semaphore.c | 272 +++++++++++++++++++++++++++++++++++++++++++++++------ server/thread.c | 36 ++++++- 4 files changed, 280 insertions(+), 34 deletions(-)
diff --git a/server/Makefile.in b/server/Makefile.in index 19a4fac..48a4522 100644 --- a/server/Makefile.in +++ b/server/Makefile.in @@ -1,4 +1,4 @@ -EXTRALIBS = $(POLL_LIBS) $(RT_LIBS) +EXTRALIBS = $(POLL_LIBS) $(RT_LIBS) $(PTHREAD_LIBS)
C_SRCS = \ async.c \ diff --git a/server/object.h b/server/object.h index b59811f..fc650ff 100644 --- a/server/object.h +++ b/server/object.h @@ -90,6 +90,10 @@ struct object_ops int (*close_handle)(struct object *,struct process *,obj_handle_t); /* destroy on refcount == 0 */ void (*destroy)(struct object *); + /* like signaled(), but tries to get the lock */ + int (*trylock)(struct object *,struct wait_queue_entry *); + /* undo a previously sucessful call to trylock */ + void (*trylock_undo)(struct object *,struct wait_queue_entry *); };
struct object diff --git a/server/semaphore.c b/server/semaphore.c index d87325c..3c8dd14 100644 --- a/server/semaphore.c +++ b/server/semaphore.c @@ -26,6 +26,21 @@ #include <stdlib.h> #include <stdarg.h>
+#if ENABLE_POSIX_SYNC +# include <fcntl.h> +# include <sys/stat.h> +# include <errno.h> +# ifdef HAVE_SEMAPHORE_H +# include <semaphore.h> +# endif +# ifdef HAVE_SYS_TYPES_H +# include <sys/types.h> +# endif +# ifdef HAVE_UNISTD_H +# include <unistd.h> +# endif +#endif + #include "ntstatus.h" #define WIN32_NO_STATUS #include "windef.h" @@ -36,10 +51,17 @@ #include "request.h" #include "security.h"
+#define NATIVE_SEMAPHORE_MAX_NAME (32) struct semaphore { struct object obj; /* object header */ +#if ENABLE_POSIX_SYNC + sem_t *p; + unsigned int key; + char name[NATIVE_SEMAPHORE_MAX_NAME]; +#else unsigned int count; /* current count */ +#endif /* ENABLE_POSIX_SYNC */ unsigned int max; /* maximum possible count */ };
@@ -49,6 +71,15 @@ static int semaphore_signaled( struct object *obj, struct wait_queue_entry *entr static void semaphore_satisfied( struct object *obj, struct wait_queue_entry *entry ); static unsigned int semaphore_map_access( struct object *obj, unsigned int access ); static int semaphore_signal( struct object *obj, unsigned int access ); +static void semaphore_new( struct semaphore *sem, unsigned int initial ); +static unsigned int semaphore_get_value( struct semaphore *sem ); +//static void semaphore_up( struct semaphore *sem, unsigned int count); + +#if ENABLE_POSIX_SYNC +static void semaphore_destroy( struct object *obj ); +static int semaphore_trylock( struct object *obj, struct wait_queue_entry *entry ); +static void semaphore_trylock_undo( struct object *obj, struct wait_queue_entry *entry ); +#endif /* ENABLE_POSIX_SYNC */
static const struct object_ops semaphore_ops = { @@ -67,35 +98,32 @@ static const struct object_ops semaphore_ops = no_lookup_name, /* lookup_name */ no_open_file, /* open_file */ no_close_handle, /* close_handle */ - no_destroy /* destroy */ +#if ENABLE_POSIX_SYNC + semaphore_destroy, /* destroy */ + semaphore_trylock, /* trylock */ + semaphore_trylock_undo /* trylock_undo */ +#else + no_destroy, /* destroy */ + NULL, /* trylock */ + NULL /* trylock_undo */ +#endif };
+#if !(ENABLE_POSIX_SYNC)
-static struct semaphore *create_semaphore( struct directory *root, const struct unicode_str *name, - unsigned int attr, unsigned int initial, unsigned int max, - const struct security_descriptor *sd ) +static inline void semaphore_new( struct semaphore *sem, unsigned int initial ) { - struct semaphore *sem; + sem->count = initial; +}
- if (!max || (initial > max)) - { - set_error( STATUS_INVALID_PARAMETER ); - return NULL; - } - if ((sem = create_named_object_dir( root, name, attr, &semaphore_ops ))) - { - if (get_error() != STATUS_OBJECT_NAME_EXISTS) - { - /* initialize it if it didn't already exist */ - sem->count = initial; - sem->max = max; - if (sd) default_set_sd( &sem->obj, sd, OWNER_SECURITY_INFORMATION| - GROUP_SECURITY_INFORMATION| - DACL_SECURITY_INFORMATION| - SACL_SECURITY_INFORMATION ); - } - } - return sem; +static inline unsigned int semaphore_get_value( struct semaphore *sem ) +{ + return sem->count; +} + +static inline void semaphore_down( struct semaphore *sem ) +{ + --sem->count; }
static int release_semaphore( struct semaphore *sem, unsigned int count, @@ -119,12 +147,187 @@ static int release_semaphore( struct semaphore *sem, unsigned int count, } return 1; } +#else /* !(ENABLE_POSIX_SYNC) */ + +static DWORD next_semaphore_id = 0; + +static void semaphore_new( struct semaphore *sem, unsigned int initial ) +{ + /* imperfect, but good enough for now */ + int tries; + for (tries = 0; tries < 0x10000; ++tries) + { + DWORD pid = (DWORD)getpid(); + DWORD reverse_pid = 0; + DWORD bit; + + for (bit = (1 << (sizeof(DWORD) * 8 - 1)); bit && pid; bit >>= 1, pid >>= 1) + { + if (pid & 1) + reverse_pid |= bit; + } + + sem->key = reverse_pid ^ next_semaphore_id++; + snprintf(sem->name, sizeof(sem->name) - 1, "/wine-sem-%08x", sem->key); + sem->p = sem_open(sem->name, O_CREAT | O_EXCL, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP, initial); + + if (sem->p != SEM_FAILED) + { + //fprintf(stderr, "semaphore_new: sem = %p, sem->p = %p, sem->name = %s\n", sem, sem->p, sem->name); + return; + } + + switch (errno) + { + case EACCES: + case EEXIST: + continue; + + case EINVAL: + set_error( STATUS_INVALID_PARAMETER ); + break; + case EMFILE: + case ENFILE: + set_error( STATUS_TOO_MANY_OPENED_FILES ); + break; + case ENOMEM: + set_error( STATUS_NO_MEMORY ); + break; + default: + break; + } + perror("sem_open"); + assert(0); + exit(1); + } + fprintf(stderr, "failed to create unique key for semaphore\n"); + assert(0); +} + +static unsigned int semaphore_get_value( struct semaphore *sem ) +{ + int ret; + + if (sem_getvalue(sem->p, &ret) == -1) + { + perror("sem_getvalue"); + exit(1); + + return STATUS_INVALID_HANDLE; + } + + return (unsigned int)ret; +} + +static void semaphore_destroy( struct object *obj ) +{ + struct semaphore *sem = (struct semaphore *)obj; + assert( obj->ops == &semaphore_ops ); + sem_destroy(sem->p); +} + +static int semaphore_trylock( struct object *obj, struct wait_queue_entry *entry ) +{ + struct semaphore *sem = (struct semaphore *)obj; + assert( obj->ops == &semaphore_ops ); + if (sem_trywait(sem->p) == -1) + { + switch(errno) { + case EAGAIN: + return 0; + + default: + perror("sem_trywait"); + exit(1); + } + } + return 1; +} + +static void semaphore_trylock_undo( struct object *obj, struct wait_queue_entry *entry ) +{ + struct semaphore *sem = (struct semaphore *)obj; + assert( obj->ops == &semaphore_ops ); + if (sem_post(sem->p) == -1) + { + perror("sem_post"); + exit(1); + } + +} + +static int release_semaphore( struct semaphore *sem, unsigned int count, + unsigned int *prev ) +{ + unsigned int cur_count; + + cur_count = semaphore_get_value(sem); + + if (prev) *prev = cur_count; + if (cur_count + count < cur_count || cur_count + count > sem->max) + { + set_error( STATUS_SEMAPHORE_LIMIT_EXCEEDED ); + return 0; + } + else + { + while(count--) + { + if (sem_post(sem->p) == -1) + { + /* FIXME: no atomic way to increase more than once */ + perror("sem_post"); + } + } + + /* there cannot be any thread to wake up if the count is != 0 */ + if (!cur_count) + wake_up( &sem->obj, count ); + } + return 1; +} + +#endif /* ENABLE_POSIX_SYNC */ + +static struct semaphore *create_semaphore( struct directory *root, const struct unicode_str *name, + unsigned int attr, unsigned int initial, unsigned int max, + const struct security_descriptor *sd ) +{ + struct semaphore *sem; + + if (!max || (initial > max)) + { + set_error( STATUS_INVALID_PARAMETER ); + return NULL; + } + if ((sem = create_named_object_dir( root, name, attr, &semaphore_ops ))) + { + if (get_error() != STATUS_OBJECT_NAME_EXISTS) + { + /* initialize it if it didn't already exist */ + semaphore_new(sem, initial); + sem->max = max; + if (sd) default_set_sd( &sem->obj, sd, OWNER_SECURITY_INFORMATION| + GROUP_SECURITY_INFORMATION| + DACL_SECURITY_INFORMATION| + SACL_SECURITY_INFORMATION ); + } +#if ENABLE_POSIX_SYNC + else + { + assert(sem->p); + assert(sem->name[0]); + } +#endif + } + return sem; +}
static void semaphore_dump( struct object *obj, int verbose ) { struct semaphore *sem = (struct semaphore *)obj; assert( obj->ops == &semaphore_ops ); - fprintf( stderr, "Semaphore count=%d max=%d ", sem->count, sem->max ); + fprintf( stderr, "Semaphore count=%d max=%d ", semaphore_get_value(sem), sem->max ); dump_object_name( &sem->obj ); fputc( '\n', stderr ); } @@ -140,15 +343,17 @@ static int semaphore_signaled( struct object *obj, struct wait_queue_entry *entr { struct semaphore *sem = (struct semaphore *)obj; assert( obj->ops == &semaphore_ops ); - return (sem->count > 0); + return (semaphore_get_value(sem) > 0); }
static void semaphore_satisfied( struct object *obj, struct wait_queue_entry *entry ) { +#ifndef ENABLE_POSIX_SYNC struct semaphore *sem = (struct semaphore *)obj; assert( obj->ops == &semaphore_ops ); - assert( sem->count ); - sem->count--; + assert( semaphore_get_value(sem) ); + semaphore_down(sem); +#endif }
static unsigned int semaphore_map_access( struct object *obj, unsigned int access ) @@ -199,6 +404,10 @@ DECL_HANDLER(create_semaphore) reply->handle = alloc_handle( current->process, sem, req->access, req->attributes ); else reply->handle = alloc_handle_no_access_check( current->process, sem, req->access, req->attributes ); +#ifdef ENABLE_POSIX_SYNC + reply->key = sem->key; +#endif + reply->server_ptr = (unsigned long)sem; release_object( sem ); }
@@ -219,6 +428,11 @@ DECL_HANDLER(open_semaphore) if ((sem = open_object_dir( root, &name, req->attributes, &semaphore_ops ))) { reply->handle = alloc_handle( current->process, &sem->obj, req->access, req->attributes ); +#ifdef ENABLE_POSIX_SYNC + reply->key = sem->key; +#endif + reply->max = sem->max; + reply->server_ptr = (unsigned long)sem; release_object( sem ); }
@@ -246,7 +460,7 @@ DECL_HANDLER(query_semaphore) if ((sem = (struct semaphore *)get_handle_obj( current->process, req->handle, SEMAPHORE_QUERY_STATE, &semaphore_ops ))) { - reply->current = sem->count; + reply->current = semaphore_get_value(sem); reply->max = sem->max; release_object( sem ); } diff --git a/server/thread.c b/server/thread.c index 6383000..4ed39d6 100644 --- a/server/thread.c +++ b/server/thread.c @@ -653,6 +653,12 @@ static int check_wait( struct thread *thread ) int i; struct thread_wait *wait = thread->wait; struct wait_queue_entry *entry; + const int posix_sync_enabled = +#ifdef ENABLE_POSIX_SYNC + 1; +#else + 0; +#endif
assert( wait );
@@ -664,12 +670,30 @@ static int check_wait( struct thread *thread )
if (wait->select == SELECT_WAIT_ALL) { - int not_ok = 0; + ULONGLONG ok = 0; + assert(MAXIMUM_WAIT_OBJECTS <= sizeof(ULONGLONG) * 8); /* Note: we must check them all anyway, as some objects may * want to do something when signaled, even if others are not */ for (i = 0, entry = wait->queues; i < wait->count; i++, entry++) - not_ok |= !entry->obj->ops->signaled( entry->obj, entry ); - if (not_ok) goto other_checks; + { + int signaled = posix_sync_enabled && entry->obj->ops->trylock + ? entry->obj->ops->trylock( entry->obj, entry ) + : entry->obj->ops->signaled( entry->obj, entry ); + ok |= (ULONGLONG)(!!signaled) << i; + } + if (!ok) + { + if (posix_sync_enabled) + { + /* reverse iterate and undo any sucessful trylocks */ + for (--i, --entry; i >= 0; --i, --entry) + { + if ((ok & (1LL << i)) && entry->obj->ops->trylock) + entry->obj->ops->trylock_undo( entry->obj, entry ); + } + } + goto other_checks; + } /* Wait satisfied: tell it to all objects */ for (i = 0, entry = wait->queues; i < wait->count; i++, entry++) entry->obj->ops->satisfied( entry->obj, entry ); @@ -679,7 +703,11 @@ static int check_wait( struct thread *thread ) { for (i = 0, entry = wait->queues; i < wait->count; i++, entry++) { - if (!entry->obj->ops->signaled( entry->obj, entry )) continue; + int signaled = posix_sync_enabled && entry->obj->ops->trylock + ? entry->obj->ops->trylock( entry->obj, entry ) + : entry->obj->ops->signaled( entry->obj, entry ); + + if (!signaled) continue; /* Wait satisfied: tell it to the object */ entry->obj->ops->satisfied( entry->obj, entry ); if (wait->abandoned) i += STATUS_ABANDONED_WAIT_0;