作者:蒋乐兴

MySQL DBA,擅长 python 和 SQL,目前维护着 github 的两个开源项目:mysqltools 、dbmc 以及独立博客:https://www.sqlpy.com。

本文来源:原创投稿

*爱可生开源社区出品,原创内容未经授权不得随意使用,转载请联系小编并注明来源。

问题

在 Python 语言环境下我们这样连接数据库。
  1. In [1]: from mysql import connector


  2. In [2]: cnx = connector.connect(host="172.16.192.100",port=3306,user="appuser",password="xxxxxx")

但是连接数据库的背后发生了什么呢?

答案

当我们通过驱动程序(mysql-connector-python,pymysql)连接 MySQL 服务端的时候,就是把连接参数传递给驱动程序,驱动程序再根据参数会发起到 MySQL 服务端的 TCP 连接。

当 TCP 连接建立之后驱动程序与服务端之间会按特定的格式和次序交换数据包,数据包的格式和发送次序由 MySQL 协议 规定。

整个连接的过程中 MySQL 服务端与驱动程序之间,按如下的次序发送了这些包。

1. MySQL 服务端向客户端发送一个握手包,包里记录了 MySQL-Server 的版本,默认的授权插件,密码盐值(auth-data)。
2. MySQL 客户端发出 ssl 连接请求包(如果有必要的话)。
3. MySQL 客户端发出握手包的响应包,这个包时记录了用户名,密码加密后的串,客户端属性,等等其它信息。
4. MySQL 服务端发出响应包,这个包里记录了登录是否成功,如果没有成功也会给出错误信息。

祼写 TCP 连接 MySQL

从上面给出的信息可以看出像 mysql-connector-python,pymysql 这类的驱动程序,并不是什么神仙、皇帝,只是一个普普通通的 TCP 客户端。那我们能不能自己一个程序来完成“连接”功能呢?还真可以这么干。
  1. #!/usr/bin/env python3

  2. """

  3. """


  4. import os

  5. import ssl

  6. import sys

  7. import time

  8. import socket

  9. import struct

  10. import logging

  11. import argparse


  12. from plugins import get_auth_plugin


  13. logging.basicConfig(level=logging.INFO,

  14. format='%(asctime)s - %(name)s - %(threadName)s - %(levelname)s - %(lineno)s - %(message)s')



  15. def read_str(packet, ends=None, size=None):

  16. """

  17. """

  18. if ends is None and size is None:

  19. raise ValueError("either ends not None or size not None.")


  20. if not isinstance(packet, (bytes, bytearray)):

  21. raise ValueError('packet must be a bytes or bytearray.')


  22. if ends is not None:

  23. index = packet.index(ends)

  24. return packet[index + 1:], packet[0:index]


  25. if size is not None and size < len(packet):

  26. return packet[size + 1], packet[0:size]

  27. else:

  28. raise ValueError('size must less than len(packet)')



  29. class MySQLTcpSocket(object):

  30. """封装一个到 MySQL 数据库的 TCP 连接(同步IO)

  31. """


  32. def __init__(self, host=None, port=3306, user=None, password=None):

  33. self._host = host

  34. self._port = port

  35. self._user = user

  36. self._password = password

  37. self._packet_number = 0


  38. addrinfos = socket.getaddrinfo(

  39. self._host, self._port, socket.AF_INET, socket.SOCK_STREAM, 0)


  40. for info in addrinfos:

  41. try:

  42. family, socket_type, proto, _, addrs = info

  43. self._sock = socket.socket(family, socket_type, proto)

  44. self._sock.settimeout(3)

  45. self._sock.connect(addrs)

  46. break

  47. except IOError as err:

  48. logging.exception(str(err))

  49. if hasattr(self._sock, 'close'):

  50. self._sock.close()

  51. sys.exit(1)

  52. except Exception as err:

  53. logging.exception(str(err))

  54. if hasattr(self._sock, 'close'):

  55. self._sock.close()

  56. sys.exit(1)


  57. @property

  58. def packet_number(self):

  59. """

  60. """

  61. if self._packet_number >= 255:

  62. self._packet_number = 0


  63. self._packet_number = self._packet_number + 1


  64. return self._packet_number


  65. def prepare_packets(self, packet, packet_number=None):

  66. """如果包大于 16 MB 就拆解成多个。

  67. """

  68. # 16 MB - 1

  69. max_length_packet = (1 << 24) - 1

  70. if packet_number is None:

  71. packet_number = self.packet_number


  72. # 超过 16M 的部分按 16M 打包

  73. packets = []

  74. while len(packet) > max_length_packet:

  75. pct = b'\x00\x00\x00' + struct.pack("B", packet_number)

  76. packet_number = packet_number + 1

  77. packets.append(pct + packet[0:max_length_packet])

  78. packet = packet[max_length_packet:]


  79. # 没有超过 16M 的部分按实际大小打包

  80. pakcet_len = len(packet)

  81. pct = struct.pack("<I", pakcet_len)[0:3] + \

  82. struct.pack("<B", packet_number) + packet


  83. packets.append(pct)

  84. return packets


  85. def recv(self):

  86. """MySQL 服务端接收一个数据包

  87. """

  88. rst = 4

  89. header = b''

  90. while rst > 0:

  91. chunck = self._sock.recv(rst)

  92. header = header + chunck

  93. rst = rst - len(chunck)


  94. # 执行到这里说明状况接收完成,准备接收 payload 部分的字节

  95. packet_length, *_ = struct.unpack("<I", header[0:3] + b'\x00')

  96. packet = bytearray(packet_length)

  97. pvm = memoryview(packet)

  98. rst = packet_length


  99. while rst > 0:

  100. chunck = self._sock.recv(rst)

  101. chunck_len = len(chunck)

  102. pvm[0:chunck_len] = chunck

  103. pvm = pvm[chunck_len:]

  104. rst = rst - chunck_len


  105. return header + packet


  106. def send(self, packet):

  107. """发送数据包到 MySQL-Server(自动分组)

  108. """

  109. for pct in self.prepare_packets(packet):

  110. self._sock.sendall(pct)


  111. def change_to_ssl_mode(self):

  112. """切换到 SSL-Client 模式

  113. """

  114. context = ssl.create_default_context()

  115. context.check_hostname = False

  116. context.verify_mode = ssl.CERT_NONE

  117. context.load_default_certs()

  118. self._sock = context.wrap_socket(self._sock)


  119. def __del__(self):

  120. if hasattr(self._sock, 'close'):

  121. logging.info("close tcp socket object.")

  122. self._sock.close()



  123. class MySQLProtocol(object):

  124. """实现 MySQL 各种数据包的解包的打包

  125. """


  126. def parser_init_packet(self, packet):

  127. """解析 MySQL-Server 发来的握手包

  128. """

  129. payload_len, *_ = struct.unpack("<I", packet[0:3] + b'\x00')

  130. packet_number = packet[3]

  131. packet = packet[4:]


  132. # 解析

  133. # protocol-version

  134. protocal_version = packet[0]

  135. packet = packet[1:]


  136. # 解析

  137. # mysql-version

  138. packet, mysql_version = read_str(packet, ends=b'\x00')

  139. mysql_version = mysql_version.decode("utf8")


  140. #

  141. connection_id, auth_data_1, capability_lower = struct.unpack(

  142. "<I8sx2s", packet[0:15])


  143. packet = packet[15:]


  144. if len(packet) > 0:

  145. charset, status_flags, capability_uper = struct.unpack(

  146. f"<BH2s", packet[0:5])

  147. packet = packet[5:]


  148. auth_plugin_data_length = packet[0]

  149. auth_data_2_len = max(13, (auth_plugin_data_length - 8))


  150. # packet = packet[1:]

  151. packet = packet[11:]

  152. auth_data_2 = packet[0:auth_data_2_len]

  153. if auth_data_2.endswith(b'\x00'):

  154. auth_data_2 = auth_data_2[0:-1]


  155. packet = packet[13:]

  156. packet, auth_plugin = read_str(packet, ends=b'\x00')

  157. auth_plugin = auth_plugin.decode('utf8')


  158. capabilities, * \

  159. _ = struct.unpack("<I", capability_lower + capability_uper)

  160. res = {

  161. 'protocol': protocal_version,

  162. 'server_version_original': mysql_version,

  163. 'server_threadid': connection_id,

  164. 'charset': charset,

  165. 'server_status': status_flags,

  166. 'auth_plugin': auth_plugin,

  167. 'auth_data': auth_data_1 + auth_data_2,

  168. 'capabilities': capabilities

  169. }


  170. return res


  171. def make_ssl_request_pakcet(self, client_flags=1813005, charset=45, max_allowed_packet=1073741824):

  172. """打包 SSL 连接请求包

  173. """

  174. packet = struct.pack("IIB" + 'x' * 23, client_flags,

  175. max_allowed_packet, charset)

  176. return packet


  177. def make_auth_response_packet(self, auth_data=None, plugin_name='caching_sha2_password',

  178. username=None, password=None, database=None, charset=45,

  179. client_flags=0, max_allowed_packet=1073741824, ssl_enabled=False,

  180. auth_plugin=None, conn_attrs=None):

  181. """

  182. """

  183. # 打包 client_flags,max_allowed_packet,charset

  184. packet = struct.pack("IIB" + "x"*23, client_flags,

  185. max_allowed_packet, charset)


  186. # 打包 username

  187. username_bytes = username.encode("utf8")

  188. packet = packet + username_bytes + b'\x00'


  189. # capabilities & CLIENT_SECURE_CONNECTION == True

  190. auth = get_auth_plugin('caching_sha2_password')(

  191. auth_data, username, password, ssl_enabled)

  192. auth_response = auth.auth_response()

  193. auth_response_len = len(auth_response)

  194. packet = packet + struct.pack("B", auth_response_len) + auth_response


  195. # capabilities & CLIENT_CONNECT_WITH_DB == False

  196. packet = packet + b'\x00'


  197. # capabilities & CLIENT_PLUGIN_AUTH == True

  198. packet = packet + auth_plugin.encode('utf8') + b'\x00'


  199. # capabilities & CLIENT_CONNECT_ATTRS == True

  200. attrs_len = sum([(2 + len(name) + len(conn_attrs[name]))

  201. for name in conn_attrs])

  202. packet = packet + struct.pack("B", attrs_len)


  203. for name in conn_attrs:

  204. packet = packet + struct.pack("B", len(name)) + name.encode('utf8')

  205. packet = packet + \

  206. struct.pack("B", len(conn_attrs[name])) + \

  207. conn_attrs[name].encode('utf8')


  208. #

  209. return packet



  210. def main(host="sqlstudio", port=3306,

  211. user='appuser', password='123456'):


  212. # 发起到连接 MySQL-Server 的连接

  213. logging.info(f"发起到 {host}:{port} 的 TCP 连接 .")

  214. sock = MySQLTcpSocket(host=host, port=port,

  215. user=user, password=password)


  216. # 收取 MySQL-Server 发来的握手包

  217. init_packet = sock.recv()

  218. logging.info("收到来自 MySQL-Server 的握手包 .")


  219. # 解析握手包

  220. protocol = MySQLProtocol()

  221. init_dict = protocol.parser_init_packet(init_packet)

  222. logging.info(f"握手包解析完成")


  223. # 发送 ssl 连接请求

  224. ssl_connection_requet_packet = protocol.make_ssl_request_pakcet()

  225. sock.send(ssl_connection_requet_packet)

  226. sock.change_to_ssl_mode()

  227. logging.info("已经切换到 ssl 模式 .")


  228. # 打包客户端属性信息,由于它不是重点,我这里直接写死了 AMD Yes!

  229. # 随便告诉 MySQL-Server 我的操作系统是 Windows-100

  230. conn_attrs = {

  231. "_pid": f"{os.getpid()}",

  232. "_platform": "x86_64",

  233. "_source_host": "AMD-Yes",

  234. "_client_name": "pure-socket",

  235. "_client_license": "GPL-2.0",

  236. "_client_version": "0.0.1",

  237. "_os": "Windows-100"

  238. }

  239. auth_response = protocol.make_auth_response_packet(auth_data=init_dict['auth_data'],

  240. auth_plugin='caching_sha2_password', username=user, password=password, database='',

  241. charset=45, client_flags=1813005, max_allowed_packet=1073741824, ssl_enabled=True, conn_attrs=conn_attrs)

  242. # 发送握手的响应包

  243. sock.send(auth_response)

  244. logging.info("握手响应包发送完成 .")


  245. # 处理二次验证

  246. auth_switch_packet = sock.recv()


  247. if auth_switch_packet[4] == 1:

  248. auth_data = auth_switch_packet[5:]


  249. if len(auth_data) == 1 and auth_data[0] == 4:

  250. auth_response = password.encode('utf8') + b'\x00'

  251. sock._packet_number = 3

  252. sock.send(auth_response)

  253. logging.info("二次校验完成 .")


  254. #

  255. packet = sock.recv()

  256. logging.info("收到来自 MySQL-Server 的登录确认包.")

  257. if packet[4] == 0:

  258. logging.info(f"收到的是 OK 包、用户登录成功!")

  259. else:

  260. logging.info(f"登录 MySQL-Server 失败了!")

  261. logging.info(f"{packet}")


  262. # sleep 30 秒

  263. time.sleep(30)



  264. if __name__ == "__main__":

  265. parser = argparse.ArgumentParser('mysql-login')

  266. parser.add_argument('--host', type=str,

  267. default="172.16.192.100", help="mysql-server ip")

  268. parser.add_argument('--port', type=int, default=3306,

  269. help="mysql-server listening port")

  270. parser.add_argument('--user', type=str,

  271. default='appuser', help="mysql user")

  272. parser.add_argument('--password', type=str,

  273. default="123456", help="password of mysql user")


  274. args = parser.parse_args()

  275. main(host=args.host, port=args.port, user=args.user, password=args.password)

执行程序看输出。
  1. python3 mysql-login.py

  2. 2020-05-26 15:51:20,494 - root - MainThread - INFO - 265 - 发起到 172.16.192.100:3306 的 TCP 连接 .

  3. 2020-05-26 15:51:20,498 - root - MainThread - INFO - 271 - 收到来自 MySQL-Server 的握手包 .

  4. 2020-05-26 15:51:20,499 - root - MainThread - INFO - 276 - 握手包解析完成

  5. 2020-05-26 15:51:20,503 - root - MainThread - INFO - 282 - 已经切换到 ssl 模式 .

  6. 2020-05-26 15:51:20,503 - root - MainThread - INFO - 300 - 握手响应包发送完成 .

  7. 2020-05-26 15:51:20,504 - root - MainThread - INFO - 316 - 收到来自 MySQL-Server 的登录确认包.

  8. 2020-05-26 15:51:20,504 - root - MainThread - INFO - 318 - 收到的是 OK 包、用户登录成功!

在 MySQL 服务端观察连接属性。
  1. mysql> show processlist;

  2. +----+---------+--------------------+------+---------+------+----------+------------------+

  3. | Id | User | Host | db | Command | Time | State | Info |

  4. +----+---------+--------------------+------+---------+------+----------+------------------+

  5. | 7 | monitor | 127.0.0.1:45088 | NULL | Sleep | 4 | | NULL |

  6. | 8 | root | 127.0.0.1:45090 | NULL | Query | 0 | starting | show processlist |

  7. | 12 | appuser | 172.16.192.1:55290 | NULL | Sleep | 4 | | NULL |

  8. +----+---------+--------------------+------+---------+------+----------+------------------+

  9. 3 rows in set (0.00 sec)


  10. mysql> select * from performance_schema.session_connect_attrs where processlist_id=12;

  11. +----------------+-----------------+-------------+------------------+

  12. | PROCESSLIST_ID | ATTR_NAME | ATTR_VALUE | ORDINAL_POSITION |

  13. +----------------+-----------------+-------------+------------------+

  14. | 12 | _pid | 4885 | 0 |

  15. | 12 | _platform | x86_64 | 1 |

  16. | 12 | _source_host | AMD-Yes | 2 |

  17. | 12 | _client_name | pure-socket | 3 |

  18. | 12 | _client_license | GPL-2.0 | 4 |

  19. | 12 | _client_version | 0.0.1 | 5 |

  20. | 12 | _os | Windows-100 | 6 |

  21. +----------------+-----------------+-------------+------------------+

  22. 7 rows in set (0.01 sec)

在不用连接驱动的情况下 300 多行代码才实现连接到 MySQL 这个操作,生活太难了!是不是祼写 TCP 就没有用了呢?如果打算自己开发读写分离中间件的话,这个还是有必要的。

其它

关于 MySQL 协议的更多内容可以看 MySQL 内部文档 。上面的示例代码你也可以在 github 上找到。

分类: 技术分享