IncServer最核心的部分,自然是网络库了。

本文旨在分析IncServer网络库(以下称IncNet)的实现,进以总结一个网络库应当具备的基本功能以及常见做法,为以后手撸一个全新的网络库打下基础。

IncNet的实现依赖:

  • Sockets API
  • I/O Multiplexing(select/poll)
  • epoll API
  • POSIX Threads API(Pthreads)
  • Reactor模式

阅读此文请先对以上知识有所了解。

在此推荐一本书,《The Linux Programming Interface——A Linux and UNIX System Programming Handbook》,简称TLPI,中文译名《Linux系统编程手册》,分上下两册,作者是目前Linux manpage的维护者Michael Kerrisk。
强烈建议阅读英文版,并结合官方勘误表。中译本上册翻译尚可,下册机翻痕迹明显,有多处意思完全相反。至少要中英结合看,感觉译文不对可以看下原文是怎么写的,还感觉不对就去看勘误表。

适用场景

为了说明IncNet的适用场景,先看一下以IncServer为服务器引擎的几款游戏的服务器架构(简化版):

以上只列出了loginserver、gameserver、dbserver三种服务器进程,略去了bfserver、logserver、gmserver等,这三种进程足以说明问题了。

三种服务器进程各司其职,loginserver进程负责玩家的登录,同时也负责管理所有gameserver的信息,有多少个gameserver,每个gameserver都是什么状态(运行中、维护中、测试中)等等。玩家首先连接到loginserver进行登录验证,并获取gameserver列表,然后再连接到选定的gameserver进程,gameserver负责大部分的游戏业务逻辑。每个gameserver都对应一个dbserver,负责将gameserver发送的玩家信息存储到数据库中。

要注意,每个进程都是由几个模块组成的,一个进程可能同时具有server模块和client模块。比如gameserver进程中的loginclient模块是loginserver进程中game_loginserver模块的client,而gameserver模块是玩家client的server,dbclient模块又是dbserver进程中dbserver模块的client。

IncNet的主要工作就是为这些server和client模块提供收发包的功能。

IncNet关键字

IncNet具有以下特性,或者说IncNet的关键字如下:

  • 跨平台
  • 面向对象
  • 非阻塞连接
  • I/O多路复用
  • epoll
  • 水平触发(Level-Triggered)
  • 多线程Reactor
  • 超时检测
  • 连接重试
  • 缓冲
  • 加密

跨平台

IncNet支持Linux和Windows,但在Windows上的功能受限,仅可用作构建客户端应用。

实际上,用IncNet所开发的几款游戏,有一个从IncNet派生出来的专门给游戏客户端(Windows、Android、IOS)使用的网络库版本。除本节外,本文讨论的IncNet仅考虑在Linux环境下的实现,暂不考虑客户端版本的网络库。

如何实现跨平台?

IncNet在平台相关的代码之上,提供了一层抽象的统一接口,屏蔽掉平台相关的细节,通过条件编译的形式选用不同的底层依赖。

以下为Linux跟Windows网络编程的主要区别之处,分为Socket、I/0模型、多线程三部分。

Socket

Windows的网络编程API Winsock(Windows Sockets API,WSA),虽然在函数名字上大都兼容Linux下的 Sockets API(最早是BSD Sockets API,后标准化为POSIX Sockets API,后文均使用POSIX Sockets API的叫法),但还是有几点不同:

1. 头文件

Windows下需要include windows.h和winsock2.h两个头文件。

Linux需要include的头文件根用到的函数和数据类型相关,主要是以下几个,其余可参考相关函数的man page。

#include <sys/types.h>  // socket, connect, bind, accept, send, recv, sendto, recvfrom, setsockopt, getsockopt
#include <sys/socket.h> // socket, connect, bind, accept, send, recv, sendto, recvfrom, setsockopt, getsockopt, listen, shutdown, getsockname, getpeername

#include <unistd.h>     // close
#include <errno.h>      // errno
#include <sys/ioctl.h>  // ioctl

2. 初始化和销毁

使用winsock时,在调用socket相关函数之前,需要调用WSAStartup()以加载winsock的dll(Ws2_32.dll)。

相应的,如果不再调用socket相关函数,则应调用WSACleanup()以终止对dll的使用。

而POSIX Sockets API则无需这一步,只需要include头文件即可。

3. 关闭socket

Winsock使用closesocket(),POSIX Sockets API使用close()

4. 获取socket系列函数的错误码

Winsock使用WSAGetLastError(),POSIX Sockets API使用errno

5. 控制socket的模式

Winsock使用ioctlsocket(),POSIX Sockets API使用ioctl()

主要用于将socket设置为非阻塞。

为什么不用POSIX标准化的fcntl()参见这里

socket相关系统调用比较多,在此不一一列举,IncNet封装版本的socket相关函数在原系统调用函数名前加了“ff_”前缀。有两个函数值得一提:

  • ff_netstart()
  • windows平台专用,内部调用WSAStartup,与ff_netclose配对使用。
  • 调用于第一个socket初始化时。
  • ff_netclose()
  • windows平台专用,内部调用WSACleanup,与ff_netstart配对使用。
  • 调用于最后一个socket销毁时。

另外,IncNet对socket地址的数据类型选用上不太严谨,导致其只支持IPv4而不支持IPv6(客户端版本的网络库其实已经做出修改,提供了对IPv6的支持),后文会专门讨论对socket地址类型的选用。

I/O模型

在Linux环境下,IncNet使用了Linux特有的epoll API,在socket数量较多时,epoll的效率相比select和poll会高很多。为什么?

在Windows环境下,epoll不可用,又考虑到在Windows下一般只会将IncNet用作client模块,只有一个socket用于跟服务器的连接与通信,因此IncNet在Windows下使用select。

(在Windows环境下是否可以考虑使用IOCP(I/O Completion Ports)呢?有待研究。)

Linux下使用epoll,Windows下使用select,为此,IncNet在此之上封装了7个平台无关的接口,屏蔽了这里的不一致性,接口的使用方式类似Linux的epoll API:

  • poll_create():创建epoll实例,对应epoll_create()
  • poll_add():向epoll实例中添加文件描述符,对应epoll_ctl()的EPOLL_CTL_ADD操作。
  • poll_mod():修改epoll实例中指定文件描述符的event,对应epoll_ctl()的EPOLL_CTL_MOD操作。
  • poll_del():将指定文件描述符从epoll实例中删除,对应epoll_ctl()的EPOLL_CTL_DEL操作。
  • poll_wait():阻塞,直到有socket就绪,或超时,或有错误发生,对应epoll_wait()
  • poll_event():取到epoll实例所检测到的就绪的socket和其对应的events,需循环调用以获取所有就绪的socket。
  • poll_destroy():销毁epoll实例,对应close()

多线程

IncNet是一个多线程的网络库。在Linux下依赖POSIX Threads API(Pthreads API),Windows下的线程API则与之不同,因此有必要提供一个统一的接口。

IncNet封装的接口名基本与Pthreads API的接口一一对应:

  • ff_thread_create():对应pthread_create()
  • ff_thread_exit():对应pthread_exit()
  • ff_thread_join():对应pthread_join()
  • ff_thread_detach():对应pthread_detach()
  • ff_thread_kill():对应pthread_cancel()

至此,经过跨平台的处理,IncNet的基础结构如下:

IncNet术语

在进行后续的讨论之前,有几个IncNet定义的术语需要提前了解一下。

DPID

IncNet使用DPID来唯一标识一个socket连接,其本质是一个32位无符号整数,高16位是从0开始的自增编号,低16位是socket的文件描述符值。

socket文件描述符本身就是唯一的了,为什么还要一个自增编号呢?高16位被用来将socket的I/O操作分配到每个I/O线程中,是为了负载均衡么?但感觉这样做非但多此一举,而且也并做不到均分。

不确定是不是还有其他用途,目前看代码是没有看到第二种用途的。

Packet

IncNet的server模块和client模块之间的通信协议是以packet为单位的,上层逻辑可以定义不同种类的packet,每种packet的内容格式各不相同,收发packet的两端都按照定好的协议格式读写数据。

上图为一个packet的格式,先是4字节的packet大小,即整个packet(包括packet_size自身)的字节数,之后是两字节的packet_type(包类型),最后是packet的实际内容,其长度和格式因packet_type而异,由上层逻辑自行定义。

高频包优化

从packet的格式可以看出,每个packet的头部实际占用了6个字节之多。为了优化类似于位置同步、战斗相关逻辑收发的高频包,IncServer采取了一种连包优化策略,以减少packet的头部所浪费的空间。

准确的说这属于上层逻辑的优化,并不属于IncNet的底层机制。

IncServer定义了一种特殊的packet_type(ST_SNAPSHOT),其内容又是n个packet,只不过这些packet的packet_type只占用一个字节,而且不再有packet_size部分,从而减少了包头所占用的空间,如下图:

不过这种做法有一定的限制:

  • packet_type只有1字节,因此最多支持256个高频包。
  • 打乱了跟其他正常packet之间的顺序。因此时序相关的逻辑要么都用单字节包,要么都用双字节包。
  • 因为不再有packet_size,所以前序packet的格式错误会导致后续所有packet的错误。

面向对象

上文所说的跨平台仅仅是将各系统调用或库函数做了一次简单的封装,仍然是简单的函数调用。而IncServer是C++实现的,因此,IncNet在一系列跨平台的函数调用之上,又做了一次抽象和封装,以使用C++面向对象的特性。

几个核心的类如下:

NetMng

NetMng类定义了IncNet与外界交互的接口,文章开头所说的各个进程的server模块和client模块都是通过继承这个类来实现的,通过观察其对外开放的主要接口,可以看出它扮演的角色:

start_server

int start_server( int _max_con,                 /* 允许的最大连接数 */
                  int _port,                    /* 监听的端口 */
                  bool _crypt = true,           /* 是否加密 */
                  const char* _ip = NULL,       /* 监听的IP */
                  int _timeval = 100,           /* epoll_wait()的timeout参数 */
                  int _proc_thread_num = 1 );   /* 读写线程数量 */

作为server模块使用时,调用start_server()以启动服务器。

可以看到,IncNet提供了以下几个功能,后面会一一介绍:

  • 控制最大连接数
  • 加密
  • 多线程读写

connect_server

int connect_server( const char *_addr,          /* 服务器地址 */
                    int _port,                  /* 服务器端口 */
                    bool _crypt = true,         /* 是否加密 */
                    int _timeval = 100 );       /* epoll_wait()的timeout参数 */

作为client模块使用时,调用connect_server()以连接服务器。这里的是否加密需要与所连接的服务器一致。

send_msg

int send_msg(const char *_buf, size_t _len, DPID _dpid = 0);

向_dpid指定的socket发送数据。其实是发送到NetMng的发送缓冲队列中,最终由读写线程统一发送。

receive_msg

void receive_msg();

处理NetMng的接收缓冲队列中的数据。IncNet的读写线程接收各socket的数据,存放于NetMng的接收缓冲队列中,而后由上层逻辑调用receive_msg()来对数据做具体的处理。

receive_msg()会将接收到的数据拆分为一个个的packet,然后调用msg_handle()

msg_handle

virtual void msg_handle(const char *_msg, int _size, int _packet_type, DPID _dpid) {};

虚函数,由NetMng的子类实现以决定如何针对不同的packet作出不同的处理。

以上接口展示了一个网络库的基本功能:在服务器和客户端之间建立连接,然后收发信息,针对不同的信息作出不同的处理。至于其内部如何实现,如何实现加密?如何实现缓冲?采用何种II/O模型?有多少个线程?各线程的职责是什么?这些对于网络库的使用者来说是透明的。

Buffer

Buffer是一块连续的内存空间,标记了起止范围,以及可读可写区域。

Buffer主要用作收发包的缓冲(一个Buffer存储多个packet),但也可以用于存储一些其他的数据,因为它无非就是一块标记了可读可写区域的内存块。

如图,一个刚创建的Buffer的head_tail_都是指向起始位置buf_start_的,随着数据的写入,tail_会后移,而随着数据的读取,head_也会随之后移。

即,head_tail_标识出了可读区域,tail_buf_max_标识出了可写区域。

另外,每个Buffer对象在其成员变量中还记录了是否加密、所属的socket(DPID)、包含多少个packet等信息。

Socket

IncNet将socket的文件描述符封装成一系列类,并形成一个继承体系,继承体系中的每一层都会给socket附加一系列特性和功能。

Socket

所有socket的父类,主要功能是将socket相关的系统调用和库函数封装成其成员函数,并提供以下特性,后文会详细讨论:

  • 非阻塞连接(non-blocking connect)
  • 开启SO_REUSEADDR选项

CommonSocket

CommonSocket继承自Socket,在其基础上又新增了几个功能:

  • 标识DPID
  • 读写缓冲
  • 加密解密
  • 声明处理I/O事件的虚函数process

这里介绍一下process,缓冲和加密功能待后文讨论。

virtual int process( int _events, NetMng *_parent ) = 0;

IncNet不会(也不能)创建作为基类的CommonSocket的对象,具体的逻辑都是在其子类中实现的。

根据对哪一类I/O事件感兴趣,以及对不同的I/O事件如何作出不同的处理,IncNet定义了两类socket,分别具体实现process()来对其感兴趣的I/O事件作出响应。

当一个socket上发生了其感兴趣的I/O事件时,其process()会被调用,来处理这些事件。

ServerSocket

继承自CommonSocket,server模块专用,用来表示server模块的监听socket,执行accept()操作以接受客户端的连接请求。

process()执行如下操作:

  1. accept()一个连接,创建一个新的socket。
  2. 检查连接数量是否超过上限,如果没超过上限,则:
  3. 将新socket的文件描述符加入epoll实例中
  4. 将新socket封装成ClientSocket对象(见下文)并添加到统一的socket管理器中
  5. 如需加密,通过这个socket向客户端发送加密种子
  6. 如果超过上限,则关闭这个socket
  7. 重复执行步骤1,直到没有新的连接

ClientSocket

继承自CommonSocket,并不是client模块专用的socket,刚提到的服务器accept()时创建的与客户端进行通信的新的socket也是ClientSocket。即,不管是server模块还是client模块,负责连接建立之后的I/O的socket都是ClientSocket。

process()针对EPOLLINEPOLLOUT分别执行如下操作:

  • EPOLLIN
  1. 接收对端发送的数据,填充接收Buffer,放至NetMng的接收缓冲队列中。
  2. 刷新socket的超时时间。
  • EPOLLOUT
  1. 发送自身发送缓冲队列中的Buffer。
  2. 如果所有Buffer都发送完毕,则修改epoll实例,不再检测这个socket的EPOLLOUT事件。

CommonSocket中的读写缓冲功能实际上是用在ClientSocket上的,后面会专门对IncNet的缓冲机制展开讨论。

Thread

另外,IncServer抽象了一个线程类Thread,IncNet使用了三种不同的Thread子类在不同的线程中负责不同的任务,稍后讨论多线程Reactor模式时会详细讨论每个线程的作用,这里简单概括一下:

  • SelectThread:select线程,负责调用epoll_wait()检测就绪的socket。
  • ConnectThread:连接线程,负责client模块发起对其他进程server模块的连接。
  • SockPorcThread:I/O线程,负责每个socket上具体I/O操作。

OOP小结

IncNet按照OOP的思想对相关的底层系统调用或库函数进行了一次封装,现在IncNet长这个样子:

基础的组件都准备好了,接下来讨论如何组合使用这些组件来实现NetMng提供的功能。

多线程Reactor

IncNet以多线程的方式实现了Reactor模式

Reactor设计模式用于处理一个或多个客户端同时传递给应用程序的服务请求。而游戏服务器,无非是接收游戏玩家客户端的连接请求,建立连接后接收客户端的发包,然后经过一系列处理,给客户端回包。所以,Reactor模式正符合游戏服务器的要求。

另外一种Proactor模式一般需要内核有异步I/O(POSIX asynchronous I/O, POSIX AIO)的支持,Linux在glibc中提供了基于线程的POSIX AIO实现,而POSIX AIO的内核实现正在进行中,应该会有更好的性能,在以后编写网络库的时候是一种可以考虑的方案,本文不多做讨论。

结合Schmidt, Douglas C., An Object Behavioral Pattern for Demultiplexing and Dispatching Handles for Synchronous Events一文对Reactor模式的介绍,来看下Reactor模式的主要参与者在IncNet中是如何实现的。

Handle

Handle标识由OS管理的资源。

在IncNet中,这个Handle就是socket的文件描述符了,IncNet将其封装成了Socket类,包括服务器用来监听连接请求的ServerSocket,以及服务器跟每一个客户端之间用来通信的CommonSocket。

Event

我们对socket相关的哪些事件感兴趣呢?

  • 可以无阻塞accept(): 服务器socket监听到一个新的客户端连接上来了
  • 可以无阻塞read(): 客户端socket上有数据可读了
  • 可以无阻塞write(): 可以向客户端socket上写入数据了

Synchronous Event Demultiplexer

同步事件分离器,阻塞地等待一组Handle上是否有event发生,当可以在某个或某些Handle上无阻塞的执行某些操作时返回。

这不就是epoll_wait()或者select()干的事情么,没错,就是它。

何为Synchronous?何为Demultiplexer

个人理解,多个客户端同时请求服务,而服务器只在一处epoll_wait(),多合一,称之为multiplex(多路复用),因为一直阻塞地等待,所以是synchronous的。而后又将每个Event分派给不同的Event Handler处理,称之为demultiplex(解多路复用,或称多路分用、分离)。

在IncNet中,使用epoll_wait()来执行Synchronous Event Demultiplexer的demultiplex操作,这个操作运行于select线程中。

Initiation Dispatcher

启动调度器(不知道怎么翻译好,直译了),定义了注册、删除和根据不同的event调用对应的Event Handler的接口。

Initiation Dispatcher调用Synchronous Event Demultiplexer等待event的发生,当Synchronous Event Demultiplexer检测到新的event时,就通知Initiation Dispatcher调用event对应的Event Handler。

在IncNet中,Initiation Dispatcher也对应于SelectThread。即SelectThread除负责调用epoll_wait()进行解多路操作之外,还定义了注册、删除EventHandler的接口(epoll_ctl())。

与通常的单线程Reactor模式不一样的是,SelectThread分发event的方式并不是通过函数调用,而是通过一个与I/O线程共享的blocking queue,后面详细讨论。

Event Handler/Concrete Event Handler

某个event发生时需要调度的操作称之为EventHandler。

在IncNet中CommonSocket扮演了Event Handler的角色,而event发生时需要具体执行的操作由继承自CommonSocket的ServerSocket和ClientSocket来负责,即Concrete Event Handler。

ServerSocket表示服务器监听连接请求的socket,因此处理“可以无阻塞的accept()”这个event。

ClientSocket表示服务器跟客户端进行I/O通信的socket,因此处理“可以无阻塞的read()”和“可以无阻塞的write()”这两个event。

各EventHandler执行的操作是分配到一组数量可配置的I/O线程上执行的(SockProcThread)。

以上Reactor模式说的都是针对server模块,当IncNe被用作client模块时,还有一个叫ConnectThread的线程,详见下文。

综上,IncNet一共有三类线程:

  • select线程(SelectThread)
  • I/O线程(SockProcThread)
  • 连接线程(ConnectThread)

以及两种socket(都继承自CommonSocket):

  • 监听socket(ServerSocket)
  • 客户端socket(ClientSocket)

下面来详细分析每一类线程。对于每类线程,需要回答以下几个问题:

  • 该线程的职责是什么?
  • 线程的生命周期,即何时创建?何时销毁?
  • 需要多少个这样的线程?
  • 是否需要与其他线程交互?怎么交互?
  • 线程执行函数的伪代码

select线程(SelectThread)

职责

负责执行Synchronous Event Demultiplexer和Initiation Dispatcher的操作。

因此,select线程需要创建epoll实例,循环执行epoll_wait()以检查各socket是否就绪,并通知I/O线程执行各socket的处理函数(通过blocking queue)。

此外,还需要提供向epoll实例监控的socket列表进行增删改的接口(epoll_ctl())。

何时创建

因为服务器的监听socket也是epoll监控列表中的一员,因此server模块在启动服务器开始监听之后就需要启动select线程。

client 模块在成功连接服务器之后启动select线程。

何时销毁

select线程是需要一直运行的,关服时才会销毁。

数量

select线程负责解多路复用,分派event,一个server或client模块只有一个select线程。

注意,正如本文开头所说,同一个服务器进程可能会有多个server模块或client模块,每个模块都有自己的select线程。比如loginserver进程,即是玩家客户端的loginserver,又是gameserver的loginserver,这两种身份是不一样的,每个模块都需要自己单独的select线程。

与其他线程的交互

epoll_wait()检测到ServerSocket可以无阻塞的accept()或者ClientSocket可以无阻塞的read()或者write()时,会通知I/O线程进行具体的accept()或读写操作,这里用到了一个线程安全的blocking queue,后文详细讨论。

伪代码

事实上,在IncNet中,并不是所有的I/O操作都是在I/O线程中执行的。有以下三种情况,IncNet直接在select线程中执行相关socket的I/O操作:

  • 没有专门的I/O线程,即I/O线程的数量配置为0。
  • 要处理的socket是server模块的监听socket。
  • 要处理的socket是client模块用于连接服务器的socket。

也就是说,只有server模块中与client进行通信的socket的I/O操作才在I/O线程中执行。

// 创建epoll实例
int epfd = epoll_create(size);

// 将服务器的监听socket或者客户端连接服务器的socket加入epoll的监控列表
epoll_ctl(epfd, EPOLL_CTL_ADD, server_socket_fd, in_event);

// 线程主循环
while (thread_is_active)
{
    // 检测哪些socket上可无阻塞的执行I/O操作(包括accept)
    if (epoll_wait(epfd, events, maxevents, timeout))
    {
        // 对每一个这样的socket
        for (int fd : read_socket_fd_list)
        {
            // 根据socket的类型以及是否有专门的I/O线程来决定在哪里处理event
            CommonSocket *socket = get_socket_by_fd(fd);
            if (io_thread_count == 0 || socket->is_server_socket() )
            {
                // 如果没有专门的I/O线程,
                // 或者当前socket是服务器的监听socket
                // 或者当前socket是客户端连接服务器的socket
                // 则在当前线程(即select线程)处理event即可
                socket->process(event);
            }
            else
            {
                // 如果有专门的I/O线程,且当前socket是服务器的client socket
                // 则根据socket的自增编号(不是fd)分派到具体的I/O线程
                // (通过blocking queue,后文详细说明)
                queue.enqueue( {fd, event } )
            }
        }

        // 将读写操作分发到各I/O线程
        for (SockProcThread proc_thread : proc_thread_list)
        {
            proc_thread.post_sock_with_events();
        }

        // 等待I/O线程处理完毕
        for (SockProcThread proc_thread : proc_thread_list)
        {
            proc_thread.wait();
        }

        // 检查是否有socket超时并删除(后文详解说明)
        check_timeout_sockets();

        // 将NetMng发送缓冲队列中的buffer发到对应的每个socket的发送队列中
        // 并将对应的socket加到epoll的监控列表中
        // (后文详细说明缓冲机制)
        dispense_send_buffer_to_sockets();
    }
}

// 销毁epoll实例
close(epfd);

I/O线程(SockProcThread)

职责

负责对每个socket执行具体的I/O操作,recv()send()等。

何时创建

server模块启动服务器时创建。

client模块不需要单独的I/O线程,其I/O操作是在SelectThread中执行的,因为client模块一般只有一个socket,没必要多线程I/O。

何时销毁

I/O线程也是一直存在的,关闭服务器时才会销毁。

数量

可配置,一般会配置多个,实现多线程读写。

与其他线程的交互

通过blocking queue,获取select线程检测到的就绪socket的具体事件,进而对socket执行具体的I/O操作。

伪代码

while (thread_is_active)
{
    // 获取当前线程负责的I/O事件列表
    event_list = get_event_list();

    // 遍历I/O事件列表
    for (event : event_list)
    {
        // 获取事件对应的socket
        CommonSocket *socket = get_socket(event);

        // 调用socket的process()函数执行具体的I/O
        socket.process();
    }

    // 通知select线程读写完毕
    select_thread.notify();
}

连接线程(ConnectThread)

职责

顾名思义,负责发起客户端向服务器的连接。

只有用IncNet构建client模块时,才会用到ConnectThread,因为server模块的socket一般都是被动打开(passive open)的,不会主动去连接某个client。

何时创建

自然是客户端想与服务器建立连接时创建。一般情况下,作为客户端的进程启动时就会创建ConnectThread,发起与服务器的连接。

何时销毁

两种情况:

  • 连接成功即销毁:适用于断开连接后不需要重连的场景。
  • 永不销毁:适用于断开连接后需要重连的场景。

也就是说,具体何时销毁由上层的业务逻辑来决定。

数量

一个足矣。

与其他线程的交互

不需要,维护is_connected可供查询当前的连接状态即可。

伪代码

while (thread_is_active)
{
    if (!is_connected)
    {
        if (try_connect(ip, port))
        {
            is_connected = true;
        }
    }

    sleep(a_while);
}

值得注意的是,这里实现了连接重试的功能,后文会单独讨论这样做的好处。

select线程与I/O线程之间的通信

select线程和I/O线程之间是生产者和消费者的关系,select线程不断的生产event,I/O线程根据不同的event,调用对应socket的处理函数process(),将这些event消费掉。

select线程和I/O线程之间使用了一个线程安全的生产者消费者队列来进行通信,每个I/O线程对应一个队列。

其实并没有使用队列的功能,只使用了其线程安全和blocking的特性。select线程将每个I/O线程需要处理的所有的event准备好,写入各I/O线程的足够大的buffer中,并将buffer塞入I/O线程各自的队列中,而后一直等待,直到所有的I/O线程将event处理完毕,才继续下一次epoll_wait(),因此,队列中最多只有一个buffer,而buffer中包含了epoll_wait()检测到的所有就绪的socket的DPID和对应的events(EPOLLINEPOLLOUTEPOLLERR)。

I/O线程则从队列中取出buffer,再将buffer中存储的event依次处理完毕,然后通知select线程。

每个I/O线程对应一个buffer,格式如下:

每个socket由哪个I/O线程处理,则由socket的DPID中的自增编号对I/O线程数量取模决定。

但由于客户端不断的连接和断开,而自增编号只增不减,因此每个I/O线程的负载情况其实是不确定的,不均衡的。极端情况下可能会出现只有一个I/O线程工作,其余I/O线程闲置的情况。当然,即便是平均分配每个socket到各个I/O线程,由于各个socket的I/O数据量各不相同,也做不到均衡。

这里可以考虑根据每个socket近期的I/O数据量来动态调配每个I/O线程的负载,尽量做到均衡,以减小I/O操作的总耗时。当然,前提是需要测试确定这里是一个性能上的热点。

具体的I/O操作

前文讨论了IncNet在跨平台、面向对象层面的封装,以及利用封装好的组件实现多线程的Reactor模式,下面讨论在Recator模式下,连接的建立、收发包的过程和数据流向,以及IncNet实现的缓冲机制。

建立连接

从前文select线程的伪代码中可以看到,server模块的监听socket的事件是在select线程中处理的,而不是在I/O线程,因此对于server模块来说,建立连接的过程如下:

  1. client模块发起连接
  2. select线程中执行epoll_wait(),检测到监听socket就绪(即,可以无阻塞的accept()),直接调用ServerSocket的process()
  3. ServerSocket在其process()中调用accept()建立与client的连接,并将新创建的socket加到epoll实例的监控列表中。

再来看连接建立之后,数据的发送和接收:

上图蓝色箭头表示发送数据的流向,红色箭头表示接收数据的流向。

以下涉及缓冲队列的操作都是需要加锁的。

发送数据

  1. 应用程序在逻辑线程组装好一个buffer,调用NetMng的send_msg()。这一步仅仅是将这个buffer拷贝到NetMng的发送缓冲队列中而已。
  2. select线程负责循环调用epoll_wait(),检测是否有读写就绪的socket。在每次epoll_wait()返回之后(socket就绪,或超时),select线程会将NetMng的发送缓冲队列中的所有buffer,分别加入到其所属socket的发送缓冲队列中。同时,将buffer所属socket加到epoll实例的监控列表中,以监控是否写就绪
  3. 还是select线程中,如果epoll_wait()检测到socket可以无阻塞的写入,则会将这个socket的可写事件加入到I/O线程的事件队列中。
  4. I/O线程从其事件队列中取出socket的可写事件,调用socket的process(),进而调用send(),进行实际的数据发送,直到发送完毕,或碰到EWOULDBLOCKEAGAIN错误。

这里有一点存疑的地方:NetMng中的发送缓冲队有必要存在吗?

为什么不直接将buffer加到其所属socket的发送缓冲队列中,同时将socket加到epoll实例的监控列表中呢?为什么还要经过NetMng的发送缓冲队列,再在select线程中转移一次?反正都是带锁操作缓冲队列,epoll API又是线程安全的。

接收数据

  1. 在select线程中,如果epoll_wait()检测到socket可以无阻塞的读取,则会将这个socket的可读事件加入到I/O线程的事件队列中。
  2. I/O线程从其事件队列中取出socket的可读事件,调用socket的process(),进而调用recv(),进行实际的数据接收,直到填满socket的接收缓冲buffer,或碰到EWOULDBLOCKEAGAIN错误。
  3. I/O线程将接收到的buffer标记好所含packet的个数,加到NetMng的接收缓冲队列中。
  4. 应用程序调用NetMng的receive_msg(),读取其接收缓冲队列中的所有buffer,进而触发event_handle(),对不同的packet作出相应的处理。

与发送数据不同的是,ClientSocket并没有接收缓冲队列的存在,而只是一个简单的buffer用来接收数据,收完就直接塞到NetMng的接收缓冲队列中(还必须保证塞进去的都是完整的packet,就不展开讨论了)。这些操作都是在I/O线程中完成的。

IncNet的大致结构讨论完了,接下来讨论一些零碎的点,以及前文提到但是没有展开讨论的内容。

非阻塞I/O

IncNet在Socket类中将socket文件描述符设置为非阻塞的,并提供了非阻塞连接的功能。

为什么要非阻塞I/O呢?

IncNet采用了I/O多路复用的I/O模型(select、poll、Linux特有的epoll),一般情况下,这种I/O模型要求socket是非阻塞的,原因如下(TLPI 63.1.2):

  1. 如果多进程(或多线程)在同一个socket上执行I/O操作,那么从一个特定进程的视角来看,一个socket的就绪状态,在从被通知就绪到真正开始执行I/O操作之间,是可能发生变化的。因此,一个阻塞的I/O调用会阻塞,从而阻止进程(或线程)继续监控其他socket的就绪状态。 (但是,最好不要多进程或多线程操作同一个socket)
  2. 假设一个socket被通知可以执行写操作而不会阻塞,如果此时我们在单次write()send()调用中写入一块足够大的数据,那么这次调用将永远不会阻塞。
  3. 在极少数情况下,使用水平触发(level-triggered)API可能会获取虚假的就绪通知,即socket没就绪,但却通知我们就绪了。可能是由于内核的bug,在某些情形下这也是预期的正常行为(UNP 16.6)。
  4. 使用边缘触发(edge-triggered)的API时,对被通知就绪的socket,程序一般需要在某个时间点用一个循环对其执行尽可能多的I/O操作,如果socket是阻塞的,当没有更多的I/O操作可执行时,I/O系统调用将会阻塞。 IncNet不会在多个线程中对同一个socket进行I/O操作,使用的是epoll的水平触发模式,因此1、4两点不必考虑。

非阻塞连接有什么好处?

非阻塞连接有以下优点:

  1. connect发起TCP三次握手期间,可以继续做其他工作。
  2. 可以同时建立多个连接。
  3. 可以指定connect的超时时间(通过select或poll)。许多实现的connect默认超时时间是75秒或数分钟。

在IncNet中,每个连接的建立都是在单独的一个线程中进行的(ConnectThread),所以1、2两点不必考虑,这里用非阻塞连接主要是有第3点的优势。

如何实现非阻塞连接?

IncNet对于非阻塞连接的实现如下,但是并不严谨

#define DSOCKERR        -1
#define DWOULDBLOCK     EWOULDBLOCK
#define DCONNECTERR     EINPROGRESS
#define CONNECT_TIMEOUT 10

int Socket::Connect( const sockaddr *_addr )
{
    if ((DSOCKERR == ff_connect( sock_, _addr )) && (DCONNECTERR == ff_errno()))
    {
        pollfd fds[1];
        fds[0].fd = sock_;
        fds[0].events = POLLOUT;

        int ret = poll( fds, 1, CONNECT_TIMEOUT * 1000 );
        if (ret > 0)
        {
            int val, len = sizeof(val);
            if (!GetOption( SOL_SOCKET, SO_ERROR, (char*)&val, &len ) && val)
            {
                errno = val;
                return -1;
            }
            return 0;
        }
    }
    return -1;
}

因为socket已经被标记为非阻塞的了,因此ff_connect()不会阻塞。阻塞发生在对poll()的调用上。

根据UNP16.3节,上述写法是有问题的,忽略了一种情况的存在:调用connect()时连接可能立即建立并返回。服务器和客户端处于同一台机器上时,这种情况有可能发生。所以还应判断connect()的返回值是否为0,为0则表示连接成功建立。

但一般情况下不会有问题,因为IncNet实现方式的原因(epoll_wait()之后再accept()),这种情况发生的概率应该很小(或者根本不可能发生?有待深入研究)。

EINPROGRESS

$man connect

EINPROGRESS
       The socket is nonblocking and the connection cannot be completed immediately.  It is possible to select(2) or poll(2) for completion by selecting the socket for  writing.
       After  select(2)  indicates writability, use getsockopt(2) to read the SO_ERROR option at level SOL_SOCKET to determine whether connect() completed successfully (SO_ERROR is zero) or unsuccessfully (SO_ERROR is one of the usual error codes listed here, explaining the reason for the failure).

为什么要检查SO_ERROR

ret大于0就表示socket处于可写状态了,为什么还要进一步判断socket上是否存在待处理的错误呢?

参考UNP 16.3节:

源自Berkeley的实现(和POSIX)有关于select和非阻塞connect有以下两个规则:

  1. 当连接成功建立时,描述符变为可写(TCPv2第531页);
  2. 当连接建立遇到错误时,描述符变为既可读又可写(TCPv2第530页)。

所以不能只根据socket是否可写来判断连接是否成功建立。

但也不能根据socket是否又可读又可写来判断连接是否失败,因为调用select或poll之前有可能连接已经建立并有来自对端的数据到达,此时即使socket上不发生错误,也是既可读又可写的,而此时连接是成功建立的。

因此,除了对返回值的判断以外,还需要对socket是否发生错误进行显式的检查。

开启SO_REUSEADDR选项

IncNet开启了socket的SO_REUSEADDR选项,为什么?

简单说,为了避免服务器重启时发生EADDRINUSE(Address already in use)错误。

什么情况下会碰到这种错误?

  1. 连接到客户端的服务器执行了一次主动关闭连接,要么通过close(),要么自己crash了。这将导致对应的TCP端点停留在TIME_WAIT状态,直到2MSL超时。
  2. 服务器创建了一个子进程来处理一个客户端的连接,之后服务器进程终止运行,子进程继续服务客户端,因此子进程依然维护了一个使用服务器的端口的TCP端点。

在以上两种情况中,残留的TCP端点其实都已经无法接受新的连接了。

但是,默认情况下,大多数TCP实现都会阻止新的监听socket绑定到上述两种情况的端口上。

为什么阻止新的绑定?

先来看一个已经连接的TCP socket是如何进行唯一标识的。

服务器在accept()一个新的连接请求时,会创建一个新的socket,所有这些socket都关联到监听socket所关联的同一个地址上。区分这些socket的唯一方法就是通过不同连接不同的对端socket了。即,通过以下四元组:

{ local-IP-address, local-port, foreign-IP-address, foreign-port }

TCP规范要求以上四元组必须是唯一的,也就是说只能有一个相对应的连接。

问题在于,大部分实现采用了更严格的约束:如果主机上存在任何一个TCP连接匹配到了local-port,那么这个端口就不能被重用,即不能用bind()绑定,即便是TCP无法再接受新的连接。

开启SO_REUSEADDR选项放开了这个限制,使其更接近TCP的要求。

开启SO_REUSEADDR选项意味着,即便是上述两种情况中的某一个TCP绑定到了一个端口上,我们依然可以将一个socket绑定到这个端口上。

大部分TCP服务器都应该开启这个选项。

以上原因参考TLPI 61.10,更多原因可参考UNP 7.5.11。

为什么选用epoll?

选用epoll自然是因为性能好,那么连接数多的情况下epoll为什么比select和poll的性能会好很多呢?

不详细写了,具体可参考TLPI相关章节(TLPI 62.3.4, 62.3.5, 63.4.5),有空再搬过来。

连接数控制

server模块可接受的client连接数是可配置的,但其所能容纳的socket连接数远高于(4倍于)这个配置,目的是防止在DDOS攻击下彻底瘫痪。实现起来简单,没必要展开讨论,但是是一个值得参考的设计。

连接重试

ConnectThread中有一个很不起眼的小功能,连接重试。

断开连接后,只要ConnectThread没有销毁,就会定期尝试重连,这其实是很有用处的。因为大部分后台服务都是分布式的,需要多台机器上的多个进程相互连接相互协作,才能保证整个服务的正常运行。

连接重试可以保证:

  • 各进程不需要以特定顺序启动。
  • 任何一个进程都可以单独重启。

超时检测

IncNet在NetMng中使用了一个链表来实现socket的超时检测:

  • 只要socket收到包,就将其从原来的位置移至超时链表的尾部,保证越久没收到包的socket排在链表的越前面。
  • 在select线程中,每次epoll_wait()之后,从超时链表的头开始,依次判断是否有socket超时:
  • 超时则触发超时逻辑,继续下一个socket的判断。
  • 没超时则整个超时检查结束,没有必要继续检查后续的socket。
  • 统一将超时的socket从管理器中删除

加密

IncNet支持传输加密,需要加密的连接一旦建立,server模块首先会向client模块同步一个加密种子,此后的数据传输都会在发包之前加密,收包之后解密。

IncNet具体采用的加密算法就不方便提了,也没深入研究加密算法的实现。

可改进之处

  • 提供对IPv6的支持 IncNet中使用了一些过时的系统调用或库函数,这类函数只支持IPv4,比如:
  • gethostbyname()
  • inet_ntoa() 另外,在IncNet中用来表示地址+端口的类EndPoint中,错误的使用了sockaddr作为其内部存储IP地址的数据结构,此类型不足以存储IPv6的地址。
  • 支持Windows环境下作为服务器启动,实现真正的跨平台
  • I/O线程的负载均衡
  • 考虑边缘触发模式的epoll,对比性能