From: Rose Hellsing <rose@pinkro.se> Add server-side handlers for sending and receiving LPC messages: - request_lpc_reply: Send a request message through a port and track it for reply correlation - reply_wait_receive_lpc: Receive the next message from a port and optionally send a reply to a previous request Also adds tracking for pending requests to route replies back to the correct client port. --- server/lpc_port.c | 223 ++++++++++++++++++++++++++++++++++++++++++++ server/protocol.def | 29 ++++++ 2 files changed, 252 insertions(+) diff --git a/server/lpc_port.c b/server/lpc_port.c index 20e5336f7d7..73da2ab648a 100644 --- a/server/lpc_port.c +++ b/server/lpc_port.c @@ -51,6 +51,9 @@ #define PORT_FLAG_WAITABLE 0x0001 /* LPC message types */ +#define LPC_REQUEST 1 +#define LPC_REPLY 2 +#define LPC_DATAGRAM 3 #define LPC_CONNECTION_REQUEST 10 /* Maximum message size */ @@ -63,6 +66,17 @@ static unsigned int global_msg_id_counter = 0; /* Global list of pending connection requests */ static struct list global_pending_connects = LIST_INIT(global_pending_connects); +/* Global list of pending requests awaiting replies */ +static struct list global_pending_requests = LIST_INIT(global_pending_requests); + +/* Pending request tracking for request/reply correlation */ +struct pending_request +{ + struct list entry; /* entry in global_pending_requests */ + unsigned int msg_id; /* message ID waiting for reply */ + struct lpc_port *client_port; /* client port to deliver reply to */ +}; + static const WCHAR lpc_port_name[] = {'L','P','C',' ','P','o','r','t'}; struct type_descr lpc_port_type = @@ -89,6 +103,7 @@ struct lpc_message unsigned int msg_type; /* LPC message type */ process_id_t client_pid; /* sender's process ID */ thread_id_t client_tid; /* sender's thread ID */ + client_ptr_t port_context; /* port context for this message */ data_size_t data_size; /* size of message data */ char data[1]; /* variable-length message data */ }; @@ -195,6 +210,48 @@ static void signal_port_queue( struct lpc_port *port ) signal_sync( port->wait_event ); } +/* Reset the port's queue event after receiving a message */ +static void reset_port_queue( struct lpc_port *port ) +{ + if (list_empty( &port->msg_queue ) && list_empty( &port->pending_connects )) + { + if (port->queue_event) + reset_sync( port->queue_event ); + if (port->wait_event) + reset_sync( port->wait_event ); + } +} + +/* Track a pending request awaiting reply */ +static void track_pending_request( unsigned int msg_id, struct lpc_port *client_port ) +{ + struct pending_request *pr = mem_alloc( sizeof(*pr) ); + if (pr) + { + pr->msg_id = msg_id; + pr->client_port = (struct lpc_port *)grab_object( client_port ); + list_add_tail( &global_pending_requests, &pr->entry ); + } +} + +/* Find and remove a pending request by message ID */ +static struct lpc_port *find_pending_request_client( unsigned int msg_id ) +{ + struct pending_request *pr; + + LIST_FOR_EACH_ENTRY( pr, &global_pending_requests, struct pending_request, entry ) + { + if (pr->msg_id == msg_id) + { + struct lpc_port *client = pr->client_port; + list_remove( &pr->entry ); + free( pr ); + return client; + } + } + return NULL; +} + static struct lpc_port *create_lpc_port( struct object *root, const struct unicode_str *name, unsigned int attr, unsigned int flags, unsigned int max_msg_len, unsigned int max_connect_info, @@ -612,3 +669,169 @@ DECL_HANDLER(get_lpc_connect_status) reply->status = port->connect_status; release_object( port ); } + +/* Send a request message and get a message ID for tracking the reply */ +DECL_HANDLER(request_lpc_reply) +{ + struct lpc_port *port; + struct lpc_port *target_port; + struct lpc_message *msg; + data_size_t data_size = get_req_data_size(); + + port = (struct lpc_port *)get_handle_obj( current->process, req->handle, + PORT_CONNECT, &lpc_port_ops ); + if (!port) return; + + if (port->port_type == PORT_TYPE_CLIENT) + { + if (!port->connected_port || !port->connected_port->connection_port) + { + set_error( STATUS_PORT_DISCONNECTED ); + release_object( port ); + return; + } + target_port = port->connected_port->connection_port; + } + else if (port->port_type == PORT_TYPE_CHANNEL) + { + if (!port->connected_port) + { + set_error( STATUS_PORT_DISCONNECTED ); + release_object( port ); + return; + } + target_port = port->connected_port; + } + else + { + set_error( STATUS_INVALID_PORT_HANDLE ); + release_object( port ); + return; + } + + msg = alloc_lpc_message( data_size ); + if (!msg) + { + release_object( port ); + return; + } + + msg->sender_port = (struct lpc_port *)grab_object( port ); + msg->sender_thread = (struct thread *)grab_object( current ); + msg->msg_id = get_next_msg_id(); + msg->msg_type = req->msg_type ? req->msg_type : LPC_REQUEST; + msg->client_pid = current->process->id; + msg->client_tid = current->id; + msg->port_context = port->port_context; + if (data_size) + memcpy( msg->data, get_req_data(), data_size ); + + if (msg->msg_type == LPC_REQUEST && port->port_type == PORT_TYPE_CLIENT) + track_pending_request( msg->msg_id, port ); + + list_add_tail( &target_port->msg_queue, &msg->entry ); + signal_port_queue( target_port ); + + reply->msg_id = msg->msg_id; + + release_object( port ); +} + +/* Receive a message and optionally reply to a previous message */ +DECL_HANDLER(reply_wait_receive_lpc) +{ + struct lpc_port *port; + struct lpc_port *receive_port; + struct lpc_message *msg; + data_size_t reply_size = get_req_data_size(); + + port = (struct lpc_port *)get_handle_obj( current->process, req->handle, + PORT_CONNECT, &lpc_port_ops ); + if (!port) return; + + /* Handle reply to previous message */ + if (req->reply_msg_id) + { + struct lpc_port *client_port; + struct lpc_message *reply_msg; + + client_port = find_pending_request_client( req->reply_msg_id ); + if (client_port) + { + reply_msg = alloc_lpc_message( reply_size ); + if (reply_msg) + { + reply_msg->msg_type = LPC_REPLY; + reply_msg->msg_id = req->reply_msg_id; + reply_msg->client_pid = current->process->id; + reply_msg->client_tid = current->id; + reply_msg->data_size = reply_size; + if (reply_size) + memcpy( reply_msg->data, get_req_data(), reply_size ); + + list_add_tail( &client_port->msg_queue, &reply_msg->entry ); + signal_port_queue( client_port ); + } + release_object( client_port ); + } + } + + if (port->port_type == PORT_TYPE_CHANNEL || port->port_type == PORT_TYPE_CLIENT) + receive_port = port; + else if (port->port_type == PORT_TYPE_SERVER) + receive_port = port; + else + { + set_error( STATUS_INVALID_PORT_HANDLE ); + release_object( port ); + return; + } + + if (list_empty( &receive_port->msg_queue )) + { + if (port->port_type == PORT_TYPE_SERVER && !list_empty( &port->pending_connects )) + { + msg = LIST_ENTRY( list_head( &port->pending_connects ), struct lpc_message, entry ); + list_remove( &msg->entry ); + + reply->msg_id = msg->msg_id; + reply->msg_type = msg->msg_type; + reply->client_pid = msg->client_pid; + reply->client_tid = msg->client_tid; + reply->context = msg->port_context; + reply->data_size = msg->data_size; + + if (msg->data_size) + set_reply_data( msg->data, min( msg->data_size, get_reply_max_size() ) ); + + reset_port_queue( receive_port ); + release_object( port ); + return; + } + else + { + set_error( STATUS_PENDING ); + release_object( port ); + return; + } + } + else + { + msg = LIST_ENTRY( list_head( &receive_port->msg_queue ), struct lpc_message, entry ); + list_remove( &msg->entry ); + } + + reply->msg_id = msg->msg_id; + reply->msg_type = msg->msg_type; + reply->client_pid = msg->client_pid; + reply->client_tid = msg->client_tid; + reply->context = msg->port_context; + reply->data_size = msg->data_size; + + if (msg->data_size) + set_reply_data( msg->data, min( msg->data_size, get_reply_max_size() ) ); + + free_lpc_message( msg ); + reset_port_queue( receive_port ); + release_object( port ); +} diff --git a/server/protocol.def b/server/protocol.def index 20cdb456e1b..6db622cf03c 100644 --- a/server/protocol.def +++ b/server/protocol.def @@ -4338,3 +4338,32 @@ enum inproc_sync_type @REPLY unsigned int status; /* connection status */ @END + + +/* Send request and wait for reply */ +@REQ(request_lpc_reply) + obj_handle_t handle; /* port handle */ + data_size_t data_size; /* size of message data */ + unsigned int msg_type; /* message type */ + VARARG(data,bytes); /* message data */ +@REPLY + unsigned int msg_id; /* assigned message ID */ +@END + + +/* Wait for message and reply to previous */ +@REQ(reply_wait_receive_lpc) + obj_handle_t handle; /* port handle */ + unsigned int reply_msg_id; /* message ID to reply to (0 for no reply) */ + data_size_t reply_size; /* size of reply data */ + timeout_t timeout; /* timeout (unused currently) */ + VARARG(reply,bytes); /* reply data */ +@REPLY + unsigned int msg_id; /* received message ID */ + unsigned int msg_type; /* message type */ + process_id_t client_pid; /* sender's process ID */ + thread_id_t client_tid; /* sender's thread ID */ + client_ptr_t context; /* port context */ + data_size_t data_size; /* size of message data */ + VARARG(data,bytes); /* message data */ +@END -- GitLab https://gitlab.winehq.org/wine/wine/-/merge_requests/10611