news 2026/4/21 3:40:36

Canal - 数据同步

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Canal - 数据同步

一、简介

1、介绍

Canal 是用 Java 开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。 目前Canal 主要支持了MySQL的Binlog解析,解析完成后利用Canal Client来处理获得相关数据。(数据库同步需要阿里的Otter中间件,基于Canal)。

GitHub地址https://github.com/alibaba/canal

2、MySQL的binlog

(1) 什么是binlog

MySQL 的二进制日志可以说MySQL最重要的日志了,它记录了所有的DDL和DML(除了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗的时间,MySQL的二进制日志是事务安全型的。一般来说开启二进制日志大概会有1%的性能损耗。

二进制有两个最重要的使用场景:

  • 其一:MySQL Replication在Master端开启binlog,Master 把它的二进制日志传递给slaves 来达到master-slave 数据一致的目的。
  • 其二:数据恢复,通过使用mysqlbinlog工具来使恢复数据。

二进制日志包括两类文件:

  • 二进制日志索引文件(文件名后缀为.index)用于记录所有的二进制文件,
  • 二进制日志文件(文件名后缀为.00000*)记录数据库所有的DDL和DML(除了数据查询语句)语句事件。

(2) binlog的开启

MySQL配置文件的位置

  • Linux: /etc/my.cnf 如果/etc目录下没有,可以通过locate my.cnf查找位置
  • Windows: \my.ini

在mysql的配置文件下,修改配置 在[mysqld]区块,添加log-bin=mysql-bin这个表示binlog日志的前缀是mysql-bin,以后生成的日志文件就是 mysql-bin.000001 的文件后面的数字按顺序生成,每次mysql重启或者到达单个文件大小的阈值时,新生一个 文件,按顺序编号。

(3) binlog的分类设置

mysql的binlog格式有三种,分别是STATEMENT,MIXED,ROW。 在配置文件中可以选择配置binlog_format= statement|mixed|row

三种格式的区别:

a、statement 语句级

binlog会记录每次一执行写操作的语句。 相对row模式节省空间,但是可能产生不一致性,比如 update test set create_date=now(); 如果用binlog日志进行恢复,由于执行时间不同可能产生的数据就不同。

优点: 节省空间 缺点: 有可能造成数据不一致。

b、row 行级

binlog会记录每次操作后每行记录的变化。

优点:保持数据的绝对一致性。因为不管sql是什么,引用了什么函数,他只记录执行后的效果。 缺点:占用较大空间。

c、mixed 混合级别

statement的升级版,一定程度上解决了statement模式因为一些情况而造成的数据不一致问题。

默认还是statement,在某些情况下,譬如:

  • 当函数中包含 UUID() 时;
  • 包含 AUTO_INCREMENT 字段的表被更新时;
  • 执行 INSERT DELAYED 语句时;
  • 用 UDF 时;

会按照 ROW的方式进行处理

优点:节省空间,同时兼顾了一定的一致性。 缺点:还有些极个别情况依旧会造成不一致,另外statement和mixed对于需要对 binlog监控的情况都不方便。

3、工作原理

就是把自己伪装成Slave,从Master复制数据。

二、安装

1、MySQL环境准备

(1)Binlog设置

修改mysql的配置文件,开启MySQL Binlog设置
sudo vim /etc/my.cnf
在[mysqld]模块下添加一下内容
[mysqld]
server_id=1
log-bin=mysql-bin
binlog_format=row
# 需要监控的库
binlog-do-db=test_maxwell
并重启Mysql服务
sudo systemctl restart mysqld
登录mysql并查看是否修改完成
mysql -uroot -p123456
mysql> show variables like '%binlog%';
查看下列属性
binlog_format | ROW

win:

(2)查看binlog文件

进入/var/lib/mysql目录,查看MySQL生成的binlog文件

注:MySQL生成的binlog文件初始大小一定是154字节,前缀是log-bin参数配置的,后缀是默认从.000001,然后依次递增。除了binlog文件文件以外,MySQL还会额外生产一个.index索引文件用来记录当前使用的binlog文件。

(3)创建账号

分配一个账号可以操作该数据库
GRANT ALL ON *.* TO 'canal'@'%' IDENTIFIED BY '123456';
GRANT SELECT ,REPLICATION SLAVE , REPLICATION CLIENT ON *.* TO canal@'%';
刷新mysql表权限
flush privileges;

2、安装Canal

(1)上传并解压

注意:canal解压后是分散的,我们在指定解压目录的时候需要将canal指定上

mkdir /opt/module/canal

tar -zxvf canal.deployer-1.1.2.tar.gz -C /opt/module/canal/

(2)修改配置文件

1)修改canal.properties的配置文件

......

canal.port = 11111

......

# tcp, kafka, RocketMQ
canal.serverMode = tcp

......

#################################################
######### destinations #############
#################################################
canal.destinations = example

说明:canal端口号默认就是11111,修改canal的输出model,默认tcp,改为输出到kafka

多实例配置如果创建多个实例,一个canal服务中可以有多个instance,conf/下的每一个example即是一个实例,每个实例下面都有独立的配置文件。默认只有一个实例example,如果需要多个实例处理不同的MySQL数据的话,直接拷贝出多个example,并对其重新命名,命名和配置文件中指定的名称一致,修改 canal.properties中的canal.destinations=实例1,实例2,实例3

2)修改instance.properties配置文件

修改conf/example目录下的配置文件,如果是多个实例,可以配置多个配置文件。

## mysql serverId , v1.0.26+ will autoGen
canal.instance.mysql.slaveId=10

canal.instance.master.address=192.168.10.139:3306
......

# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=root

canal.instance.connectionCharset = UTF-8
canal.instance.defaultDatabaseName =test
# enable druid Decrypt database password
canal.instance.enableDruid=false

(3)启动

./bin/startup.sh

三、实时监控

1、TCP监控

(1)数据结构

(2)创建maven项目

(3)添加依赖

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>com.hk</groupId> <artifactId>hadoopDemo</artifactId> <version>1.0-SNAPSHOT</version> </parent> <artifactId>canalDemo</artifactId> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.2</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.4.1</version> </dependency> </dependencies> </project>

(4)编写客户端代码

package com.hk; import com.alibaba.fastjson.JSONObject; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; import java.net.InetSocketAddress; import java.util.List; public class Main { public static void main(String[] args) throws InvalidProtocolBufferException { // 1.获取canal连接对象 CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("hd01", 11111), "example", "", ""); // 循环监听 while (true) { // 获取连接 canalConnector.connect(); // 要监听的数据库和表 canalConnector.subscribe("test_maxwell.*"); // 获取message,一次获取10条修改 Message message = canalConnector.get(10); // 获取entry List<CanalEntry.Entry> entries = message.getEntries(); // 遍历entry if(entries.size() == 0) { try { System.out.println("暂无数据修改......"); Thread.sleep(3000); } catch (InterruptedException e) { throw new RuntimeException(e); } } else { for (CanalEntry.Entry entry : entries) { // 获取表名 String tableName = entry.getHeader().getTableName(); // 获取entry类型 CanalEntry.EntryType entryType = entry.getEntryType(); // 判断entryType是否为ROWDATA if(entryType == CanalEntry.EntryType.ROWDATA) { // 序列化数据 ByteString storeValue = entry.getStoreValue(); // 反序列化 CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue); // 获取事件类型 CanalEntry.EventType eventType = rowChange.getEventType(); // 获取具体的数据 List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); // 打印数据 for (CanalEntry.RowData rowData : rowDatasList) { // 获取修改前的数据 List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList(); JSONObject beforeData = new JSONObject(); for (CanalEntry.Column column : beforeColumnsList) { beforeData.put(column.getName(), column.getValue()); } // 获取修改后的数据 List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); JSONObject afterData = new JSONObject(); for (CanalEntry.Column column : afterColumnsList) { afterData.put(column.getName(), column.getValue()); } System.out.println("TableName:" + tableName + ",EventType:" + eventType + ",Before:" + beforeData + ",After:" + afterData); } } } } } } }

2、发送到Kafka

(1)启动Kafka和zookeeper

(2)修改配置文件

修改canal.properties中canal的输出model,默认tcp,改为输出到kafka

# tcp, kafka, RocketMQ

canal.serverMode = kafka

......

##################################################
######### MQ #############
##################################################
canal.mq.servers = hd01:6667,hd02:6667,hd03:6667

修改instance.properties输出到Kafka的主题以及分区数

# mq config

canal.mq.topic=canal_test

canal.mq.partitionsNum=1

# hash partition config

#canal.mq.partition=0

#canal.mq.partitionHash=mytest.person:id,mytest.role:id

注意:默认还是输出到指定Kafka主题的一个kafka分区,因为多个分区并行可能会打 乱binlog的顺序,如果要提高并行度,首先设置kafka的分区数>1,然后设置 canal.mq.partitionHash属性

(3)启动Canal

bin/startup.sh

(4)测试

向MySQL中插入数据后查看消费者控制台

插入数据

INSERT INTO test VALUES('1001','zhangsan'),('1002','lisi');

Kafka 消费者控制台

{"data":[{"id":"1001","name":"zhangsan"},{"id":"1002","name":"lisi"}],"database":"test-maxwwell","es":1639360729000,"id":1,"isDdl":false,"mysqlType":{"id":" varchar(255)","name":"varchar(255)"},"old":null,"sql":"","sqlType":{"id":12,"name":12,"table":"test","ts":1639361038454,"type":"INSERT"}

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/21 3:39:09

3步突破:如何免费解锁Cursor Pro完整AI编程功能?

3步突破&#xff1a;如何免费解锁Cursor Pro完整AI编程功能&#xff1f; 【免费下载链接】cursor-free-vip [Support 0.45]&#xff08;Multi Language 多语言&#xff09;自动注册 Cursor Ai &#xff0c;自动重置机器ID &#xff0c; 免费升级使用Pro 功能: Youve reached yo…

作者头像 李华
网站建设 2026/4/21 3:35:14

JDK 7、8、13 和 20区别深度了解

Java 的每一次版本迭代&#xff0c;都是一场为开发者体验而生的进化。JDK 7 到 20 的演变&#xff0c;清晰地展现了 Java 语言从“稳健保守”到“敏捷创新”的转型之路。 下面&#xff0c;我们就以 JDK 7、8、13 和 20 这四个标志性版本为切片&#xff0c;看看 Java 是如何一步…

作者头像 李华
网站建设 2026/4/21 3:33:25

Transformer 技术

Transformer 技术详解&#xff1a;从原理到实战&#xff0c;一篇读懂新时代 AI 基石 一、引言 1.1 Transformer 是什么&#xff1f; Transformer 是 2017 年由 Google 团队在论文《Attention Is All You Need》中提出的深度学习模型架构。 它彻底改变了自然语言处理&#xff08…

作者头像 李华