• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Canal(Mysql)数据同步工具使用

武飞扬头像
xiongyi01
帮助1

1. Canal介绍

阿里巴巴 B2B 公司,因为业务的特性,卖家主要集中在国内,买家主要集中在国外,所以衍生出了同步杭州和美国异地机房的需求,从 2010 年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务。Canal 是用 Java 开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。目前。Canal 主要支持了 MySQL 的 Binlog 解析,解析完成后才利用 Canal Client 来处理获得的相关数据。(数据库同步需要阿里的 Otter 中间件,基于 Canal)

2.Canal工作原理

2.1 Mysql主从复制原理

1)Master 主库将改变记录,写到二进制日志(Binary Log)中;
2)Slave 从库向 MySQL Master 发送 dump 协议,将 Master 主库的 binary log events 拷贝
到它的中继日志(relay log);
3)Slave 从库读取并重做中继日志中的事件,将改变的数据同步到自己的数据库。
学新通

2.2 Mysql bin-log开启

详情见Maxwell简单使用2.2.1章节

3 Canal使用

3.1 Canal部署

Canal需要部署到linux服务器上

  1. 上传canal.deployer-1.1.2.tar.gz包到服务器的/opt/software
  2. 解压 tar -zxvf canal.deployer-1.1.2.tar.gz -C ./canal/ 目录下
  3. 配置canal.propertiesinstance.properties文件中内容
    cancal.properties
    学新通

instance.properties
学新通
4. 启动canal ./bin/startup.sh

3.2客户端连接Canal

  1. 创建一个java的maven工程
  2. pom.xml中添加如下配置
		<dependency>
			<groupId>com.alibaba.otter</groupId>
			<artifactId>canal.client</artifactId>
			<version>1.1.2</version>
		</dependency>
  1. 创建一个CanalClient.java文件
  2. 内容如下
  //连接服务端canal
        CanalConnector example = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.10.130", 11111), "example", "", "");
        //开始循环订阅数据
        while (true) {
            //开始连接
            example.connect();
            //订阅某个库或者表数据
            example.subscribe("test-canal.*");
            //获取变化数据
            Message message = example.get(100);
            //拿到数据
            List<CanalEntry.Entry> entries = message.getEntries();
            //判断是否存在数据
            if (entries.size() <= 0) {
                log.info("没有数据");
                Thread.sleep(1000L);
            } else {
            //开始遍历数据
                for (CanalEntry.Entry e : entries) {
                    String tableName = e.getHeader().getTableName();
                    CanalEntry.EntryType entryType = e.getEntryType();
                    if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {
                        ByteString storeValue = e.getStoreValue();
                        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
                        List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
                        for (CanalEntry.RowData row : rowDatasList) {
                            Map<String, Object> beforeMap = new HashMap<>(1 << 4);
                            List<CanalEntry.Column> beforeColumnsList = row.getBeforeColumnsList();
                            for (CanalEntry.Column c : beforeColumnsList) {
                                beforeMap.put(c.getName(), c.getValue());
                            }
                            Map<String, Object> afterMap = new HashMap<>(1 << 4);
                            List<CanalEntry.Column> afterColumnsList = row.getAfterColumnsList();
                            for (CanalEntry.Column c : afterColumnsList) {
                                afterMap.put(c.getName(), c.getValue());
                            }
                            log.info("表名:{},之前数据:{},之后数据:{}", tableName, JSON.toJSON(beforeMap), JSON.toJSON(afterMap));
                        }
                    }
                }
            }
        }
学新通
  1. 数据变化格式
    学新通

4 Canal使用总结

上面为一个canal的简单使用。具体详细使用可以查看Canal的gitHub项目地址

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhiacjhb
系列文章
更多 icon
同类精品
更多 icon
继续加载