Canal(Mysql)数据同步工具使用
1. Canal介绍
阿里巴巴 B2B 公司,因为业务的特性,卖家主要集中在国内,买家主要集中在国外,所以衍生出了同步杭州和美国异地机房的需求,从 2010 年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务。Canal 是用 Java 开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。目前。Canal 主要支持了 MySQL 的 Binlog 解析,解析完成后才利用 Canal Client 来处理获得的相关数据。(数据库同步需要阿里的 Otter 中间件,基于 Canal)
2.Canal工作原理
2.1 Mysql主从复制原理
1)Master 主库将改变记录,写到二进制日志(Binary Log)中;
2)Slave 从库向 MySQL Master 发送 dump 协议,将 Master 主库的 binary log events 拷贝
到它的中继日志(relay log);
3)Slave 从库读取并重做中继日志中的事件,将改变的数据同步到自己的数据库。
2.2 Mysql bin-log开启
略
3 Canal使用
3.1 Canal部署
Canal需要部署到linux服务器上
- 上传canal.deployer-1.1.2.tar.gz包到服务器的
/opt/software
下 - 解压
tar -zxvf canal.deployer-1.1.2.tar.gz -C ./canal/
目录下 - 配置
canal.properties
和instance.properties
文件中内容
cancal.properties
instance.properties
4. 启动canal ./bin/startup.sh
3.2客户端连接Canal
- 创建一个java的maven工程
- pom.xml中添加如下配置
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.2</version>
</dependency>
- 创建一个CanalClient.java文件
- 内容如下
//连接服务端canal
CanalConnector example = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.10.130", 11111), "example", "", "");
//开始循环订阅数据
while (true) {
//开始连接
example.connect();
//订阅某个库或者表数据
example.subscribe("test-canal.*");
//获取变化数据
Message message = example.get(100);
//拿到数据
List<CanalEntry.Entry> entries = message.getEntries();
//判断是否存在数据
if (entries.size() <= 0) {
log.info("没有数据");
Thread.sleep(1000L);
} else {
//开始遍历数据
for (CanalEntry.Entry e : entries) {
String tableName = e.getHeader().getTableName();
CanalEntry.EntryType entryType = e.getEntryType();
if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {
ByteString storeValue = e.getStoreValue();
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
for (CanalEntry.RowData row : rowDatasList) {
Map<String, Object> beforeMap = new HashMap<>(1 << 4);
List<CanalEntry.Column> beforeColumnsList = row.getBeforeColumnsList();
for (CanalEntry.Column c : beforeColumnsList) {
beforeMap.put(c.getName(), c.getValue());
}
Map<String, Object> afterMap = new HashMap<>(1 << 4);
List<CanalEntry.Column> afterColumnsList = row.getAfterColumnsList();
for (CanalEntry.Column c : afterColumnsList) {
afterMap.put(c.getName(), c.getValue());
}
log.info("表名:{},之前数据:{},之后数据:{}", tableName, JSON.toJSON(beforeMap), JSON.toJSON(afterMap));
}
}
}
}
}
- 数据变化格式
4 Canal使用总结
上面为一个canal的简单使用。具体详细使用可以查看Canal的gitHub项目地址
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhiacjhb
系列文章
更多
同类精品
更多
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
微信运动停用后别人还能看到步数吗
PHP中文网 07-22 -
excel打印预览压线压字怎么办
PHP中文网 06-22