作者简介

路路,热爱技术、乐于分享的技术人,目前主要从事数据库相关技术的研究。

前言

今天给大家带来 DBLE 的内存管理模块源码解析文章。

本文主要分为两部分,一是内存管理模块结构,二是内存管理模块源码解析。

内存管理模块结构
DBLE 管理的内存的主要目的有以下两点 

1. 网络读写

2. 查询结果集处理

简单说明一下这两点:

第一点比较好理解,网络读的时候需要从通道中读取字节流到内存中,以便进一步处理,网络写的时候也需要将字节流从内存中写入到通道中,以便发送信息。

第二点就比较复杂一些了,DBLE 获取到 MySQL 端的数据后,会经过 DBLE 然后再发送给客户端,如果是简单查询的话,DBLE 端只是起到一个中转的作用,会利用少量内存用于将结果转发给客户端。如果是复杂查询的话,DBLE 会先对结果集进行处理,比如连接、分组排序等操作,然后再将结果发送给客户端,这里涉及到的内存使用就复杂了,对于复杂查询 DBLE 会先使用堆内存,如果使用的堆内存超过限制,则会写内存映射文件,如果写内存映射文件个数再次达到上限,则会写硬盘。是不是很复杂?放心,这一块本文不会介绍,原因我就不说了(太复杂了)……

我们看一下 DBLE 官方内存结构图:

个人觉得上图有一点理解上的疑惑, DirectByteBufferPool 占用的内存应该位于物理内存中,JVM 中只是有着堆外内存的引用对象而已。

下图个人觉得可能更好理解。

结合 DBLE 管理内存的主要两个作用,网络读写主要用到了堆外内存,即 DirectByteBufferPool 管理的部分。结果集处理根据情况有所不同,本文只讨论简单情况,对于简单情况来讲,会使用到堆外内存或堆内存,下文源码中会提到具体情况。

内存管理模块源码解析
来看下内存管理模块涉及到的类:
从上图可以看出内存管理模块涉及到的类不多,只有两个:
1. DirectByteBufferPool 类即为管理堆外内存的内存池类,可以看到它创建了 ByteBuffePage 类,并且与该类有着一对多的关系;
2. ByteBufferPage 类为 DBLE 抽象出的堆外内存页的概念,内存页中又有内存块的概念,内存块是内存分配的最小单元。采用此种内存概念主要是为了减少内存碎片问题。

DBLE 官方对于这两个类的内部结构以图的形式表示的很清楚了:

上图中的参数 bufferPoolPageNumberbufferPoolPageSizebufferPoolChunkSize 可在 Server.xml 中配置 ,bufferPoolPageSize 默认为 2M,bufferPoolPageNumber 默认为 Java 虚拟机的可用的处理器数量 * 20, bufferPoolChunkSize 的参数默认为 4k,该参数最好设为 bufferPoolPageSize 的约数,否则最后一个会造成浪费。

在看源码前,先看下内存分配的逻辑:

1. 如果不指定分配大小,则默认分配一个最小单元(最小单元由 bufferPoolChunkSize  决定);

2. 如果指定分配大小,则分配放得下分配大小的最小单元的整数倍(向上取整) ,举个例子,如果需要 10k 的大小,则在 bufferPoolChunkSize 参数默认为 4k 的情况下,会分配 3 个内存 chunk;

3. 分配逻辑为:
(1)遍历缓冲池从 N+1 页到 bufferPoolPageNumber -1 页(上次分配过的记为第 N 页),然后对单页加锁在每个页中从头寻找未被使用的连续 M 个最小单元 ;
(2)如果没找到,再从第 0 页找到第 N 页;
(3)成功分配内存后更新上次分配页,标记内存页中分配的单元;

(4)如果找不到可存放的单页(比如大于 bufferPoolPageSize ),直接分配 On-Heap 内存。

下面我们就开始看看对应的源码。

内存池初始化的代码调用在 DbleServer#startup 方法中:

  1. //通过配置的相关参数初始化内存池

  2. bufferPool = new DirectByteBufferPool(bufferPoolPageSize, bufferPoolChunkSize, bufferPoolPageNumber);

看下内存池初始化逻辑,代码在 DirectByteBufferPool 类的构造方法中:
  1. public DirectByteBufferPool(int pageSize, short chunkSize, short pageCount) {

  2. //初始化内存总页数

  3. allPages = new ByteBufferPage[pageCount];

  4. //内存页中的内存块大小

  5. this.chunkSize = chunkSize;

  6. //每个内存页的大小

  7. this.pageSize = pageSize;

  8. //内存页总数

  9. this.pageCount = pageCount;

  10. //记录上次分配过的页

  11. prevAllocatedPage = new AtomicInteger(0);

  12. //循环初始化内存页

  13. for (int i = 0; i < pageCount; i++) {

  14. allPages[i] = new ByteBufferPage(ByteBuffer.allocateDirect(pageSize), chunkSize);

  15. }

  16. //记录堆外内存的使用大小

  17. memoryUsage = new ConcurrentHashMap<>();

  18. }

继续看下内存页的初始化逻辑,在 ByteBufferPage 类的构造方法中:
  1. public ByteBufferPage(ByteBuffer buf, int chunkSize) {

  2. //内存块大小

  3. this.chunkSize = chunkSize;

  4. //计算总内存块数

  5. chunkCount = buf.capacity() / chunkSize;

  6. //非常巧妙的位图结构,用于标记内存块的使用情况

  7. chunkAllocateTrack = new BitSet(chunkCount);

  8. //内存页的大小

  9. this.buf = buf;

  10. }

上述代码完成了堆外内存的初始化。

下面让我们看看内存的分配逻辑代码。
分配内存的操作主要在 DirectByteBufferPool#allocate 方法中:
  1. public ByteBuffer allocate(int size) {

  2. //计算需要分配的chunk数

  3. final int theChunkCount = size / chunkSize + (size % chunkSize == 0 ? 0 : 1);

  4. //本次分配的开始页数,为上次分配过的页数(N)+1

  5. int selectedPage = prevAllocatedPage.incrementAndGet() % allPages.length;

  6. //从N+1页到bufferPoolPageNumber-1页遍历分配,allocateBuffer方法下面会进一步分析

  7. ByteBuffer byteBuf = allocateBuffer(theChunkCount, selectedPage, allPages.length);

  8. //没有分配成功,则从0页到N页继续遍历分配

  9. if (byteBuf == null) {

  10. byteBuf = allocateBuffer(theChunkCount, 0, selectedPage);

  11. }

  12. final long threadId = Thread.currentThread().getId();

  13. //分配成功的话,则记录分配到的内存大小

  14. if (byteBuf != null) {

  15. if (memoryUsage.containsKey(threadId)) {

  16. memoryUsage.put(threadId, memoryUsage.get(threadId) + byteBuf.capacity());

  17. } else {

  18. memoryUsage.put(threadId, (long) byteBuf.capacity());

  19. }

  20. }

  21. //如果遍历完所有页还没有分配成功,则直接分配堆上内存

  22. if (byteBuf == null) {

  23. //ByteBuffer.allocate方法为分配JVM堆内存

  24. return ByteBuffer.allocate(size);

  25. }

  26. return byteBuf;

  27. }

继续看下 DirectByteBufferPool#allocateBuffer 方法:

  1. private ByteBuffer allocateBuffer(int theChunkCount, int startPage, int endPage) {

  2. //从指定页数开始遍历分配内存,分配成功则标记当前分配的页数,然后直接返回

  3. for (int i = startPage; i < endPage; i++) {

  4. //调用了ByteBufferPage类的allocateChunk方法进行内存块的分配,该方法下面也会进一步分析

  5. ByteBuffer buffer = allPages[i].allocateChunk(theChunkCount);

  6. if (buffer != null) {

  7. prevAllocatedPage.getAndSet(i);

  8. return buffer;

  9. }

  10. }

  11. return null;

  12. }

继续看一下 ByteBufferPage#allocateChunk 方法,该方法比较长,分配页中的连续内存块逻辑就在此方法中:
  1. public ByteBuffer allocateChunk(int theChunkCount) {

  2. //对页加状态锁,防止并发操作异常

  3. if (!allocLockStatus.compareAndSet(false, true)) {

  4. return null;

  5. }

  6. int startChunk = -1;

  7. int continueCount = 0;

  8. try {

  9. //下面的逻辑为在页中找到符合内存分配大小的连续内存块

  10. for (int i = 0; i < chunkCount; i++) {

  11. if (!chunkAllocateTrack.get(i)) {

  12. if (startChunk == -1) {

  13. startChunk = i;

  14. continueCount = 1;

  15. if (theChunkCount == 1) {

  16. break;

  17. }

  18. } else {

  19. if (++continueCount == theChunkCount) {

  20. break;

  21. }

  22. }

  23. } else {

  24. startChunk = -1;

  25. continueCount = 0;

  26. }

  27. }

  28. //成功找到后则返回分配的内存块,并且标记相应的内存块位置

  29. if (continueCount == theChunkCount) {

  30. int offStart = startChunk * chunkSize;

  31. int offEnd = offStart + theChunkCount * chunkSize;

  32. buf.limit(offEnd);

  33. buf.position(offStart);


  34. ByteBuffer newBuf = buf.slice();

  35. markChunksUsed(startChunk, theChunkCount);

  36. return newBuf;

  37. } else {

  38. //分配失败返回null

  39. return null;

  40. }

  41. } finally {

  42. //解锁

  43. allocLockStatus.set(false);

  44. }

  45. }

到这里,内存分配的逻辑就大概讲完了。

有分配也得有回收,为了文章的完整性,内存回收也顺带讲一下。
内存回收逻辑比较简单,分两种情况,如果是分配的堆内存,则直接 clear 等待 GC 回收,如果是堆外内存,则遍历所有页,找到对应页然后加锁,找到对应块的位置,标记为未使用就可以了。
内存回收的主要代码在 DirectByteBufferPool#recycle 方法中:
  1. public void recycle(ByteBuffer theBuf) {

  2. //堆内存的回收

  3. if (!(theBuf instanceof DirectBuffer)) {

  4. theBuf.clear();

  5. return;

  6. }


  7. final long size = theBuf.capacity();

  8. //堆外内存的回收

  9. boolean recycled = false;

  10. DirectBuffer thisNavBuf = (DirectBuffer)theBuf;

  11. int chunkCount = theBuf.capacity() / chunkSize;

  12. DirectBuffer parentBuf = (DirectBuffer) thisNavBuf.attachment();

  13. int startChunk = (int) ((thisNavBuf.address() - parentBuf.address()) / this.chunkSize);

  14. //遍历页然后将对应块标记为未使用即可

  15. for (ByteBufferPage allPage : allPages) {

  16. if ((recycled = allPage.recycleBuffer((ByteBuffer) parentBuf, startChunk, chunkCount))) {

  17. break;

  18. }

  19. }

  20. final long threadId = Thread.currentThread().getId();


  21. if (memoryUsage.containsKey(threadId)) {

  22. memoryUsage.put(threadId, memoryUsage.get(threadId) - size);

  23. }

  24. if (!recycled) {

  25. LOGGER.info("warning ,not recycled buffer " + theBuf);

  26. }

  27. }

内存的分配与回收到这里也讲完了,还记得开始时候说的内存的主要作用吗?一个是网络读写时使用,一个是结果集暂存时使用。

网络读写时使用的例子可以参考这里,NIOSocketWR#asyncRead 方法:

  1. public void asyncRead() throws IOException {

  2. ByteBuffer theBuffer = con.readBuffer;

  3. if (theBuffer == null) {

  4. //这里分配了从通道读取数据时候的内存

  5. theBuffer = con.processor.getBufferPool().allocate(con.processor.getBufferPool().getChunkSize());

  6. con.readBuffer = theBuffer;

  7. }

  8. int got = channel.read(theBuffer);

  9. con.onReadData(got);

  10. }

暂存结果集使用的例子可以参考这里,SingleNodeHandler#rowResponse 方法:

  1. //省略无关内容

  2. ......

  3. //这里分配内存,将结果集写入,然后待发送到客户端

  4. buffer = session.getSource().writeToBuffer(row, allocBuffer());

  5. ......

当然内存分配的代码调用远不止上面举例的两处,这里只是和内存管理的作用做个呼应。其他地方大家可以自己看。

总结
DBLE 的内存管理模块源码阅读的内容如上所述,希望能够对大家更深入的理解 DBLE 有所帮助。

社区近期动态

No.1

10.26 DBLE 用户见面会 北京站

爱可生开源社区将在 2019 年 10 月 26 日迎来在北京的首场 DBLE 用户见面会,以线下互动分享的会议形式跟大家见面。

时间:10月26日 9:00 – 12:00 AM

地点:HomeCafe 上地店(北京市海淀区上地二街一号龙泉湖酒店对面)


重要提醒:

1. 同日下午还有 dbaplus 社群举办的沙龙:聚焦数据中台、数据架构与优化。

2. 爱可生开源社区会在每年10.24日开源一款高质量产品。本次在 dbaplus 沙龙会议上,爱可生的资深研发工程师闫阿龙,将为大家带来《金融分布式事务实践及txle概述》,并在现场开源。

No.2

「3306π」成都站 Meetup

知数堂将在 2019 年 10 月 26 日在成都举办线下会议,本次会议中邀请了五位数据库领域的资深研发/DBA进行主题分享。

时间:2019年10月26日 13:00-18:00

地点:成都市高新区天府三街198号腾讯成都大厦A座多功能厅

No.3

Mycat 问题免费诊断

诊断范围支持:

Mycat 的故障诊断、源码分析、性能优化

服务支持渠道:

  1. 技术交流群,进群后可提问

    QQ群(669663113)

  2. 社区通道,邮件&电话

    osc@actionsky.com

  3. 现场拜访,线下实地,1天免费拜访

关注“爱可生开源社区”公众号,回复关键字“Mycat”,获取活动详情。

No.4

社区技术内容征稿

征稿内容:

  1. 格式:.md/.doc/.txt

  2. 主题:MySQL、分布式中间件DBLE、数据传输组件DTLE相关技术内容

  3. 要求:原创且未发布过

  4. 奖励:作者署名;200元京东E卡+社区周边

投稿方式:

  1. 邮箱:osc@actionsky.com

  2. 格式:[投稿]姓名+文章标题

  3. 以附件形式发送,正文需注明姓名、手机号、微信号,以便小编及时联系