Module: wine Branch: master Commit: 5f077bab07ba5cd7eebe7aeba48c6a570f39da4a URL: http://source.winehq.org/git/wine.git/?a=commit;h=5f077bab07ba5cd7eebe7aeba4...
Author: Rob Shearman rob@codeweavers.com Date: Mon Jan 21 10:31:40 2008 +0000
rpcrt4: Implement asynchronous RPC support.
---
dlls/rpcrt4/Makefile.in | 2 +- dlls/rpcrt4/rpc_binding.h | 2 + dlls/rpcrt4/rpc_message.c | 63 +++++++++++++++++++++++++++++++++++++++++- dlls/rpcrt4/rpc_transport.c | 36 ++++++++++++++++++++++++ 4 files changed, 100 insertions(+), 3 deletions(-)
diff --git a/dlls/rpcrt4/Makefile.in b/dlls/rpcrt4/Makefile.in index 5cb090b..9fe0026 100644 --- a/dlls/rpcrt4/Makefile.in +++ b/dlls/rpcrt4/Makefile.in @@ -6,7 +6,7 @@ VPATH = @srcdir@ MODULE = rpcrt4.dll IMPORTLIB = librpcrt4.$(IMPLIBEXT) IMPORTS = iphlpapi advapi32 kernel32 ntdll -DELAYIMPORTS = secur32 +DELAYIMPORTS = secur32 user32 EXTRALIBS = -luuid
C_SRCS = \ diff --git a/dlls/rpcrt4/rpc_binding.h b/dlls/rpcrt4/rpc_binding.h index ed8d4ea..e3a3c95 100644 --- a/dlls/rpcrt4/rpc_binding.h +++ b/dlls/rpcrt4/rpc_binding.h @@ -73,6 +73,7 @@ typedef struct _RpcConnection /* client-only */ struct list conn_pool_entry; ULONG assoc_group_id; /* association group returned during binding */ + RPC_ASYNC_STATE *async_state;
/* server-only */ /* The active interface bound to server. */ @@ -92,6 +93,7 @@ struct connection_ops { int (*write)(RpcConnection *conn, const void *buffer, unsigned int len); int (*close)(RpcConnection *conn); void (*cancel_call)(RpcConnection *conn); + int (*wait_for_incoming_data)(RpcConnection *conn); size_t (*get_top_of_tower)(unsigned char *tower_data, const char *networkaddr, const char *endpoint); RPC_STATUS (*parse_top_of_tower)(const unsigned char *tower_data, size_t tower_size, char **networkaddr, char **endpoint); }; diff --git a/dlls/rpcrt4/rpc_message.c b/dlls/rpcrt4/rpc_message.c index 07af1fc..ff1a808 100644 --- a/dlls/rpcrt4/rpc_message.c +++ b/dlls/rpcrt4/rpc_message.c @@ -27,6 +27,7 @@ #include "windef.h" #include "winbase.h" #include "winerror.h" +#include "winuser.h"
#include "rpc.h" #include "rpcndr.h" @@ -1039,6 +1040,49 @@ RPC_STATUS WINAPI I_RpcFreeBuffer(PRPC_MESSAGE pMsg) return RPC_S_OK; }
+static void CALLBACK async_apc_notifier_proc(ULONG_PTR ulParam) +{ + RPC_ASYNC_STATE *state = (RPC_ASYNC_STATE *)ulParam; + state->u.APC.NotificationRoutine(state, NULL, state->Event); +} + +static DWORD WINAPI async_notifier_proc(LPVOID p) +{ + RpcConnection *conn = p; + RPC_ASYNC_STATE *state = conn->async_state; + + if (state && !conn->ops->wait_for_incoming_data(conn)) + { + state->Event = RpcCallComplete; + switch (state->NotificationType) + { + case RpcNotificationTypeEvent: + SetEvent(state->u.hEvent); + break; + case RpcNotificationTypeApc: + QueueUserAPC(async_apc_notifier_proc, state->u.APC.hThread, (ULONG_PTR)state); + break; + case RpcNotificationTypeIoc: + PostQueuedCompletionStatus(state->u.IOC.hIOPort, + state->u.IOC.dwNumberOfBytesTransferred, + state->u.IOC.dwCompletionKey, + state->u.IOC.lpOverlapped); + break; + case RpcNotificationTypeHwnd: + PostMessageW(state->u.HWND.hWnd, state->u.HWND.Msg, 0, 0); + break; + case RpcNotificationTypeCallback: + state->u.NotificationRoutine(state, NULL, state->Event); + break; + case RpcNotificationTypeNone: + default: + break; + } + } + + return 0; +} + /*********************************************************************** * I_RpcSend [RPCRT4.@] * @@ -1080,6 +1124,12 @@ RPC_STATUS WINAPI I_RpcSend(PRPC_MESSAGE pMsg)
RPCRT4_FreeHeader(hdr);
+ if (status == RPC_S_OK && pMsg->RpcFlags & RPC_BUFFER_ASYNC) + { + if (!QueueUserWorkItem(async_notifier_proc, conn, WT_EXECUTEDEFAULT | WT_EXECUTELONGFUNCTION)) + status = RPC_S_OUT_OF_RESOURCES; + } + return status; }
@@ -1198,8 +1248,17 @@ RPC_STATUS WINAPI I_RpcSendReceive(PRPC_MESSAGE pMsg) */ RPC_STATUS WINAPI I_RpcAsyncSetHandle(PRPC_MESSAGE pMsg, PRPC_ASYNC_STATE pAsync) { - FIXME("(%p, %p): stub\n", pMsg, pAsync); - return RPC_S_INVALID_BINDING; + RpcBinding* bind = (RpcBinding*)pMsg->Handle; + RpcConnection *conn; + + TRACE("(%p, %p)\n", pMsg, pAsync); + + if (!bind || bind->server || !pMsg->ReservedForRuntime) return RPC_S_INVALID_BINDING; + + conn = pMsg->ReservedForRuntime; + conn->async_state = pAsync; + + return RPC_S_OK; }
/*********************************************************************** diff --git a/dlls/rpcrt4/rpc_transport.c b/dlls/rpcrt4/rpc_transport.c index eb6bd8d..e3fd8c1 100644 --- a/dlls/rpcrt4/rpc_transport.c +++ b/dlls/rpcrt4/rpc_transport.c @@ -410,6 +410,12 @@ static void rpcrt4_conn_np_cancel_call(RpcConnection *Connection) /* FIXME: implement when named pipe writes use overlapped I/O */ }
+static int rpcrt4_conn_np_wait_for_incoming_data(RpcConnection *Connection) +{ + /* FIXME: implement when named pipe writes use overlapped I/O */ + return -1; +} + static size_t rpcrt4_ncacn_np_get_top_of_tower(unsigned char *tower_data, const char *networkaddr, const char *endpoint) @@ -1047,6 +1053,32 @@ static void rpcrt4_conn_tcp_cancel_call(RpcConnection *Connection) write(tcpc->cancel_fds[1], &dummy, 1); }
+static int rpcrt4_conn_tcp_wait_for_incoming_data(RpcConnection *Connection) +{ + RpcConnection_tcp *tcpc = (RpcConnection_tcp *) Connection; + struct pollfd pfds[2]; + + TRACE("%p\n", Connection); + + pfds[0].fd = tcpc->sock; + pfds[0].events = POLLIN; + pfds[1].fd = tcpc->cancel_fds[0]; + pfds[1].events = POLLIN; + if (poll(pfds, 2, -1 /* infinite */) == -1 && errno != EINTR) + { + ERR("poll() failed: %s\n", strerror(errno)); + return -1; + } + if (pfds[1].revents & POLLIN) /* canceled */ + { + char dummy; + read(pfds[1].fd, &dummy, sizeof(dummy)); + return -1; + } + + return 0; +} + static size_t rpcrt4_ncacn_ip_tcp_get_top_of_tower(unsigned char *tower_data, const char *networkaddr, const char *endpoint) @@ -1330,6 +1362,7 @@ static const struct connection_ops conn_protseq_list[] = { rpcrt4_conn_np_write, rpcrt4_conn_np_close, rpcrt4_conn_np_cancel_call, + rpcrt4_conn_np_wait_for_incoming_data, rpcrt4_ncacn_np_get_top_of_tower, rpcrt4_ncacn_np_parse_top_of_tower, }, @@ -1342,6 +1375,7 @@ static const struct connection_ops conn_protseq_list[] = { rpcrt4_conn_np_write, rpcrt4_conn_np_close, rpcrt4_conn_np_cancel_call, + rpcrt4_conn_np_wait_for_incoming_data, rpcrt4_ncalrpc_get_top_of_tower, rpcrt4_ncalrpc_parse_top_of_tower, }, @@ -1354,6 +1388,7 @@ static const struct connection_ops conn_protseq_list[] = { rpcrt4_conn_tcp_write, rpcrt4_conn_tcp_close, rpcrt4_conn_tcp_cancel_call, + rpcrt4_conn_tcp_wait_for_incoming_data, rpcrt4_ncacn_ip_tcp_get_top_of_tower, rpcrt4_ncacn_ip_tcp_parse_top_of_tower, } @@ -1470,6 +1505,7 @@ RPC_STATUS RPCRT4_CreateConnection(RpcConnection** Connection, BOOL server, NewConnection->QOS = QOS;
list_init(&NewConnection->conn_pool_entry); + NewConnection->async_state = NULL;
TRACE("connection: %p\n", NewConnection); *Connection = NewConnection;