Docker部署Canal+SpringBoot集成
標(biāo)簽:
SpringBoot
前言
Canal是阿里巴巴开发并开源的MySQL数据库日志解析中间件。原理是将自己伪装成MySQL从库,订阅binlog并将增量数据投递到下游消费者。
Docker镜像版本
mysql:8.0.39
canal/canal-server:v1.1.8
docker安装mysql
a.创建my.cnf文件
mkdir -p /root/docker/mysql/conf vi /root/docker/mysql/conf/my.cnf
内容如下:
[client] #设置客户端默认字符集utf8mb4 default-character-set=utf8mb4 [mysql] #设置服务器默认字符集为utf8mb4 default-character-set=utf8mb4 [mysqld] #配置服务器的服务号,具备日后需要集群做准备 server-id = 1 #开启MySQL数据库的二进制日志,用于记录用户对数据库的操作SQL语句,具备日后需要集群做准备 log-bin=mysql-bin #设置清理超过30天的日志,以免日志堆积造过多成服务器内存爆满。2592000秒等于30天的秒数 binlog_expire_logs_seconds = 2592000 #解决MySQL8.0版本GROUP BY问题 sql_mode='STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION' #允许最大的连接数 max_connections=1000 # 禁用符号链接以防止各种安全风险 symbolic-links=0 # 设置东八区时区 default-time_zone = '+8:00'
b.运行mysql镜像
docker run \ -p 3306:3306 \ --restart=always \ --name mysql \ --privileged=true \ -v /root/docker/mysql/log:/var/log/mysql \ -v /root/docker/mysql/data:/var/lib/mysql \ -v /root/docker/mysql/conf/my.cnf:/etc/mysql/my.cnf \ -e MYSQL_ROOT_PASSWORD=imooc202507 \ -d mysql:8.0.39
创建canal账户
#进入容器 docker exec -it mysql /bin/sh #root账号登录,输入密码 mysql -u root -p # 新建用户 用户名:canal 密码:canal CREATE USER canal IDENTIFIED by 'canal'; # 授权 GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; # 刷新MySQL的系统权限相关表 FLUSH PRIVILEGES;
安装canal
#启动canal,复制properties配件文件 docker run -p 11111:11111 --name canal -d canal/canal-server:v1.1.8 # /home/admin/canal-server/conf/example/instance.properties 容器中properties配置文件地址 docker cp canal:/home/admin/canal-server/conf/example/instance.properties /root/docker/canal/conf/ #修改配置 vi /root/docker/canal/conf/instance.properties
instance.properties修改内容如下:
#canal地址,替换成实际的ip和端口 canal.instance.master.address=10.35.101.139:3306 # username/password canal.instance.dbUsername=canal canal.instance.dbPassword=canal # table regex canal.instance.filter.regex=.\*\\\\..\*
#删除旧的canal docker rm canal #重新部署,添加挂载 docker run -p 11111:11111 \ --name canal \ -v /root/docker/canal/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties \ -d canal/canal-server:v1.1.8
后续修改配置后只要重启容器就行了。
Spring Boot集成
在pom.xml中添加canal依赖
<!-- Canal 客户端 --> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.8</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>jcl-over-slf4j</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </exclusion> <exclusion> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> </exclusion> <exclusion> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.protocol</artifactId> <version>1.1.8</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>jcl-over-slf4j</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </exclusion> <exclusion> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> </exclusion> <exclusion> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> </exclusion> </exclusions> </dependency> <!--Spring boot--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <version>2.7.14</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> <version>1.18.28</version> </dependency>
在application.yaml中添加配置信息
canal: server: host: 10.35.101.139 port: 11111 destination: example username: "" password: "" batch-size: 1000
配置项CanalOptions.java
import lombok.Getter; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; @Getter @Component public class CanalOptions { @Value("${canal.server.host}") private String host; @Value("${canal.server.port}") private int port; @Value("${canal.destination}") private String destination; @Value("${canal.username}") private String username; @Value("${canal.password}") private String password; @Value("${canal.batch-size}") private int batchSize; }
CanalConfig.java
import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.net.InetSocketAddress; @Configuration public class CanalConfig { private final CanalOptions canalOptions; public CanalConfig(CanalOptions canalOptions) { this.canalOptions = canalOptions; } @Bean public CanalConnector canalConnector(){ //创建连接 CanalConnector connector = CanalConnectors.newSingleConnector( new InetSocketAddress(canalOptions.getHost(),canalOptions.getPort()), canalOptions.getDestination(), canalOptions.getUsername(), canalOptions.getPassword() ); connector.connect(); connector.subscribe(".*\\..*"); return connector; } }
CanalClient.java
import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component; import demo.canal.config.CanalOptions; import java.util.List; @Slf4j @Component @RequiredArgsConstructor public class CanalClient2 implements ApplicationRunner { private final CanalConnector canalConnector; private final CanalOptions canalOptions; @Override public void run(ApplicationArguments args){ log.info("开始监听canal..."); startListening(); } public void startListening(){ new Thread(() -> { try { while (true){ Message message = canalConnector.getWithoutAck(canalOptions.getBatchSize()); long batchId = message.getId(); int size = message.getEntries().size(); if(batchId != -1 && size > 0){ processEntries(message.getEntries()); } canalConnector.ack(batchId); } } catch (Exception e) { log.error("Canal监听异常",e); throw new RuntimeException("Canal监听异常",e); //canalConnector.rollback(); } finally { canalConnector.disconnect(); } }, "canal-listener-thread").start(); } private void processEntries(List<CanalEntry.Entry> entries){ for (CanalEntry.Entry entry : entries){ //跳过事务开始/结束事件 if(entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND){ continue; } try { CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); CanalEntry.EventType eventType = rowChange.getEventType(); String tableName = entry.getHeader().getTableName(); String schemaName = entry.getHeader().getSchemaName(); long executeTime = entry.getHeader().getExecuteTime(); if(rowChange.getIsDdl()){ log.info("监听到binlog DDL => 数据库:{},操作类型:{},sql:{}",schemaName,rowChange.getEventType(),rowChange.getSql()); continue; } for (CanalEntry.RowData rowData : rowChange.getRowDatasList()){ switch (eventType){ case INSERT: log.info("监听到binlog insert => 数据库:{} 表:{},columns:{}",schemaName,tableName,rowData.getAfterColumnsList()); break; case UPDATE: log.info("监听到binlog update => 数据库:{} 表:{},columns:{}",schemaName,tableName,rowData.getAfterColumnsList()); break; case DELETE: log.info("监听到binlog update => 数据库:{} 表:{},columns:{}",schemaName,tableName,rowData.getBeforeColumnsList()); break; default: log.error("不支持的eventType:{}",eventType); break; } } } catch (Exception e){ log.error("解析binlog事件失败",e); throw new RuntimeException(e); } } } }
點(diǎn)擊查看更多內(nèi)容
為 TA 點(diǎn)贊
評論
評論
共同學(xué)習(xí),寫下你的評論
評論加載中...
作者其他優(yōu)質(zhì)文章
正在加載中
感謝您的支持,我會(huì)繼續(xù)努力的~
掃碼打賞,你說多少就多少
贊賞金額會(huì)直接到老師賬戶
支付方式
打開微信掃一掃,即可進(jìn)行掃碼打賞哦