Skip to content

client-adapter使用

canal 1.1.1版本之后, 增强客户端数据落地的适配及启动功能,不用开发者再手动编写代码即可实现--关系型数据库的数据同步(表对表同步)功能。目前支持功能:

  • 客户端启动器
  • 同步管理REST接口
  • 日志适配器, 作为DEMO演示
  • 关系型数据库的数据同步(表对表同步), ETL功能
  • HBase的数据同步(表对表同步), ETL功能
  • ElasticSearch多表数据同步,ETL功能
  • kudu多表数据同步,ETL功能
  • phoenix多表数据同步,ETL功能
  • clickhouse多表数据同步,ETL功能

1. 适配器介绍

client-adapter目前实现的有logger适配器、Hbase适配器、RDB适配器、ES适配器、kudu适配器、phoenix适配器、clickhouse适配器。

1.1 logger适配器

数据最简单的处理, 将收到的变更事件通过日志打印的方式进行输出。

1.2 Hbase适配器

将数据同步和导入到Hbase中。

1.3 RDB适配器

用于适配mysql到任意关系型数据库和数据分析OLAP数据库(需支持jdbc)的数据同步及导入,支持的数据库有:MySQL、Oracle、Postgress、SQLServer等。

1.4 ES适配器

目前支持Elastic Search 6、7、8的数据导入和同步。

1.5 kudu适配器

将数据同步和导入(与Hbase类似的列式存储分布式数据库)Kudu中。

1.6 phoenix适配器

支持通过phoenix将数据导入HBase中。

1.7 clickhouse适配器

将数据同步和导入(与Hbase类似的列式存储分布式数据库)clickhouse中。

2. client-adapter目录介绍

sh
[jack@hadoop102 software]$ mkdir /opt/module/canal.adapter
[jack@hadoop102 software]$ tar -xvf canal.adapter-1.1.7.tar.gz -C /opt/module/canal.adapter
[jack@hadoop102 module]$ cd canal.adapter/
[jack@hadoop102 canal.adapter]$ ll
总用量 8
drwxrwxr-x.  2 jack jack   76 4月  23 23:56 bin
drwxrwxr-x. 10 jack jack  172 10月  9 2023 conf
drwxrwxr-x.  2 jack jack 4096 4月  23 23:56 lib
drwxrwxr-x.  2 jack jack    6 10月  9 2023 logs
drwxrwxr-x.  2 jack jack 4096 10月  9 2023 plugin
  1. client-adapter是一个SpringBoot工程,配置文件在conf目录下,对外提供REST管理接口。
  2. 在plugin目录下存放适配器jar, 每个适配器包含自己所需的依赖, 原理是通过以SPI的方式让启动器动态加载。
  3. logs目录存放client-adapter的日志信息。
  4. lib目录是client-adapter的所有核心内部jar包存放位置,数据库驱动包也需要放到这里。
  5. bin目录是存放启动脚本的地方。

2. RDB适配器配置文件说明

2.1 client-adapter总配置文件

conf/application.yml为client-adapter的总配置文件

yml
# springboot的相关配置
server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null
# canal adapter的配置
canal.conf: 
  # canal-adapter的连接canal服务器方式: tcp kafka rocketMQ  
  mode: tcp 
  # 扁平message开关, 是否以json字符串形式投递数据, 仅在kafka/rocketMQ模式下有效    
  flatMessage: true      
  # 每次同步的批数量                      
  syncBatchSize: 1000   
  # 重试次数, -1为无限重试                    
  retries: 0 
  # 同步超时时间, 单位毫秒                               
  timeout:      
  # 访问canal-server服务端秘钥
  accessKey:
  # canal-adapter的REST接口秘钥
  secretKey:
  # canal-server服务端信息配置,和mode配置项对应
  consumerProperties:
    # 对应单机模式下的canal server的ip:port
    canal.tcp.server.host: 127.0.0.1:11111
    # 对应集群模式下的zk地址, 如果配置了canal.tcp.server.host, 则以canal.tcp.server.host为准
    canal.tcp.zookeeper.hosts:
    # 每次获取数据的批大小, 单位为K  
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
    # kafka
    # kafka集群地址
    kafka.bootstrap.servers: 127.0.0.1:9092
    kafka.enable.auto.commit: false
    kafka.auto.commit.interval.ms: 1000
    kafka.auto.offset.reset: latest
    kafka.request.timeout.ms: 40000
    kafka.session.timeout.ms: 30000
    kafka.isolation.level: read_committed
    kafka.max.poll.records: 1000
    # rocketMQ
    rocketmq.namespace:  
    # rocketMQ集群地址   
    rocketmq.namesrv.addr: 127.0.0.1:9876
    rocketmq.batch.size: 1000
    rocketmq.enable.message.trace: false
    rocketmq.customized.trace.topic:
    rocketmq.access.channel:
    rocketmq.subscribe.filter:
    # rabbitMQ
    # kafka集群地址
    rabbitmq.host:
    rabbitmq.virtual.host:
    rabbitmq.username:
    rabbitmq.password:
    rabbitmq.resource.ownerId:
   # 源库配置信息            
  srcDataSources:         
    # 自定义名称                 
    defaultDS:                        
      # jdbc url       
      url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true 
      # jdbc 账号  
      username: root    
      # jdbc 密码                                        
      password: 121212  
  # 适配器配置项列表                                        
  canalAdapters:      
  # canal 实例名或者 MQ topic 名                      
  - instance: example 
    # 分组列表                      
    groups:          
    # 分组id, 如果是MQ模式将用到该值                       
    - groupId: g1    
      # 分组内适配器列表                       
      outerAdapters:    
      # 日志适配器:logger, 数据库适配器:rdb, es适配器:es6/es7/es8, hbase适配器:hbase                    
      - name: rdb           
        # 指定adapter的唯一key, 与表映射配置中outerAdapterKey对应                                   
        key: oracle1                                            
        properties:
          # jdbc驱动名, 部分jdbc的jar包需要自行放致lib目录下
          jdbc.driverClassName: oracle.jdbc.OracleDriver     
          # jdbc url   
          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE    
          # jdbc username    
          jdbc.username: mytest    
          # jdbc password                             
          jdbc.password: m121212     
          # 并行执行的线程数, 默认为1                              
          threads: 5 
      - name: rdb
        key: mysql1
        properties:
          jdbc.driverClassName: com.mysql.jdbc.Driver
          jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
          jdbc.username: root
          jdbc.password: 121212
          druid.stat.enable: false
          druid.stat.slowSqlMillis: 1000

说明:

  1. 一份数据可以被多个group同时消费, 多个group之间会是一个并行执行, 一个group内部是一个串行执行多个outerAdapters, 比如例子中oracle和mysql。
  2. 目前client adapter数据订阅的方式支持两种,直连canal server或者订阅kafka/RocketMQ的消息。
  3. name属性值为rdb, 那么adapter将会自动加载conf/rdb下的所有.yml结尾的表映射配置文件

2.2 RDB表映射文件

conf/rdb文件夹下为表映射文件,默认有mytest_user.yml文件。

yml
# 源数据源的key, 对应上面配置的srcDataSources中的值
dataSourceKey: defaultDS        
# cannal的instance或者MQ的topic
destination: example      
# 对应MQ模式下的groupId, 只会同步对应groupId的数据      
groupId: g1                
# adapter key, 对应上面配置outAdapters中的key       
outerAdapterKey: oracle1        
# 是否按主键hash并行同步, 并行同步的表必须保证主键不会更改及主键不能为其他同步表的外键!!
concurrent: true                
dbMapping:
  # 源数据源的database/shcema
  database: mytest       
  # 源数据源表名      
  table: user                   
  # 目标数据源的库名.表名
  targetTable: mytest.tb_user   
  # 主键映射
  targetPk:              
    # 如果是复合主键可以换行映射多个       
    id: id                      
    # 是否整表映射, 要求源表和目标表字段名一模一样 (如果targetColumns也配置了映射,则以targetColumns配置为准)
#  mapAll: true           
  # 字段映射, 格式: 目标表字段: 源表字段, 如果字段名一样源表字段名可不填      
  targetColumns:                
    id:
    name:
    role_id:
    c_time:
    test1: 

## 直接同步Mysql库
dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: mysql1
concurrent: true
dbMapping:
  mirrorDb: true
  # 源库和目标库的schema名称,要求两库的schema要一模一样,建议相同类型数据使用该方式
  database: mytest

导入的类型以目标表的元类型为准。

3. 适配器实操(Mysql=>Mysql)

下面通过数据库对接数据库方式,直接将源数据库数据同步到目标数据库中,采用这种方式要求数据库一模一样,adapter底层没有针对不同数据库进行sql翻译,所以有一定的限制:要求数据库类型相同。如只需同步单表可参考远程加载实操

3.1 准备数据库

  1. 我已经提前安装mysql8.0.35在192.168.101.105机器上,查看mysql目前日志binlog相关信息:
sql
-- 查询binlog是否开启
show VARIABLES like '%log_bin%';

Alt text

sql
-- 查询目前binlog记录状态
show MASTER STATUS;

可以看到涉及到binlog的数据库是demo、test。 Alt text

  1. 使用docker安装mysql作为目标数据库, 目标mysql数据库为mysql5.7.44,不用开启日志。
sh
[jack@hadoop102 module]$ sudo docker pull mysql:5.7.44
5.7.44: Pulling from library/mysql
20e4dcae4c69: Pull complete 
1c56c3d4ce74: Pull complete 
e9f03a1c24ce: Pull complete 
68c3898c2015: Pull complete 
6b95a940e7b6: Pull complete 
90986bb8de6e: Pull complete 
ae71319cb779: Pull complete 
ffc89e9dfd88: Pull complete 
43d05e938198: Pull complete 
064b2d298fba: Pull complete 
df9a4d85569b: Pull complete 
Digest: sha256:4bc6bc963e6d8443453676cae56536f4b8156d78bae03c0145cbe47c2aad73bb
Status: Downloaded newer image for mysql:5.7.44
docker.io/library/mysql:5.7.44
## 查看本地所有的镜像信息,可以看到mysql镜像已经拉取下来了
[jack@hadoop102 module]$ sudo docker images -a
REPOSITORY    TAG       IMAGE ID       CREATED         SIZE
postgres      12.18     735b07d34bf1   2 months ago    419MB
mysql         5.7.44    5107333e08a8   4 months ago    501MB
hello-world   latest    d2c94e258dcb   12 months ago   13.3kB
## 启动mysql,镜像名为mysql-ods实例数据库
[jack@hadoop102 module]$ sudo docker run -itd --name mysql-ods -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 mysql:5.7.44
440b18ce65e0fd2231e6b6442c7ab01d345dc7cf863ebb7336d8a869fdfcb585
## 查看mysql进程
[jack@hadoop102 module]$ sudo docker ps
CONTAINER ID   IMAGE          COMMAND                   CREATED         STATUS         PORTS                                                  NAMES
440b18ce65e0   mysql:5.7.44   "docker-entrypoint.s…"   7 seconds ago   Up 7 seconds   0.0.0.0:3306->3306/tcp, :::3306->3306/tcp, 33060/tcp   mysql-ods
  1. 使用远程连接mysql-ods数据库 Alt text
  2. 创建和源数据库mysql中同名数据库test
sql
CREATE DATABASE `test` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';

3.2 预准备canal

  1. 需要提前启动canal-server, 我在本机127.0.0.1上面启动了canal-server。至于canal-server的配置,请参考配置canal.properties配置instance.properties
  2. 将mysql库的jdbc驱动jar包放入lib文件夹。(发现lib文件下已经有mysql驱动包)

3.3 配置相关配置文件

主要涉及application.yml和rdb/sync_test.yml文件

yml
server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: -1
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    canal.tcp.server.host: 127.0.0.1:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:

  srcDataSources:
    defaultDS:
      url: jdbc:mysql://192.168.101.105:3306/test?useUnicode=true&useSSL=false&characterEncoding=UTF-8
      username: root
      password: root
  canalAdapters:
  - instance: example
    groups:
    - groupId: g1
      outerAdapters:
      - name: rdb
        key: mysql1
        properties:
          jdbc.driverClassName: com.mysql.jdbc.Driver
          jdbc.url: jdbc:mysql://192.168.101.102:3306/test?useUnicode=true&useSSL=false&characterEncoding=UTF-8
          jdbc.username: root
          jdbc.password: 123456
          druid.stat.enable: false
          druid.stat.slowSqlMillis: 1000
yml
## Mirror schema synchronize config
dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: mysql1
concurrent: true
dbMapping:
  mirrorDb: true
  database: test

3.4 启动脚本

sh
[jack@hadoop102 canal.adapter]$ ./bin/startup.sh
[jack@hadoop102 logs]$ cd logs/adapter/
[jack@hadoop102 adapter]$ ll
总用量 4
-rw-rw-r--. 1 jack jack 3152 4月  24 00:44 adapter.log
-rw-rw-r--. 1 jack jack    0 4月  24 00:44 rocketmq_client.log
[jack@hadoop102 adapter]$ tail -f adapter.log 
2024-04-24 00:44:58.098 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## syncSwitch refreshed.
2024-04-24 00:44:58.099 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## start the canal client adapters.
2024-04-24 00:44:58.122 [main] INFO  c.a.otter.canal.client.adapter.support.ExtensionLoader - extension classpath dir: /opt/module/canal.adapter/plugin
2024-04-24 00:44:58.242 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: logger succeed
2024-04-24 00:44:58.276 [main] INFO  c.alibaba.otter.canal.connector.core.spi.ExtensionLoader - extension classpath dir: /opt/module/canal.adapter/plugin
2024-04-24 00:44:58.340 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Start adapter for canal-client mq topic: example-g1 succeed
2024-04-24 00:44:58.341 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## the canal client adapters are running now ......
2024-04-24 00:44:58.354 [Thread-2] INFO  c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Start to connect destination: example <=============
2024-04-24 00:44:58.369 [main] INFO  c.a.otter.canal.adapter.launcher.CanalAdapterApplication - Started CanalAdapterApplication in 6.694 seconds (JVM running for 8.06)
2024-04-24 00:44:58.522 [Thread-2] INFO  c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Subscribe destination: example succeed <=============

3.5 测试数据库同步

  1. 使用mysql客户端连接192.168.101.105上的mysql,创建下面t_user表
sql
create table test.t_user(
id int auto_increment,
name varchar(300),
createtime timestamp,
desc_info text,
wallet  DECIMAL(6,2), 
is_live boolean,
id_card char(18),
sex TINYINT(1),
PRIMARY KEY(id)
);

可以发现在192.168.101.102上面已经同步好了t_user Alt text

sql
-- 查看同步的表结构
desc t_user;

Alt text 2. 在源端数据库插入数据

sql
insert into t_user(id, name, createtime, desc_info, wallet, is_live, id_card, sex) VALUES
(1, 'jack', now(), 'data_test', 123.56, 1, '51018220240501001', 1)

查看mysql-ods上面,发现数据已经同步过来 Alt text

4. adapter管理REST接口

adapter运行起来后,可以通过相应restapi进行查询控制数据同步。

4.1 查看adapter所有已经订阅同步的canal instance或MQ topic

sh
[jack@hadoop102 ~]$ curl http://192.168.101.102/destinations
[{"destination":"example","status":"on"}]

4.2 数据同步开关

sh
[jack@hadoop102 ~]$ curl http://192.168.101.102/syncSwitch/example/off -X PUT
{"code":20000,"message":"实例: example 关闭同步成功"}

针对example这个canal instance/MQ topic进行开关操作. off代表关闭, instance/topic下的同步将阻塞或者断开连接不再接收数据, on代表开启

提示

如果在配置文件中配置了zookeeperHosts项, 则会使用分布式锁来控制HA中的数据同步开关, 如果是单机模式则使用本地锁来控制开关

4.3 数据同步开关状态

查看指定canal instance/MQ topic的数据同步开关状态

sh
[jack@hadoop102 ~]$ curl http://192.168.101.102/syncSwitch/example
{"stauts":"off"}

4.4 动态导入配置并启动任务

格式为:/etl/{type}/{key}/{task}/{params}, 其中type为适配器类型,key为adapter的key,key可以不传, task为配置文件名,params是where条件参数,params可以不传,为空表示全部导入。

sh
[jack@hadoop102 ~]$ curl http://192.168.101.102/etl/rdb/oracle1/mytest_user.yml -X POST
[jack@hadoop102 ~]$ curl http://192.168.101.102/etl/hbase/mytest_person2.yml -X POST

4.5 统计同步的表数目

格式为:/count/{type}/{key}/{task}, 其中type为适配器类型,key为adapter的key,key可以不传, task为配置文件名。

sh
## 目前使用的是数据库同步方式,所以不会显示具体表
[jack@hadoop102 ~]$ curl http://192.168.101.102/count/rdb/mysql1/sync_test.yml
{"targetTable":"`null`"}
[jack@hadoop102 ~]$ curl http://192.168.101.102/count/rdb/sync_test.yml
{"targetTable":"`null`"}

4.6 获取所有同步任务信息

sh
[jack@hadoop102 ~]$ curl http://192.168.101.102/destinations
[{"destination":"example","status":"on"}]

5. 远程加载配置

canal-adapter支持使用远程数据库作为配置中心,进行统一配置管理。

5.1 远程库表结构

canal_adapter库表使用和canal-admin组件(下一章会介绍✍️)同一套数据库表结构,脚本就在canal-admin压缩包的conf/canal_manager.sql中。
canal_adapter涉及到的表有canal_adapter_config、canal_config表:

表结构明细
sql
CREATE DATABASE /*!32312 IF NOT EXISTS*/ `canal_manager` /*!40100 DEFAULT CHARACTER SET utf8 COLLATE utf8_bin */;

USE `canal_manager`;

SET NAMES utf8;
SET FOREIGN_KEY_CHECKS = 0;

-- ----------------------------
-- Table structure for canal_adapter_config
-- ----------------------------
DROP TABLE IF EXISTS `canal_adapter_config`;
CREATE TABLE `canal_adapter_config` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `category` varchar(45) NOT NULL,
  `name` varchar(45) NOT NULL,
  `status` varchar(45) DEFAULT NULL,
  `content` text NOT NULL,
  `modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- ----------------------------
-- Table structure for canal_cluster
-- ----------------------------
DROP TABLE IF EXISTS `canal_cluster`;
CREATE TABLE `canal_cluster` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `name` varchar(63) NOT NULL,
  `zk_hosts` varchar(255) NOT NULL,
  `modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- ----------------------------
-- Table structure for canal_config
-- ----------------------------
DROP TABLE IF EXISTS `canal_config`;
CREATE TABLE `canal_config` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `cluster_id` bigint(20) DEFAULT NULL,
  `server_id` bigint(20) DEFAULT NULL,
  `name` varchar(45) NOT NULL,
  `status` varchar(45) DEFAULT NULL,
  `content` text NOT NULL,
  `content_md5` varchar(128) NOT NULL,
  `modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`),
  UNIQUE KEY `sid_UNIQUE` (`server_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- ----------------------------
-- Table structure for canal_instance_config
-- ----------------------------
DROP TABLE IF EXISTS `canal_instance_config`;
CREATE TABLE `canal_instance_config` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `cluster_id` bigint(20) DEFAULT NULL,
  `server_id` bigint(20) DEFAULT NULL,
  `name` varchar(45) NOT NULL,
  `status` varchar(45) DEFAULT NULL,
  `content` text NOT NULL,
  `content_md5` varchar(128) DEFAULT NULL,
  `modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`),
  UNIQUE KEY `name_UNIQUE` (`name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- ----------------------------
-- Table structure for canal_node_server
-- ----------------------------
DROP TABLE IF EXISTS `canal_node_server`;
CREATE TABLE `canal_node_server` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `cluster_id` bigint(20) DEFAULT NULL,
  `name` varchar(63) NOT NULL,
  `ip` varchar(63) NOT NULL,
  `admin_port` int(11) DEFAULT NULL,
  `tcp_port` int(11) DEFAULT NULL,
  `metric_port` int(11) DEFAULT NULL,
  `status` varchar(45) DEFAULT NULL,
  `modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- ----------------------------
-- Table structure for canal_user
-- ----------------------------
DROP TABLE IF EXISTS `canal_user`;
CREATE TABLE `canal_user` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `username` varchar(31) NOT NULL,
  `password` varchar(128) NOT NULL,
  `name` varchar(31) NOT NULL,
  `roles` varchar(31) NOT NULL,
  `introduction` varchar(255) DEFAULT NULL,
  `avatar` varchar(255) DEFAULT NULL,
  `creation_date` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

SET FOREIGN_KEY_CHECKS = 1;

-- ----------------------------
-- Records of canal_user
-- ----------------------------
BEGIN;
INSERT INTO `canal_user` VALUES (1, 'admin', '6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9', 'Canal Manager', 'admin', NULL, NULL, '2019-07-14 00:05:28');
COMMIT;

SET FOREIGN_KEY_CHECKS = 1;

5.2 初始化数据

将application.yml、库表映射配置文件内容手动录入到canal_config、canal_adapter_config表中。

  1. canal_config表初始化
    canal_config表id=2的数据对应adapter下的application.yml文件。其中name字段为文件名, content字段为文件内容,modified_time字段为时间字段,content_md5字段可以随机填。

为何application.yml数据在表中id=2

可以参考canal源码:DbRemoteConfigLoader的第95行,代码中id=2就写死了的😵

  1. canal_adapter_config表初始化
    该表对应每个adapter的子配置文件。比如application.yml文件配置rdb适配器,那么子配置文件就是rdb目录下的所有yml文件。canal_adapter_config表中category字段为适配器类型,name字段为文件名字,content字段为文件内容, 每次修改之后生效需要修改一下修改时间。

5.3 修改bootstrap.yml

将cannal-manager数据库地址相关信息配置到里面,然后将本地application.yml文件和其他子配置文件删除或清空,启动工程将自动从远程加载配置。

6. 远程加载实操

下面通过192.168.101.105的test.t_user表数据同步到192.168.101.102的mytest.t_user为例进行操作。两张表的表结构信息如下:
Alt text

6.1 初始化数据

在192.168.101.105上执行canal_manager.sql脚本如下图所示。 Alt text application.yml和目录rdb/sync_t_user.yml配置信息如下:

yml
server:
  port: 8088
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp
  flatMessage: true
  syncBatchSize: 1000
  retries: -1
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    canal.tcp.server.host: 127.0.0.1:11111
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:

  srcDataSources:
    defaultDS:
      url: jdbc:mysql://192.168.101.105:3306/test?useUnicode=true&useSSL=false
      username: root
      password: root
  canalAdapters:
  - instance: example
    groups:
    - groupId: g1
      outerAdapters:
      - name: rdb
        key: mysql1
        properties:
          jdbc.driverClassName: com.mysql.jdbc.Driver
          jdbc.url: jdbc:mysql://192.168.101.102:3306/mytest?useUnicode=true&useSSL=false
          jdbc.username: root
          jdbc.password: 123456
          druid.stat.enable: false
          druid.stat.slowSqlMillis: 1000
yml
dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: mysql1
concurrent: true
dbMapping:
  database: test
  table: t_user
  targetTable: t_user
  targetPk:
    id: id
  targetColumns:   # 字段映射, 格式: 目标表字段: 源表字段, 如果字段名一样源表字段名可不填
    id: id
    name2: name
    createtime:
    desc_info:
    is_live:
    sex2: sex
  commitBatch: 300

将其分别初始化到表中:
Alt textAlt text

6.2 修改bootstrap.yml

修改bootstrap.yml如下:

yml
canal:
  manager:
    jdbc:
      url: jdbc:mysql://192.168.101.102:3306/canal_manager?useUnicode=true&characterEncoding=UTF-8&useSSL=false
      username: root
      password: 123456

6.3 启动canal-server

  1. 需要提前启动canal-server, 我在本机127.0.0.1上面启动了canal-server。至于canal-server的配置,请参考配置canal.properties配置instance.properties
  2. 将mysql库的jdbc驱动jar包放入lib文件夹。(发现lib文件下已经有mysql驱动包)

6.3 修改start.sh

在实操过程中发现,canal-adapter并不会去加载数据库中的配置,也不会下载到本地生成yml文件,同时启动报错,报错各种配置信息没拿到导致空指针, 按照官网说明步骤执行不下去。

  • 原因分析: canal-adapter并没有去读取bootstrap.yml文件,在canal最新版(v1.1.7)中,spring-cloud-context依赖组件版本为3.0.6版本,spring-boot版本2.5.4。而springboot2.4版本之后配置文件加载方式进行了重构。另外也有配置的默认值变化,比如spring.cloud.bootstrap.enabled属性由原来的true变为了false。
  • 解决办法:在启动脚本start.sh添加参数:-Dspring.cloud.bootstrap.enabled=true
sh
[jack@hadoop102 ~]$ cd /opt/module/canal-adapter/bin
[jack@hadoop102 bin]$ vi startup.sh
# 在83行添加参数:    
ADAPTER_OPTS="-Dspring.cloud.bootstrap.enabled=true -DappName=canal-adapter"

6.4 启动start.sh

  1. 将本地application.yml文件和其他子配置文件删除或清空。
sh
mi@JieBaBa-PC UCRT64 /d/canal/canal.adapter-1.1.7/conf
$ rm -rf application.yml es6 es7 es8 hbase/ kudu/ rdb/ tablestore/

mi@JieBaBa-PC UCRT64 /d/canal/canal.adapter-1.1.7/conf
$ ll
总用量 12
drwxr-xr-x 1 mi None    0  5月 18 10:15 .
drwxr-xr-x 1 mi None    0  5月 16 07:18 ..
-rw-r--r-- 1 mi None 3106 10月  9  2023 logback.xml
drwxr-xr-x 1 mi None    0  4月 23 07:14 META-INF
  1. 执行start.sh启动
sh
mi@JieBaBa-PC UCRT64 /d/canal/canal.adapter-1.1.7/bin
$ sh startup.sh
  1. 查看启动日志,可以看到开始监听example:
    Alt text

6.5 测试数据同步

  1. 插入数据测试 Alt text 查询mytest.t_user表数据 Alt text 由于同步没有配置wallet、id_card字段,所以同步过来是null,特别的sex同步过来为sex2字段数据为1,源端test表中为2,那是因为tinyint(1)类型数据,会被转化为boolean类型,而boolean类型在表中只显示0或1,tinyint(1)类型数据若为0则为false显示0,若非0则为true都显示1。所以这里同步过来的数据显示为2,以下为MySQL官网的原话:

BOOL, BOOLEAN
These types are synonyms for TINYINT(1). A value of zero is considered false. Nonzero values are considered true

如果想要让tinyint(1)转化为int而不是boolean类型,可以在jdbcUrl内指定属性tinyInt1isBitfalse即可。
2. 更新数据测试
Alt text 查询mytest.t_user表数据 Alt text 查看cancal-adapter日志:
Alt text

6.5 远程加载配置总结

  1. 如果使用多个canal-adapter进行接收数据,需要使用各自canal_manager的库表,可以避免id=2的冲突。
  2. 容易和canal-admin配置管理组件(下一节会介绍它)的数据信息冲突,下面是canal_config表结构,主键id是自增长的,id=2需要给canal-adapter预留。
    Alt text
  3. 不一定非要使用远程加载配置功能特性!!!按照目前发展以后很可能canal-adapter会被官方集成到canal-admin中的。但是目前canal-admin还没有提供相关adapter的功能,只是有了相关的表,并没有可视化页面提供可用进行操作,没有页面那么adapter配置到表中和之前使用配置文件方式工作量并没有简化多少🤣,所以不推荐远程加载配置方式。
  4. 使用中发现,由于配置信息都集中在canal_config表、canal_adapter_config表一个字段叫content的大文本类型字段中存放,相比配置文件方式不是很直观能够查看和修改配置信息👎。