第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號(hào)安全,請(qǐng)及時(shí)綁定郵箱和手機(jī)立即綁定

架構(gòu)設(shè)計(jì)之NodeJS操作消息隊(duì)列RabbitMQ

標(biāo)簽:
Node.js

一. 什么是消息队列?

消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。

消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。

二. 常用的消息队列有哪些?

RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq。

甚至现在部分NoSQL也可做消息队列,如Redis。

三. 消息队列的使用场景?

  • 异步处理

  • 应用解耦

  • 流量削峰

四. 使用案例

上规模的公司都会有自己的日志分析系统,日志系统是怎么实现的呢?

 

https://img1.sycdn.imooc.com//5b48c420000195f606710081.jpg

图解:用户在访问应用的时候,我们要记录下用户的操作记录和系统的异常日志,常规的做法是将系统产生的日志保存到服务器磁盘,在服务器中开启定时任务,定时将磁盘的日志信息传入mq中(生产者),也定时将mq中的消息取出并存到相应的数据库,如ElasticSearch或Hive中。

五. 如何安装RabbitMQ?

上面的案例介绍了MQ的一个使用场景,我这里是用RabbitMQ举例,现实项目中可能用到的是Kafka。

  1. 首先安装brew(mac为例)

    ?

    1/usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"

  2. 安装RabbitMQ

    brew install rabbitmq
  3. 运行RabbitMQ

    进入到 /usr/local/Cellar/rabbitmq/3.7.7,执行

    ?

    1sbin/rabbitmq-server

  4. 启动插件

    进入到 /usr/local/Cellar/rabbitmq/3.7.7/sbin

    ?

    1./rabbitmq-plugins enable rabbitmq_management

  5. 登陆管理界面

    打开浏览器输入:http://localhost:15672,RabbitMQ默认15672端口六. Nodejs操作RabbitMQ

     https://img1.sycdn.imooc.com//5b48c42c0001669b19200519.jpg

 

网上可以找到好几个相应的Node SDK,这里推荐amqplib

1. 生产者

复制代码

/**
 * 对RabbitMQ的封装 */let amqp = require('amqplib');

class RabbitMQ {
    constructor() {        this.hosts = [];        this.index = 0;        this.length = this.hosts.length;        this.open = amqp.connect(this.hosts[this.index]);
    }
    sendQueueMsg(queueName, msg, errCallBack) {
        let self = this;

        self.open
            .then(function (conn) {                return conn.createChannel();
            })
            .then(function (channel) {                return channel.assertQueue(queueName).then(function (ok) {                    return channel.sendToQueue(queueName, new Buffer(msg), {
                        persistent: true
                    });
                })
                    .then(function (data) {                        if (data) {
                            errCallBack && errCallBack("success");
                            channel.close();
                        }
                    })
                    .catch(function () {
                        setTimeout(() => {                            if (channel) {
                                channel.close();
                            }
                        }, 500)
                    });
            })
            .catch(function () {
                let num = self.index++;                if (num <= self.length - 1) {
                    self.open = amqp.connect(self.hosts[num]);
                } else {
                    self.index == 0;
                }
            });
    }
}

复制代码

 

2. 消费者

复制代码

/**
 * 对RabbitMQ的封装 */let amqp = require('amqplib');

class RabbitMQ {
    constructor() {        this.open = amqp.connect(this.hosts[this.index]);
    }
    receiveQueueMsg(queueName, receiveCallBack, errCallBack) {
        let self = this;

        self.open
            .then(function (conn) {                return conn.createChannel();
            })
            .then(function (channel) {                return channel.assertQueue(queueName)
                    .then(function (ok) {                        return channel.consume(queueName, function (msg) {                            if (msg !== null) {
                                let data = msg.content.toString();
                                channel.ack(msg);
                                receiveCallBack && receiveCallBack(data);
                            }
                        })
                            .finally(function () {
                                setTimeout(() => {                                    if (channel) {
                                        channel.close();
                                    }
                                }, 500)
                            });
                    })
            })
            .catch(function () {
                let num = self.index++;                if (num <= self.length - 1) {
                    self.open = amqp.connect(self.hosts[num]);
                } else {
                    self.index = 0;
                    self.open = amqp.connect(self.hosts[0]);
                }
            });
    }

复制代码

3. 通过生产者向MQ发送一个消息,并创建队列

let mq = new RabbitMQ();
mq.sendQueueMsg('testQueue', 'my first message', (error) => {
    console.log(error)
})

执行之后,我们打开管理平台,发现RabbbitMQ已经接受到了一条消息:

https://img1.sycdn.imooc.com//5b48c47d0001cf3912340684.jpg

并且RabbbitMQ新增了一个队列testQueue

https://img1.sycdn.imooc.com//5b48c486000153ae14200780.jpg

4. 获取指定队列的消息

let mq = new RabbitMQ();
mq.receiveQueueMsg('testQueue',(msg) => {    
   console.log(msg)
})// 输出结果:my first message复制代码

此时打开RabbitMQ管理平台,消息数量已经变为0

https://img1.sycdn.imooc.com//5b48c48e00015a7412780686.jpg

综上:我们简单讲述了消息队列及RabbitMQ相关的一些知识,以及我们如何通过nodejs来生产与消费消息,上面讲的比较简单,之后会发表更多文章讲述消息队列集群搭建及容灾的实现。

原文出处

點(diǎn)擊查看更多內(nèi)容
TA 點(diǎn)贊

若覺(jué)得本文不錯(cuò),就分享一下吧!

評(píng)論

作者其他優(yōu)質(zhì)文章

正在加載中
  • 推薦
  • 評(píng)論
  • 收藏
  • 共同學(xué)習(xí),寫(xiě)下你的評(píng)論
感謝您的支持,我會(huì)繼續(xù)努力的~
掃碼打賞,你說(shuō)多少就多少
贊賞金額會(huì)直接到老師賬戶
支付方式
打開(kāi)微信掃一掃,即可進(jìn)行掃碼打賞哦
今天注冊(cè)有機(jī)會(huì)得

100積分直接送

付費(fèi)專欄免費(fèi)學(xué)

大額優(yōu)惠券免費(fèi)領(lǐng)

立即參與 放棄機(jī)會(huì)
微信客服

購(gòu)課補(bǔ)貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動(dòng)學(xué)習(xí)伙伴

公眾號(hào)

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號(hào)

舉報(bào)

0/150
提交
取消