?在上篇介绍了重叠IO的基本知识并使用事件的方式实现了第一个版本,但大家知道使用事件的缺点,因为WaitForSingleObject函数最多只能等待64个事件,所以要想处理更多的客户端得通过多个工作者线程来同时监视Event对象,这样便使处理更加麻烦,而且和事件选择其实一样依旧存在着部分阻塞。
所以今天来看第二种实现重叠IO的方法,即使用Completion Routine。接着上篇,我们说这种方法是跟WSASend和WSARecv函数的最后一个参数有关的,所以再来看看其原型:
int WSARecv (
SOCKET s, // 套接字句柄
LPWSABUF lpBuffers, // 指向待接收数据缓冲区
DWORD dwBufferCount, // lpBuffers数组的长度
LPDWORD lpNumberOfBytesRecvd, // 保存实际接收的字节数
LPDWORD lpFlags, // 数据传输标志
LPWSAOVERLAPPED lpOverlapped, // 指向重叠结构
LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionROUTINE // 指向Completion Routine函数
);
WSASend和WSARecv同理,所以只看一个便好了。可以看到,最后一个参数是一个函数指针,这个指向的函数原型必须是这样的:
void?CALLBACK?CompletionRoutine(
????DWORD?dwError,??//?错误信息
????DWORD?dwTransferred,??//?实际收发的字节数
????LPWSAOVERLAPPED?lpOverlapped,??//?就是WSARecv和WSASend中的lpOverlapped
????DWORD?dwFlags??//?标志
);
WSASend和WSARecv函数会保存最后一个参数中我们传入的函数,当系统处理完后再自动调用这个函数,那么系统是如何得知这些参数的呢?其实就是在WSARecv函数的参数中指定的,想想上篇说的WSAOVERLAPPED结构体中的Internal和InternalHigh就分别保存着错误码和实际收发的字节数。
当有接收到用户消息后,系统就通过这些参数来调用我们传进去的函数指针,我们在自己创建的函数中作处理便可以了。这就把如何确认发生了消息这部分变得非常简单了,而且这部分的阻塞也不存在了,这才真正实现了非阻塞处理。想想我们前面学习的各种方法都得去开个循环确认是否有消息要处理,处理不便且有阻塞,你就会发现这种方式真的是方便多了。
所以这种方法的基本操作都和第一种方法是一样的,主要是在这最后一个函数指针上。还是直接通过代码来说吧,毕竟千言不及一码清晰,首先还是来先定义一个结构体供我们使用:
typedef?struct?{
????SOCKET?hClntSock;
????char?buf[BUF_SIZE];
????WSABUF?wsaBuf;
}PER_IO_DATA,?*LPPER_IO_DATA;
这就是我们在这里用的小纸条了,其中包含着客户端套接字和接收数据的缓冲区,我们为其多加一个指针别名方便使用。
接着来看类的定义:
class?COverlappedServer
{
public:
????COverlappedServer(int?port);
????~COverlappedServer();
????void?Accept();
private:
????void?InitSock();
????void?RequestHandler();
????static?void?CALLBACK?ReadCmplRoutine(DWORD,?DWORD,?LPWSAOVERLAPPED,?DWORD);
????static?void?CALLBACK?WriteCmplRoutine(DWORD,?DWORD,?LPWSAOVERLAPPED,?DWORD);
private:
????SOCKET?m_hListenSock;??//?监听套接字
????SOCKADDR_IN?m_listenAddr;??//?监听套接字地址
????LPWSAOVERLAPPED?m_lpOverlapped;??//?重叠结构
????LPPER_IO_DATA?m_hIoInfo;??//?单IO数据
????int?m_nPort;??//?端口
};
在这里我们声明了两个Completion Routine函数,ReadCmplRoutine函数用于在接收数据后调用,WriteCmplRoutine函数用于在发送数据后调用。CALLBACK其实就是stdcall调用约定,函数调用约定的目的是指定栈平衡的方式,关于这个若是讲逆向时我会专门总结一篇文章的。
大家可能发现这里还有一个WSAOVERLAPPED指针,不是使用第二种方法了吗?怎么还需要这个参数。其实在这里它的作用不再是用于事件了,稍后你将看到它起的作用。
InitSock函数和第一种方法的操作是一样的,看看就好,若有哪里不懂,可回上篇参考:
void?COverlappedServer::InitSock()
{
????WSADATA?wsaData;
????int?ret?=?WSAStartup(MAKEWORD(2,?2),?&wsaData);
????assert(ret?==?0);
????//?创建重叠IO套接字
????m_hListenSock?=?WSASocketW(PF_INET,?SOCK_STREAM,?0,?NULL,?0,?WSA_FLAG_OVERLAPPED);
????//?更改socket?IO选项为非0,非0即为非阻塞套接字
????ULONG?ulMode?=?1;
????ioctlsocket(m_hListenSock,?FIONBIO,?&ulMode);
????memset(&m_listenAddr,?0,?sizeof(m_listenAddr));
????m_listenAddr.sin_family?=?AF_INET;
????m_listenAddr.sin_addr.s_addr?=?htonl(INADDR_ANY);
????m_listenAddr.sin_port?=?htons(m_nPort);
????ret?=?bind(m_hListenSock,?(SOCKADDR?*)&m_listenAddr,?sizeof(m_listenAddr));
????assert(ret?!=?SOCKET_ERROR);
????ret?=?listen(m_hListenSock,?5);
????assert(ret?!=?SOCKET_ERROR);
}
现在来看最主要的部分,RequestHandler函数:
1void?COverlappedServer::RequestHandler()
2{
3????SOCKET?hClntSock;
4????SOCKADDR_IN?clntAddr;
5????int?addrLen?=?sizeof(clntAddr);
6????while?(true)
7????{
8????????SleepEx(100,?TRUE);??//?设为alertable?wait以调用例行程序
9
10????????hClntSock?=?accept(m_hListenSock,?(SOCKADDR?*)&clntAddr,?&addrLen);
11????????if?(hClntSock?==?INVALID_SOCKET)
12????????{
13????????????if?(WSAGetLastError()?==?WSAEWOULDBLOCK)
14????????????????continue;
15????????????else
16????????????{
17????????????????std::cerr?<<?"accept()?error!"?<<?std::endl;
18????????????????break;
19????????????}
20????????}
21
22????????std::cout?<<?"connected?client...."?<<?hClntSock?<<??std::endl;
23
24????????m_lpOverlapped?=?new?WSAOVERLAPPED;
25????????ZeroMemory(m_lpOverlapped,?sizeof(m_lpOverlapped));
26
27????????m_hIoInfo?=?new?PER_IO_DATA;
28????????m_hIoInfo->hClntSock?=?hClntSock;
29????????m_hIoInfo->wsaBuf.buf?=?m_hIoInfo->buf;
30????????m_hIoInfo->wsaBuf.len?=?BUF_SIZE;
31
32????????m_lpOverlapped->hEvent?=?m_hIoInfo;
33????????DWORD?recvBytes?=?0;
34????????DWORD?flags?=?0;
35????????WSARecv(hClntSock,?&m_hIoInfo->wsaBuf,?1,
36????????????&recvBytes,?&flags,?m_lpOverlapped,?ReadCmplRoutine);
前面部分便不再说了,主要来说下部分,在24行创建了一个重叠结构,并用ZeroMemory函数初始化为0。在27行,我们创建了我们的“小纸条”,对于每位连接进来的用户我们都得为其分配一个,所以我们用堆分配。在这里,我们保存着客户端套接字和接收数据的缓冲区,别忘了,系统会在这里为我们填写我们需要的数据。
那么我们应该把“小纸条”保存到那里呢?这东西是要在Completion Routine函数中使用的,可再在上面观察其原型,发现只有一个lpOverlapped成员没有用到,事实上,当使用Completion Routine时根本就不会用到OVERLAPPED中的hEvent成员,所以我们便能将其利用起来传递我们的“小纸条”。所以有了32行的代码,事件本就是一个指针,所以当然可以指向我们的结构体了。
最后要说说系统是如何调用这个回调函数(ReadCmplRoutine)的,当程序运行时,系统会创建一个线程,同时会创建一个和该线程相关联的队列,这个队列称为APC队列。当IO请求完成时,系统会将IO完成通知添加到APC队列中,以执行处理,不过系统会以任意的顺序来处理队列中的IO请求。所以在回调函数调用前线程中的其它正在处理的东西必须处理完,然后系统才能检查APC队列,对队列中的每一项调用回调函数,并传入错误码,实际传输的字节数,和OVERLAPPED结构体的地址。
也就是说要给线程标记一个时间点,在这个点上线程的其它东西都处理完了,然后系统再检查APC队列中是否有待处理项。线程可以把自己设为alertable wait状态来设置这个点,这样系统就知道何时需要检查线程的APC队列了,然后便可为其中的每一项调用回调函数。
在第8行就通过SleepEx函数将线程设为了alertable wait状态,我们熟悉的具有此功能的函数有:
DWORD?SleepEx(
??DWORD?dwMilliseconds,??//?time-out?interval?in?milliseconds
??BOOL?bAlertable????????//?early?completion?flag
);
DWORD?WaitForSingleObjectEx(
??HANDLE?hHandle,????????//?handle?to?object?to?wait?for
??DWORD?dwMilliseconds,??//?time-out?interval,?in?milliseconds
??BOOL?bAlertable????????//?return?to?execute?I/O?completion?routine?if?TRUE
);
DWORD?WaitForMultipleObjectsEx(
??DWORD?nCount,?????????????//?number?of?handles?in?handle?array
??CONST?HANDLE?*lpHandles,??//?points?to?the?object-handle?array
??BOOL?fWaitAll,????????????//?wait?flag
??DWORD?dwMilliseconds,?????//?time-out?interval?in?milliseconds
??BOOL?bAlertable???????????//?alertable?wait?flag
);
DWORD?WSAWaitForMultipleEvents(
??DWORD?cEvents,??????????????????
??const?WSAEVENT?FAR?*lphEvents,??
??BOOL?fWaitAll,??????????????????
??DWORD?dwTimeOUT,????????????????
??BOOL?bAlertable?????????????????
);
可以发现这些函数皆有一个bAlertable成员,将其设为true就可设为alertable wait状态,我们还见过这些函数的非Ex版本,比如Sleep函数,其实在这些函数的内部就调用了Ex版本并将bAlertable成员设为false。
最后,来看在ReadCmplRoutie和WriteCmplRoutie回调函数中如何处理,先来看ReadCmplRoutie函数:
void?COverlappedServer::ReadCmplRoutine(DWORD?dwError,
????DWORD?dwRecvBytes,?LPWSAOVERLAPPED?lpOverlapped,?DWORD?dwFlags)
{
????LPPER_IO_DATA?hIoInfo?=?static_cast<LPPER_IO_DATA>(lpOverlapped->hEvent);
????if?(dwRecvBytes?==?0)
????{
????????std::cout?<<?"disconnected?client?"?<<?hIoInfo->hClntSock?<<?std::endl;
????????closesocket(hIoInfo->hClntSock);
????????delete?lpOverlapped;
????????delete?hIoInfo;
????}
????else
????{
????????std::cout?<<?"received:?"?<<?hIoInfo->buf?<<?std::endl;
????????hIoInfo->wsaBuf.len?=?dwRecvBytes;
????????DWORD?sendBytes;
????????WSASend(hIoInfo->hClntSock,?&hIoInfo->wsaBuf,
????????????1,?&sendBytes,?0,?lpOverlapped,?WriteCmplRoutine);
????}
}
可以看到,在第一行取到了传入的“小纸条”,也就是说我们获得了客户端的套接字和接收数据的缓冲区,很棒。
接着判断是否是退出消息,接收的字节长度为0便为退出消息,这些系统在调用回调函数时已经为我们填好了,所以直接判断dwRecvBytes就可以了。在这里,关闭断开连接的套接字句柄,并释放为其分配的内存,我们要接受上千个客户端,若是只分配不释放电脑会崩的,而且崩的很快。
接着,依旧将客户端发过来的数据再转发回去,这里使用的是Completion Routine,所以必须得使用WSASend函数。
现在在WriteCmplRoutine中需要做的处理就非常少了:
void?COverlappedServer::WriteCmplRoutine(DWORD?dwError,
????DWORD?dwRecvBytes,?LPWSAOVERLAPPED?lpOverlapped,?DWORD?dwFlags)
{
????LPPER_IO_DATA?hIoInfo?=?static_cast<LPPER_IO_DATA>(lpOverlapped->hEvent);
????DWORD?recvBytes?=?0;
????DWORD?flags?=?0;
????WSARecv(hIoInfo->hClntSock,?&hIoInfo->wsaBuf,
????????1,?&recvBytes,?&flags,?lpOverlapped,?ReadCmplRoutine);
}
在这里,依旧拿到我们的小纸条,继续调用WSARecv函数来接受下一个数据请求。
这次使用重叠IO实现的版本是目前为止我们实现过的最高效的,四五千个用户应该是没有问题的,我开了四千个客户端测试,未崩。
最后不得不说说使用alertable wait这种方式的缺点,首先上下文环境的问题,因为这个函数是static的,所以是不能访问类中成员的,在这里我们巧妙地通过借用OVERLAPPED的hEvent指向“小纸条”解决了这个问题,要不然就只能全放到全局了,这里算是解决了这个问题。
主要问题是在一个线程中既要发出IO请求,又要对完成通知进行处理。若是CPU有多个核,这就会使其它线程即使空闲也不会对完成通知做出响应,就造成了忙的忙死了,闲的闲死了,所以它并没能充分利用起CPU,这样的程序伸缩性并不太好。所以微软又开发了IOCP模型来解决了这个问题,下篇我们便来学习。