Calling try_recv could make messages misordered.
Suppose that a server program calls a overlapped WSARecv and its operation is pending. Another program sends a message and wineserver is busy. The server program calls another overlapped WSARecv and it intercepts the message for the pending WSARecv.
The problem already had been discussed here https://www.winehq.org/pipermail/wine-devel/2021-May/186612.html
To avoid this situation, this kind of approach can be applied.
The server program needs to know if there are pending asyncs before calling try_recv. Wineserver notifies it whenever invoking APC_ASYNC_IO. Then, the server program updates it and check it before calling try_recv.
Signed-off-by: Dongwan Kim kdw6485@gmail.com --- dlls/ntdll/unix/server.c | 8 +++---- dlls/ntdll/unix/socket.c | 42 +++++++++++++++++++++++++++++++++- include/wine/server_protocol.h | 1 + server/async.c | 8 +++++++ 4 files changed, 54 insertions(+), 5 deletions(-)
diff --git a/dlls/ntdll/unix/server.c b/dlls/ntdll/unix/server.c index a388247beb2..047da6e0974 100644 --- a/dlls/ntdll/unix/server.c +++ b/dlls/ntdll/unix/server.c @@ -368,17 +368,17 @@ static void invoke_system_apc( const apc_call_t *call, apc_result_t *result, BOO case APC_ASYNC_IO: { struct async_fileio *user = wine_server_get_ptr( call->async_io.user ); - ULONG_PTR info = call->async_io.result; + ULONG_PTR info[2] = { call->async_io.result, call->async_io.async_wait }; NTSTATUS status;
result->type = call->type; status = call->async_io.status; - if (user->callback( user, &info, &status )) + if (user->callback( user, info, &status )) { result->async_io.status = status; - result->async_io.total = info; + result->async_io.total = *info; /* the server will pass us NULL if a call failed synchronously */ - set_async_iosb( call->async_io.sb, result->async_io.status, info ); + set_async_iosb( call->async_io.sb, result->async_io.status, *info ); } else result->async_io.status = STATUS_PENDING; /* restart it */ break; diff --git a/dlls/ntdll/unix/socket.c b/dlls/ntdll/unix/socket.c index 92374e39db7..73c92d26b5f 100644 --- a/dlls/ntdll/unix/socket.c +++ b/dlls/ntdll/unix/socket.c @@ -651,13 +651,45 @@ static NTSTATUS try_recv( int fd, struct async_recv_ioctl *async, ULONG_PTR *siz return status; }
+struct async_queue{ + struct list entry; + HANDLE sock; + char read_q; + char write_q; +}; +static struct list async_queue_list = LIST_INIT(async_queue_list); + + +static struct async_queue* alloc_async_queue(HANDLE handle){ + struct async_queue *queue = + (struct async_queue*)malloc(sizeof(struct async_queue)); + memset(queue, 0, sizeof(struct async_queue)); + + queue->sock = handle; + list_add_head(&async_queue_list, &queue->entry); + return queue; +} + +static struct async_queue* find_async_queue(HANDLE handle){ + struct async_queue *queue; + + LIST_FOR_EACH_ENTRY(queue, &async_queue_list, struct async_queue, entry){ + if(queue->sock == handle) + return queue; + } + return NULL; +} static BOOL async_recv_proc( void *user, ULONG_PTR *info, NTSTATUS *status ) { struct async_recv_ioctl *async = user; + struct async_queue *queue; int fd, needs_close;
TRACE( "%#x\n", *status );
+ queue = find_async_queue(async->io.handle); + if(queue) + queue->read_q = info[1]; //store if waiting asyncs exist if (*status == STATUS_ALERTED) { if ((*status = server_get_unix_fd( async->io.handle, 0, &fd, &needs_close, NULL, NULL ))) @@ -679,6 +711,7 @@ static NTSTATUS sock_recv( HANDLE handle, HANDLE event, PIO_APC_ROUTINE apc, voi struct WS_sockaddr *addr, int *addr_len, DWORD *ret_flags, int unix_flags, int force_async ) { struct async_recv_ioctl *async; + struct async_queue *queue; ULONG_PTR information; HANDLE wait_handle; DWORD async_size; @@ -735,7 +768,13 @@ static NTSTATUS sock_recv( HANDLE handle, HANDLE event, PIO_APC_ROUTINE apc, voi } }
- status = try_recv( fd, async, &information ); + queue = find_async_queue(handle); + if(!queue) + queue = alloc_async_queue(handle); + if(queue->read_q ) // if there are waiting asyncs, avoid cutting in line. + status = STATUS_DEVICE_NOT_READY; + else + status = try_recv( fd, async, &information );
if (status != STATUS_SUCCESS && status != STATUS_BUFFER_OVERFLOW && status != STATUS_DEVICE_NOT_READY) { @@ -764,6 +803,7 @@ static NTSTATUS sock_recv( HANDLE handle, HANDLE event, PIO_APC_ROUTINE apc, voi SERVER_END_REQ;
if (status != STATUS_PENDING) release_fileio( &async->io ); + else queue->read_q = 1;
if (wait_handle) status = wait_async( wait_handle, options & FILE_SYNCHRONOUS_IO_ALERT ); return status; diff --git a/include/wine/server_protocol.h b/include/wine/server_protocol.h index 1d17a40530f..ce26240bc9a 100644 --- a/include/wine/server_protocol.h +++ b/include/wine/server_protocol.h @@ -492,6 +492,7 @@ typedef union client_ptr_t user; client_ptr_t sb; data_size_t result; + unsigned int async_wait; } async_io; struct { diff --git a/server/async.c b/server/async.c index 1a564ff1a69..5d6cc9770c3 100644 --- a/server/async.c +++ b/server/async.c @@ -164,6 +164,7 @@ static void async_destroy( struct object *obj ) void async_terminate( struct async *async, unsigned int status ) { struct iosb *iosb = async->iosb; + struct async* async_waiting;
if (async->terminated) return;
@@ -202,6 +203,13 @@ void async_terminate( struct async *async, unsigned int status ) else data.async_io.status = status;
+ async_waiting = find_pending_async(async->queue); + if(async_waiting) + { + data.async_io.async_wait = 1; + release_object(async_waiting); + } + thread_queue_apc( async->thread->process, async->thread, &async->obj, &data ); }
fixed to call async_reselect after list_remove so that server can wait for the event POLLIN again, if there are waiting asyncs in queue.
Signed-off-by: Dongwan Kim kdw6485@gmail.com --- server/async.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/server/async.c b/server/async.c index 5d6cc9770c3..4135e25b2d5 100644 --- a/server/async.c +++ b/server/async.c @@ -521,8 +521,9 @@ void async_set_result( struct object *obj, unsigned int status, apc_param_t tota
if (async->queue) { - async->fd = NULL; list_remove( &async->queue_entry ); + async_reselect( async ); + async->fd = NULL; async->queue = NULL; release_object( async ); }
Signed-off-by: Dongwan Kim kdw6485@gmail.com --- dlls/ws2_32/tests/sock.c | 162 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 159 insertions(+), 3 deletions(-)
diff --git a/dlls/ws2_32/tests/sock.c b/dlls/ws2_32/tests/sock.c index 02713a7c625..47ce08088a0 100644 --- a/dlls/ws2_32/tests/sock.c +++ b/dlls/ws2_32/tests/sock.c @@ -162,16 +162,16 @@ static GUID WSARecvMsg_GUID = WSAID_WSARECVMSG; static SOCKET setup_server_socket(struct sockaddr_in *addr, int *len); static SOCKET setup_connector_socket(const struct sockaddr_in *addr, int len, BOOL nonblock);
-static void tcp_socketpair_flags(SOCKET *src, SOCKET *dst, DWORD flags) +static void tcp_socketpair_flags2(SOCKET *src, SOCKET *dst, DWORD srcflags, DWORD dstflags) { SOCKET server = INVALID_SOCKET; struct sockaddr_in addr; int len, ret;
- *src = WSASocketW(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, flags); + *src = WSASocketW(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, srcflags); ok(*src != INVALID_SOCKET, "failed to create socket, error %u\n", WSAGetLastError());
- server = WSASocketW(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, flags); + server = WSASocketW(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, dstflags); ok(server != INVALID_SOCKET, "failed to create socket, error %u\n", WSAGetLastError());
memset(&addr, 0, sizeof(addr)); @@ -196,6 +196,10 @@ static void tcp_socketpair_flags(SOCKET *src, SOCKET *dst, DWORD flags)
closesocket(server); } +static void tcp_socketpair_flags(SOCKET *src, SOCKET *dst, DWORD flags) +{ + tcp_socketpair_flags2(src,dst,flags,flags); +}
static void tcp_socketpair(SOCKET *src, SOCKET *dst) { @@ -11219,6 +11223,155 @@ static DWORD CALLBACK nonblocking_async_recv_thread(void *arg) return 0; }
+#define BUFFERSIZE 4096 +typedef struct +{ + OVERLAPPED overlapped; + CHAR buffer[BUFFERSIZE]; + WSABUF wsaBuf; +} PER_IO_DATA, * LPPER_IO_DATA; + +struct SocketInfo +{ + SOCKET fd; + PER_IO_DATA perio[16]; +}; + +static HANDLE g_hIocp; +static unsigned char book[BUFFERSIZE<<10]; + +static DWORD CALLBACK async_send_file_client_thread(void* arg){ + int randlen[] = { 1476, 1886, 3887, + 1105, + 2460, + 3689, + 2942, + 3358, + 2221, + 3714, + 1739, + 2955, + 1769, + 1912, + 1401, + 1753 }; + + char sendbuf[BUFFERSIZE]; + char recvbuf[BUFFERSIZE]; + int iResult; + SOCKET client = *(SOCKET*)arg; + int offset=0; + int len=0; + + + for (int i = 0; ; i = (i+1)%16 ) + { + len = offset + randlen[i] < sizeof(book) ? randlen[i] : sizeof(book)-offset ; + memcpy(sendbuf, book+offset , len); + offset += len; + iResult = send(client, sendbuf, len, 0); + if(i==15) + recv(client, recvbuf, BUFFERSIZE, 0); + if(offset >= sizeof(book)) + break; + } + shutdown(client, SD_SEND); + + do { + iResult = recv(client, recvbuf, 4096, 0); + } while (iResult > 0); + + closesocket(client); + + + return 0; +} +static DWORD CALLBACK async_recv_file_iocp_thread(void* arg){ + DWORD readn; + DWORD coKey; + DWORD flags = 0; + LPPER_IO_DATA perIoData; + struct SocketInfo* sInfo; + DWORD sent; + DWORD cnt=0; + DWORD offset=0; + unsigned char total[38267]; + + + PER_IO_DATA sdata; + memcpy(sdata.buffer, "ok", 3); + sdata.wsaBuf.buf = sdata.buffer; + sdata.wsaBuf.len = 3; + while (1) + { + GetQueuedCompletionStatus(g_hIocp, &readn, (PULONG_PTR)&coKey , (LPOVERLAPPED*)&perIoData, INFINITE); + + sInfo = (struct SocketInfo*)coKey; + if (readn == 0) + { + closesocket(sInfo->fd); + return 0; + } + memcpy(total+ cnt, perIoData->buffer, readn); + cnt+= readn; + if (cnt == 38267) + { + cnt = 0; + ok(memcmp(book+offset, total, 38267) == 0 , "message misordered...\n"); + offset+= 38267; + WSASend(sInfo->fd , &(sdata.wsaBuf), 1, &sent, 0, 0, NULL); + for (int i = 0; i < 16; i++) { + readn = BUFFERSIZE; + WSARecv(sInfo->fd, &sInfo->perio[i].wsaBuf, 1, &readn, &flags, (LPOVERLAPPED)&sInfo->perio[i], NULL); + } + } + + } + + return 0; +} + + +static void test_message_ordering(void) +{ + SOCKET client, server; + HANDLE hIOCP; + DWORD readn; + struct SocketInfo sInfo; + DWORD flags=0; + + /*initialization*/ + tcp_socketpair_flags2(&client, &server, 0, WSA_FLAG_OVERLAPPED); + + set_blocking(client, TRUE); + set_blocking(server, FALSE); + for(int i=0; i< sizeof(book); i++) + book[i] = rand()%256; + + CreateThread(0,0, async_send_file_client_thread, &client,0, 0); + hIOCP = CreateThread(0,0, async_recv_file_iocp_thread, 0,0,0); + + g_hIocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); + + sInfo.fd = server; + for (int i = 0; i < 16; i++) { + sInfo.perio[i].wsaBuf.buf = sInfo.perio[i].buffer; + sInfo.perio[i].wsaBuf.len = BUFFERSIZE; + } + + g_hIocp = CreateIoCompletionPort((HANDLE)server, g_hIocp, (unsigned long)&sInfo, 0); + ok(g_hIocp != NULL, "Failed to create Iocompletion"); + + + for (int i = 0; i < 16; i++) { + readn = BUFFERSIZE; + WSARecv(sInfo.fd, &sInfo.perio[i].wsaBuf, 1, &readn, &flags, (LPOVERLAPPED)&sInfo.perio[i], NULL) ; + } + + + WaitForSingleObject(hIOCP, INFINITE); + CloseHandle(g_hIocp); +} static void test_nonblocking_async_recv(void) { struct nonblocking_async_recv_params params; @@ -11882,6 +12035,8 @@ START_TEST( sock )
Init();
+ test_message_ordering(); + test_set_getsockopt(); test_so_reuseaddr(); test_ip_pktinfo(); @@ -11934,6 +12089,7 @@ START_TEST( sock ) test_shutdown(); test_DisconnectEx();
+ test_completion_port(); test_connect_completion_port(); test_shutdown_completion_port();
Hi,
While running your changed tests, I think I found new failures. Being a bot and all I'm not very good at pattern recognition, so I might be wrong, but could you please double-check?
Full results can be found at: https://testbot.winehq.org/JobDetails.pl?Key=104134
Your paranoid android.
=== w7u_2qxl (32 bit report) ===
ws2_32: sock: Timeout
=== w7u_adm (32 bit report) ===
ws2_32: sock: Timeout
=== w7u_el (32 bit report) ===
ws2_32: sock: Timeout
=== w8 (32 bit report) ===
ws2_32: sock: Timeout
=== w8adm (32 bit report) ===
ws2_32: sock: Timeout
=== w864 (32 bit report) ===
ws2_32: sock: Timeout
=== w1064v1507 (32 bit report) ===
ws2_32: sock: Timeout
=== w1064v1809 (32 bit report) ===
ws2_32: sock: Timeout
=== w1064 (32 bit report) ===
ws2_32: sock: Timeout
=== w1064_tsign (32 bit report) ===
ws2_32: sock: Timeout
=== w864 (64 bit report) ===
ws2_32: sock: Timeout
=== w1064v1507 (64 bit report) ===
ws2_32: sock: Timeout
=== w1064v1809 (64 bit report) ===
ws2_32: sock: Timeout
=== w1064 (64 bit report) ===
ws2_32: sock: Timeout
=== w1064_2qxl (64 bit report) ===
ws2_32: sock: Timeout
=== w1064_tsign (64 bit report) ===
ws2_32: sock: Timeout
=== w10pro64 (64 bit report) ===
ws2_32: sock: Timeout
=== w10pro64_ar (64 bit report) ===
ws2_32: sock: Timeout
=== w10pro64_he (64 bit report) ===
ws2_32: sock: Timeout
=== w10pro64_ja (64 bit report) ===
ws2_32: sock: Timeout
=== w10pro64_zh_CN (64 bit report) ===
ws2_32: sock: Timeout
=== debian11 (32 bit report) ===
ws2_32: sock.c:1139: Test failed: wait failed, error 258