作者:路路
热爱技术、乐于分享的技术人,目前主要从事数据库相关技术的研究。
本文来源:原创投稿
*爱可生开源社区出品,原创内容未经授权不得随意使用,转载请联系小编并注明来源。
前言
在上一篇文章中,我讲了网络 IO 的基础知识,本篇文章将从源码角度具体讲解 DBLE 的网络模块:包括 DBLE 是如何处理 MySQL 包的,多路复用在 DBLE 中是如何实现的,以及请求的异步化处理相关逻辑。
DBLE 是如何处理 MySQL 包的?
1、 客户端发起 connect 连接;
2、服务端发送握手包;
3、客户端回复握手包;
4、服务端返回 OK 包,表示连接建立完成,进入命令阶段。
我们直接看源码:
public void run() {
//这里的selector即IO多路复用选择器,一个selector可以处理多个客户端连接请求
final Selector tSelector = this.selector;
for (; ; ) {
try {
tSelector.select(1000L);
Set<SelectionKey> keys = tSelector.selectedKeys();
try {
for (SelectionKey key : keys) {
if (key.isValid() && key.isAcceptable()) {
//当连接有效且可接受时,处理客户端连接
accept();
} else {
key.cancel();
}
}
} catch (final Throwable e) {
LOGGER.warn("caught Throwable err: ", e);
} finally {
keys.clear();
}
} catch (Exception e) {
LOGGER.info(getName(), e);
}
}
}
private void accept() {
SocketChannel channel = null;
try {
//与客户端建立TCP连接
channel = serverChannel.accept();
channel.configureBlocking(false);
NIOSocketWR socketWR = new NIOSocketWR();
FrontendConnection c = factory.make(channel, socketWR);
socketWR.initFromConnection(c);
c.setId(ID_GENERATOR.getId());
IOProcessor processor = DbleServer.getInstance().nextFrontProcessor();
c.setProcessor(processor);
NIOReactor reactor = reactorPool.getNextReactor();
//将已建立好的连接注册给NIOReactor
reactor.postRegister(c);
} catch (Exception e) {
LOGGER.info(getName(), e);
closeChannel(channel);
}
}
2、服务端发送握手包
//该方法将连接放入队列,并唤醒reactorR的selector,其中reactorR为NIOReactor的内部类RW
void postRegister(AbstractConnection c) {
reactorR.registerQueue.offer(c);
reactorR.selector.wakeup();
}
private void register(Selector finalSelector) {
……
while ((c = registerQueue.poll()) != null) {
try {
//下面这行代码需要注意,该代码将连接注册到了RW的selector多路复用选择器中,使得该选择器后续能够读取该连接发送过来的数据
((NIOSocketWR) c.getSocketWR()).register(finalSelector);
//服务端发送握手包的逻辑在下面这个方法中
c.register();
}
……
}
public void register() throws IOException {
//终于看到了greeting,该方法向客户端发送了握手包
greeting();
this.connection.getSocketWR().asyncRead();
}
private void greeting() {
// generate auth data
byte[] rand1 = RandomUtil.randomBytes(8);
byte[] rand2 = RandomUtil.randomBytes(12);
// save auth data
byte[] rand = new byte[rand1.length + rand2.length];
System.arraycopy(rand1, 0, rand, 0, rand1.length);
System.arraycopy(rand2, 0, rand, rand1.length, rand2.length);
this.seed = rand;
HandshakeV10Packet hs = new HandshakeV10Packet();
hs.setPacketId(0);
hs.setProtocolVersion(Versions.PROTOCOL_VERSION); // [0a] protocol version V10
hs.setServerVersion(Versions.getServerVersion());
hs.setThreadId(connection.getId());
hs.setSeed(rand1);
hs.setServerCapabilities(getServerCapabilities());
int charsetIndex = CharsetUtil.getCharsetDefaultIndex(SystemConfig.getInstance().getCharset());
hs.setServerCharsetIndex((byte) (charsetIndex & 0xff));
hs.setServerStatus(2);
hs.setRestOfScrambleBuff(rand2);
hs.setAuthPluginName(pluginName.name().getBytes());
//这里的调用即发送握手包到客户端
hs.write(connection);
}
3、DBLE 处理客户端的握手回复包
public void run() {
final Selector finalSelector = this.selector;
Set<SelectionKey> keys = null;
for (; ; ) {
……
//当连接中有数据的时候,这里会返回相应的selection keys
keys = finalSelector.selectedKeys();
if (keys.size() == 0) {
continue;
}
//对有相应事件的连接进行处理
executeKeys(keys);
}
……
}
private void executeKeys(Set<SelectionKey> keys) {
for (SelectionKey key : keys) {
AbstractConnection con = null;
Object att = key.attachment();
con = (AbstractConnection) att;
if (key.isValid() && key.isReadable()) {
//这里即为读取客户端发送过来的数据
con.asyncRead();
}
}
}
public void asyncRead() throws IOException {
ByteBuffer theBuffer = con.findReadBuffer();
//读取客户端发送过来的数据到缓存theBuffer中
int got = channel.read(theBuffer);
//处理相应的数据
con.onReadData(got);
}
public void handle(ByteBuffer dataBuffer) {
this.sessionStart();
boolean hasReming = true;
int offset = 0;
while (hasReming) {
//下面这行代码实际处理了客户端传过来的数据包,里面包含计算包总长度、判断读取的数据包是否完整等逻辑
ProtoHandlerResult result = proto.handle(dataBuffer, offset, isSupportCompress);
switch (result.getCode()) {
//客户端首次发来的握手包,所以是完整的数据包,进入这里的处理逻辑,这里将读取的数据封装成task任务,提交到队列中,然后通过线程异步处理
case REACH_END_BUFFER:
connection.readReachEnd();
byte[] packetData = result.getPacketData();
if (packetData != null) {
taskCreate(packetData);
}
dataBuffer.clear();
hasReming = false;
break;
case BUFFER_PACKET_UNCOMPLETE:
connection.compactReadBuffer(dataBuffer, result.getOffset());
hasReming = false;
break;
case BUFFER_NOT_BIG_ENOUGH:
connection.ensureFreeSpaceOfReadBuffer(dataBuffer, result.getOffset(), result.getPacketLength());
hasReming = false;
break;
case STLL_DATA_REMING:
byte[] partData = result.getPacketData();
if (partData != null) {
taskCreate(partData);
}
offset = result.getOffset();
continue;
default:
throw new RuntimeException("unknown error when read data");
}
}
}
4、DBLE 异步处理任务并返回 OK 包
public void execute(ServiceTask task) {
task.increasePriority();
handleData(task);
}
private void handleAuthPacket(byte[] data) {
//将读取到的数据转换为AuthPacket
AuthPacket auth = new AuthPacket();
auth.read(data);
this.authPacket = auth;
……
//检查用户名和密码
auth();
……
}
private void checkForResult(AuthResultInfo info) {
……
AbstractService service = BusinessServiceFactory.getBusinessService(info, connection);
connection.setService(service);
//验证通过后,拼装并发送OK包给客户端
MySQLPacket packet = new OkPacket();
packet.setPacketId(needAuthSwitched ? 4 : 2);
packet.write(connection);
……
}
mysql>
多路复用在 DBLE 中是如何实现的
其实这个问题的答案,如果仔细看前面代码章节的话就已经能够知道了。DBLE 的多路复用其实就是通过 JAVA 的多路复用选择器 Selector 来实现的,通过将连接注册给 Selector,这样只是在连接中有数据时候才进行读取,能够实现一个线程监听多个连接。
请求的异步化处理
DBLE 在读取完数据后,并没有在当前线程中处理这些数据,而是将数据封装成任务提交到队列中去,然后通过另外的线程来进行处理,这即是请求异步化处理,能够极大的提高性能,这在上面的源码解读章节里也进行了说明。
总结
今天从一个实例出发,从源码角度详细解读了 DBLE 对网络数据包的处理流程。通过 Selector 实现多路复用,将接收到的数据封装成任务提交到队列以进行异步处理。这是 DBLE 高性能网络 IO 处理的秘密。当然可能还有一些代码细节在文章中没有讲到,大家如果有疑问的地方可以进一步阅读源码,也可以评论区留言。