一、简介
1、介绍
Canal 是用 Java 开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。 目前Canal 主要支持了MySQL的Binlog解析,解析完成后利用Canal Client来处理获得相关数据。(数据库同步需要阿里的Otter中间件,基于Canal)。
GitHub地址https://github.com/alibaba/canal
2、MySQL的binlog
(1) 什么是binlog
MySQL 的二进制日志可以说MySQL最重要的日志了,它记录了所有的DDL和DML(除了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗的时间,MySQL的二进制日志是事务安全型的。一般来说开启二进制日志大概会有1%的性能损耗。
二进制有两个最重要的使用场景:
- 其一:MySQL Replication在Master端开启binlog,Master 把它的二进制日志传递给slaves 来达到master-slave 数据一致的目的。
- 其二:数据恢复,通过使用mysqlbinlog工具来使恢复数据。
二进制日志包括两类文件:
- 二进制日志索引文件(文件名后缀为.index)用于记录所有的二进制文件,
- 二进制日志文件(文件名后缀为.00000*)记录数据库所有的DDL和DML(除了数据查询语句)语句事件。
(2) binlog的开启
MySQL配置文件的位置
- Linux: /etc/my.cnf 如果/etc目录下没有,可以通过locate my.cnf查找位置
- Windows: \my.ini
在mysql的配置文件下,修改配置 在[mysqld]区块,添加log-bin=mysql-bin这个表示binlog日志的前缀是mysql-bin,以后生成的日志文件就是 mysql-bin.000001 的文件后面的数字按顺序生成,每次mysql重启或者到达单个文件大小的阈值时,新生一个 文件,按顺序编号。
(3) binlog的分类设置
mysql的binlog格式有三种,分别是STATEMENT,MIXED,ROW。 在配置文件中可以选择配置binlog_format= statement|mixed|row
三种格式的区别:
a、statement 语句级
binlog会记录每次一执行写操作的语句。 相对row模式节省空间,但是可能产生不一致性,比如 update test set create_date=now(); 如果用binlog日志进行恢复,由于执行时间不同可能产生的数据就不同。
优点: 节省空间 缺点: 有可能造成数据不一致。
b、row 行级
binlog会记录每次操作后每行记录的变化。
优点:保持数据的绝对一致性。因为不管sql是什么,引用了什么函数,他只记录执行后的效果。 缺点:占用较大空间。
c、mixed 混合级别
statement的升级版,一定程度上解决了statement模式因为一些情况而造成的数据不一致问题。
默认还是statement,在某些情况下,譬如:
- 当函数中包含 UUID() 时;
- 包含 AUTO_INCREMENT 字段的表被更新时;
- 执行 INSERT DELAYED 语句时;
- 用 UDF 时;
会按照 ROW的方式进行处理
优点:节省空间,同时兼顾了一定的一致性。 缺点:还有些极个别情况依旧会造成不一致,另外statement和mixed对于需要对 binlog监控的情况都不方便。
3、工作原理
就是把自己伪装成Slave,从Master复制数据。
二、安装
1、MySQL环境准备
(1)Binlog设置
修改mysql的配置文件,开启MySQL Binlog设置
sudo vim /etc/my.cnf
在[mysqld]模块下添加一下内容
[mysqld]
server_id=1
log-bin=mysql-bin
binlog_format=row
# 需要监控的库
binlog-do-db=test_maxwell
并重启Mysql服务
sudo systemctl restart mysqld
登录mysql并查看是否修改完成
mysql -uroot -p123456
mysql> show variables like '%binlog%';
查看下列属性
binlog_format | ROW
win:
(2)查看binlog文件
进入/var/lib/mysql目录,查看MySQL生成的binlog文件
注:MySQL生成的binlog文件初始大小一定是154字节,前缀是log-bin参数配置的,后缀是默认从.000001,然后依次递增。除了binlog文件文件以外,MySQL还会额外生产一个.index索引文件用来记录当前使用的binlog文件。
(3)创建账号
分配一个账号可以操作该数据库
GRANT ALL ON *.* TO 'canal'@'%' IDENTIFIED BY '123456';
GRANT SELECT ,REPLICATION SLAVE , REPLICATION CLIENT ON *.* TO canal@'%';
刷新mysql表权限
flush privileges;
2、安装Canal
(1)上传并解压
注意:canal解压后是分散的,我们在指定解压目录的时候需要将canal指定上
mkdir /opt/module/canal
tar -zxvf canal.deployer-1.1.2.tar.gz -C /opt/module/canal/
(2)修改配置文件
1)修改canal.properties的配置文件
......
canal.port = 11111
......
# tcp, kafka, RocketMQ
canal.serverMode = tcp......
#################################################
######### destinations #############
#################################################
canal.destinations = example
说明:canal端口号默认就是11111,修改canal的输出model,默认tcp,改为输出到kafka
多实例配置如果创建多个实例,一个canal服务中可以有多个instance,conf/下的每一个example即是一个实例,每个实例下面都有独立的配置文件。默认只有一个实例example,如果需要多个实例处理不同的MySQL数据的话,直接拷贝出多个example,并对其重新命名,命名和配置文件中指定的名称一致,修改 canal.properties中的canal.destinations=实例1,实例2,实例3。
2)修改instance.properties配置文件
修改conf/example目录下的配置文件,如果是多个实例,可以配置多个配置文件。
## mysql serverId , v1.0.26+ will autoGen
canal.instance.mysql.slaveId=10canal.instance.master.address=192.168.10.139:3306
......# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=root
canal.instance.connectionCharset = UTF-8
canal.instance.defaultDatabaseName =test
# enable druid Decrypt database password
canal.instance.enableDruid=false
(3)启动
./bin/startup.sh
三、实时监控
1、TCP监控
(1)数据结构
(2)创建maven项目
(3)添加依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>com.hk</groupId> <artifactId>hadoopDemo</artifactId> <version>1.0-SNAPSHOT</version> </parent> <artifactId>canalDemo</artifactId> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.2</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.4.1</version> </dependency> </dependencies> </project>(4)编写客户端代码
package com.hk; import com.alibaba.fastjson.JSONObject; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; import java.net.InetSocketAddress; import java.util.List; public class Main { public static void main(String[] args) throws InvalidProtocolBufferException { // 1.获取canal连接对象 CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("hd01", 11111), "example", "", ""); // 循环监听 while (true) { // 获取连接 canalConnector.connect(); // 要监听的数据库和表 canalConnector.subscribe("test_maxwell.*"); // 获取message,一次获取10条修改 Message message = canalConnector.get(10); // 获取entry List<CanalEntry.Entry> entries = message.getEntries(); // 遍历entry if(entries.size() == 0) { try { System.out.println("暂无数据修改......"); Thread.sleep(3000); } catch (InterruptedException e) { throw new RuntimeException(e); } } else { for (CanalEntry.Entry entry : entries) { // 获取表名 String tableName = entry.getHeader().getTableName(); // 获取entry类型 CanalEntry.EntryType entryType = entry.getEntryType(); // 判断entryType是否为ROWDATA if(entryType == CanalEntry.EntryType.ROWDATA) { // 序列化数据 ByteString storeValue = entry.getStoreValue(); // 反序列化 CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue); // 获取事件类型 CanalEntry.EventType eventType = rowChange.getEventType(); // 获取具体的数据 List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); // 打印数据 for (CanalEntry.RowData rowData : rowDatasList) { // 获取修改前的数据 List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList(); JSONObject beforeData = new JSONObject(); for (CanalEntry.Column column : beforeColumnsList) { beforeData.put(column.getName(), column.getValue()); } // 获取修改后的数据 List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); JSONObject afterData = new JSONObject(); for (CanalEntry.Column column : afterColumnsList) { afterData.put(column.getName(), column.getValue()); } System.out.println("TableName:" + tableName + ",EventType:" + eventType + ",Before:" + beforeData + ",After:" + afterData); } } } } } } }2、发送到Kafka
(1)启动Kafka和zookeeper
(2)修改配置文件
修改canal.properties中canal的输出model,默认tcp,改为输出到kafka
# tcp, kafka, RocketMQ
canal.serverMode = kafka
......
##################################################
######### MQ #############
##################################################
canal.mq.servers = hd01:6667,hd02:6667,hd03:6667
修改instance.properties输出到Kafka的主题以及分区数
# mq config
canal.mq.topic=canal_test
canal.mq.partitionsNum=1
# hash partition config
#canal.mq.partition=0
#canal.mq.partitionHash=mytest.person:id,mytest.role:id
注意:默认还是输出到指定Kafka主题的一个kafka分区,因为多个分区并行可能会打 乱binlog的顺序,如果要提高并行度,首先设置kafka的分区数>1,然后设置 canal.mq.partitionHash属性
(3)启动Canal
bin/startup.sh
(4)测试
向MySQL中插入数据后查看消费者控制台
插入数据
INSERT INTO test VALUES('1001','zhangsan'),('1002','lisi');
Kafka 消费者控制台
{"data":[{"id":"1001","name":"zhangsan"},{"id":"1002","name":"lisi"}],"database":"test-maxwwell","es":1639360729000,"id":1,"isDdl":false,"mysqlType":{"id":" varchar(255)","name":"varchar(255)"},"old":null,"sql":"","sqlType":{"id":12,"name":12,"table":"test","ts":1639361038454,"type":"INSERT"}