第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號(hào)安全,請(qǐng)及時(shí)綁定郵箱和手機(jī)立即綁定

RabbitMQ與.net core(二)Producer與Exchange

標(biāo)簽:
C#

正文

Producer:消息的生产者,也就是创建消息的对象

Exchange:消息的接受者,也就是用来接收消息的对象,Exchange接收到消息后将消息按照规则发送到与他绑定的Queue中。下面我们来定义一个Producer与Exchange。

回到顶部

1.新建.netcore console项目,并引入RabbitMQ.Client的Nuget包

回到顶部

2.创建Exchange

复制代码

using RabbitMQ.Client;namespace RabbitMQConsole
{    class Program
    {        static void Main(string[] args)
        {
            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "39.**.**.**";
            factory.Port = 5672;
            factory.VirtualHost = "/";
            factory.UserName = "root";
            factory.Password = "root";            var exchange = "change2";            var route = "route2";            var queue = "queue2";            using (var connection = factory.CreateConnection())
            {                using (var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange, type:"direct", durable: true, autoDelete: false);   //创建Exchange
                    
                }
            }
        }
    }
}

复制代码

可以看到Echange的参数有:

type:可选项为,fanout,direct,topic,headers。区别如下:

    fanout:发送到所有与当前Exchange绑定的Queue中

    direct:发送到与消息的routeKey相同的Rueue中

    topic:fanout的模糊版本

    headers:发送到与消息的header属性相同的Queue中

durable:持久化

autoDelete:当最后一个绑定(队列或者exchange)被unbind之后,该exchange自动被删除。

 运行程序,可以在可视化界面看到change2

接下来我们可以创建与change2绑定的queue

回到顶部

3.创建Queue

复制代码

                using (var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange, type: "direct", durable: true, autoDelete: false);
                    channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false);  #创建queue2
                    channel.QueueBind(queue, exchange, route);  #将queue2绑定到exchange2
                }

复制代码

可以看到Echange的参数有:

durable:持久化

exclusive:如果为true,则queue只在channel存在时存在,channel关闭则queue消失

autoDelete:当最后一个绑定(队列或者exchange)被unbind之后,该exchange自动被删除。

去可视化界面看Queue

回到顶部

4.发送消息

复制代码

                using (var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange, type: "direct", durable: true, autoDelete: false);
                    channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false);
                    channel.QueueBind(queue, exchange, route);                    var props = channel.CreateBasicProperties();
                    props.Persistent = true; #持久化
                    channel.BasicPublish(exchange, route, true, props, Encoding.UTF8.GetBytes("hello rabbit"));
                }

复制代码

回到顶部

5.消费消息

复制代码

using RabbitMQ.Client;using System;using System.Text;namespace RabbitMQClient
{    class Program
    {        private static readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory()
        {
            HostName = "39.**.**.**",
            Port = 5672,
            UserName = "root",
            Password = "root",
            VirtualHost = "/"
        };        static void Main(string[] args)
        {            var exchange = "change2";            var route = "route2";            var queue = "queue2";            using (IConnection conn = rabbitMqFactory.CreateConnection())            using (IModel channel = conn.CreateModel())
            {
                channel.ExchangeDeclare(exchange, "direct", durable: true, autoDelete: false);
                channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false);
                channel.QueueBind(queue, exchange, route);                while (true)
                {                    var message = channel.BasicGet(queue, true);  #第二个参数说明自动释放消息,如为false需手动释放消息                    if(message!=null)
                    {                        var msgBody = Encoding.UTF8.GetString(message.Body);
                        Console.WriteLine(string.Format("***接收时间:{0},消息内容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody));
                    }
                    System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1));
                }
            }
        }
    }
}

复制代码

运行查看结果

查看可视化界面

回到顶部

6.手动释放消息

复制代码

                while (true)
                {                    var message = channel.BasicGet(queue, false);#设置为手动释放                    if(message!=null)
                    {                        var msgBody = Encoding.UTF8.GetString(message.Body);
                        Console.WriteLine(string.Format("***接收时间:{0},消息内容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody));
                    }
                    channel.BasicAck(message.DeliveryTag, false); #手动释放
                    System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1));
                }

复制代码

我们再发一条消息,然后开始消费,加个断点调试一下

查看一下Queue中消息状态

然后直接取消调试,不让程序走到释放的那一步,再查看一下消息状态

这么说来只要不走到 channel.BasicAck(message.DeliveryTag, false);这一行,消息就不会被释放掉,我们让程序直接走到这一行代码,查看一下消息的状态

如图已经被释放了

回到顶部

7.让失败的消息回到队列中

复制代码

                while (true)
                {                    var message = channel.BasicGet(queue, false);                    if(message!=null)
                    {                        var msgBody = Encoding.UTF8.GetString(message.Body);
                        Console.WriteLine(string.Format("***接收时间:{0},消息内容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody));
                        Console.WriteLine(message.DeliveryTag);    #当前消息被处理的次序数                        if (1==1)
                            channel.BasicReject(message.DeliveryTag, true);
                    }
                    
                    System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1));
                }

复制代码

重新发送4条消息

开始消费

我们可以看到消息一直没有没消费,因为消息被处理之后又放到了队尾

回到顶部

8.监听消息

复制代码

 using (IConnection conn = rabbitMqFactory.CreateConnection())            using (IModel channel = conn.CreateModel())
            {
                channel.ExchangeDeclare(exchange, "direct", durable: true, autoDelete: false);
                channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false);
                channel.QueueBind(queue, exchange, route);

                channel.BasicQos(prefetchSize: 0, prefetchCount: 10, global: false);  #一次接受10条消息,否则rabbit会把所有的消息一次性推到client,会增大client的负荷
                EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    Byte[] body = ea.Body;
                    String message = Encoding.UTF8.GetString(body);
                    Console.WriteLine( message+Thread.CurrentThread.ManagedThreadId);
                    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                };

                channel.BasicConsume(queue: queue, autoAck: false, consumer: consumer);
                Console.ReadLine();
            }

复制代码

原文出处: https://www.cnblogs.com/chenyishi/p/10233629.html  

點(diǎn)擊查看更多內(nèi)容
TA 點(diǎn)贊

若覺(jué)得本文不錯(cuò),就分享一下吧!

評(píng)論

作者其他優(yōu)質(zhì)文章

正在加載中
  • 推薦
  • 評(píng)論
  • 收藏
  • 共同學(xué)習(xí),寫(xiě)下你的評(píng)論
感謝您的支持,我會(huì)繼續(xù)努力的~
掃碼打賞,你說(shuō)多少就多少
贊賞金額會(huì)直接到老師賬戶
支付方式
打開(kāi)微信掃一掃,即可進(jìn)行掃碼打賞哦
今天注冊(cè)有機(jī)會(huì)得

100積分直接送

付費(fèi)專欄免費(fèi)學(xué)

大額優(yōu)惠券免費(fèi)領(lǐng)

立即參與 放棄機(jī)會(huì)
微信客服

購(gòu)課補(bǔ)貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動(dòng)學(xué)習(xí)伙伴

公眾號(hào)

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號(hào)

舉報(bào)

0/150
提交
取消