作者:陈怡

爱可生南分团队 DBA,负责公司自动化运维平台维护和处理客户问题。

本文来源:原创投稿

*爱可生开源社区出品,原创内容未经授权不得随意使用,转载请联系小编并注明来源。


前言

DTLE 是开源的数据传输组件,支持 MySQL 多种使用场景的数据传输。 可能会遇到这样的场景,将数据传输到目的端时,目的端的库名想要与源端的库名不一样。或者传输到目的端时,库名与源端的一样,但是想重命名表名与源端的不同。本文将简单介绍 DTLE 如何设置满足这样的场景。

安装部署

1、3.21.10.0 版本 rpm 包下载地址

https://github.com/actiontech/dtle/releases/download/v3.21.10.1/dtle-ce-3.21.10.1.x86_64.rpm

2、安装

rpm -ivh dtle-ce-3.21.10.1.x86_64.rpm --prefix=/data/dtle

安装完成后,dtle 的相关日志会位于 /data/dtle/var/log 目录下

3、启动 dtle

systemctl start dtle-consul dtle-nomad

启动 dtle 之后,我们就可以创建任务来实现我们的数据传输了。

测试环境准备

1、准备两个 5.7 版本的 MySQL 实例,分别用作源端和目的端数据库。

2、在源库,创建测试数据如下

mysql> create database dtle_d1;
Query OK, 1 row affected (0.00 sec)

mysql> use dtle_d1
Database changed
mysql> create table dtle_t1(id int,name varchar(20),PRIMARY KEY(id));
Query OK, 0 rows affected (0.02 sec)

mysql> insert into dtle_t1 values (1,'xiaoming'),(2,'xiaohong'),(3,'xiaofang');
Query OK, 3 rows affected (0.00 sec)
Records: 3  Duplicates: 0  Warnings: 0

mysql> select * from dtle_t1;
+----+----------+
| id | name     |
+----+----------+
|  1 | xiaoming |
|  2 | xiaohong |
|  3 | xiaofang |
+----+----------+
3 rows in set (0.00 sec)

mysql> create database dtle_db2;
Query OK, 1 row affected (0.01 sec)

mysql> use dtle_db2
Database changed
mysql> create table dtle_table2(id int,name varchar(20),tag varchar(20),PRIMARY KEY(id));
Query OK, 0 rows affected (0.01 sec)

mysql> insert into dtle_table2 values(1,'mao','0'),(2,'gou','0'),(3,'tu',0),(4,'shu','0'),(5,'yu','0');
Query OK, 5 rows affected (0.00 sec)
Records: 5  Duplicates: 0  Warnings: 0

mysql> select * from dtle_table2;
+----+------+------+
| id | name | tag  |
+----+------+------+
|  1 | mao  | 0    |
|  2 | gou  | 0    |
|  3 | tu   | 0    |
|  4 | shu  | 0    |
|  5 | yu   | 0    |
+----+------+------+
5 rows in set (0.00 sec)

3、安装 jq 工具以方便在 Linux 上查询任务状态。此为可选项,非必选项。

yum install jq -y

实现数据传输

在启动 dtle 组件后,先创建作业配置,然后启动作业,就可以实现通过 dtle 完成 MySQL 到 MySQL 的数据传输。作业配置一般采用 json ( HTTP API 提交) 或 hcl ( nomad 命令行工具提交)文件。对应文件的样例模板位于 /data/dtle/usr/share/dtle/scripts 目录下。 本文将介绍采用 HTTP API 提交 json 文件 配置的方式启动 job 实现数据传输。

场景一: 1、实现将源端 dtle_d1 库数据传输到目的端时,目的端库名修改为 dtle_d1_new,表名不变 。 创建 /data/dtle/usr/share/dtle/scripts/job1.json 文件如下:

{
  "Job": {
    "ID": "job1",                                # 指定 job id,查看 job 状态时需要用到该 id
    "Datacenters": ["dc1"],
    "TaskGroups": [
      {
        "Name": "src", 
        "Tasks": [{  
          "Name": "src",
          "Driver": "dtle",
          "Config": {
            "ReplicateDoDb": [{                  # 指定复制的库名/表名,当该配置为空时,则复制整个实例
              "TableSchema": "dtle_d1",          # 指定复制的库名
              "TableSchemaRename": "dtle_d1_new" # 设置库复制到目的端时,新的数据库名
            }],
            "GroupMaxSize": 1024,                # 源端发送数据时, 等待数据包达到一定大小后发送该包. 单位为字节. 值为 1 时则表示即刻发送数据
            "GroupTimeout": 100,                 # 等待包超时时间。如果等待指定时间后,数据包大小还未到达 GroupMaxSize 值,则直接发送当前数据包
            "DropTableIfExists": true,
            "Gtid": "",                          # 1、为空时,则全量+增量进行复制。2、填写为已复制的 GTID 集合, 则将从未复制的 GTID 开始增量复制
            "ChunkSize": 2000,                   # 可以控制全量复制时,每次读取-传输-写入的行数
            "ConnectionConfig": {                # 配置访问源端的方式,ip、端口、用户名、密码
              "Host": "10.186.65.16",
              "Port": 3316,
              "User": "root",
              "Password": "root"
            }
          }
        }],
        "RestartPolicy": {     
          "Attempts": 3,
          "Interval": 600000000000,
          "Delay": 15000000000,
          "Mode": "delay"
        }
      }, {
        "Name": "dest",
        "Tasks": [{
          "Name": "dest",
          "Driver": "dtle",
          "Config": {
            "ConnectionConfig": {                # 配置访问目的端的数据库访问方式
              "Host": "10.186.65.15",
              "Port": 3316,
              "User": "root",
              "Password": "root"
            }
          }
        }],
        "RestartPolicy": {  
          "Attempts": 3,
          "Interval": 600000000000,
          "Delay": 15000000000,
          "Mode": "delay"
        }
      }
    ],
    "ReschedulePolicy": {  
      "Attempts": 1,
      "Interval": 1800000000000,
      "Unlimited": false
    }
  }
}

注意:上述注释部分是为了方便快速介绍各配置项意思,实际使用时,需要去掉注释部分。

2、启动 job

[root@10-186-65-5 ~]# cd /data/dtle/usr/share/dtle/scripts/
[root@10-186-65-5 scripts]# curl -XPOST "http://10.186.65.5:4646/v1/jobs" -d @job1.json -s | jq
{
  "EvalID": "98023d12-82e7-76f6-9711-2ca17b500221",
  "EvalCreateIndex": 2950,
  "JobModifyIndex": 2950,
  "Warnings": "",
  "Index": 2950,
  "LastContact": 0,
  "KnownLeader": false
}

3、根据json文件中的 job id 可查看到任务的状态是否 running,如果不记得 job id,也可以执行命令获取 job id

[root@10-186-65-5 ~]#  curl -s -XGET 127.0.0.1:4646/v1/jobs | jq '.[].ID'
"job1"
[root@10-186-65-5 ~]#  curl -s -XGET 127.0.0.1:4646/v1/job/job1 | jq '.Status'
"running"

4、查看目的端数据库,源端数据已全量到目的端,且测试增量数据也能正常同步。

##### 目的端查看数据同步情况
mysql> show databases;
+--------------------+
| Database           |
+--------------------+
| information_schema |
| dtle               |
| dtle_d1_new        |
| mysql              |
| performance_schema |
| sys                |
+--------------------+
6 rows in set (0.00 sec)


mysql> select * from dtle_d1_new.dtle_t1;
+----+----------+
| id | name     |
+----+----------+
|  1 | xiaoming |
|  2 | xiaohong |
|  3 | xiaofang |
+----+----------+
3 rows in set (0.00 sec)

##### 源端新增数据
mysql> insert into dtle_t1 value(4,"xiaobai"),(5,"xiaosu");
Query OK, 2 rows affected (0.10 sec)
Records: 2  Duplicates: 0  Warnings: 0

##### 目的端查询到数据已经同步过来
mysql> select * from dtle_d1_new.dtle_t1;
+----+----------+
| id | name     |
+----+----------+
|  1 | xiaoming |
|  2 | xiaohong |
|  3 | xiaofang |
|  4 | xiaobai  |
|  5 | xiaosu   |
+----+----------+
5 rows in set (0.00 sec)

场景二: 1、实现将源端 dtle_db2.dtle_table2 表数据传输到目的端时,目的端表名修改为 dtle_db2.dtle_table2_new。 创建 /data/dtle/usr/share/dtle/scripts/job2.json 文件如下:

{
  "Job": {
    "ID": "job2",                               # 指定 job id,查看 job 状态时需要用到该 id
    "Datacenters": ["dc1"],
    "TaskGroups": [
      {
        "Name": "src",
        "Tasks": [{
          "Name": "src",
          "Driver": "dtle",
          "Config": {
            "ReplicateDoDb": [{
              "TableSchema": "dtle_db2",        # 指定源端要复制的库名
              "Tables": [{
                "TableName": "dtle_table2",     # 指定源端要复制的表
                "TableRename":"dtle_table2_new" # 设置库复制到目的端时,新的表名
              }]
            }],
            "GroupMaxSize": 1024,
            "GroupTimeout": 100,
            "DropTableIfExists": true,
            "Gtid": "",
            "ChunkSize": 2000,
            "ConnectionConfig": {
              "Host": "10.186.65.16",
              "Port": 3316,
              "User": "root",
              "Password": "root"
            }
          }
        }],
        "RestartPolicy": {
          "Attempts": 3,
          "Interval": 600000000000,
          "Delay": 15000000000,
          "Mode": "delay"
        }
      }, {
        "Name": "dest",
        "Tasks": [{
          "Name": "dest",
          "Driver": "dtle",
          "Config": {
            "ConnectionConfig": {
              "Host": "10.186.65.15",
              "Port": 3316,
              "User": "root",
              "Password": "root"
            }
          }
        }],
        "RestartPolicy": {
          "Attempts": 3,
          "Interval": 600000000000,
          "Delay": 15000000000,
          "Mode": "delay"
        }
      }
    ],
    "ReschedulePolicy": {
      "Attempts": 1,
      "Interval": 1800000000000,
      "Unlimited": false
    }
  }
}

注意:上述注释部分是为了方便快速介绍各配置项意思,实际使用时,需要去掉注释部分。

2、启动 job

[root@10-186-65-5 ~]# cd /data/dtle/usr/share/dtle/scripts/
[root@10-186-65-5 scripts]# curl -XPOST "http://10.186.65.5:4646/v1/jobs" -d @job2.json -s | jq
{
  "EvalID": "9a54b81f-e139-e65a-20ec-a1b9f622f27e",
  "EvalCreateIndex": 7039,
  "JobModifyIndex": 7039,
  "Warnings": "",
  "Index": 7039,
  "LastContact": 0,
  "KnownLeader": false
}

3、查看 job 执行状态

[root@10-186-65-5 scripts]# curl -s -XGET 10.186.65.5:4646/v1/job/job2 | jq '.Status'
"running"

4、检查数据同步情况,查看目的端数据库,源端数据已全量到目的端,且测试增量数据也能正常同步。

##### 目的端查看
mysql> show databases;
+--------------------+
| Database           |
+--------------------+
| information_schema |
| dtle               |
| dtle_d1_new        |
| dtle_db2           |
| mysql              |
| performance_schema |
| sys                |
+--------------------+
7 rows in set (0.00 sec)

mysql> use dtle_db2
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Database changed
mysql> show tables;
+--------------------+
| Tables_in_dtle_db2 |
+--------------------+
| dtle_table2_new    |
+--------------------+
1 row in set (0.00 sec)

mysql> select * from dtle_table2_new;
+----+------+------+
| id | name | tag  |
+----+------+------+
|  1 | mao  | 0    |
|  2 | gou  | 0    |
|  3 | tu   | 0    |
|  4 | shu  | 0    |
|  5 | yu   | 0    |
+----+------+------+
5 rows in set (0.00 sec)

##### 源端更新数据
mysql> update dtle_table2 set tag='1' where id >3;
Query OK, 2 rows affected (0.00 sec)
Rows matched: 2  Changed: 2  Warnings: 0

##### 目的端查看更新语句已经同步
mysql> select * from dtle_table2_new;
+----+------+------+
| id | name | tag  |
+----+------+------+
|  1 | mao  | 0    |
|  2 | gou  | 0    |
|  3 | tu   | 0    |
|  4 | shu  | 1    |
|  5 | yu   | 1    |
+----+------+------+
5 rows in set (0.00 sec)

其他

如果要停止删除掉启动的 job ,比如删除 job2 任务,如下命令操作:

[root@10-186-65-5 scripts]#  curl -s -XGET 10.186.65.5:4646/v1/jobs | jq '.[].ID'
"job1"
"job2"
[root@10-186-65-5 scripts]# curl -s -XDELETE 10.186.65.5:4646/v1/job/job2?purge=true |jq
{
  "EvalID": "42889a00-9239-21e4-761c-6684abb0316c",
  "EvalCreateIndex": 7067,
  "JobModifyIndex": 7067,
  "VolumeEvalID": "",
  "VolumeEvalIndex": 0,
  "Index": 7067,
  "LastContact": 0,
  "KnownLeader": false
}
[root@10-186-65-5 scripts]#  curl -s -XGET 10.186.65.5:4646/v1/jobs | jq '.[].ID'
"job1"

dtle 用户手册链接:

https://actiontech.github.io/dtle-docs-cn/


avatar
100
  Subscribe  
提醒