第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號安全,請及時(shí)綁定郵箱和手機(jī)立即綁定

Docker部署Canal+SpringBoot集成

標(biāo)簽:
SpringBoot
  • 前言

      Canal是阿里巴巴开发并开源的MySQL数据库日志解析中间件。原理是将自己伪装成MySQL从库,订阅binlog并将增量数据投递到下游消费者。

  • Docker镜像版本

    mysql:8.0.39

    canal/canal-server:v1.1.8

  • docker安装mysql

    a.创建my.cnf文件

mkdir -p /root/docker/mysql/conf
vi /root/docker/mysql/conf/my.cnf

        内容如下:

[client]
#设置客户端默认字符集utf8mb4
default-character-set=utf8mb4
[mysql]
#设置服务器默认字符集为utf8mb4
default-character-set=utf8mb4
[mysqld]
#配置服务器的服务号,具备日后需要集群做准备
server-id = 1
#开启MySQL数据库的二进制日志,用于记录用户对数据库的操作SQL语句,具备日后需要集群做准备
log-bin=mysql-bin
#设置清理超过30天的日志,以免日志堆积造过多成服务器内存爆满。2592000秒等于30天的秒数
binlog_expire_logs_seconds = 2592000
#解决MySQL8.0版本GROUP BY问题
sql_mode='STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION'
#允许最大的连接数
max_connections=1000
# 禁用符号链接以防止各种安全风险
symbolic-links=0
# 设置东八区时区
default-time_zone = '+8:00'

        b.运行mysql镜像

docker run \
-p 3306:3306 \
--restart=always \
--name mysql \
--privileged=true \
-v /root/docker/mysql/log:/var/log/mysql \
-v /root/docker/mysql/data:/var/lib/mysql \
-v /root/docker/mysql/conf/my.cnf:/etc/mysql/my.cnf \
-e MYSQL_ROOT_PASSWORD=imooc202507 \
-d mysql:8.0.39

https://img1.sycdn.imooc.com/f0b49b6809819b8304990166.jpg

  • 创建canal账户

#进入容器
docker exec -it mysql /bin/sh

#root账号登录,输入密码
mysql -u root -p

# 新建用户 用户名:canal  密码:canal 
CREATE USER canal IDENTIFIED by 'canal';

# 授权
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';

# 刷新MySQL的系统权限相关表
FLUSH PRIVILEGES;

https://img1.sycdn.imooc.com/17bac56809819da105930332.jpg

  • 安装canal

#启动canal,复制properties配件文件
docker run -p 11111:11111 --name canal -d canal/canal-server:v1.1.8

# /home/admin/canal-server/conf/example/instance.properties 容器中properties配置文件地址
docker cp canal:/home/admin/canal-server/conf/example/instance.properties  /root/docker/canal/conf/

#修改配置
vi /root/docker/canal/conf/instance.properties

        instance.properties修改内容如下:

#canal地址,替换成实际的ip和端口
canal.instance.master.address=10.35.101.139:3306

# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal

# table regex
canal.instance.filter.regex=.\*\\\\..\*

      https://img1.sycdn.imooc.com/09c0d86809819e8811580846.jpg

#删除旧的canal
docker rm canal

#重新部署,添加挂载
docker run -p 11111:11111 \
--name canal \
-v /root/docker/canal/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties \
-d canal/canal-server:v1.1.8

      后续修改配置后只要重启容器就行了。

  • Spring Boot集成

    在pom.xml中添加canal依赖

<!-- Canal 客户端 -->
<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.8</version>
    <exclusions>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>jcl-over-slf4j</artifactId>
        </exclusion>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
        </exclusion>
        <exclusion>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
        </exclusion>
        <exclusion>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.protocol</artifactId>
    <version>1.1.8</version>
    <exclusions>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>jcl-over-slf4j</artifactId>
        </exclusion>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
        </exclusion>
        <exclusion>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
        </exclusion>
        <exclusion>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
        </exclusion>
    </exclusions>
</dependency>

<!--Spring boot-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    <version>2.7.14</version>
</dependency>

<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
    <version>1.18.28</version>
</dependency>

      在application.yaml中添加配置信息

canal:
  server:
    host: 10.35.101.139
    port: 11111
  destination: example
  username: ""
  password: ""
  batch-size: 1000

      配置项CanalOptions.java

import lombok.Getter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Getter
@Component
public class CanalOptions
{
    @Value("${canal.server.host}")
    private String host;
    @Value("${canal.server.port}")
    private int port;
    @Value("${canal.destination}")
    private String destination;
    @Value("${canal.username}")
    private String username;
    @Value("${canal.password}")
    private String password;
    @Value("${canal.batch-size}")
    private int batchSize;
}

    CanalConfig.java

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.net.InetSocketAddress;

@Configuration
public class CanalConfig
{

    private final CanalOptions canalOptions;

    public CanalConfig(CanalOptions canalOptions)
    {
        this.canalOptions = canalOptions;
    }

    @Bean
    public CanalConnector canalConnector(){

        //创建连接
        CanalConnector connector = CanalConnectors.newSingleConnector(
                new InetSocketAddress(canalOptions.getHost(),canalOptions.getPort()),
                canalOptions.getDestination(),
                canalOptions.getUsername(),
                canalOptions.getPassword()
        );

        connector.connect();
        connector.subscribe(".*\\..*");

        return connector;
    }
}

      CanalClient.java

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import demo.canal.config.CanalOptions;

import java.util.List;

@Slf4j
@Component
@RequiredArgsConstructor
public class CanalClient2 implements ApplicationRunner {

    private final CanalConnector canalConnector;
    private final CanalOptions canalOptions;

    @Override
    public void run(ApplicationArguments args){

        log.info("开始监听canal...");
        startListening();
    }

    public void startListening(){
        new Thread(() -> {
            try {
                while (true){

                    Message message = canalConnector.getWithoutAck(canalOptions.getBatchSize());
                    long batchId = message.getId();
                    int size = message.getEntries().size();

                    if(batchId != -1 && size > 0){
                        processEntries(message.getEntries());
                    }

                    canalConnector.ack(batchId);

                }
            } catch (Exception e) {

                log.error("Canal监听异常",e);
                throw new RuntimeException("Canal监听异常",e);
                //canalConnector.rollback();
            } finally {
                canalConnector.disconnect();
            }

        }, "canal-listener-thread").start();
    }

    private void processEntries(List<CanalEntry.Entry> entries){
        for (CanalEntry.Entry entry : entries){

            //跳过事务开始/结束事件
            if(entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN ||
                    entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND){
                continue;
            }

            try {
                CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());

                CanalEntry.EventType eventType = rowChange.getEventType();
                String tableName = entry.getHeader().getTableName();
                String schemaName = entry.getHeader().getSchemaName();
                long executeTime = entry.getHeader().getExecuteTime();


                if(rowChange.getIsDdl()){
                    log.info("监听到binlog DDL => 数据库:{},操作类型:{},sql:{}",schemaName,rowChange.getEventType(),rowChange.getSql());

                    continue;
                }

                for (CanalEntry.RowData rowData : rowChange.getRowDatasList()){
                    switch (eventType){
                        case INSERT:

                            log.info("监听到binlog insert => 数据库:{} 表:{},columns:{}",schemaName,tableName,rowData.getAfterColumnsList());

                            break;
                        case UPDATE:

                            log.info("监听到binlog update => 数据库:{} 表:{},columns:{}",schemaName,tableName,rowData.getAfterColumnsList());
                            break;
                        case DELETE:

                            log.info("监听到binlog update => 数据库:{} 表:{},columns:{}",schemaName,tableName,rowData.getBeforeColumnsList());
                            break;
                        default:
                            log.error("不支持的eventType:{}",eventType);
                            break;
                    }
                }

            } catch (Exception e){

                log.error("解析binlog事件失败",e);

                throw new RuntimeException(e);
            }
        }
    }

}


點(diǎn)擊查看更多內(nèi)容
TA 點(diǎn)贊

若覺得本文不錯(cuò),就分享一下吧!

評論

作者其他優(yōu)質(zhì)文章

正在加載中
  • 推薦
  • 評論
  • 收藏
  • 共同學(xué)習(xí),寫下你的評論
感謝您的支持,我會(huì)繼續(xù)努力的~
掃碼打賞,你說多少就多少
贊賞金額會(huì)直接到老師賬戶
支付方式
打開微信掃一掃,即可進(jìn)行掃碼打賞哦
今天注冊有機(jī)會(huì)得

100積分直接送

付費(fèi)專欄免費(fèi)學(xué)

大額優(yōu)惠券免費(fèi)領(lǐng)

立即參與 放棄機(jī)會(huì)
微信客服

購課補(bǔ)貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動(dòng)學(xué)習(xí)伙伴

公眾號

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號

舉報(bào)

0/150
提交
取消