WSARecv call should not intercept message for the waiting overlapped WSARecv. To do that, We should know if waiting asyncs exist before trying ws2_recv/ws2_send.
Signed-off-by: Dongwan Kim kdw6485@gmail.com --- dlls/ws2_32/socket.c | 31 +++++++++++++++++++++++++++++-- include/wine/server_protocol.h | 20 +++++++++++++++++++- server/protocol.def | 8 ++++++++ server/request.h | 8 ++++++++ server/sock.c | 26 ++++++++++++++++++++++++++ server/trace.c | 12 ++++++++++++ 6 files changed, 102 insertions(+), 3 deletions(-)
diff --git a/dlls/ws2_32/socket.c b/dlls/ws2_32/socket.c index 2ba1982b1d9..b3d5b92c610 100644 --- a/dlls/ws2_32/socket.c +++ b/dlls/ws2_32/socket.c @@ -2016,6 +2016,23 @@ static void WINAPI ws2_async_apc( void *arg, IO_STATUS_BLOCK *iosb, ULONG reserv release_async_io( &wsa->io ); }
+/* query if the socket has pending asyncs */ +static int WS_QueryAsyncWaiting( SOCKET sock, int type ) +{ + int result=0; + + SERVER_START_REQ( query_async_waiting ) + { + req->handle = wine_server_obj_handle( SOCKET2HANDLE(sock) ); + req->type = type; + wine_server_call(req); + result = reply->state; + } + SERVER_END_REQ; + return result; + +} + /*********************************************************************** * WS2_recv (INTERNAL) * @@ -5048,7 +5065,12 @@ static int WS2_sendto( SOCKET s, LPWSABUF lpBuffers, DWORD dwBufferCount, }
flags = convert_flags(dwFlags); - n = WS2_send( fd, wsa, flags ); + if(WS_QueryAsyncWaiting(s , ASYNC_TYPE_WRITE)) + { + n = -1; errno = EAGAIN; + } + else + n = WS2_send( fd, wsa, flags ); if (n == -1 && errno != EAGAIN) { err = wsaErrno(); @@ -6128,7 +6150,12 @@ static int WS2_recv_base( SOCKET s, LPWSABUF lpBuffers, DWORD dwBufferCount, flags = convert_flags(wsa->flags); for (;;) { - n = WS2_recv( fd, wsa, flags ); + if(WS_QueryAsyncWaiting(s, ASYNC_TYPE_READ)) + { + n = -1; errno = EAGAIN; + } + else + n = WS2_recv( fd, wsa, flags ); if (n == -1) { /* Unix-like systems return EINVAL when attempting to read OOB data from diff --git a/include/wine/server_protocol.h b/include/wine/server_protocol.h index 27557f2c1d7..dd0b6f0ba65 100644 --- a/include/wine/server_protocol.h +++ b/include/wine/server_protocol.h @@ -5403,6 +5403,21 @@ struct get_next_thread_reply char __pad_12[4]; };
+struct query_async_waiting_request +{ + struct request_header __header; + obj_handle_t handle; + int type; + char __pad_20[4]; + +}; +struct query_async_waiting_reply +{ + struct reply_header __header; + int state; + char __pad_12[4]; +}; +
enum request { @@ -5680,6 +5695,7 @@ enum request REQ_suspend_process, REQ_resume_process, REQ_get_next_thread, + REQ_query_async_waiting, REQ_NB_REQUESTS };
@@ -5961,6 +5977,7 @@ union generic_request struct suspend_process_request suspend_process_request; struct resume_process_request resume_process_request; struct get_next_thread_request get_next_thread_request; + struct query_async_waiting_request query_async_waiting_request; }; union generic_reply { @@ -6240,11 +6257,12 @@ union generic_reply struct suspend_process_reply suspend_process_reply; struct resume_process_reply resume_process_reply; struct get_next_thread_reply get_next_thread_reply; + struct query_async_waiting_reply query_async_waiting_reply; };
/* ### protocol_version begin ### */
-#define SERVER_PROTOCOL_VERSION 701 +#define SERVER_PROTOCOL_VERSION 702
/* ### protocol_version end ### */
diff --git a/server/protocol.def b/server/protocol.def index 6d8208b128b..da754cc89b6 100644 --- a/server/protocol.def +++ b/server/protocol.def @@ -3711,3 +3711,11 @@ struct handle_info @REPLY obj_handle_t handle; /* next thread handle */ @END + +/* Query if there are waiting asyncs of the socket */ +@REQ(query_async_waiting) + obj_handle_t handle; /* socket handle */ + int type; /* async type */ +@REPLY + int state; /* waiting async exists */ +@END diff --git a/server/request.h b/server/request.h index 41810b89299..88eee241b9b 100644 --- a/server/request.h +++ b/server/request.h @@ -393,6 +393,7 @@ DECL_HANDLER(terminate_job); DECL_HANDLER(suspend_process); DECL_HANDLER(resume_process); DECL_HANDLER(get_next_thread); +DECL_HANDLER(query_async_waiting);
#ifdef WANT_REQUEST_HANDLERS
@@ -673,6 +674,7 @@ static const req_handler req_handlers[REQ_NB_REQUESTS] = (req_handler)req_suspend_process, (req_handler)req_resume_process, (req_handler)req_get_next_thread, + (req_handler)req_query_async_waiting, };
C_ASSERT( sizeof(abstime_t) == 8 ); @@ -2236,6 +2238,12 @@ C_ASSERT( FIELD_OFFSET(struct get_next_thread_request, flags) == 28 ); C_ASSERT( sizeof(struct get_next_thread_request) == 32 ); C_ASSERT( FIELD_OFFSET(struct get_next_thread_reply, handle) == 8 ); C_ASSERT( sizeof(struct get_next_thread_reply) == 16 ); +C_ASSERT( FIELD_OFFSET(struct query_async_waiting_request, handle) == 12 ); +C_ASSERT( FIELD_OFFSET(struct query_async_waiting_request, type) == 16 ); +C_ASSERT( sizeof(struct query_async_waiting_request) == 24 ); +C_ASSERT( FIELD_OFFSET(struct query_async_waiting_reply, state) == 8 ); +C_ASSERT( sizeof(struct query_async_waiting_reply) == 16 ); +
#endif /* WANT_REQUEST_HANDLERS */
diff --git a/server/sock.c b/server/sock.c index 565fb4c5a2c..c1142d75e2f 100644 --- a/server/sock.c +++ b/server/sock.c @@ -1923,3 +1923,29 @@ DECL_HANDLER(get_socket_info)
release_object( &sock->obj ); } + +DECL_HANDLER( query_async_waiting ) +{ + struct sock *sock; + struct async *async; + reply->state = 0; + + if (!(sock = (struct sock *)get_handle_obj( current->process, req->handle, + FILE_READ_ATTRIBUTES, &sock_ops))) return; + if (get_unix_fd( sock->fd ) == -1) return; + + if (is_fd_overlapped( sock->fd )) + { + if(req->type == ASYNC_TYPE_READ && ( async = find_pending_async(&sock->read_q) )) { + reply->state = 1; + release_object(async); + } + if(req->type == ASYNC_TYPE_WRITE && ( async = find_pending_async(&sock->write_q )) ) + { + reply->state = 1; + release_object(async); + } + } + release_object( &sock->obj ); + +} diff --git a/server/trace.c b/server/trace.c index ad7236dd393..63960a5fec4 100644 --- a/server/trace.c +++ b/server/trace.c @@ -4478,6 +4478,15 @@ static void dump_get_next_thread_reply( const struct get_next_thread_reply *req { fprintf( stderr, " handle=%04x", req->handle ); } +static void dump_query_async_waiting_request( const struct query_async_waiting_request *req ) +{ + fprintf( stderr, " handle=%04x", req->handle ); + fprintf( stderr, ", type=%d", req->type ); +} +static void dump_query_async_waiting_reply( const struct query_async_waiting_reply *req ) +{ + fprintf( stderr, " state=%d", req->state ); +}
static const dump_func req_dumpers[REQ_NB_REQUESTS] = { (dump_func)dump_new_process_request, @@ -4754,6 +4763,7 @@ static const dump_func req_dumpers[REQ_NB_REQUESTS] = { (dump_func)dump_suspend_process_request, (dump_func)dump_resume_process_request, (dump_func)dump_get_next_thread_request, + (dump_func)dump_query_async_waiting_request, };
static const dump_func reply_dumpers[REQ_NB_REQUESTS] = { @@ -5031,6 +5041,7 @@ static const dump_func reply_dumpers[REQ_NB_REQUESTS] = { NULL, NULL, (dump_func)dump_get_next_thread_reply, + (dump_func)dump_query_async_waiting_reply, };
static const char * const req_names[REQ_NB_REQUESTS] = { @@ -5308,6 +5319,7 @@ static const char * const req_names[REQ_NB_REQUESTS] = { "suspend_process", "resume_process", "get_next_thread", + "query_async_waiting", };
static const struct
Signed-off-by: Dongwan Kim kdw6485@gmail.com --- dlls/ws2_32/tests/sock.c | 124 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 124 insertions(+)
diff --git a/dlls/ws2_32/tests/sock.c b/dlls/ws2_32/tests/sock.c index 33cdd5d6d15..bca3db9cbe6 100644 --- a/dlls/ws2_32/tests/sock.c +++ b/dlls/ws2_32/tests/sock.c @@ -9903,6 +9903,129 @@ static void test_empty_recv(void) CloseHandle(overlapped.hEvent); }
+/* io data for WSARecv */ +typedef struct +{ + OVERLAPPED overlapped; + CHAR buffer[8000]; + WSABUF wsaBuf; +}IO_DATA; +struct async_worker_params +{ + SOCKET socket; + DWORD ret; +}; + +static IO_DATA g_iodata[16]; +static HANDLE g_hIocp; +static DWORD check_ordering(char* buffer, int len , int cnt) +{ + int i; + for(i=0; i< len; i++) + if((unsigned char )buffer[i] != (cnt+i) % 256){ + ok((unsigned char)buffer[i] == (cnt+i)%256, "%d , expected %d\n",(unsigned char)buffer[i], (cnt+i)%256); + return 0; + } + return 1; +} +static DWORD CALLBACK async_recv_worker(void* arg) +{ + struct async_worker_params *params = arg; + int i ; + DWORD readn, arranged=1, cnt=0; + ULONG_PTR coKey; + DWORD flags = 0; + IO_DATA* io_data; + + while(1) + { + GetQueuedCompletionStatus(g_hIocp, &readn, &coKey, (LPOVERLAPPED*)&io_data, INFINITE); + if(readn == 0 ) + { + break; + } + arranged = arranged & check_ordering(io_data->buffer, readn, cnt); + cnt+= readn; + if(cnt == 16*4096) + { + cnt=0; + send(params->socket, "1", 1,0); + for(i=0;i < 16; i++) + { + WSARecv(params->socket, &g_iodata[i].wsaBuf, 1, &readn, &flags, (LPOVERLAPPED)&g_iodata[i], NULL); + } + } + } + params->ret = arranged; + return 0; +} +static DWORD CALLBACK async_recv_until_close(SOCKET server) +{ + int i; + DWORD flags=0; + + HANDLE worker; + + struct async_worker_params params; + params.socket = server; + worker = CreateThread(NULL,0,async_recv_worker, ¶ms, 0, NULL); + g_hIocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0,0 ); + + g_hIocp = CreateIoCompletionPort((HANDLE)server, g_hIocp, 0, 0); + + for(i=0; i< 16; i++) + { + g_iodata[i].wsaBuf.buf = g_iodata[i].buffer; + g_iodata[i].wsaBuf.len = sizeof(g_iodata[i].buffer); + WSARecv(server, &g_iodata[i].wsaBuf, 1, NULL, &flags, (LPOVERLAPPED)&g_iodata[i], NULL); + } + WaitForSingleObject(worker,INFINITE); + + return params.ret; + +} +static DWORD CALLBACK send_large_data( void* arg ) +{ + SOCKET *client = arg; + int i,j; + unsigned char sendbuffer[4096]; + char recvbuffer[5]; + for(i=0; i < sizeof(sendbuffer); i++) + { + sendbuffer[i] = i %256; + } + + for(j=0; j< 1000 ; j++) + { + for(i=0; i < 16; i++) + { + send(*client , (char*)sendbuffer, sizeof(sendbuffer), 0 ); + + } + recv(*client, recvbuffer, sizeof(recvbuffer), 0); + } + shutdown(*client, SD_SEND); + return 0; +} + +static void test_message_ordering(void) +{ + + SOCKET client, server; + DWORD ret; + HANDLE thread; + /* message ordering test */ + + tcp_socketpair(&client, &server); + thread = CreateThread(NULL, 0 , send_large_data, &client,0,NULL); + ret = async_recv_until_close(server); + WaitForSingleObject(thread,INFINITE); + ok(ret == 1, "message ordering failed\n"); + closesocket(client); + closesocket(server); + +} + START_TEST( sock ) { int i; @@ -9966,6 +10089,7 @@ START_TEST( sock ) test_empty_recv();
/* this is an io heavy test, do it at the end so the kernel doesn't start dropping packets */ + test_message_ordering(); test_send(); test_synchronous_WSAIoctl(); test_wsaioctl();
Hi,
While running your changed tests, I think I found new failures. Being a bot and all I'm not very good at pattern recognition, so I might be wrong, but could you please double-check?
Full results can be found at: https://testbot.winehq.org/JobDetails.pl?Key=90659
Your paranoid android.
=== w7u_adm (32 bit report) ===
ws2_32: sock: Timeout
=== w8 (32 bit report) ===
ws2_32: sock: Timeout
=== w8adm (32 bit report) ===
ws2_32: sock.c:9926: Test failed: 180 , expected 0 sock.c:9926: Test failed: 104 , expected 0 sock.c:9926: Test failed: 180 , expected 0 sock.c:9926: Test failed: 104 , expected 0 sock: Timeout
=== debiant2 (32 bit Chinese:China report) ===
ws2_32: sock.c:9926: Test failed: 0 , expected 128 sock.c:9926: Test failed: 128 , expected 64 sock.c:10023: Test failed: message ordering failed
=== debiant2 (32 bit WoW report) ===
ws2_32: sock.c:9926: Test failed: 0 , expected 64 sock.c:9926: Test failed: 128 , expected 64 sock: Timeout