用Python开发MySQL增强半同步BinlogServer(T2通信篇)

in 编程
关注公众号【好便宜】( ID:haopianyi222 ),领红包啦~
阿里云,国内最大的云服务商,注册就送数千元优惠券:https://t.cn/AiQe5A0g
腾讯云,良心云,价格优惠: https://t.cn/AieHwwKl
搬瓦工,CN2 GIA 优质线路,搭梯子、海外建站推荐: https://t.cn/AieHwfX9

导读

作者:曾永伟,知数堂10期学员,多年JAVA物流行业开发管理经验和PHP/Python跨境电商开发管理经验,对数据库系统情有独钟,善于运用SQL编程简化业务逻辑,去年开始正式从业MySQL DBA, 专注于DB系统自动化运维、MySQL云上实践。

本文为python-mysql-binlogserver系列的第二篇(T2通信篇),之后会陆续发布此系列的其他文章,请大家点击在看或者收藏,自行练习。

一、概述

从前一篇文章我们已经了解了:

在本节中,我们将结合这些内容进一步来了解如何使Python和MySQL进行交互。

二、MySQL通信协议基础

在Python中,使用socket.secv接收数据或send发送数据的都是二进制流对象Bytes,我们需要结合MySQL通信协议来逐字节解析其具体的含义。

MySQL基础通信单位Packet,它由header + payload组成,header由3个字节的payload长度(最大16M字节数)和1个字节的流水号组成,在读取一个Packet时,通常先读4个字节,解析出payload的长度和payload的序号,再根据payload的长度把余下的正文读取出来。

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(("127.0.0.1",  3306))
'''
# 先读Header
header = s.recv(4)
length = struct.unpack('<I', header[:3] + b'\x00')[0]
sequenceId = struct.unpack('<B', header[:-1])[0]  

# 再读余下的正文,完成一个Packet的完整读取
body = s.recv(length)

解析Greeting包

当Client连接上MySQL Server后,MySQL会主动发送一个greeting包,把自己的状态和随机密文发送给Client, 等待Client响应帐户和密码等信息,验证失败发送ERR包并主动断开连接,验证成功后发送OK包,保持连接,等待Client发送其它"指令"。

接下来我先看一下Greeting包正文的官方说明:

https://dev.mysql.com/doc/internals/en/connection-phase-packets.html
1                [0a] protocol version
string[NUL]      server version
4                connection id
string[8]        auth-plugin-data-part-1
1                [00] filler
2                capability flags (lower 2 bytes)
1                character set
2                status flags
2                capability flags (upper 2 bytes)
1                length of auth-plugin-data
string[10]       reserved (all [00])
string[$len]     auth-plugin-data-part-2  ($len=MAX(13, length of auth-plugin-data -  8))
string[NUL]      auth-plugin name

为了更加直接的观察和理解,我这里去掉了对低版本的兼容格式,让其显得更加整洁。

我们来尝试用Python解析第一个Greeting包:

import socket
import  struct
from pprint import pprint

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(("192.168.56.101",  3306))
 
greeting =  {}

# Header 3+1
greeting["length"]  =  struct.unpack('<I', s.recv(3)  + b'\x00')[0]
greeting["sequenceId"]  =  struct.unpack('<B', s.recv(1))[0]

# 正文开始
greeting["protocolVersion"]  = s.recv(1)

# serverVersion是string[NUL]类型,所以一直循环读取到\x00节束
greeting["serverVersion"]  =  ""
while  True:
      _byte = s.recv(1)
      if _byte == b'\x00':
          break
      greeting["serverVersion"]  += chr(int(_byte.hex(),  16))

# 余下部分请自行参照上方文档进行一一对照
greeting["connectionId"]  =  struct.unpack('<I', s.recv(4))[0]
greeting["challenge1"]  = s.recv(8).decode("utf8")
_filler = s.recv(1)
greeting["capabilityFlags"]  = s.recv(2)
greeting["characterSet"]  = s.recv(1)
greeting["statusFlags"]  = s.recv(2)
greeting["capabilityFlag"]  = s.recv(2)
greeting["authPluginDataLength"]  =  struct.unpack('<B', s.recv(1))[0]
_reserved = s.recv(10)
greeting["challenge2"]  = s.recv(max(13, greeting["authPluginDataLength"]  -  8)).decode("utf8")
greeting["authPluginName"]  =  ""
while  True:
     _byte = s.recv(1)
     if _byte == b'\x00':
         break
     greeting["authPluginName"]  += chr(int(_byte.hex(),  16))
pprint(greeting)

输出:

{'authPluginDataLength':  21,
'authPluginName':  'mysql_native_password',
'capabilityFlag': b'\xff\x81',
'capabilityFlags': b'\xff\xf7',
'challenge1':  'X:u8\x11N4\x1b',
'challenge2':  'dF\x04f~\x1f!%\x14\x1acV\x00',
'characterSet': b'\xe0',
'connectionId':  73,
'length':  78,
'protocolVersion': b'\n',
'sequenceId':  0,
'serverVersion':  '5.7.20-log',
'statusFlags': b'\x02\x00'}

可见用Python解析出Greeting包的内容并没有想象中那么难,只要结合官方文档,用Python struct很容易就把二进制流还原成可读的文本信息。

为了使代码可以复用,我们将上面的代码进行一个简单的函数化封装,以方便给后面的例子调用:

# learn_packet1_greeting.py

import socket
import  struct
from pprint import pprint
    
def get_greeting(s):
    greeting =  {}
    greeting["length"]  =  struct.unpack('<I', s.recv(3)  + b'\x00')[0]
    greeting["sequenceId"]  =  struct.unpack('<B', s.recv(1))[0]
    greeting["protocolVersion"]  = s.recv(1)
    greeting["serverVersion"]  =  ""
    while  True:
        _byte = s.recv(1)
        if _byte == b'\x00':
            break
    greeting["serverVersion"]  += chr(int(_byte.hex(),  16))
    greeting["connectionId"]  =  struct.unpack('<I', s.recv(4))[0]
    greeting["challenge1"]  = s.recv(8).decode("utf8")
_filler = s.recv(1)
    greeting["capabilityFlags"]  = s.recv(2)
    greeting["characterSet"]  = s.recv(1)
    greeting["statusFlags"]  = s.recv(2)
    greeting["capabilityFlag"]  = s.recv(2)
    greeting["authPluginDataLength"]  =  struct.unpack('<B', s.recv(1))[0]
_reserved = s.recv(10)
    greeting["challenge2"]  = s.recv(max(13,          greeting["authPluginDataLength"]  -  8)).decode("utf8")
    greeting["authPluginName"]  =  ""
    while  True:
        _byte = s.recv(1)
        if _byte == b'\x00':
            break
    greeting["authPluginName"]  += chr(int(_byte.hex(),  16))
return greeting 
    
if __name__ ==  "__main__":
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.connect(("192.168.1.100",  3306))
    greeting = get_greeting(s)
    pprint(greeting)

封装Response包,完成验证

当Client得到Greeting包后,就可以结合两个随机码,组成认证回应包,完成MySQL的认证:

# learn_packet2_auth.py

import socket
import  struct
import hashlib
from functools import  partial
from py_mysql_binlogserver._tutorial.learn_packet1_greeting import get_greeting 
    
def dump_packet(packet, title=None):
    pass  # 省略代码,节约篇幅 
    
def scramble_native_password(password, message):
    '''
    mysql_native_password
    https://dev.mysql.com/doc/internals/en/secure-password-authentication.html#packet-Authentication::Native41
    '''
    SCRAMBLE_LENGTH =  20
    sha1_new =  partial(hashlib.new,  'sha1')

    """Scramble used for mysql_native_password"""
    if  not password:
         return b''
         
    password = password.encode("utf-8")  
    message = message.encode("utf-8")
    
    stage1 = sha1_new(password).digest()
    stage2 = sha1_new(stage1).digest()
    s = sha1_new()
    s.update(message[:SCRAMBLE_LENGTH])
    s.update(stage2)
    result = s.digest()
    return _my_crypt(result, stage1)

def _my_crypt(message1, message2):
    result = bytearray(message1)
    for i in range(len(result)):
        result[i]  ^= message2[i]

    return bytes(result)
    
def get_response(s, username, password, challenge1, challenge2):
    '''
    https://dev.mysql.com/doc/internals/en/connection-phase-packets.html
    简化版
    4              capability flags, CLIENT_PROTOCOL_41 always set
    4              max-packet size
    1              character set
    string[23]     reserved (all [0])
    string[NUL]    username
    lenenc-int     length of auth-response
    string[n]      auth-response
    string[NUL]    auth plugin name
    '''
    scramble_password = scramble_native_password(password, challenge1 + challenge2)
    
    response = b''
    response +=  struct.pack('<I',  32482821)
    response +=  struct.pack('<I',  16777216)
    response +=  struct.pack('<b',  33)
    response += b''.join([b'\x00'  for i in range(23)])
    response += username.encode()  + b'\x00'
    response +=  struct.pack('<b', len(scramble_password))
    response += scramble_password
    response +=  "mysql_native_password".encode()  + b'\x00'
    response =  struct.pack('<I', len(response))[:-1]  +       struct.pack('<B',  1)  + response
    return response
    
if __name__ ==  "__main__":
    
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)`
    s.connect(("192.168.1.100",  3306))

    greeting = get_greeting(s)
    
    username =  'repl'
    password =  'repl1234'
    response = get_response(s, username, password,       greeting["challenge1"], greeting["challenge2"])
    s.send(response)
    dump_packet(response,  "Response packet:")
    
    result = s.recv(1024)
    dump_packet(result,  "Result packet:")

向服务器发送帐号及加密的密文,通过验证后,会得到OK包:

Response packet:
00000000  50  00  00  01  05  A6  EF  01  00  00  00  01  21  00  00  00 P....¦ï.  ....!...
00000010  00  00  00  00  00  00  00  00  00  00  00  00  00  00  00  00  ........  ........
00000020  00  00  00  00  72  65  70  6C  00  14  F6  4A  C1  E7  4F  2A  ....repl ..öJÁçO*
00000030  BB  A3  CB  29  3C  5B  50  F9  3C  AF  E3  6C  1C  A9  6D  79  »£Ë)<[Pù  <¯ãl.©my
00000040  73  71  6C  5F  6E  61  74  69  76  65  5F  70  61  73  73  77 sql_nati ve_passw
00000050  6F  72  64  00 ord.
    
Result packet:
00000000  07  00  00  02  00  00  00  02  00  00  00  ........  ...

验证失败会得到如下结果:

Response packet:
00000000  50  00  00  01  05  A6  EF  01  00  00  00  01  21  00  00  00 P....¦ï.  ....!...
00000010  00  00  00  00  00  00  00  00  00  00  00  00  00  00  00  00  ........  ........
00000020  00  00  00  00  72  65  70  6C  00  14  AB  41  90  72  07  98  ....repl ..«A�r.�
00000030  0A  36  21  F8  FC  CC  83  7B  8E  4E  A3  75  A2  DB  6D  79  .6!øüÌ�{  �N£u¢Ûmy
00000040  73  71  6C  5F  6E  61  74  69  76  65  5F  70  61  73  73  77 sql_nati ve_passw
00000050  6F  72  64  00 ord.
    
Result packet:
00000000  4A  00  00  02  FF  15  04  23  32  38  30  30  30  41  63  63 J......#  28000Acc
00000010  65  73  73  20  64  65  6E  69  65  64  20  66  6F  72  20  75 ess deni ed for u
00000020  73  65  72  20  27  72  65  70  6C  27  40  27  31  39  32  2E ser 'rep l'@'192.
00000030  31  36  38  2E  31  2E  31  27  20  28  75  73  69  6E  67  20   168.1.1'  (using
00000040  70  61  73  73  77  6F  72  64  3A  20  59  45  53  29 password : YES)

执行查询

完成认证后,服务器就会保存连接,直到超时或Client主动退出才会中断连接。接下来我们就可以在完成认证后的socket上发送命令了,就像使用mysql client一样,只不过通过socket发送的数据还要是bytes对象,直接上菜:

# learn_packet3_query.py
    
import socket
import  struct
    
from py_mysql_binlogserver._tutorial.learn_packet1_greeting import get_greeting
from py_mysql_binlogserver._tutorial.learn_packet2_auth import dump_packet, get_response
    
def read_packet(skt):
    while  True:
        _header = skt.recv(5)
        _length =  struct.unpack("<I",  (_header[0:3]  + b"\x00"))[0]
        _sequenceId =  struct.unpack("<B", _header[3:4])[0]
        _packetType =  struct.unpack("<B", _header[4:])[0]

        # 每个查询会产生多个数据包,读到 EOF 包后结束
        if _packetType ==  0xfe:  # EOF
            break
   
        _payload = skt.recv(_length -  1)
        dump_packet(_header + _payload, f"read packet [{_sequenceId}]")
    
def get_query(sql):
    """
    https://dev.mysql.com/doc/internals/en/com-query.html
    1              [03] COM_QUERY
    string[EOF]    the query the server shall execute
    """
    query = b''
    query +=  struct.pack("<B",  3)
    query += sql.encode()
    
    query =  struct.pack('<I', len(query))[:-1]  +  struct.pack('<B',  0)  + query
    return query
    
if __name__ ==  "__main__":
    
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.connect(("192.168.1.100",  3306))
    greeting = get_greeting(s)
    username =  'repl'
    password =  'repl1234'
    response = get_response(s, username, password,      greeting["challenge1"], greeting["challenge2"])
    s.send(response)
    result = s.recv(1024)
    
    sql =  "select @@version_comment"
    query = get_query(sql)
    dump_packet(query, f"query packet:{sql}")
    s.send(query
    )
    
    
read_packet(s)

三、Binlog文件格式

先简单回顾一下binlog文件中都有些什么内容:

mysql> show binlog events in  'mysql-bin.000009';
+-------------------+-------+-------------------+-------------+---------------+--------------------------------------------------------------------+
|  Log_name         |  Pos  |  Event_type       |  Server_id  |  End_log_pos  |  Info                                                              |
+-------------------+-------+-------------------+-------------+---------------+--------------------------------------------------------------------+
| mysql-bin.000009  |  4    |  Format_desc      |  3306100    |  123          |  Server ver:  5.7.20-log,  Binlog ver:  4                          |
| mysql-bin.000009  |  123  |  Previous_gtids   |  3306100    |  190          | f0ea18e0-3cff-11e9-9488-0800275ae9e7:1-19                          |
| mysql-bin.000009  |  190  |  Gtid             |  3306100    |  251          | SET @@SESSION.GTID_NEXT=  'f0ea18e0-3cff-11e9-9488-0800275ae9e7:20'|
| mysql-bin.000009  |  251  |  Query            |  3306100    |  318          |  BEGIN                                                             |
| mysql-bin.000009  |  318  |  Table_map        |  3306100    |  361          | table_id:  223  (db3.t3)                                           |
| mysql-bin.000009  |  361  |  Write_rows       |  3306100    |  402          | table_id:  223 flags: STMT_END_F                                   |
| mysql-bin.000009  |  402  |  Xid              |  3306100    |  429          | COMMIT /* xid=286 */  |
| mysql-bin.000009  |  429  |  Gtid             |  3306100    |  490          | SET @@SESSION.GTID_NEXT=  'f0ea18e0-3cff-11e9-9488-0800275ae9e7:21'|
| mysql-bin.000009  |  490  |  Query            |  3306100    |  557          |  BEGIN                                                             |
| mysql-bin.000009  |  557  |  Table_map        |  3306100    |  600          | table_id:  223  (db3.t3)                                           |
| mysql-bin.000009  |  600  |  Write_rows       |  3306100    |  641          | table_id:  223 flags: STMT_END_F                                   |
| mysql-bin.000009  |  641  |  Xid              |  3306100    |  668          | COMMIT /* xid=287 */                                               |
| mysql-bin.000009  |  668  |  Rotate           |  3306100    |  711          | mysql-bin.000010;pos=4                                             |
+-------------------+-------+-------------------+-------------+---------------+--------------------------------------------------------------------+
13 rows in  set  (0.00 sec)

可以看出,binlog文件内容就是有很多的Event组成,一个完整的binlog应该是由Format_desc event开始,Rotate event结束,它们充当Binlog文件的元数据,中间Event才是真正和数据相关Event,每一个Event的格式都不尽相同,需要单独作解析。不过我们的BinlogServer并不关心具体的Event内容,只需要把Event作为一个接收,存储和发送的基本单元即可,简单说就是,把Master发送的Event按顺序存储起来,当有Slave change过来以后,再从指定位置把Event一个一个的发送给Slave,仅此而已。

每一个Event都有自己的Header, 描述了它的创建时间、类型、Server Id、长度、下一个Event位置和flags信息,结合Event的长度和下个Event位置信息,我们可以很容易地实现顺序扫描一个Binlog文件中的所有Event:

# learn_bin2_binlog.py
    
import  struct
from py_mysql_binlogserver.constants.EVENT_TYPE import event_type_name
    
event_map = event_type_name()
    
with open("mysql-bin.000009", mode="rb")  as fr:
     _file_header = fr.read(4  
     if _file_header != bytes.fromhex("fe62696e"):
        print("It is not a binlog file.")
        exit()
    
     '''
     https://dev.mysql.com/doc/internals/en/binlog-event-header.html
     4              timestamp
     1              event type
     4              server-id
     4              event-size
     4              log pos
     2              flags
     '''
    while  True:
        event_header = fr.read(19)
        if len(event_header)  ==  0:
           break
        timestamp, event_type, server_id, event_size, log_pos, flags =  struct.unpack('<IBIIIH', event_header)
        print("Binlog Event[%s]: [%s] %s %s"  %  (timestamp,
                                                  event_type,
                                                  event_map.get(event_type), log_pos))
    event_body = fr.read(event_size -  19)

输出:

Binlog  Event[1570889546]:  [15] FORMAT_DESCRIPTION_EVENT 123
Binlog  Event[1570889546]:  [35] PREVIOUS_GTIDS_LOG_EVENT 190
Binlog  Event[1570889807]:  [33] GTID_LOG_EVENT 251
Binlog  Event[1570889807]:  [2] QUERY_EVENT 318
Binlog  Event[1570889807]:  [19] TABLE_MAP_EVENT 361
Binlog  Event[1570889807]:  [30] WRITE_ROWS_EVENT 402
Binlog  Event[1570889807]:  [16] XID_EVENT 429
Binlog  Event[1570889813]:  [33] GTID_LOG_EVENT 490
Binlog  Event[1570889813]:  [2] QUERY_EVENT 557
Binlog  Event[1570889813]:  [19] TABLE_MAP_EVENT 600
Binlog  Event[1570889813]:  [30] WRITE_ROWS_EVENT 641
Binlog  Event[1570889813]:  [16] XID_EVENT 668
Binlog  Event[1570889820]:  [4] ROTATE_EVENT 711

是不是比想象中简单,如果你把基础篇的内容全部理解了,我相信上面这段代码不会难到你,相反如果你还不能理解上面这段代码,请移步回去多看几遍,多练几遍再回来。更多的Binlog相关知识,请参考官方文档。

四、小结

这一节我们把Socket通信的难关攻破了,已经完成了和MySQL服务器进行握手,登陆和执行查询,近距离的接触了MySQL的通信协议,学会了如何运用Python struct进行简单的解包和封包,也简单分析了binlog的组成及使用Python来解析binlog文件,最重要的是学会了结合官方文档来解决我们的实际问题。

下一节我们将在本节的基础上进入实战篇,利用Socket向Master发起Slave注册,发送BinlogDump指令,以获取Master上的Binlog Event并保存到本地文件中。

相关文档

(本周先为大家献上这两篇文章,大家可以多练习,下周会继续发布此系列的文章,请持续关注哦~~)

关注公众号【好便宜】( ID:haopianyi222 ),领红包啦~
阿里云,国内最大的云服务商,注册就送数千元优惠券:https://t.cn/AiQe5A0g
腾讯云,良心云,价格优惠: https://t.cn/AieHwwKl
搬瓦工,CN2 GIA 优质线路,搭梯子、海外建站推荐: https://t.cn/AieHwfX9
扫一扫关注公众号添加购物返利助手,领红包
Comments are closed.

推荐使用阿里云服务器

超多优惠券

服务器最低一折,一年不到100!

朕已阅去看看