Canal+RabbitMQ实现数据增量同步

1/5/2022 CanalRabbitMQ

# Canal+RabbitMQ实现数据增量同步🧨

# Canal是什么?🧐

canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费;

基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

# 工作原理🤔

canal 工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

    canal1

Canal分为服务端和客户端:

  • 服务端:负责解析MySQL的binlog日志,传递增量数据给客户端或者消息中间件
  • 客户端:负责解析服务端传过来的数据,然后定制自己的业务处理。

目前为止支持的消息中间件很全面了,比如KafkaRocketMQRabbitMQ

# 数据同步还有其他中间件吗?🤔

canal2

# Canal服务端安装🛻

服务端下载:链接 (opens new window)

下载后解压可得(canal.deployer-1.1.5)

canal3

# 1、打开MySQL的binlog日志

修改MySQL的日志文件,my.cnf 配置如下:

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
1
2
3
4

# 2、设置MySQL的配置

需要设置服务端配置文件中的MySQL配置,这样Canal才能知道需要监听哪个库、哪个表的日志文件。

一个 Server 可以配置多个实例监听 ,Canal 功能默认自带的有个 example 实例,这里我就用 example 实例 。如果增加实例,复制 example 文件夹内容到同级目录下,然后在 canal.properties 指定添加实例的名称。

修改canal.deployer-1.1.5\conf\example\instance.properties配置文件

# url
canal.instance.master.address=127.0.0.1:3306
# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=root
# 监听的数据库
canal.instance.defaultDatabaseName=mydb

# 监听的表,可以指定,多个用逗号分割,这里正则是监听所有
canal.instance.filter.regex=.*\\..*

1
2
3
4
5
6
7
8
9
10
11

# 3、设置RabbitMQ的配置

服务端默认的传输方式是tcp,需要在配置文件中设置MQ的相关信息。

这里需要修改两处配置文件,如下

1、canal.deployer-1.1.5\conf\canal.properties

这个配置文件主要是设置MQ相关的配置,比如URL,用户名、密码...

# 传输方式:tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = rabbitMQ
##################################################
######### 		    RabbitMQ	     #############
##################################################
rabbitmq.host = 127.0.0.1
rabbitmq.virtual.host =/
# exchange
rabbitmq.exchange =canal.exchange
# 用户名、密码
rabbitmq.username =guest
rabbitmq.password =guest
## 是否持久化
rabbitmq.deliveryMode = 2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

2、canal.deployer-1.1.5\conf\example\instance.properties

这个文件设置MQ的路由KEY,这样才能路由到指定的队列中,如下:

canal.mq.topic=canal.routing.key

1
2

# 4、RabbitMQ新建exchange和Queue

在RabbitMQ管理控制台中需要新建一个名称为:canal.exchange(必须和配置中的相同且交换机类型为topic)的exchange和一个名称为 canal.queue(名称随意)的队列。

其中绑定的路由KEY为:canal.routing.key(必须和配置中的相同),如下图:

canal4

# 5、启动服务端

点击bin目录下的脚本,windows直接双击startup.bat

启动可能报错:

canal5

修改方案:去除图中标记的JVM配置

canal6

启动成功如下:

canal7

# 6、测试

在本地数据库中插入一条数据,如下:

INSERT INTO T_ZB_BX VALUES (1,1,1);
1

此时查看MQ中的canal.queue已经有了数据,如下:

canal8

{
	"data": [{
		"ID": "1",
		"ZB_ID": "1",
		"BX_ID": "1"
	}],
	"database": "w2db",
	"es": 1641285182000,
	"id": 2,
	"isDdl": false,
	"mysqlType": {
		"ID": "int(11)",
		"ZB_ID": "int(11)",
		"BX_ID": "int(11)"
	},
	"old": null,
	"pkNames": ["ID"],
	"sql": "",
	"sqlType": {
		"ID": 4,
		"ZB_ID": 4,
		"BX_ID": 4
	},
	"table": "T_ZB_BX",
	"ts": 1641285179543,
	"type": "INSERT"
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

该JSON数据表述很明确,包含表名称、方法、参数、参数类型、参数值等等

客户端要做的就是监听MQ获取JSON数据,然后将其解析出来,处理自己的业务逻辑。

# Canal客户端搭建🌋

客户端很简单实现,要做的就是消费Canal服务端传递过来的消息,监听canal.queue这个队列。

# 1、创建消息实体类

创建个实体类接收MQ传递过来的数据

import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.List;

/**
 * @version 1.0.0
 * @className: CanalMessage
 * @description: 阿里Canal中间件消息接收实体类
 * @author: LiJunYi
 * @create: 2022/1/4 16:44
 */
@NoArgsConstructor // 无参构造器  必须!!!
@Data
public class CanalMessage<T>
{
    @JsonProperty("data")
    private List<T> data;

    @JsonProperty("database")
    private String database;

    @JsonProperty("es")
    private Long es;

    @JsonProperty("id")
    private Integer id;

    @JsonProperty("isDdl")
    private Boolean isDdl;

    @JsonProperty("mysqlType")
    private Object mysqlType;

    @JsonProperty("old")
    private List<T> old;

    @JsonProperty("pkNames")
    private List<String> pkNames;

    @JsonProperty("sql")
    private String sql;

    @JsonProperty("sqlType")
    private Object sqlType;

    @JsonProperty("table")
    private String table;

    @JsonProperty("ts")
    private Long ts;

    @JsonProperty("type")
    private String type;
}


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57

# 2、MQ消息监听业务

接下来就是监听队列,一旦有Canal服务端有数据推送能够及时的消费。

import cn.hutool.json.JSONUtil;
import com.seecen.springbootrabbitmq.model.CanalMessage;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;

/**
 * @version 1.0.0
 * @className: CanalRabbitMQListener
 * @description: 监听MQ获取Canal增量的数据消息
 * @author: LiJunYi
 * @create: 2022/1/4 16:49
 */
@Component
@Slf4j
@RequiredArgsConstructor
public class CanalRabbitMQListener
{
   @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(value = "canal.queue", durable = "true"),
                    exchange = @Exchange(value = "canal.exchange", type = "topic"),
                    key = "canal.routing.key"
            )
    })
    public void handleDataChange(Message message)
   {
       // 注意此处MQ传来的是Byte[]
       byte[] ba = message.getBody();
       String s = new String(ba, StandardCharsets.UTF_8);
       String jsonStr = JSONUtil.toJsonStr(s);
       //使用Hutool工具将message转换为CanalMessage
       CanalMessage canalMessage = JSONUtil.toBean(jsonStr,CanalMessage.class);
       String tableName = canalMessage.getTable();
       //TODO 后续逻辑完善...............
    }
}


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

# 3、MQ的数据解析配置

@Configuration
public class MyAMQPConfig {

    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}
1
2
3
4
5
6
7
8

# 4、测试

向表中插入数据,看下接收的消息是什么样的,SQL如下:

INSERT INTO T_ZB_BX VALUES (1,1,1);
1

客户端转换后的消息如下图:

canal9

图上显示,MQ发送过来的所有的数据都已经成功接收到,后续可根据实际需求完善业务逻辑代码即可。

十年
陈奕迅