-
消息確認模式,消息發(fā)出去之后異步的等待。
查看全部 -
默認的主機是/
也可以自己建虛擬主機,回頭看看是怎么用的。
查看全部 -
rabbitmq啟動它擁有的一些插件
查看全部 -
整個安裝過程
查看全部 -
第一步erlan那一個
第二步socat那一個
第三步 rabbitmq那一個
查看全部 -
消息可靠性圖
查看全部 -
消息投靠方案設(shè)計
查看全部 -
9llojjl9llljlll lj5 lll laaaaaa aaaa lcl l jl lll查看全部
-
消息投遞成功設(shè)計方案:
查看全部 -
AMQP核心概念
Server:又稱Broker,接受客戶端的連接,實現(xiàn)AMQP實體服務(wù)
Connection:連接,應(yīng)用程序與Broker的網(wǎng)絡(luò)連接
Chnnel:網(wǎng)絡(luò)通道,幾乎所有的操作都在Channel中進行,Channel是實進行消息讀寫的通道??蛻舳丝梢越⒍鄠€Channel,每個Channel代表一個會話任務(wù)。
Message:消息。服務(wù)器和應(yīng)用程序之間傳送的數(shù)據(jù),由Properties和Body組成。Properties可以對消息進行修飾,比如消息的優(yōu)先級、延遲等高級特性;Body則就是消息體內(nèi)容。
Virtual host:虛擬地址,用于進行邏輯隔離,最上層的消息路由。一個Virtual Host里面可以有若干個Exchange和Queue,同一個Virtual Host里面不能有相同名稱的Exchage或Queue。
Exchange:交換機,接收消息,根據(jù)路由鍵轉(zhuǎn)發(fā)消息到綁定的隊列。
Binding:Exchange和Queue之間的虛擬連接,binding中可以包含 routing key。
Routing key:一個路由規(guī)則,虛擬機可用它來確定如何路由一個特定消息。
Queue:也稱為Message Queue ,消息隊列,保持消息并將它們轉(zhuǎn)發(fā)給消費者。
查看全部 -
RabbitMQ簡介及AMPQ協(xié)議
RabbitMQ是一個開源的消息代理和隊列服務(wù)器,用來通過普通協(xié)議在完全不同的應(yīng)用之間共享數(shù)據(jù),RabbitMQ 是使用Erlang 語言來編寫的,并且RabbitMQ是基于AMQP協(xié)議的。
RabbitMQ底層采用Erlang 語言進行編寫
開源、性能優(yōu)秀,穩(wěn)定性保障
與SpringAMQP完美的整合、API豐富
集群模式豐富,表達式配置,HA模式,鏡像隊列模型
保證數(shù)據(jù)不丟失的前提做到高可靠性、可用性
AMQP全程:Advanced Message Queuing Protocol
AMQP翻譯:高級消息隊列協(xié)議
AMQP協(xié)議模型
查看全部 -
package?com.mfl.demo.utils; import?com.mfl.demo.entity.Order; import?com.rabbitmq.client.Channel; import?org.springframework.amqp.rabbit.annotation.*; import?org.springframework.amqp.support.AmqpHeaders; import?org.springframework.messaging.handler.annotation.Headers; import?org.springframework.messaging.handler.annotation.Payload; import?org.springframework.stereotype.Component; import?java.util.Map; @Component public?class?MessageConsumerUtils?{ ????@RabbitListener(bindings?=?@QueueBinding(value?=?@Queue(value?=?"mfl-1-queue",durable?=?"true"), ?????????????????????????????????????????????exchange?=?@Exchange(name?=?"mfl-1-exchange",?type?=?"topic"), ??????????????????????????????????????????????key?=?"order.*" ????)) ????@RabbitHandler ????//Payload?指定消息體是order ????public?void?receive(@Payload?Order?order,?@Headers?Map<String,Object>?headers,?Channel?channel)?throws?Exception{ ????????System.out.println("-------收到消息,開始消費---------"); ????????System.out.println(order.getName()); ????????Long?deliveryTag?=?(Long)?headers.get(AmqpHeaders.DELIVERY_TAG); //????????deliveryTag:該消息的index //????????multiple:是否批量.true:將一次性ack所有小于deliveryTag的消息。 ????????channel.basicAck(deliveryTag,false); ????} }
配置文件:
##?springboot整合rabbitmq基本配置 spring.rabbitmq.addresses=192.168.254.135:5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/ spring.rabbitmq.connection-timeout=15000 ##?springboot整合rabbitmq消費端配置 #線程數(shù) spring.rabbitmq.listener.simple.concurrency=5 #簽收模式manual:手工簽收??auto自動簽收?none不簽收 spring.rabbitmq.listener.simple.acknowledge-mode=manual #最大線程數(shù) spring.rabbitmq.listener.simple.max-concurrency=10 #限流,每個線程同一時間只能過來一條消息,消費完了再取下一條 spring.rabbitmq.listener.simple.prefetch=1 server.servlet.context-path=/ server.port=8002
查看全部 -
package?com.mfl.demo.utils; import?com.mfl.demo.entity.Order; import?org.springframework.amqp.rabbit.connection.CorrelationData; import?org.springframework.amqp.rabbit.core.RabbitTemplate; import?org.springframework.beans.factory.annotation.Autowired; import?org.springframework.stereotype.Component; @Component public?class?MessageProducerUtils?{ ????@Autowired ????private?RabbitTemplate?rabbitTemplate; ????public?void?send(Order?order)?throws?Exception{ ????????rabbitTemplate.setConfirmCallback((correlationData,?ack,?cause)?->?{ ????????????System.out.println("correlationData:"+correlationData); ????????????if(ack){ ????????????????System.out.println("回調(diào)成功--------"); ????????????}else?{ ????????????????System.out.println("回調(diào)失敗,異常處理............."+cause); ????????????} ?????????????????????}); ????????CorrelationData?correlationData?=?new?CorrelationData(); ????????correlationData.setId(order.getMessageId()); ????????rabbitTemplate.convertAndSend("mfl-1-exchange",?//交換機 ????????????????"order.abcd",//rountingKey ????????????????order,//消息內(nèi)容 ????????????????correlationData);//消息唯一id ????} }
回調(diào)函數(shù)需要在生產(chǎn)者代碼配置文件里配置
spring.rabbitmq.publisher-confirm-type=correlated
查看全部 -
源碼路徑:https://github.com/suxiongwei/springboot-rabbitmq
安裝順序:若提前安裝3,會提示缺少秘鑰
1、?rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm
2、rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm
3、rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
查看全部 -
RabbitMQ的安裝過程
1.準備:
yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz
2.下載
wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm
wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm
wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm
3.配置:
vim /etc/hosts 以及 /etc/hostname
配置文件:
vi /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app
比如修改密碼、配置等等,例如:loopback_users 中的 <<"guest">>,只保留guest,?用于登陸管控臺
啟動:rabbitmq-server start &
停止: rabbitmqctl app_stop
4.管理插件:啟用管控臺
rabbitmq-plugins enable rabbitmq_management
5.訪問地址:http://127.0.0.1:15672/
查看全部
舉報