Calling try_recv could make messages misordered.
Suppose that a server program calls a overlapped WSARecv and its operation is pending. Another program sends a message and wineserver is busy. The server program calls another overlapped WSARecv and it intercepts the message for the pending WSARecv.
The problem already had been discussed here https://www.winehq.org/pipermail/wine-devel/2021-May/186612.html
To avoid this situation, this kind of approach can be applied.
The server program needs to know if there are pending asyncs before calling try_recv.
Signed-off-by: Dongwan Kim kdw6485@gmail.com --- dlls/ntdll/unix/socket.c | 20 +++++++++++++++++++- include/wine/server_protocol.h | 16 ++++++++++++++++ server/request.h | 7 +++++++ server/sock.c | 26 ++++++++++++++++++++++++++ server/trace.c | 13 +++++++++++++ 5 files changed, 81 insertions(+), 1 deletion(-)
diff --git a/dlls/ntdll/unix/socket.c b/dlls/ntdll/unix/socket.c index f82c7ae1ddf..8823e99901b 100644 --- a/dlls/ntdll/unix/socket.c +++ b/dlls/ntdll/unix/socket.c @@ -673,6 +673,21 @@ static BOOL async_recv_proc( void *user, ULONG_PTR *info, NTSTATUS *status ) release_fileio( &async->io ); return TRUE; } +static int query_asyncs_queued( HANDLE sock, int type ) +{ + int result=0; + + SERVER_START_REQ( query_async_waiting ) + { + req->handle = wine_server_obj_handle( sock ); + req->type = type; + wine_server_call(req); + result = reply->state; + } + SERVER_END_REQ; + return result; + +}
static NTSTATUS sock_recv( HANDLE handle, HANDLE event, PIO_APC_ROUTINE apc, void *apc_user, IO_STATUS_BLOCK *io, int fd, const void *buffers_ptr, unsigned int count, WSABUF *control, @@ -735,7 +750,10 @@ static NTSTATUS sock_recv( HANDLE handle, HANDLE event, PIO_APC_ROUTINE apc, voi } }
- status = try_recv( fd, async, &information ); + if( force_async && query_asyncs_queued( handle , ASYNC_TYPE_READ )) + status = STATUS_DEVICE_NOT_READY; + else + status = try_recv( fd, async, &information );
if (status != STATUS_SUCCESS && status != STATUS_BUFFER_OVERFLOW && status != STATUS_DEVICE_NOT_READY) { diff --git a/include/wine/server_protocol.h b/include/wine/server_protocol.h index 1d17a40530f..104e3b4cd3c 100644 --- a/include/wine/server_protocol.h +++ b/include/wine/server_protocol.h @@ -5404,7 +5404,20 @@ struct get_next_thread_reply obj_handle_t handle; char __pad_12[4]; }; +struct query_async_waiting_request +{ + struct request_header __header; + obj_handle_t handle; + int type; + char __pad_20[4];
+}; +struct query_async_waiting_reply +{ + struct reply_header __header; + int state; + char __pad_12[4]; +};
enum request { @@ -5682,6 +5695,7 @@ enum request REQ_suspend_process, REQ_resume_process, REQ_get_next_thread, + REQ_query_async_waiting, REQ_NB_REQUESTS };
@@ -5963,6 +5977,7 @@ union generic_request struct suspend_process_request suspend_process_request; struct resume_process_request resume_process_request; struct get_next_thread_request get_next_thread_request; + struct query_async_waiting_request query_async_waiting_request; }; union generic_reply { @@ -6242,6 +6257,7 @@ union generic_reply struct suspend_process_reply suspend_process_reply; struct resume_process_reply resume_process_reply; struct get_next_thread_reply get_next_thread_reply; + struct query_async_waiting_reply query_async_waiting_reply; };
/* ### protocol_version begin ### */ diff --git a/server/request.h b/server/request.h index 3c455799d54..e26dbaae0f3 100644 --- a/server/request.h +++ b/server/request.h @@ -393,6 +393,7 @@ DECL_HANDLER(terminate_job); DECL_HANDLER(suspend_process); DECL_HANDLER(resume_process); DECL_HANDLER(get_next_thread); +DECL_HANDLER(query_async_waiting);
#ifdef WANT_REQUEST_HANDLERS
@@ -673,6 +674,7 @@ static const req_handler req_handlers[REQ_NB_REQUESTS] = (req_handler)req_suspend_process, (req_handler)req_resume_process, (req_handler)req_get_next_thread, + (req_handler)req_query_async_waiting, };
C_ASSERT( sizeof(abstime_t) == 8 ); @@ -2244,6 +2246,11 @@ C_ASSERT( FIELD_OFFSET(struct get_next_thread_request, flags) == 28 ); C_ASSERT( sizeof(struct get_next_thread_request) == 32 ); C_ASSERT( FIELD_OFFSET(struct get_next_thread_reply, handle) == 8 ); C_ASSERT( sizeof(struct get_next_thread_reply) == 16 ); +C_ASSERT( FIELD_OFFSET(struct query_async_waiting_request, handle) == 12 ); +C_ASSERT( FIELD_OFFSET(struct query_async_waiting_request, type) == 16 ); +C_ASSERT( sizeof(struct query_async_waiting_request) == 24 ); +C_ASSERT( FIELD_OFFSET(struct query_async_waiting_reply, state) == 8 ); +C_ASSERT( sizeof(struct query_async_waiting_reply) == 16 );
#endif /* WANT_REQUEST_HANDLERS */
diff --git a/server/sock.c b/server/sock.c index 2df4f3d3056..d3e3b4cd847 100644 --- a/server/sock.c +++ b/server/sock.c @@ -3520,3 +3520,29 @@ DECL_HANDLER(send_socket) } release_object( sock ); } + +DECL_HANDLER( query_async_waiting ) +{ + struct sock *sock; + struct async *async; + reply->state = 0; + + if (!(sock = (struct sock *)get_handle_obj( current->process, req->handle, + FILE_READ_ATTRIBUTES, &sock_ops))) return; + if (get_unix_fd( sock->fd ) == -1) return; + + if (is_fd_overlapped( sock->fd )) + { + if(req->type == ASYNC_TYPE_READ && ( async = find_pending_async(&sock->read_q) )) { + reply->state = 1; + release_object(async); + } + if(req->type == ASYNC_TYPE_WRITE && ( async = find_pending_async(&sock->write_q )) ) + { + reply->state = 1; + release_object(async); + } + } + release_object( &sock->obj ); + +} diff --git a/server/trace.c b/server/trace.c index a48f00258fe..32f236b2b5a 100644 --- a/server/trace.c +++ b/server/trace.c @@ -4516,6 +4516,16 @@ static void dump_get_next_thread_reply( const struct get_next_thread_reply *req { fprintf( stderr, " handle=%04x", req->handle ); } +static void dump_query_async_waiting_request( const struct query_async_waiting_request *req ) +{ + fprintf( stderr, " handle=%04x", req->handle ); + fprintf( stderr, ", type=%d", req->type ); +} +static void dump_query_async_waiting_reply( const struct query_async_waiting_reply *req ) +{ + fprintf( stderr, " state=%d", req->state ); +} +
static const dump_func req_dumpers[REQ_NB_REQUESTS] = { (dump_func)dump_new_process_request, @@ -4792,6 +4802,7 @@ static const dump_func req_dumpers[REQ_NB_REQUESTS] = { (dump_func)dump_suspend_process_request, (dump_func)dump_resume_process_request, (dump_func)dump_get_next_thread_request, + (dump_func)dump_query_async_waiting_request, };
static const dump_func reply_dumpers[REQ_NB_REQUESTS] = { @@ -5069,6 +5080,7 @@ static const dump_func reply_dumpers[REQ_NB_REQUESTS] = { NULL, NULL, (dump_func)dump_get_next_thread_reply, + (dump_func)dump_query_async_waiting_reply, };
static const char * const req_names[REQ_NB_REQUESTS] = { @@ -5346,6 +5358,7 @@ static const char * const req_names[REQ_NB_REQUESTS] = { "suspend_process", "resume_process", "get_next_thread", + "query_async_waiting", };
static const struct
fixed to call async_reselect after list_remove so that server can wait for the event POLLIN again, if there are waiting asyncs in queue.
Signed-off-by: Dongwan Kim kdw6485@gmail.com --- server/async.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/server/async.c b/server/async.c index 1a564ff1a69..119e03440e4 100644 --- a/server/async.c +++ b/server/async.c @@ -513,8 +513,9 @@ void async_set_result( struct object *obj, unsigned int status, apc_param_t tota
if (async->queue) { - async->fd = NULL; list_remove( &async->queue_entry ); + async_reselect( async ); + async->fd = NULL; async->queue = NULL; release_object( async ); }
Signed-off-by: Dongwan Kim kdw6485@gmail.com --- dlls/ws2_32/tests/sock.c | 173 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 170 insertions(+), 3 deletions(-)
diff --git a/dlls/ws2_32/tests/sock.c b/dlls/ws2_32/tests/sock.c index 02713a7c625..5afdaa39ffe 100644 --- a/dlls/ws2_32/tests/sock.c +++ b/dlls/ws2_32/tests/sock.c @@ -162,16 +162,16 @@ static GUID WSARecvMsg_GUID = WSAID_WSARECVMSG; static SOCKET setup_server_socket(struct sockaddr_in *addr, int *len); static SOCKET setup_connector_socket(const struct sockaddr_in *addr, int len, BOOL nonblock);
-static void tcp_socketpair_flags(SOCKET *src, SOCKET *dst, DWORD flags) +static void tcp_socketpair_flags2(SOCKET *src, SOCKET *dst, DWORD srcflags, DWORD dstflags) { SOCKET server = INVALID_SOCKET; struct sockaddr_in addr; int len, ret;
- *src = WSASocketW(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, flags); + *src = WSASocketW(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, srcflags); ok(*src != INVALID_SOCKET, "failed to create socket, error %u\n", WSAGetLastError());
- server = WSASocketW(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, flags); + server = WSASocketW(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, dstflags); ok(server != INVALID_SOCKET, "failed to create socket, error %u\n", WSAGetLastError());
memset(&addr, 0, sizeof(addr)); @@ -196,6 +196,10 @@ static void tcp_socketpair_flags(SOCKET *src, SOCKET *dst, DWORD flags)
closesocket(server); } +static void tcp_socketpair_flags(SOCKET *src, SOCKET *dst, DWORD flags) +{ + tcp_socketpair_flags2(src,dst,flags,flags); +}
static void tcp_socketpair(SOCKET *src, SOCKET *dst) { @@ -11219,6 +11223,167 @@ static DWORD CALLBACK nonblocking_async_recv_thread(void *arg) return 0; }
+#define BUFFERSIZE 4096 +#define NUM_ASYNCS 32 +typedef struct +{ + OVERLAPPED overlapped; + CHAR buffer[BUFFERSIZE]; + WSABUF wsaBuf; +} PER_IO_DATA, * LPPER_IO_DATA; + +struct SocketInfo +{ + SOCKET fd; + PER_IO_DATA perio[NUM_ASYNCS]; +}; + +static HANDLE g_hIocp; +static unsigned char book[BUFFERSIZE<<10]; + +static DWORD CALLBACK async_send_file_client_thread(void* arg){ + int randlen[] = { 1476, 1886, 3887, + 1105, + 2460, + 3689, + 2942, + 3358, + 2221, + 3714, + 1739, + 2955, + 1769, + 1912, + 1401, + 1753 }; + + char sendbuf[BUFFERSIZE]; + char recvbuf[BUFFERSIZE]; + int iResult; + int offset=0; + int len=0; + int i=0; + SOCKET client = *(SOCKET*)arg; + + + for (i = 0; ; i = (i+1)%16 ) + { + len = offset + randlen[i] < sizeof(book) ? randlen[i] : sizeof(book)-offset ; + memcpy(sendbuf, book+offset , len); + offset += len; + iResult = send(client, sendbuf, len, 0); + if(i==15) + recv(client, recvbuf, BUFFERSIZE, 0); + if(offset >= sizeof(book)) + break; + } + shutdown(client, SD_SEND); + + do { + iResult = recv(client, recvbuf, 4096, 0); + } while (iResult > 0); + + closesocket(client); + + + return 0; +} +static DWORD CALLBACK async_recv_file_iocp_thread(void* arg){ + DWORD readn; + DWORD coKey; + DWORD flags = 0; + LPPER_IO_DATA perIoData; + struct SocketInfo* sInfo; + DWORD sent; + DWORD cnt=0; + DWORD freed_async=0; + DWORD offset=0; + int i=0; + int ret=0; + unsigned char total[38267]; + + + PER_IO_DATA sdata; + memcpy(sdata.buffer, "ok", 3); + sdata.wsaBuf.buf = sdata.buffer; + sdata.wsaBuf.len = 3; + while (1) + { + GetQueuedCompletionStatus(g_hIocp, &readn, (PULONG_PTR)&coKey , (LPOVERLAPPED*)&perIoData, INFINITE); + + sInfo = (struct SocketInfo*)coKey; + freed_async++; + if (readn == 0) + { + closesocket(sInfo->fd); + return 0; + } + memcpy(total+ cnt, perIoData->buffer, readn); + cnt+= readn; + if (cnt == 38267) + { + cnt = 0; + ok(memcmp(book+offset, total, 38267) == 0 , "message misordered...\n"); + offset+= 38267; + WSASend(sInfo->fd , &(sdata.wsaBuf), 1, &sent, 0, 0, NULL); + while(freed_async){ + readn = BUFFERSIZE; + ret = WSARecv(sInfo->fd, &sInfo->perio[i].wsaBuf, 1, &readn, &flags, (LPOVERLAPPED)&sInfo->perio[i], NULL); + if( (ret == SOCKET_ERROR && WSAGetLastError() == ERROR_IO_PENDING) || ret == ERROR_SUCCESS) + { + freed_async--; + i = (i+1)%NUM_ASYNCS; + } + } + } + + } + + return 0; +} + + +static void test_message_ordering(void) +{ + SOCKET client, server; + HANDLE hIOCP; + DWORD readn; + struct SocketInfo sInfo={0}; + DWORD flags=0; + int i=0; + + /*initialization*/ + tcp_socketpair_flags2(&client, &server, 0, WSA_FLAG_OVERLAPPED); + + set_blocking(client, TRUE); + set_blocking(server, FALSE); + for(i=0; i< sizeof(book); i++) + book[i] = rand()%256; + + CreateThread(0,0, async_send_file_client_thread, &client,0, 0); + hIOCP = CreateThread(0,0, async_recv_file_iocp_thread, 0,0,0); + + g_hIocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); + + sInfo.fd = server; + for (i = 0; i < NUM_ASYNCS; i++) { + sInfo.perio[i].wsaBuf.buf = sInfo.perio[i].buffer; + sInfo.perio[i].wsaBuf.len = BUFFERSIZE; + } + + g_hIocp = CreateIoCompletionPort((HANDLE)server, g_hIocp, (unsigned long)&sInfo, 0); + ok(g_hIocp != NULL, "Failed to create Iocompletion"); + + + for (i = 0; i < NUM_ASYNCS; i++) { + readn = BUFFERSIZE; + WSARecv(sInfo.fd, &sInfo.perio[i].wsaBuf, 1, &readn, &flags, (LPOVERLAPPED)&sInfo.perio[i], NULL) ; + } + + + WaitForSingleObject(hIOCP, INFINITE); + CloseHandle(g_hIocp); +} static void test_nonblocking_async_recv(void) { struct nonblocking_async_recv_params params; @@ -11934,6 +12099,8 @@ START_TEST( sock ) test_shutdown(); test_DisconnectEx();
+ test_message_ordering(); + test_completion_port(); test_connect_completion_port(); test_shutdown_completion_port();
Wine-Bug: https://bugs.winehq.org/show_bug.cgi?id=52401
This has been replaced with another patch serie:
https://www.winehq.org/pipermail/wine-devel/2022-January/205154.html
Could you take a look at it and leave some feedback?
I apologise that I forgot to CC you.