第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號安全,請及時綁定郵箱和手機(jī)立即綁定

RocketMq原理項目實戰(zhàn):面向初學(xué)者的簡易教程

標(biāo)簽:
中間件
概述

本文将详细介绍RocketMq原理项目实战,包括RocketMQ的基本概念、开发环境搭建、核心概念讲解、基本操作教程以及集群部署与管理等内容,帮助读者全面理解并掌握RocketMQ的使用。

RocketMQ简介与环境搭建
RocketMQ的基本概念

RocketMQ是一个由阿里巴巴开发并捐赠给Apache基金会的分布式消息中间件,用于构建大规模分布式系统的异步通信和可靠消息传输。RocketMQ具有高吞吐量、低延迟和高可用性等特性,适用于大数据实时处理、日志收集和交易系统等场景。

主要特点

  • 高吞吐量:支持每秒百万条消息的吞吐量。
  • 低延迟:单条消息的延迟在毫秒级别。
  • 高可用性:支持主备部署,增强系统可靠性。
  • 消息过滤:支持多种消息过滤方式。
  • 事务消息:支持消息的事务处理。
开发环境的搭建

安装JDK

RocketMQ基于Java开发,因此需要先安装JDK。请安装最新版本的JDK,并确保JAVA_HOME环境变量已正确配置。

# 检查JDK安装
java -version

下载RocketMQ

从Apache官网下载最新版本的RocketMQ:

# 下载RocketMQ
wget https://downloads.apache.org/rocketmq/rocketmq-release-4.9.3-bin.tar.gz

# 解压RocketMQ
tar -zxvf rocketmq-release-4.9.3-bin.tar.gz
cd rocketmq-4.9.3

启动NameServer和Broker

RocketMQ的运行需要启动NameServer和Broker。NameServer主要负责集群的路由信息管理,而Broker主要负责消息的存储和转发。

# 启动NameServer
nohup sh bin/mqnamesrv &

# 启动Broker
nohup sh bin/mqbroker -n localhost:9876 & 

检查服务是否启动

通过查询端口号来检查NameServer和Broker是否成功启动:

# 检查NameServer
netstat -an | grep 9876

# 检查Broker
netstat -an | grep 10911

如果端口被监听,说明服务已经成功启动。

RocketMQ的核心概念讲解
Topic与Tag

Topic

Topic是消息分类的基本单位,类似于消息队列的概念。每个Topic可以有多个消费者(Consumer)订阅,每个Producer可以向某个Topic发送消息。

Tag

Tag是Topic下的细分分类,用于进一步区分消息的类型。不同Tag的消息可以由不同的Consumer处理。

Producer与Consumer

Producer

Producer负责将消息发送到指定的Topic。每个Producer实例可以向一个或多个Topic发送消息。

// 创建Producer实例
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");

// 配置NameServer地址
producer.setNamesrvAddr("localhost:9876");

// 启动Producer
producer.start();

// 创建消息
Message msg = new Message("TestTopic", "TagTest", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));

// 发送消息
SendResult sendResult = producer.send(msg);

// 关闭Producer
producer.shutdown();

Consumer

Consumer负责从指定的Topic订阅消息,并处理这些消息。每个Consumer实例可以订阅一个或多个Topic。

// 创建Consumer实例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");

// 配置NameServer地址
consumer.setNamesrvAddr("localhost:9876");

// 订阅Topic
consumer.subscribe("TestTopic", "*");

// 注册消息处理函数
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
    for (MessageExt msg : msgs) {
        System.out.printf("%s Receive New Messages: %s %s %n", Thread.currentThread().getName(), new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET), msg.getStoreTimestamp());
    }
    return ConsumeMsgResult.CONSUME_SUCCESS;
});

// 启动Consumer
consumer.start();

// 等待关闭
Thread.sleep(86400000);
消息模型与消息类型

消息模型

RocketMQ支持两种消息模型:顺序消息和广播消息。

  • 顺序消息:确保消息按照发送顺序消费。
  • 广播消息:每个Consumer都能接收到每条消息。

消息类型

RocketMQ支持多种消息类型,包括:

  • 普通消息:最基本的单向消息。
  • 有序消息:确保消息的发送顺序。
  • 事务消息:支持事务操作的消息。
  • 定时消息:消息在指定时间后发送。
RocketMQ的基本操作教程
发送消息

同步发送

同步发送消息时,Producer会等待消息发送成功后才继续执行。

// 创建Producer实例
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");

// 配置NameServer地址
producer.setNamesrvAddr("localhost:9876");

// 启动Producer
producer.start();

// 创建消息
Message msg = new Message("TestTopic", "TagTest", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));

// 发送消息
SendResult sendResult = producer.send(msg);

// 关闭Producer
producer.shutdown();

异步发送

异步发送消息时,Producer通过回调函数获取发送结果。

// 创建Producer实例
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");

// 配置NameServer地址
producer.setNamesrvAddr("localhost:9876");

// 启动Producer
producer.start();

// 创建消息
Message msg = new Message("TestTopic", "TagTest", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));

// 发送消息
producer.send(msg, (sendResult) -> {
    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), sendResult);
});

// 关闭Producer
producer.shutdown();
接收消息

拉取消息

Consumer通过主动拉取的方式获取消息。

// 创建Consumer实例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");

// 配置NameServer地址
consumer.setNamesrvAddr("localhost:9876");

// 订阅Topic
consumer.subscribe("TestTopic", "*");

// 注册消息处理函数
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
    for (MessageExt msg : msgs) {
        System.out.printf("%s Receive New Messages: %s %s %n", Thread.currentThread().getName(), new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET), msg.getStoreTimestamp());
    }
    return ConsumeMsgResult.CONSUME_SUCCESS;
});

// 启动Consumer
consumer.start();

// 等待关闭
Thread.sleep(86400000);

推送消息

推送消息是Consumer被动接收消息的方式。

// 创建Consumer实例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");

// 配置NameServer地址
consumer.setNamesrvAddr("localhost:9876");

// 订阅Topic
consumer.subscribe("TestTopic", "*");

// 注册消息处理函数
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
    for (MessageExt msg : msgs) {
        System.out.printf("%s Receive New Messages: %s %s %n", Thread.currentThread().getName(), new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET), msg.getStoreTimestamp());
    }
    return ConsumeMsgResult.CONSUME_SUCCESS;
});

// 启动Consumer
consumer.start();

// 等待关闭
Thread.sleep(86400000);
消息过滤与重试

消息过滤

Consumer可以通过Tag来过滤消息。

// 创建Consumer实例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");

// 配置NameServer地址
consumer.setNamesrvAddr("localhost:9876");

// 订阅带有特定Tag的消息
consumer.subscribe("TestTopic", "TagTest");

// 注册消息处理函数
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
    for (MessageExt msg : msgs) {
        System.out.printf("%s Receive New Messages: %s %s %n", Thread.currentThread().getName(), new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET), msg.getStoreTimestamp());
    }
    return ConsumeMsgResult.CONSUME_SUCCESS;
});

// 启动Consumer
consumer.start();

// 等待关闭
Thread.sleep(86400000);

消息重试

当消息消费失败时,RocketMQ会自动重试。可以通过设置重试策略来控制重试次数和间隔。

// 创建Consumer实例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");

// 配置NameServer地址
consumer.setNamesrvAddr("localhost:9876");

// 订阅Topic
consumer.subscribe("TestTopic", "*");

// 设置重试策略
consumer.setConsumeRetryMax(3);

// 注册消息处理函数
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
    for (MessageExt msg : msgs) {
        System.out.printf("%s Receive New Messages: %s %s %n", Thread.currentThread().getName(), new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET), msg.getStoreTimestamp());
    }
    return ConsumeMsgResult.CONSUME_SUCCESS;
});

// 启动Consumer
consumer.start();

// 等待关闭
Thread.sleep(86400000);
RocketMQ的集群部署与管理
集群模式下的部署

集群模式下,RocketMQ可以实现高可用和负载均衡。通过部署多个NameServer和Broker,可以提高系统的可靠性和性能。

部署NameServer

NameServer用于管理Broker的路由信息。部署步骤如下:

# 启动NameServer
nohup sh bin/mqnamesrv -n localhost:9876 &

部署Broker

Broker负责消息的存储和转发。部署多个Broker可以实现负载均衡。

# 启动Broker
nohup sh bin/mqbroker -n localhost:9876 -c broker-a.properties &
nohup sh bin/mqbroker -n localhost:9876 -c broker-b.properties &

配置Broker

Broker的配置文件中需要指定NameServer地址和其他参数。

# broker.properties
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
namesrvAddr=localhost:9876
storePathRootDir=/opt/message
storePathCommitLog=/opt/message/commitlog
NameServer与Broker的角色与配置

NameServer

NameServer主要负责管理和维护所有Broker的路由信息。每台Broker启动时都会主动向NameServer注册,并定期发送心跳。

Broker

Broker负责消息的存储和转发。每个Broker实例都有一个唯一的brokerId,可以通过配置文件进行指定。

# broker-a.properties
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
namesrvAddr=localhost:9876
storePathRootDir=/opt/message
storePathCommitLog=/opt/message/commitlog
RocketMQ实战项目案例
实战项目需求分析

假设我们需要构建一个日志收集系统,将不同服务的日志发送到RocketMQ,并由其他系统消费这些日志进行处理。

需求分析

  1. 生产者:负责从各个服务收集日志,并发送到RocketMQ。
  2. 消费者:负责从RocketMQ消费日志,并进行进一步处理。
实战项目开发步骤与技巧

步骤1:创建生产者

编写一个Java程序,作为生产者,负责收集日志并发送到RocketMQ。

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class LogProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("LogProducerGroup");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        Message msg = new Message("LogTopic", "LogTag", "Hello Log!".getBytes());
        SendResult sendResult = producer.send(msg);
        System.out.println("Message Sent: " + sendResult);
        producer.shutdown();
    }
}

步骤2:创建消费者

编写一个Java程序,作为消费者,从RocketMQ消费日志并进行处理。

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class LogConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("LogConsumerGroup");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("LogTopic", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeMsgResult consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("Received Message: " + new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET));
                }
                return ConsumeMsgResult.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}

步骤3:部署和运行

将上述生产者和消费者程序部署到相应的服务器上,并启动它们。

# 启动生产者
java -cp ./lib/* LogProducer

# 启动消费者
java -cp ./lib/* LogConsumer

步骤4:监控和维护

定期监控RocketMQ的运行状态,包括消息的发送和接收情况,及时处理异常和故障。

# 检查Broker状态
sh mqadmin brokerStatus -n localhost:9876 -b broker-a
常见问题与调试技巧
常见问题汇总
  1. 消息发送失败:检查网络连接和NameServer地址配置。
  2. 消息接收延迟:检查Consumer的配置和负载情况。
  3. 消息重复:检查消息的重试机制和消费处理函数。
调试与监控工具介绍

RocketMQ提供了多种调试工具和监控插件,帮助开发者更好地理解和维护系统。

mqadmin工具

mqadmin是一个命令行工具,可以查看和管理RocketMQ的运行状态。

# 查看NameServer状态
sh mqadmin clusterList -n localhost:9876

# 查看Broker状态
sh mqadmin brokerStatus -n localhost:9876 -b broker-a

RocketMQ Console

RocketMQ Console是一个Web界面的监控工具,可以实时查看RocketMQ的各项指标和状态。

# 启动RocketMQ Console
sh console/bin/mqadmin start

通过这些工具,可以有效地监控和调试RocketMQ系统,确保其稳定运行。

點擊查看更多內(nèi)容
TA 點贊

若覺得本文不錯,就分享一下吧!

評論

作者其他優(yōu)質(zhì)文章

正在加載中
  • 推薦
  • 評論
  • 收藏
  • 共同學(xué)習(xí),寫下你的評論
感謝您的支持,我會繼續(xù)努力的~
掃碼打賞,你說多少就多少
贊賞金額會直接到老師賬戶
支付方式
打開微信掃一掃,即可進(jìn)行掃碼打賞哦
今天注冊有機(jī)會得

100積分直接送

付費專欄免費學(xué)

大額優(yōu)惠券免費領(lǐng)

立即參與 放棄機(jī)會
微信客服

購課補貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動學(xué)習(xí)伙伴

公眾號

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號

舉報

0/150
提交
取消