作者:路路
热爱技术、乐于分享的技术人,目前主要从事数据库相关技术的研究。
本文来源:原创投稿
*爱可生开源社区出品,原创内容未经授权不得随意使用,转载请联系小编并注明来源。
前言
在上一篇文章中,我讲了网络 IO 的基础知识,本篇文章将从源码角度具体讲解 DBLE 的网络模块:包括 DBLE 是如何处理 MySQL 包的,多路复用在 DBLE 中是如何实现的,以及请求的异步化处理相关逻辑。
DBLE 是如何处理 MySQL 包的?
1、 客户端发起 connect 连接;
2、服务端发送握手包;
3、客户端回复握手包;
4、服务端返回 OK 包,表示连接建立完成,进入命令阶段。
我们直接看源码:
public void run() {//这里的selector即IO多路复用选择器,一个selector可以处理多个客户端连接请求final Selector tSelector = this.selector;for (; ; ) {try {tSelector.select(1000L);Set<SelectionKey> keys = tSelector.selectedKeys();try {for (SelectionKey key : keys) {if (key.isValid() && key.isAcceptable()) {//当连接有效且可接受时,处理客户端连接accept();} else {key.cancel();}}} catch (final Throwable e) {LOGGER.warn("caught Throwable err: ", e);} finally {keys.clear();}} catch (Exception e) {LOGGER.info(getName(), e);}}}
private void accept() {SocketChannel channel = null;try {//与客户端建立TCP连接channel = serverChannel.accept();channel.configureBlocking(false);NIOSocketWR socketWR = new NIOSocketWR();FrontendConnection c = factory.make(channel, socketWR);socketWR.initFromConnection(c);c.setId(ID_GENERATOR.getId());IOProcessor processor = DbleServer.getInstance().nextFrontProcessor();c.setProcessor(processor);NIOReactor reactor = reactorPool.getNextReactor();//将已建立好的连接注册给NIOReactorreactor.postRegister(c);} catch (Exception e) {LOGGER.info(getName(), e);closeChannel(channel);}}
2、服务端发送握手包
//该方法将连接放入队列,并唤醒reactorR的selector,其中reactorR为NIOReactor的内部类RWvoid postRegister(AbstractConnection c) {reactorR.registerQueue.offer(c);reactorR.selector.wakeup();}
private void register(Selector finalSelector) {……while ((c = registerQueue.poll()) != null) {try {//下面这行代码需要注意,该代码将连接注册到了RW的selector多路复用选择器中,使得该选择器后续能够读取该连接发送过来的数据((NIOSocketWR) c.getSocketWR()).register(finalSelector);//服务端发送握手包的逻辑在下面这个方法中c.register();}……}
public void register() throws IOException {//终于看到了greeting,该方法向客户端发送了握手包greeting();this.connection.getSocketWR().asyncRead();}
private void greeting() {// generate auth databyte[] rand1 = RandomUtil.randomBytes(8);byte[] rand2 = RandomUtil.randomBytes(12);// save auth databyte[] rand = new byte[rand1.length + rand2.length];System.arraycopy(rand1, 0, rand, 0, rand1.length);System.arraycopy(rand2, 0, rand, rand1.length, rand2.length);this.seed = rand;HandshakeV10Packet hs = new HandshakeV10Packet();hs.setPacketId(0);hs.setProtocolVersion(Versions.PROTOCOL_VERSION); // [0a] protocol version V10hs.setServerVersion(Versions.getServerVersion());hs.setThreadId(connection.getId());hs.setSeed(rand1);hs.setServerCapabilities(getServerCapabilities());int charsetIndex = CharsetUtil.getCharsetDefaultIndex(SystemConfig.getInstance().getCharset());hs.setServerCharsetIndex((byte) (charsetIndex & 0xff));hs.setServerStatus(2);hs.setRestOfScrambleBuff(rand2);hs.setAuthPluginName(pluginName.name().getBytes());//这里的调用即发送握手包到客户端hs.write(connection);}
3、DBLE 处理客户端的握手回复包
public void run() {final Selector finalSelector = this.selector;Set<SelectionKey> keys = null;for (; ; ) {……//当连接中有数据的时候,这里会返回相应的selection keyskeys = finalSelector.selectedKeys();if (keys.size() == 0) {continue;}//对有相应事件的连接进行处理executeKeys(keys);}……}
private void executeKeys(Set<SelectionKey> keys) {for (SelectionKey key : keys) {AbstractConnection con = null;Object att = key.attachment();con = (AbstractConnection) att;if (key.isValid() && key.isReadable()) {//这里即为读取客户端发送过来的数据con.asyncRead();}}}
public void asyncRead() throws IOException {ByteBuffer theBuffer = con.findReadBuffer();//读取客户端发送过来的数据到缓存theBuffer中int got = channel.read(theBuffer);//处理相应的数据con.onReadData(got);}
public void handle(ByteBuffer dataBuffer) {this.sessionStart();boolean hasReming = true;int offset = 0;while (hasReming) {//下面这行代码实际处理了客户端传过来的数据包,里面包含计算包总长度、判断读取的数据包是否完整等逻辑ProtoHandlerResult result = proto.handle(dataBuffer, offset, isSupportCompress);switch (result.getCode()) {//客户端首次发来的握手包,所以是完整的数据包,进入这里的处理逻辑,这里将读取的数据封装成task任务,提交到队列中,然后通过线程异步处理case REACH_END_BUFFER:connection.readReachEnd();byte[] packetData = result.getPacketData();if (packetData != null) {taskCreate(packetData);}dataBuffer.clear();hasReming = false;break;case BUFFER_PACKET_UNCOMPLETE:connection.compactReadBuffer(dataBuffer, result.getOffset());hasReming = false;break;case BUFFER_NOT_BIG_ENOUGH:connection.ensureFreeSpaceOfReadBuffer(dataBuffer, result.getOffset(), result.getPacketLength());hasReming = false;break;case STLL_DATA_REMING:byte[] partData = result.getPacketData();if (partData != null) {taskCreate(partData);}offset = result.getOffset();continue;default:throw new RuntimeException("unknown error when read data");}}}
4、DBLE 异步处理任务并返回 OK 包
public void execute(ServiceTask task) {task.increasePriority();handleData(task);}
private void handleAuthPacket(byte[] data) {//将读取到的数据转换为AuthPacketAuthPacket auth = new AuthPacket();auth.read(data);this.authPacket = auth;……//检查用户名和密码auth();……}
private void checkForResult(AuthResultInfo info) {……AbstractService service = BusinessServiceFactory.getBusinessService(info, connection);connection.setService(service);//验证通过后,拼装并发送OK包给客户端MySQLPacket packet = new OkPacket();packet.setPacketId(needAuthSwitched ? 4 : 2);packet.write(connection);……}
mysql>多路复用在 DBLE 中是如何实现的
其实这个问题的答案,如果仔细看前面代码章节的话就已经能够知道了。DBLE 的多路复用其实就是通过 JAVA 的多路复用选择器 Selector 来实现的,通过将连接注册给 Selector,这样只是在连接中有数据时候才进行读取,能够实现一个线程监听多个连接。
请求的异步化处理
DBLE 在读取完数据后,并没有在当前线程中处理这些数据,而是将数据封装成任务提交到队列中去,然后通过另外的线程来进行处理,这即是请求异步化处理,能够极大的提高性能,这在上面的源码解读章节里也进行了说明。
总结
今天从一个实例出发,从源码角度详细解读了 DBLE 对网络数据包的处理流程。通过 Selector 实现多路复用,将接收到的数据封装成任务提交到队列以进行异步处理。这是 DBLE 高性能网络 IO 处理的秘密。当然可能还有一些代码细节在文章中没有讲到,大家如果有疑问的地方可以进一步阅读源码,也可以评论区留言。