前言…
最近工作中遇到了一个数据同步的问题
我们这边系统的一个子业务需要依赖另一个系统的数据,当另一个系统数据变更时,我们这边的数据库要对数据进行同步…
那么我自己想到的同步方式呢就两种:
1、MQ订阅,另一个系统数据变更后将变更数据方式到MQ 我们这边订阅接受
2、数据库的触发器
但是呢,两者都被组长paas了!
1、MQ呢,会造成代码侵入,但是另一个系统暂时不会做任何代码更改…
2、数据库的触发器会直接跟生产数据库强关联,会抢占资源,甚至有可能造成生产数据库的不稳定…
对此很是苦恼…
于是啊,只能借由强大的google、百度,看看能不能解决我这个问题!一番搜索,有学习了一个很有趣的东西…
Canal
canal:阿里开源mysql binlog 数据组件
官网解释的相当详细了(国产牛逼)…下边我也是照搬过来的…
官网地址如下:https://github.com/alibaba/canal/wiki
早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元。ps. 目前内部使用的同步,已经支持mysql5.x和oracle部分版本的日志解析
canal [k?’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
工作原理
canal呢,实际是就是运用了Mysql的主从复制原理…
MySQL主从复制实现
复制遵循三步过程:
如何运作
原理很简单:
通过官网的介绍,让我们了解到,canal实际上就是伪装为了一个从库,我们只需要订阅到数据变更的主库,那么canal就会以从库的身份读取到其主库的binlog日志!我们拿到canal解析好的binlog日志信息,就等于拿到了变更的数据啦!…
这样的话呢,我们即保证了不影响其系统数据库正常使用,又不会侵入他的项目代码,一举两得
ok,接下来开始实战篇…
使用canal呢,有一个前提条件,即被订阅的数据库需要开启binlog
如何查看是否开启binlog呢?
登录服务器上数据库或在可视化工具中 执行查询语句: 如果出现 log_bin ON 表示已开启Binlog
1 |
show variables like 'log_bin'; |
如果服务器上的数据库为自己安装的,则找到配置文件my.conf 添加以下内容,如果买的云实例,则询问厂商开启即可
在my.conf文件中的 [mysqld] 下添加以下三行内容
1 2 3 |
log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择 ROW 模式 读行 server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复 |
canaltest:作为slave 角色的账户 Canal123…:为密码
1 2 3 4 |
CREATE USER canaltest IDENTIFIED BY 'Canal123..'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canaltest'@'%'; GRANT ALL PRIVILEGES ON *.* TO 'canaltest'@'%' ; FLUSH PRIVILEGES; |
连接测试
那么到这里,准备工作就好了!
可能呢,有的小伙伴有点懵,你这是在干啥?那么咱们就来理那么一理! 敲黑板了哈!
1、事前准备,是针对于订阅数据库的(即主库)
2、实际步骤也就两步 1:更改配置,开启binlog 2:设置新账号,赋予slave权限,供canal读取Binlog桥梁使用
3、以上操作与canal本身没啥关系,仅仅是使用canal的前提条件罢辽…
canal admin 是 一个可视化的 canal web管理运维工程,脱离以往服务器运维,面向web…
canal-admin设计上是为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面,方便更多用户快速和安全的操作
canal-admin的限定依赖:
下载
1 |
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz |
解压
1 2 |
mkdir /usr/local/canal-admin tar zxvf canal.admin-1.1.4.tar.gz -C /usr/local/canal-admin |
进入canal-admin目录下查看
1 |
cd /usr/local/canal-admin |
修改配置
1 |
vim conf/application.yml |
里边的配置 按照自己的实际情况更改…
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
server: port: 8089 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 #这里是配置canal-admin 所依赖的数据库,,,存放web管理中设置的配置等,,, spring.datasource: address: 127.0.0.1:3306 database: canal_manager username: root password: 123456 driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false hikari: maximum-pool-size: 30 minimum-idle: 1 # 连接所用的账户密码 canal: adminUser: admin adminPasswd: leitest |
导入canaladmin 所需要的数据库文件
这里需要注意了,要和 application.yml中的数据库名对应,你可以选择命令导入,也可以Navicat 可视化拖sql文件导入…一切…看你喜欢.
我这个玩canal的服务器呢,是新安装的,mysql直接用docker安装即可,具体可查看我的博客:
Docker在CentOS7下不能下载镜像timeout的解决办法(图解)
CentOS 7安装Docker
需要注意的是,使用docker 安装的mysql 是无法直接使用 mysql -uroot -p 命令的哦,需要先将脚本复制到容器中,docker不熟练或觉得麻烦的同鞋,请直接使用Navicat可视化工具…
导入canal-admin服务所必需的sql文件
如果是服务器软件软件安装的mysql 则直接执行以下命令即可
1 2 3 4 |
mysql -uroot -p #......... # 导入初始化SQL > source conf/canal_manager.sql |
启动
直接执行启动脚本即可
1 2 |
cd bin ./startup.sh |
默认账户密码:
1 |
admin:123456 |
canal-server 才是canal的核心我们前边所讲的canal的功能,实际上讲述的就是canal-server的功能…admin 仅仅只是一个web管理而已,不要搞混主次关系…
下载
1 |
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz |
解压
1 2 |
mkdir /usr/local/canal-server tar zxvf canal.deployer-1.1.4.tar.gz -C /usr/local/canal-server |
启动,并连接到canal-admin web端
首先,我们需要修改配置文件
1 2 3 |
cd /usr/local/canal-server
vim /conf/canal_local.properties |
注意了,密码如何加密!!!
要记得,前边 canal-admin 的 aplication.yml 中设置了账户密码为 admin:leitest
1 2 3 4 |
# 连接所用的账户密码 canal: adminUser: admin adminPasswd: leitest |
所以,我们这里需要对明文 leitest 加密并替换即可
使用数据库函数 PASSWORD 加密即可
SELECT PASSWORD(‘要加密的明文’),然后去掉前边的* 号就行
启动并连接到admin
1 |
sh bin/startup.sh local |
查看端口看是否有 11110 、11111、11112
netstat -untlp 看了一下,发现没有,说明server 没有启动成功
看下日志
1 |
vim logs/canal/canal.log |
解决办法:
1、canal-admin 先停止后从起
2、canal server 先以之前的形式运行,不输入后边 local 命令
3、关闭canal server
4、再以canal server 连接 admin 形式启动
admin页面上新建server
修改配置,注释 (instance连接信息,我们还是以前边设置的 admin:leitest 为准,所有这里需要注释掉,如果不注释,那么我们代码中连接则需要使用此账号以及密码)
接下来咱们创建instance
如何理解server 和instance 呢,我认为,可以把它当做 java 中的 class 和 bean 即 类和对象
server 为类 instance 为其具体的实例对象 ,可创建多个不同的实例…
而我们这边监听到主库变化的呢,则是根据业务,对不同的实例即(instance )做不同配置即可…
根据自己情况进行过滤数据
canal.instance.filter.regex | mysql 数据解析关注的表,Perl正则表达式.多个正则之间以逗号(,)分隔,转义符需要双斜杠(\) 常见例子:1. 所有表:.* or .\… 2. canal schema下所有表: canal\…* 3. canal下的以canal打头的表:canal\.canal.* 4. canal schema下的一张表:canal\.test15. 多个规则组合使用:canal\…*,mysql.test1,mysql.test2 (逗号分隔) | ||
---|---|---|---|
canal.instance.filter.druid.ddl | 是否使用druid处理所有的ddl解析来获取库和表名 | true | |
canal.instance.filter.query.dcl | 是否忽略dcl语句 | false | |
canal.instance.filter.query.dml | 是否忽略dml语句 (mysql5.6之后,在row模式下每条DML语句也会记录SQL到binlog中,可参考MySQL文档) | false | |
canal.instance.filter.query.ddl | 是否忽略ddl语句 | false |
更多设置请见官网:https://github.com/alibaba/canal/wiki/AdminGuide
如此一来,一个简单的canal环境就搭建好了,接下来,咱们开始测试吧!
引入canal所需依赖
1 2 3 4 5 |
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.4</version> </dependency> |
配置
1 2 3 4 5 6 7 8 9 10 11 |
canal: # instance 实例所在ip host: 192.168.96.129 # tcp通信端口 port: 11111 # 账号 canal-admin application.yml 设置的 username: admin # 密码 password: leitest #实例名称 instance: test |
代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
package com.leilei; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component; import java.net.InetSocketAddress; import java.util.List; /** * @author lei * @version 1.0 * @date 2020/9/27 22:23 * @desc 读取binlog日志 */ @Component public class ReadBinLogService implements ApplicationRunner { @Value("${canal.host}") private String host; @Value("${canal.port}") private int port; @Value("${canal.username}") private String username; @Value("${canal.password}") private String password; @Value("${canal.instance}") private String instance; @Override public void run(ApplicationArguments args) throws Exception { CanalConnector conn = getConn(); while (true) { conn.connect(); //订阅实例中所有的数据库和表 conn.subscribe(".*\\..*"); // 回滚到未进行ack的地方 conn.rollback(); // 获取数据 每次获取一百条改变数据 Message message = conn.getWithoutAck(100); long id = message.getId(); int size = message.getEntries().size(); if (id != -1 && size > 0) { // 数据解析 analysis(message.getEntries()); }else { Thread.sleep(1000); } // 确认消息 conn.ack(message.getId()); // 关闭连接 conn.disconnect(); } } /** * 数据解析 */ private void analysis(List<CanalEntry.Entry> entries) { for (CanalEntry.Entry entry : entries) { // 只解析mysql事务的操作,其他的不解析 if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN) { continue; if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) { // 解析binlog CanalEntry.RowChange rowChange = null; try { rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("解析出现异常 data:" + entry.toString(), e); if (rowChange != null) { // 获取操作类型 CanalEntry.EventType eventType = rowChange.getEventType(); // 获取当前操作所属的数据库 String dbName = entry.getHeader().getSchemaName(); // 获取当前操作所属的表 String tableName = entry.getHeader().getTableName(); // 事务提交时间 long timestamp = entry.getHeader().getExecuteTime(); for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { dataDetails(rowData.getBeforeColumnsList(), rowData.getAfterColumnsList(), dbName, tableName, eventType, timestamp); System.out.println("-------------------------------------------------------------"); } * 解析具体一条Binlog消息的数据 * * @param dbName 当前操作所属数据库名称 * @param tableName 当前操作所属表名称 * @param eventType 当前操作类型(新增、修改、删除) private static void dataDetails(List<CanalEntry.Column> beforeColumns, List<CanalEntry.Column> afterColumns, String dbName, String tableName, CanalEntry.EventType eventType, long timestamp) { System.out.println("数据库:" + dbName); System.out.println("表名:" + tableName); System.out.println("操作类型:" + eventType); if (CanalEntry.EventType.INSERT.equals(eventType)) { System.out.println("新增数据:"); printColumn(afterColumns); } else if (CanalEntry.EventType.DELETE.equals(eventType)) { System.out.println("删除数据:"); printColumn(beforeColumns); } else { System.out.println("更新数据:更新前数据--"); System.out.println("更新数据:更新后数据--"); System.out.println("操作时间:" + timestamp); private static void printColumn(List<CanalEntry.Column> columns) { for (CanalEntry.Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); * 获取连接 public CanalConnector getConn() { return CanalConnectors.newSingleConnector(new InetSocketAddress(host, port), instance, username, password); } |
测试查看
数据库修改数据库时
数据新增数据时
删除数据(把我们才添加的小明删掉)
当我们操作监控的数据库DM L操作的时候呢,会被canal监听到…我们呢,通过canal监听,拿到修改的库,修改的表,修改的字段,便可以根据自己业务进行数据处理了!
哎,这个时候啊,可能有小伙伴就要问了,那么,我能不能直接获取其操作的sql语句呢?
目前,我是自己解析其列来手动拼接的sql语句实现了
话不多说,先上效果:
canal 监听到主库sql变化----> update students set id = '2', age = '999', name = '小三', city = '11', date = '2020-09-27 17:41:44', birth = '2020-09-27 18:00:48' where id=2
canal 监听到主库sql变化----> delete from students where id=6
canal 监听到主库sql变化----> insert into students (id,age,name,city,date,birth) VALUES ('89','98','测试新增','深圳','2020-09-27 22:46:53','')
canal 监听到主库sql变化----> update students set id = '89', age = '98', name = '测试新增', city = '深圳', date = '2020-09-27 22:46:53', birth = '2020-09-27 22:46:56' where id=89
实际上呢,我们也就是拿到其执行前列数据变化 执行后列数据变化,自己拼接了一个sql罢了…附上代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 |
package com.leilei; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry.*; import com.alibaba.otter.canal.protocol.Message; import com.alibaba.otter.canal.protocol.exception.CanalClientException; import com.google.protobuf.InvalidProtocolBufferException; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component; import java.net.InetSocketAddress; import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; /** * @author lei * @version 1.0 * @date 2020/9/27 22:33 * @desc 读取binlog日志 */ @Component public class ReadBinLogToSql implements ApplicationRunner { //读取的binlog sql 队列缓存 一边Push 一边poll private Queue<String> canalQueue = new ConcurrentLinkedQueue<>(); @Value("${canal.host}") private String host; @Value("${canal.port}") private int port; @Value("${canal.username}") private String username; @Value("${canal.password}") private String password; @Value("${canal.instance}") private String instance; @Override public void run(ApplicationArguments args) throws Exception { CanalConnector conn = getConn(); while (true) { try { conn.connect(); //订阅实例中所有的数据库和表 conn.subscribe(".*\\..*"); // 回滚到未进行ack的地方 conn.rollback(); // 获取数据 每次获取一百条改变数据 Message message = conn.getWithoutAck(100); long id = message.getId(); int size = message.getEntries().size(); if (id != -1 && size > 0) { // 数据解析 analysis(message.getEntries()); } else { Thread.sleep(1000); } // 确认消息 conn.ack(message.getId()); } catch (CanalClientException | InvalidProtocolBufferException | InterruptedException e) { e.printStackTrace(); } finally { // 关闭连接 conn.disconnect(); } } } private void analysis(List<Entry> entries) throws InvalidProtocolBufferException { for (Entry entry : entries) { if (EntryType.ROWDATA == entry.getEntryType()) { RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); EventType eventType = rowChange.getEventType(); if (eventType == EventType.DELETE) { saveDeleteSql(entry); } else if (eventType == EventType.UPDATE) { saveUpdateSql(entry); } else if (eventType == EventType.INSERT) { saveInsertSql(entry); } } } } /** * 保存更新语句 * * @param entry */ private void saveUpdateSql(Entry entry) { try { RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); List<RowData> dataList = rowChange.getRowDatasList(); for (RowData rowData : dataList) { List<Column> afterColumnsList = rowData.getAfterColumnsList(); StringBuffer sql = new StringBuffer("update " + entry.getHeader().getTableName() + " set "); for (int i = 0; i < afterColumnsList.size(); i++) { sql.append(" ") .append(afterColumnsList.get(i).getName()) .append(" = '").append(afterColumnsList.get(i).getValue()) .append("'"); if (i != afterColumnsList.size() - 1) { sql.append(","); } } sql.append(" where "); List<Column> oldColumnList = rowData.getBeforeColumnsList(); for (Column column : oldColumnList) { if (column.getIsKey()) { sql.append(column.getName()).append("=").append(column.getValue()); break; } } canalQueue.add(sql.toString()); } } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } } /** * 保存删除语句 * * @param entry */ private void saveDeleteSql(Entry entry) { try { RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); List<RowData> rowDatasList = rowChange.getRowDatasList(); for (RowData rowData : rowDatasList) { List<Column> columnList = rowData.getBeforeColumnsList(); StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getTableName() + " where "); for (Column column : columnList) { if (column.getIsKey()) { sql.append(column.getName()).append("=").append(column.getValue()); break; } } canalQueue.add(sql.toString()); } } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } } /** * 保存插入语句 * * @param entry */ private void saveInsertSql(Entry entry) { try { RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); List<RowData> datasList = rowChange.getRowDatasList(); for (RowData rowData : datasList) { List<Column> columnList = rowData.getAfterColumnsList(); StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getTableName() + " ("); for (int i = 0; i < columnList.size(); i++) { sql.append(columnList.get(i).getName()); if (i != columnList.size() - 1) { sql.append(","); } } sql.append(") VALUES ("); for (int i = 0; i < columnList.size(); i++) { sql.append("'" + columnList.get(i).getValue() + "'"); if (i != columnList.size() - 1) { sql.append(","); } } sql.append(")"); canalQueue.add(sql.toString()); } } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } } /** * 获取连接 */ public CanalConnector getConn() { return CanalConnectors.newSingleConnector(new InetSocketAddress(host, port), instance, username, password); } /** * 模拟消费canal转换的sql语句 */ public void executeQueueSql() { int size = canalQueue.size(); for (int i = 0; i < size; i++) { String sql = canalQueue.poll(); System.out.println("canal 监听到主库sql变化----> " + sql); } } } |
当然了,这只是简单的demo 演示,您可根据自己的业务进行修改完善即可…
上边的安装步骤呢,我也是不断的测试过,没有问题,当然可能或多或少有些坑没有踩到,但是如果您按照我的步骤来,大概率是一马平川的…
附上项目源码:springboot-canal https://github.com/leilei0220/springboot-learn