作者:徐耀荣

爱可生南区交付服务部 DBA 团队成员,主要负责MySQL故障处理以及相关技术支持。爱好电影,游戏,旅游以及桌球。

本文来源:原创投稿

*爱可生开源社区出品,原创内容未经授权不得随意使用,转载请联系小编并注明来源。


扩容思路

internal_replication参数说明

Optional. Whether to write data to just one of the replicas. Default: false (write data to all replicas).

此参数设置为 true 时,写操作只选一个正常的副本写入数据。如果分布式表的本地表是复制表(* ReplicaMergeTree),可以设置为 true,replica 副本之间的数据复制会交 由 ReplicatedMergeTree 自身处理,不再由 Distributed 分布式总表负责,从而为其减负。

若此参数设置为false(默认值),写操作会将数据写入所有副本。实质上,这意味着要分布式表本身来复制数据。这种方式不如使用复制表的好,因为这种方式不会检查副本的一致性,并且随着时间的推移,副本数据可能会不一致(节点宕机、数据插入失败)。

1. ReplicatedMergeTree ENGINE

ReplicatedMergeTree表引擎本身具备同步功能,所以不需要分布式表进行副本之间的数据复制(internal_replication为true),副本的数据同步交由zookeeper进行协同,具体扩容步骤如下:

  • 在新增副本节点的集群配置中添加扩容后集群的完整信息。
  • 历史副本节点修改配置文件,在集群配置中添加新增副本节点信息(历史副本集群不需要停库,配置文件能够进行热更新)。
  • 启动新增副本节点,并创建对应的复制本地表、分布式表(此时该副本节点查询请求可正常路由选择所有的副本节点)。
  • zookeeper会自动将历史副本中的数据信息同步至新增副本节点中,进行数据的同步。

2. MergeTree ENGINE

由于 MergeTree 表引擎本身不具备同步副本的功能,所以集群副本的数据复制需要由分布式总表来负责(internal_replication 为 false )。

所以新增副本节点不会从原历史副本节点同步历史数据,但是对于新增数据,集群副本之间能够正常同步。为此这里采用备份的方式同步历史数据,具体步骤如下:

  • 在新增副本节点的集群配置中添加当前集群的完整信息。
  • 历史副本节点修改配置文件,在集群配置中添加新增副本节点信息(历史副本集群不需要停库,配置文件能够进行热更新)。
  • 启动新增副本节点,并创建对应的复制本地表、分布式表。
  • 在历史副本中,通过筛选导出历史数据,然后将历史数据导入新副本的本地表,以达到数据一致,期间集群的写入并不会受到影响。

案例验证

环境介绍

操作系统版本:CentOS Linux release 7.5.1804 (Core) (4C4G)

软件版本:ClickHouse version 21.8.4.51、zookeeper-3.7.0

hostname ip 端口 角色
node1 10.186.63.71 9000 replica(clickhouse)
node2 10.186.63.74 9000 replica(clickhouse)
node3 10.186.63.48 9000 待添加节点(clickhouse)
node1 10.186.63.71 2181 zookeeper(单节点)

本次预先搭建 clickhouse 两节点,单分片双副本(多副本单分片),测试对象分别为 ReplicatedMergeTree 、MergeTree 两种常见的表引擎。每个节点创建对应引擎的本地表,以及 Distributed 引擎的分布式总表,各个节点上的本地表的写入、查询等操作都由分布式总表进行路由转发。

由于两个表引擎配置中的 internal_replication 参数需求不一致,所以分开进行测试。

ReplicatedMergeTree ENGINE(单分片双副本)

1.集群信息

(1)以下配置信息定义了集群名为 test_action 的单分片双副本集群(metrika.xml)。

<yandex>
  <zookeeper-servers>
    <node index="1">
      <host>node1</host>
      <port>2181</port>
    </node>
  </zookeeper-servers>
  <remote_servers>
    <test_action>      
      <shard>
        <internal_replication>true</internal_replication>        
        <replica>  
          <host>node1</host>
          <port>9000</port>    
        </replica>
        <replica>
          <host>node2</host>
          <port>9000</port>
        </replica>
      </shard>
    </test_action>
  </remote_servers>
  <networks>
    <ip>::/0</ip>
  </networks>
  <macros>
    <cluster>test_action</cluster>
    <shard>1</shard>
    <replica>node1</replica>
  </macros>
</yandex>

注意:集群不同节点的集群信息配置除了 macros 标签有所不同,其余都一样,macros 是该副本的唯一标识,以下是具体说明。

node1 的 macros 标签:

<macros>
    <cluster>test_action</cluster> ##集群名称
    <shard>1</shard>               ##分片shard number
    <replica>node1</replica>       ##副本名称
  </macros>
node2 的 macros 标签:
  <macros>
    <cluster>test_action</cluster>
    <shard>1</shard>
    <replica>node2</replica>
  </macros>

(2)集群信息,以及表结构数据

集群信息:

node1 :) select * from system.clusters where cluster = 'test_action';
SELECT *
FROM system.clusters
WHERE cluster = 'test_action'
Query id: 8495c696-9060-4aac-acc6-c641b2ec7aa2
┌─cluster─────┬─shard_num─┬─shard_weight─┬─replica_num─┬─host_name─┬─host_address─┬─port─┬─is_local─┬─user────┬─default_database─┬─errors_count─┬─slowdowns_count─┬─estimated_recovery_time─┐
│ test_action │         1 │            1 │           1 │ node1     │ 10.186.63.71 │ 9000 │        1 │ default │                  │            0 │               0 │                       0 │
│ test_action │         1 │            1 │           2 │ node2     │ 10.186.63.74 │ 9000 │        0 │ default │                  │            0 │               0 │                       0 │
└─────────────┴───────────┴──────────────┴─────────────┴───────────┴──────────────┴──────┴──────────┴─────────┴──────────────────┴──────────────┴─────────────────┴─────────────────────────┘
2 rows in set. Elapsed: 0.006 sec. 

2.扩容副本

(1)node3 节点创建 metrika.xml ,添加集群信息。

<yandex>
  <zookeeper-servers>
    <node index="1">
      <host>node1</host>
      <port>2181</port>
    </node>
  </zookeeper-servers>
  <remote_servers>
    <test_action>      
      <shard>
        <internal_replication>true</internal_replication>        
        <replica>  
          <host>node1</host>
          <port>9000</port>    
        </replica>
        <replica>
          <host>node2</host>
          <port>9000</port>
        </replica>
        <replica>
          <host>node3</host>
          <port>9000</port>
        </replica>
      </shard>
    </test_action>
  </remote_servers>
  <macros>
    <cluster>test_action</cluster>
    <shard>1</shard>
    <replica>node3</replica>
  </macros>
  <networks>
    <ip>::/0</ip>
  </networks>
</yandex>

(2)修改 node1 和 node2 节点 metrika.xml 文件,添加 node3 的集群信息。

在 shard 标签下添加如下信息:

<replica>
  <host>node3</host>
    <port>9000</port>
    </replica>        

(3)启动node3节点后,所有节点检查集群信息,副本信息均同步完整。

node1 :) select * from system.clusters where cluster = 'test_action';
SELECT *
FROM system.clusters
WHERE cluster = 'test_action'
Query id: c901b3a0-743e-4cfc-b5f8-5add5c21ba42
┌─cluster─────┬─shard_num─┬─shard_weight─┬─replica_num─┬─host_name─┬─host_address─┬─port─┬─is_local─┬─user────┬─default_database─┬─errors_count─┬─slowdowns_count─┬─estimated_recovery_time─┐
│ test_action │         1 │            1 │           1 │ node1     │ 10.186.63.71 │ 9000 │        1 │ default │                  │            0 │               0 │                       0 │
│ test_action │         1 │            1 │           2 │ node2     │ 10.186.63.74 │ 9000 │        0 │ default │                  │            0 │               0 │                       0 │
│ test_action │         1 │            1 │           3 │ node3     │ 10.186.63.48 │ 9000 │        0 │ default │                  │            0 │               0 │                       0 │
└─────────────┴───────────┴──────────────┴─────────────┴───────────┴──────────────┴──────┴──────────┴─────────┴──────────────────┴──────────────┴─────────────────┴─────────────────────────┘
3 rows in set. Elapsed: 0.005 sec. 
 
node2 :) select * from system.clusters where cluster = 'test_action';
SELECT *
FROM system.clusters
WHERE cluster = 'test_action'
Query id: bbdc533d-996c-4c46-a660-13a1b3e7f5cf
┌─cluster─────┬─shard_num─┬─shard_weight─┬─replica_num─┬─host_name─┬─host_address─┬─port─┬─is_local─┬─user────┬─default_database─┬─errors_count─┬─slowdowns_count─┬─estimated_recovery_time─┐
│ test_action │         1 │            1 │           1 │ node1     │ 10.186.63.71 │ 9000 │        0 │ default │                  │            0 │               0 │                       0 │
│ test_action │         1 │            1 │           2 │ node2     │ 10.186.63.74 │ 9000 │        1 │ default │                  │            0 │               0 │                       0 │
│ test_action │         1 │            1 │           3 │ node3     │ 10.186.63.48 │ 9000 │        0 │ default │                  │            0 │               0 │                       0 │
└─────────────┴───────────┴──────────────┴─────────────┴───────────┴──────────────┴──────┴──────────┴─────────┴──────────────────┴──────────────┴─────────────────┴─────────────────────────┘
3 rows in set. Elapsed: 0.003 sec.
 
node3 :) select * from system.clusters where cluster = 'test_action';
SELECT *
FROM system.clusters
WHERE cluster = 'test_action'
Query id: e676da7c-faf0-4876-aba8-c2fb29d9adb5
┌─cluster─────┬─shard_num─┬─shard_weight─┬─replica_num─┬─host_name─┬─host_address─┬─port─┬─is_local─┬─user────┬─default_database─┬─errors_count─┬─slowdowns_count─┬─estimated_recovery_time─┐
│ test_action │         1 │            1 │           1 │ node1     │ 10.186.63.71 │ 9000 │        0 │ default │                  │            0 │               0 │                       0 │
│ test_action │         1 │            1 │           2 │ node2     │ 10.186.63.74 │ 9000 │        0 │ default │                  │            0 │               0 │                       0 │
│ test_action │         1 │            1 │           3 │ node3     │ 10.186.63.48 │ 9000 │        1 │ default │                  │            0 │               0 │                       0 │
└─────────────┴───────────┴──────────────┴─────────────┴───────────┴──────────────┴──────┴──────────┴─────────┴──────────────────┴──────────────┴─────────────────┴─────────────────────────┘
3 rows in set. Elapsed: 0.004 sec. 

3.建立同步

(1)node3 节点创建对应的本地复制表和分布式表结构。
创建本地复制表

create table table_test( label_id UInt32, label_name String, insert_time Date) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test_action/1/table_test','node3',insert_time, (label_id, insert_time), 8192);

创建分布式表

CREATE TABLE table_test_all AS table_test ENGINE = Distributed(test_action, default, table_test, rand());

注意:此时创建表时切勿使用分布式 DDL 语法(ON CLUSTER),在本地创建即可,否则会抛出其他节点表已存在的报错。

(2)建立完本地以及分布式表后,zookeeper便会在后台自动同步数据至新副本。同步后检查本地表数据一致,且分布式总表能够正常路由查询。

node3 :) select * from table_test;
SELECT *
FROM table_test
Query id: dffc5029-0a45-49fb-a5c6-c39f488b2a40
┌─label_id─┬─label_name─┬─insert_time─┐
│        4 │ 111        │  2021-07-01 │
└──────────┴────────────┴─────────────┘
┌─label_id─┬─label_name─┬─insert_time─┐
│        1 │ 111        │  2021-06-28 │
│        2 │ 111        │  2021-06-29 │
│        3 │ 111        │  2021-06-30 │
└──────────┴────────────┴─────────────┘
4 rows in set. Elapsed: 0.005 sec. 

node3 :) select * from table_test_all;
SELECT *
FROM table_test_all
Query id: b590017e-a502-4010-86f6-190ab899b15f
┌─label_id─┬─label_name─┬─insert_time─┐
│        4 │ 111        │  2021-07-01 │
└──────────┴────────────┴─────────────┘
┌─label_id─┬─label_name─┬─insert_time─┐
│        1 │ 111        │  2021-06-28 │
│        2 │ 111        │  2021-06-29 │
│        3 │ 111        │  2021-06-30 │
└──────────┴────────────┴─────────────┘
4 rows in set. Elapsed: 0.006 sec. 

(3)通过分布式表插入数据,验证集群数据正常复制。

node1插入新数据。

node1 :) insert into table_test_all values (5,'111','2021-07-02'),(6,'111','2021-07-03');
INSERT INTO table_test_all VALUES
Query id: b3047373-13e4-4667-bda6-17f21c96ace7
Ok.
2 rows in set. Elapsed: 0.017 sec. 
node1 :) select * from table_test_all;
SELECT *
FROM table_test_all
Query id: e942c27d-3193-4303-8ff6-f0da86939558
┌─label_id─┬─label_name─┬─insert_time─┐
│        1 │ 111        │  2021-06-28 │
│        2 │ 111        │  2021-06-29 │
│        3 │ 111        │  2021-06-30 │
└──────────┴────────────┴─────────────┘
┌─label_id─┬─label_name─┬─insert_time─┐
│        4 │ 111        │  2021-07-01 │
└──────────┴────────────┴─────────────┘
┌─label_id─┬─label_name─┬─insert_time─┐
│        5 │ 111        │  2021-07-02 │
│        6 │ 111        │  2021-07-03 │
└──────────┴────────────┴─────────────┘
6 rows in set. Elapsed: 0.006 sec. 
 
node2 :) select * from table_test_all;
SELECT *
FROM table_test_all
Query id: 21f9e176-d314-4823-b4c7-3f4a0fadee4f
┌─label_id─┬─label_name─┬─insert_time─┐
│        1 │ 111        │  2021-06-28 │
│        2 │ 111        │  2021-06-29 │
│        3 │ 111        │  2021-06-30 │
└──────────┴────────────┴─────────────┘
┌─label_id─┬─label_name─┬─insert_time─┐
│        4 │ 111        │  2021-07-01 │
└──────────┴────────────┴─────────────┘
┌─label_id─┬─label_name─┬─insert_time─┐
│        5 │ 111        │  2021-07-02 │
│        6 │ 111        │  2021-07-03 │
└──────────┴────────────┴─────────────┘
6 rows in set. Elapsed: 0.004 sec. 


node3 :) select * from table_test_all;
SELECT *
FROM table_test_all
Query id: 9e66470b-11bb-4b7d-bc4c-40b07ee01ae8
┌─label_id─┬─label_name─┬─insert_time─┐
│        4 │ 111        │  2021-07-01 │
└──────────┴────────────┴─────────────┘
┌─label_id─┬─label_name─┬─insert_time─┐
│        1 │ 111        │  2021-06-28 │
│        2 │ 111        │  2021-06-29 │
│        3 │ 111        │  2021-06-30 │
└──────────┴────────────┴─────────────┘
┌─label_id─┬─label_name─┬─insert_time─┐
│        5 │ 111        │  2021-07-02 │
│        6 │ 111        │  2021-07-03 │
└──────────┴────────────┴─────────────┘
6 rows in set. Elapsed: 0.005 sec. 

MergeTree ENGINE(单分片双副本)

node1、node2 和 node3 节点的 metrika.xml 除了将 internal_replication 设置为 false 外,其余配置、扩容添加副本的操作和上述一致。以下从建立同步开始。

建立同步

(1)node3节点创建对应的本地复制表和分布式表结构。

创建本地复制表

CREATE TABLE t_cluster( id Int16, name String, birth Date )ENGINE = MergeTree() PARTITION BY toYYYYMM(birth) ORDER BY id;

创建分布式表

CREATE TABLE dist_t_cluster as t_cluster engine = Distributed(test_action, default, t_cluster,rand());

(2)node3 查看本地表以及分布式表数据,历史数据并不会自行同步;往分布式总表插入新数据,副本之间能够正常同步。

node1 :) select * from dist_t_cluster;
SELECT *
FROM dist_t_cluster
Query id: 42b3e629-8cf0-4ea5-a9ca-a26caafa03c2
┌─id─┬─name─┬──────birth─┐
│  1 │ aaa  │ 2021-02-01 │
│  2 │ bbb  │ 2021-02-02 │
└────┴──────┴────────────┘
 
node2 :) select * from dist_t_cluster;
SELECT *
FROM dist_t_cluster
Query id: 8fbe57b7-c375-439f-a235-9feb07b33d83
┌─id─┬─name─┬──────birth─┐
│  1 │ aaa  │ 2021-02-01 │
│  2 │ bbb  │ 2021-02-02 │
└────┴──────┴────────────┘
 
node3 :) select * from dist_t_cluster;
SELECT *
FROM dist_t_cluster
Query id: ffa76681-9f09-4016-b7ca-3b97119f9580
Ok.
0 rows in set. Elapsed: 0.005 sec. 

##node3插入新的数据:
insert into dist_t_cluster values(3, 'aaa', '2021-03-01'), (4, 'bbb', '2021-03-02');
##查询数据,副本同步正常,但是新增节点node3只能查询到新增数据,无法同步之前的数据。
node3 :) select * from dist_t_cluster;
SELECT *
FROM dist_t_cluster
Query id: 7ad9f34b-e2a8-47ed-9d6e-a7ed76a6f8c6
┌─id─┬─name─┬──────birth─┐
│  3 │ aaa  │ 2021-03-01 │
│  4 │ bbb  │ 2021-03-02 │
└────┴──────┴────────────┘
 
node2 :) select * from dist_t_cluster;

SELECT *
FROM dist_t_cluster

Query id: 8fbe57b7-c375-439f-a235-9feb07b33d83

┌─id─┬─name─┬──────birth─┐
│  1 │ aaa  │ 2021-02-01 │
│  2 │ bbb  │ 2021-02-02 │
└────┴──────┴────────────┘
┌─id─┬─name─┬──────birth─┐
│  3 │ aaa  │ 2021-03-01 │
│  4 │ bbb  │ 2021-03-02 │
└────┴──────┴────────────┘
 
node1 :) select * from dist_t_cluster;
SELECT *
FROM dist_t_cluster
Query id: 42b3e629-8cf0-4ea5-a9ca-a26caafa03c2
┌─id─┬─name─┬──────birth─┐
│  1 │ aaa  │ 2021-02-01 │
│  2 │ bbb  │ 2021-02-02 │
└────┴──────┴────────────┘
┌─id─┬─name─┬──────birth─┐
│  3 │ aaa  │ 2021-03-01 │
│  4 │ bbb  │ 2021-03-02 │
└────┴──────┴────────────┘

(3)在任一历史副本中,通过筛选将之前的数据导出成 tsv 文件,传输至 node3 节点导入进本地表,检查数据一致。

##node1节点导出数据
[root@node1 ~]# clickhouse-client --query="select * from t_cluster where id <5 " > /var/lib/clickhouse/backup/t_cluster.tsv

##将csv文件传输至node3节点进行导入
[root@node3 config.d]# cat /tmp/t_cluster.tsv | clickhouse-client --query="insert into t_cluster FORMAT TSV"
 
node3 :) select * from t_cluster;
SELECT *
FROM t_cluster
Query id: 0713ab9e-6594-41f0-917d-8b968c609766
┌─id─┬─name─┬──────birth─┐
│  1 │ aaa  │ 2021-02-01 │
│  2 │ bbb  │ 2021-02-02 │
└────┴──────┴────────────┘
┌─id─┬─name─┬──────birth─┐
│  3 │ aaa  │ 2021-03-01 │
│  4 │ bbb  │ 2021-03-02 │
└────┴──────┴────────────┘


avatar
100
  Subscribe  
提醒