第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號安全,請及時綁定郵箱和手機立即綁定

RocketMQ安裝指南:從零開始搭建消息隊列系統(tǒng)

標簽:
雜七雜八
简介

RocketMQ 是阿里云开发的一款高性能消息队列,它采用分布式架构、支持高并发、高可用、以及事务消息、消息轨迹等功能,广泛应用于分布式系统中。在微服务架构中,RocketMQ 作为消息中间件,可以帮助实现服务间的解耦、异步通信、消息可靠传输等需求。

环境准备

在开始安装 RocketMQ 之前,确保您的计算机满足以下基本系统要求:

  • 操作系统: Linux、Windows 或 macOS。
  • JDK: 安装最新版本的 JDK(建议使用 8 或以上版本)。
  • 构建工具: Maven 或 Gradle(可选)。

安装依赖环境

  1. JDK: 访问 Oracle 官网下载适合您操作系统的 JDK,并按照官方指南进行安装。
  2. Maven: 访问 Apache Maven 官网下载最新版本的 Maven,并添加到系统环境变量中。

安装完毕后,通过终端验证安装情况:

# 验证 JDK
java -version
# 验证 Maven
mvn -version
下载和管理RocketMQ

为了顺利进行 RocketMQ 的安装及管理,您需要从 RocketMQ 官方网站下载合适操作系统的版本,并自行配置存放目录和初始化参数。

从官方网站下载RocketMQ

访问 RocketMQ 官方网站(http://rocketmq.apache.org/zh/),下载最新稳定版本的 RocketMQ 并安装。

配置RocketMQ的存放目录和初始化参数

安装 RocketMQ 时,需要设置特定的目录和参数:

  • logs: 日志文件存放目录。
  • tmp: 临时文件存放目录。
  • data: 消息数据存放目录。

初始化参数通常包括:

  • -d--dataPath:指定 data 目录。
  • -t--tmpPath:指定 tmp 目录。
  • -l--logPath:指定日志文件存放目录。

使用如下命令安装 RocketMQ(以 Linux 为例):

# 创建目录
mkdir -p /data/rocketmq/logs /data/rocketmq/data /data/rocketmq/tmp

# 运行安装脚本(通常在下载的 RocketMQ 文件夹中)
bin/start.sh -n localhost -c /data/rocketmq/conf/rocketmq.conf

启动控制台和管理界面

通常情况下,RocketMQ 会自带控制台和管理界面,无需额外安装。通过访问以下地址查看管理界面:

http://localhost:9876/

确保服务器可以访问该端口。

基础配置与验证

配置服务器通信和端口

为了确保 RocketMQ 能在集群中正常通信,配置服务器间的通信参数,包括:

  • 服务器地址:指明集群中的其他服务器地址。
  • 端口:通常使用 9876 作为控制台和管理界面的端口。

在配置文件 rocketmq.conf 中添加如下参数:

# 配置服务器间的通信参数
nameServer = 127.0.0.1:9876

# 配置端口
nameServerPort = 9876

# 配置日志级别
log4j.rootLogger = "INFO, stdout"

# 配置日志输出
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = %d{ABSOLUTE} %5p %c{1}:%L - %m%n

创建生产者和消费者实例

生产者(Producer)和消费者(Consumer)是 RocketMQ 的核心组件,分别用于消息的发送与接收。

生产者实例创建

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;

import java.util.Properties;

public class Producer {
    public static void main(String[] args) throws Exception {
        // 创建生产者实例
        Properties config = new Properties();
        config.put("producerGroup", "GROUP_1");
        DefaultMQProducer producer = new DefaultMQProducer("GROUP_1", config);

        // 启动生产者
        producer.start();

        // 消息发送
        String msg = "Hello RocketMQ!";
        SendResult result = producer.send(msg.getBytes());

        // 关闭生产者
        producer.shutdown();
    }
}

消费者实例创建

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;

import java.util.List;
import java.util.Properties;

public class Consumer {
    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put("consumerGroup", "GROUP_2");
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GROUP_2", props);

        // 设置名服务器地址
        consumer.setNamesrvAddr("127.0.0.1:9876");

        // 从历史消息队列开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

        // 设置消息监听器
        consumer.subscribe("TopicTest", "*");
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            for (MessageExt msg : msgs) {
                context.setConsumeResult(ConsumeConcurrentlyStatus.CONSUME_SUCCESS);
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });

        // 启动消费者
        consumer.start();
        System.out.println("Consumer started.");
    }
}

基本消息发送与接收测试

为了验证生产者和消费者是否正确工作,可以通过控制台发送消息并观察消费者接收情况。

生产者:

java -jar path/to/your/producer-application.jar

消费者:

java -jar path/to/your/consumer-application.jar

使用控制台发送消息:

curl -X POST -H "Content-Type: application/json" -d "{\"message\": \"Hello from the console!\"}" http://localhost:9876/admin/message/admin/getMessage?topic=TopicTest&consumerGroup=GROUP_2

观察消费者接收消息:

通过控制台或终端观察消费者是否正确接收到消息。

实践应用

微服务架构集成实践

在微服务架构中集成 RocketMQ 可以实现服务之间的异步通信和解耦。以下是在微服务架构中集成 RocketMQ 的一个简单示例:

Producer Service:使用 RocketMQ 发送消息。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.core.RocketMQTopic;

@Service
public class NotificationProducer {
    private final RabbitTemplate rabbitTemplate;
    private final RocketMQTemplate rocketMQTemplate;

    @Value("${queue.name}")
    private String queueName;

    @Autowired
    public NotificationProducer(RabbitTemplate rabbitTemplate, RocketMQTemplate rocketMQTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        this.rocketMQTemplate = rocketMQTemplate;
    }

    public void sendNotification(String message) {
        // 发送消息到 RabbitMQ
        rabbitTemplate.convertAndSend(queueName, message);

        // 发送消息到 RocketMQ
        rocketMQTemplate.convertAndSend(RocketMQTopic.TOPIC_NOTIFICATION, message);
    }
}

Consumer Service:监听 RocketMQ 队列接收消息。

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Component
@RocketMQMessageListener(consumerGroup = "GROUP", topics = "Topic.Notification")
public class NotificationConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println("Received notification: " + message);
    }
}

通过上述示例,展示了如何在微服务架构中集成 RocketMQ,实现消息的异步通信和解耦,从而提升系统的稳定性和可扩展性。

总结

遵循上述步骤和提供的代码示例,您可以顺利安装并配置 RocketMQ 消息队列系统,为您的分布式应用提供高效、可靠的消息传输服务。通过合理配置和应用 RocketMQ,不仅能够优化服务间的通信,还能够提高系统的稳定性和响应效率,满足高并发、高可用的分布式环境需求。

點擊查看更多內(nèi)容
TA 點贊

若覺得本文不錯,就分享一下吧!

評論

作者其他優(yōu)質(zhì)文章

正在加載中
  • 推薦
  • 評論
  • 收藏
  • 共同學(xué)習(xí),寫下你的評論
感謝您的支持,我會繼續(xù)努力的~
掃碼打賞,你說多少就多少
贊賞金額會直接到老師賬戶
支付方式
打開微信掃一掃,即可進行掃碼打賞哦
今天注冊有機會得

100積分直接送

付費專欄免費學(xué)

大額優(yōu)惠券免費領(lǐng)

立即參與 放棄機會
微信客服

購課補貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動學(xué)習(xí)伙伴

公眾號

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號

舉報

0/150
提交
取消