作者:路路

热爱技术、乐于分享的技术人,目前主要从事数据库相关技术的研究。

本文来源:原创投稿

*爱可生开源社区出品,原创内容未经授权不得随意使用,转载请联系小编并注明来源。

前言

上一篇文章中,我讲了网络 IO 的基础知识,本篇文章将从源码角度具体讲解 DBLE 的网络模块:包括 DBLE 是如何处理 MySQL 包的,多路复用在 DBLE 中是如何实现的,以及请求的异步化处理相关逻辑。

DBLE 是如何处理 MySQL 包的?

我们将以客户端连接 DBLE 为例,从源码角度讲解 DBLE 的相关处理流程。
客户端与 DBLE 建立连接的流程如下图所示(因为 DBLE 实现了 MySQL 协议,所以与客户端连接 MySQL 的流程一样):
主要包括以下四个步骤:

1、 客户端发起 connect 连接;

2、服务端发送握手包;

3、客户端回复握手包;

4、服务端返回 OK 包,表示连接建立完成,进入命令阶段。

我们直接看源码:

1、DBLE 处理客户端 connect
DBLE 处理客户端 connect 的代码在 NIOAcceptor#run 方法中:
public void run() {        //这里的selector即IO多路复用选择器,一个selector可以处理多个客户端连接请求        final Selector tSelector = this.selector;        for (; ; ) {            try {                tSelector.select(1000L);                Set<SelectionKey> keys = tSelector.selectedKeys();                try {                    for (SelectionKey key : keys) {                        if (key.isValid() && key.isAcceptable()) {                            //当连接有效且可接受时,处理客户端连接                            accept();                        } else {                            key.cancel();                        }                    }                } catch (final Throwable e) {                    LOGGER.warn("caught Throwable err: ", e);                } finally {                    keys.clear();                }            } catch (Exception e) {                LOGGER.info(getName(), e);            }        }    }
可以看出上述方法调用了 accept 来接受客户端发起的 TCP 连接,继续看该类的 accept 方法:
private void accept() {        SocketChannel channel = null;        try {            //与客户端建立TCP连接            channel = serverChannel.accept();            channel.configureBlocking(false);            NIOSocketWR socketWR = new NIOSocketWR();            FrontendConnection c = factory.make(channel, socketWR);            socketWR.initFromConnection(c);            c.setId(ID_GENERATOR.getId());            IOProcessor processor = DbleServer.getInstance().nextFrontProcessor();            c.setProcessor(processor);            NIOReactor reactor = reactorPool.getNextReactor();            //将已建立好的连接注册给NIOReactor            reactor.postRegister(c);        } catch (Exception e) {            LOGGER.info(getName(), e);            closeChannel(channel);        }    }
可以看出上述代码将客户端发起建立的 TCP 连接注册给了 NIOReactor 来管理。
到这里 DBLE 对客户端的 connect 已经处理完成了,他们之间已经完成了 TCP 连接的建立,同时 DBLE 将客户端请求的连接注册给了 NIOReactor

2、服务端发送握手包

接着上面我们继续看 NIOReactor#postRegister 方法:
//该方法将连接放入队列,并唤醒reactorR的selector,其中reactorR为NIOReactor的内部类RWvoid postRegister(AbstractConnection c) {        reactorR.registerQueue.offer(c);        reactorR.selector.wakeup();    }
我们直接看 RW 处理注册队列里客户端连接的代码,在 RW#register 方法中:
private void register(Selector finalSelector) {            ……            while ((c = registerQueue.poll()) != null) {                try {                     //下面这行代码需要注意,该代码将连接注册到了RW的selector多路复用选择器中,使得该选择器后续能够读取该连接发送过来的数据                    ((NIOSocketWR) c.getSocketWR()).register(finalSelector);                    //服务端发送握手包的逻辑在下面这个方法中                    c.register();                }             ……        }
AbstractConnection#register 方法中又调用了 AbstractService#register 方法,对于客户端连接请求来讲,这里最终调用的是 MySQLFrontAuthService#register 方法:
public void register() throws IOException {        //终于看到了greeting,该方法向客户端发送了握手包        greeting();        this.connection.getSocketWR().asyncRead();    }
MySQLFrontAuthService#greeting 方法实现了拼装握手包,并将握手包发送给客户端的逻辑:
private void greeting() {        // generate auth data        byte[] rand1 = RandomUtil.randomBytes(8);        byte[] rand2 = RandomUtil.randomBytes(12);
// save auth data byte[] rand = new byte[rand1.length + rand2.length]; System.arraycopy(rand1, 0, rand, 0, rand1.length); System.arraycopy(rand2, 0, rand, rand1.length, rand2.length); this.seed = rand;
HandshakeV10Packet hs = new HandshakeV10Packet(); hs.setPacketId(0); hs.setProtocolVersion(Versions.PROTOCOL_VERSION); // [0a] protocol version V10 hs.setServerVersion(Versions.getServerVersion()); hs.setThreadId(connection.getId()); hs.setSeed(rand1); hs.setServerCapabilities(getServerCapabilities()); int charsetIndex = CharsetUtil.getCharsetDefaultIndex(SystemConfig.getInstance().getCharset()); hs.setServerCharsetIndex((byte) (charsetIndex & 0xff)); hs.setServerStatus(2); hs.setRestOfScrambleBuff(rand2); hs.setAuthPluginName(pluginName.name().getBytes());
//这里的调用即发送握手包到客户端 hs.write(connection); }
到这里就完成了服务端发送握手包的逻辑。

3、DBLE 处理客户端的握手回复包

服务端发送了握手包给客户端,客户端收到后需要发送握手回复包过来了,一般该握手回复包中会包含用户相关信息。
那么 DBLE 如何读取并处理客户端发送过来的握手回复包呢?
相应的代码在 RW#run 方法中,那为什么在这个方法中能处理客户端发送过来的数据呢?因为之前在处理客户端连接的时候,已经把相应的连接注册给了 RW 的多路复用选择器,所以它当然能处理相应连接的数据了,记不得的同学可以看前面 RW#register 方法中的注释。
RW#run 方法中处理客户端发送数据的主要代码如下:
public void run() {            final Selector finalSelector = this.selector;            Set<SelectionKey> keys = null;            for (; ; ) {                    ……                                    //当连接中有数据的时候,这里会返回相应的selection keys                    keys = finalSelector.selectedKeys();                    if (keys.size() == 0) {                        continue;                    }                    //对有相应事件的连接进行处理                    executeKeys(keys);             }                    ……     }
我们继续看 RW#executeKeys 方法(对代码做了一些精简,但不影响理解):
private void executeKeys(Set<SelectionKey> keys) {            for (SelectionKey key : keys) {                AbstractConnection con = null;                Object att = key.attachment();                con = (AbstractConnection) att;                if (key.isValid() && key.isReadable()) {                    //这里即为读取客户端发送过来的数据                    con.asyncRead();                }            }        }
着代码走,相应的处理逻辑在 NIOSocketWR#asyncRead 方法中:
public void asyncRead() throws IOException {        ByteBuffer theBuffer = con.findReadBuffer();        //读取客户端发送过来的数据到缓存theBuffer中        int got = channel.read(theBuffer);        //处理相应的数据        con.onReadData(got);    }
AbstractConnection#onReadData 方法中又进一步调用了 AbstractService#handle 方法来处理数据,所以我们直接看 AbstractService#handle 方法:
public void handle(ByteBuffer dataBuffer) {        this.sessionStart();        boolean hasReming = true;        int offset = 0;        while (hasReming) {            //下面这行代码实际处理了客户端传过来的数据包,里面包含计算包总长度、判断读取的数据包是否完整等逻辑            ProtoHandlerResult result = proto.handle(dataBuffer, offset, isSupportCompress);            switch (result.getCode()) {                //客户端首次发来的握手包,所以是完整的数据包,进入这里的处理逻辑,这里将读取的数据封装成task任务,提交到队列中,然后通过线程异步处理                case REACH_END_BUFFER:                    connection.readReachEnd();                    byte[] packetData = result.getPacketData();                    if (packetData != null) {                        taskCreate(packetData);                    }                    dataBuffer.clear();                    hasReming = false;                    break;                case BUFFER_PACKET_UNCOMPLETE:                    connection.compactReadBuffer(dataBuffer, result.getOffset());                    hasReming = false;                    break;                case BUFFER_NOT_BIG_ENOUGH:                    connection.ensureFreeSpaceOfReadBuffer(dataBuffer, result.getOffset(), result.getPacketLength());                    hasReming = false;                    break;                case STLL_DATA_REMING:                    byte[] partData = result.getPacketData();                    if (partData != null) {                        taskCreate(partData);                    }                    offset = result.getOffset();                    continue;                default:                    throw new RuntimeException("unknown error when read data");            }        }    }
到这里 DBLE 完成了读取客户端发送过来的握手包,并将它封装成了异步任务以备下一步处理。

4、DBLE 异步处理任务并返回 OK 包

异步是高性能的秘诀之一。上面DBLE将读取到的数据封装成了任务,然后交由线程异步处理。
我们直接来看任务处理的相关代码,在 AbstractService#execute 方法中:
public void execute(ServiceTask task) {        task.increasePriority();        handleData(task);    }
对于客户端握手回复包的处理,最后调用的代码在 MySQLFrontAuthService#handleAuthPacket 方法中,所以对于该场景,我们直接看该方法的相关代码:
private void handleAuthPacket(byte[] data) {        //将读取到的数据转换为AuthPacket        AuthPacket auth = new AuthPacket();        auth.read(data);        this.authPacket = auth;        ……        //检查用户名和密码        auth();        ……    }
上面的 auth 方法里又调用了 MySQLFrontAuthService#checkForResult 方法,所以我们直接看该方法:
private void checkForResult(AuthResultInfo info) {            ……            AbstractService service = BusinessServiceFactory.getBusinessService(info, connection);            connection.setService(service);            //验证通过后,拼装并发送OK包给客户端            MySQLPacket packet = new OkPacket();            packet.setPacketId(needAuthSwitched ? 4 : 2);            packet.write(connection);            ……    }
到这里,DBLE 已经处理完了握手回复包,并返回 OK 包给客户端,整个客户端与 DBLE 的连接建立过程就结束了。
该过程结束后,将进入 MySQL 协议的 Command 阶段,如果你通过命令行连接 DBLE 的话,即进入了下面的界面:
mysql>
是不是没想到在进入这个命令界面前发生了这么多……

多路复用在 DBLE 中是如何实现的

其实这个问题的答案,如果仔细看前面代码章节的话就已经能够知道了。DBLE 的多路复用其实就是通过 JAVA 的多路复用选择器 Selector 来实现的,通过将连接注册给 Selector,这样只是在连接中有数据时候才进行读取,能够实现一个线程监听多个连接。

请求的异步化处理

DBLE 在读取完数据后,并没有在当前线程中处理这些数据,而是将数据封装成任务提交到队列中去,然后通过另外的线程来进行处理,这即是请求异步化处理,能够极大的提高性能,这在上面的源码解读章节里也进行了说明。

总结

今天从一个实例出发,从源码角度详细解读了 DBLE 对网络数据包的处理流程。通过 Selector 实现多路复用,将接收到的数据封装成任务提交到队列以进行异步处理。这是 DBLE 高性能网络 IO 处理的秘密。当然可能还有一些代码细节在文章中没有讲到,大家如果有疑问的地方可以进一步阅读源码,也可以评论区留言。