第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號(hào)安全,請(qǐng)及時(shí)綁定郵箱和手機(jī)立即綁定

幾種實(shí)現(xiàn)延時(shí)任務(wù)的方式(三)

標(biāo)簽:
Java

上篇文章介绍了使用Redis来实现延时任务,这是一个比较好的方案,但是这种方式是把Redis作为消息队列去使用,而Redis作为消息队列还是有一些缺点的:

  1. Redis本身没有提供监控、管理界面,需要自己去实现。我们无法方便的知道现在队列的情况,比如是否有积压,消费情况是如何的,生产情况又是如何的。

  2. 消息可能被重复消费,如果是幂等性操作也没什么,但是如果非幂等性操作,就需要其他的解决方案来解决这个问题。

  3. Redis本身没有ACK机制,消息没有那么可靠,当然这个缺点在这个案例中,并不是那么明显,因为我们可以在该执行的都执行成功了,才去删除数据。
    ...

当然最根本的问题是Redis本身就不是为了队列而生的,它是为了存储而生的,所以它缺少一些队列才有的功能也是“情理之中”的。不过,Redis5引进了Stream,据说 这也是一个功能很强大的队列,但是我还没去看。这里就不说了。

在本节中,我将用RabbitMQ来实现延时任务。

关于RabbitMQ的安装,我就不做介绍了,网上都有,而且没有什么难度。

在使用方面,RabbitMQ比Redis难很多,毕竟使用的比较少,而且不少公司都对MQ进行了封装,使其更好用,但是同时也隐藏了MQ在使用方面的不少细节。

从基本没有接触过RabbitMQ,到要使用RabbitMQ来完成延时任务,也是一个"跳跃性"的任务。我们应该先了解RabbitMQ一些基础概念,基本使用 等等。仅仅靠一两句话是远远不够的。本文的主题在于“使用RabbitMQ来完成延时任务”。所以在这里我默认大家都有一定的RabbitMQ使用经验了。

好了,让我们开始吧。

首先,让我们引进两个名词:

  1. TTL、死信:
    Time To Live,这个名词也说不上是一个新名词,Redis中也有,就是 存活时间,也就是我们经常说的过期时间了,放在MQ里面,特指 消息的存活时间。消息超过了存活时间,就认为这个消息“死”了,称之为“死信”。

  2. Dead Letter Exchange
    死信交换器。创建死信交换器和创建其他交换器没什么区别,只是我们需要告诉队列,死信需要被推送到死信交换器上。

对于生产者来说,需要创建一个Connection连接,接着在Connection中创建一个Channel,通过Channel申明两个交换器,一个是 用来接收订单数据的交换器,一个是用来接收超时订单数据的交换机,然后申明两个队列,一个是订单数据队列,并且需要告诉这个队列,如果有消息超时了,需要转发到 “用来接收超时订单数据的交换机”,还要申明一个超时订单数据队列。然后把 “用来接收订单数据的交换器”和“订单数据队列”进行绑定,把“用来接收超时订单数据的交换机”和“超时订单数据队列”进行绑定。前置准备工作才算完成,下面就是通过Channel往 “用来接收订单数据的交换器”推数据了。

为了帮助大家更好的理解,我简单的画了一张图:
image.png
希望大家看了文字之后,再对照图片,可以有所理解。

对于生产者来说,就比较简单了,前置工作就是创建Connection连接,再创建Channel,然后通过Channel,消费 “超时订单数据队列” 就OK了。

下面我直接放出代码:
需要在pom中引入依赖:

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.5.0</version>
        </dependency>
public class Main {    static ConnectionFactory connectionFactory;    static Connection connection;    static {
        connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");        try {
            connection = connectionFactory.newConnection();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }

    }    public static void main(String[] args) throws Exception {
        producer();

        Thread thread = new Thread(() -> {            try {
                consume();
            } catch (Exception e) {
                e.printStackTrace();
            }

        });
        thread.start();
    }    private static void producer() throws Exception {

        Channel channel = connection.createChannel();//创建一个channel,不管是生产数据,还是消费数据,都是通过channel去操作的

        channel.exchangeDeclare("orderExchange", "direct", true);//定义一个交换机,路由类型为direct,所有的订单会塞给此交换机
        channel.exchangeDeclare("orderDelayExchange", "direct", true);//定义一个交换机,路由类型为direct,延迟的订单会塞给此交换机

        HashMap<String, Object> arguments = new HashMap<String, Object>();
        arguments.put("x-dead-letter-exchange", "orderDelayExchange");//申明死信交换机是名称为orderDelayExchange的交换机
        channel.queueDeclare("order_queue", true, false, false,
                arguments);//定义一个名称为order_queue的队列,绑定上面定义的参数,这样就告诉rabbit此队列延迟的消息,发送给orderDelayExchange交换机

        channel.queueDeclare("order_delay_queue", true, false, false,                null);//定义一个名称为order_delay_queue的队列

        channel.queueBind("order_queue", "orderExchange",                "delay");//order_queue和orderExchange绑定,路由为delay。路由也为delay的消息会通过orderExchange进入到order_queue队列
        channel.queueBind("order_delay_queue", "orderDelayExchange",                "delay");//order_delay_queue和orderDelayExchange绑定

        AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
        builder.expiration("15000");//设置消息TTL(消息生存时间)
        builder.deliveryMode(2);//设置消息持久化
        AMQP.BasicProperties properties = builder.build();

        Thread productThread = new Thread(() -> {            for (int i = 0; i < 20; i++) {
                String order = "order" + i;                try {
                    channel.basicPublish("orderExchange", "delay",
                            properties, order.getBytes());//通过channel,向orderExchange交换机发送路由为delay的消息,这样就可以进入到order_queue队列
                    String str = "现在时间是" + new Date().toString() + "  " + order + "  的消息产生了";
                    System.out.println(str);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }            try {
                channel.close();
            } catch (Exception ex) {

            }
        });
        productThread.start();

    }    private static void consume() throws Exception {
        Channel channel = connection.createChannel();//创建一个channel,不管是生产数据,还是消费数据,都是通过channel去操作的
        //消费名称为order_delay_queue的队列,且关闭自动应答,需要手动应答
        channel.basicConsume("order_delay_queue", false, new DefaultConsumer(channel) {            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                long deliveryTag = envelope.getDeliveryTag();//消息的标记,应答的时候需要传入这个参数
                String str = "现在时间是" + new Date().toString() + "  " + new String(body) + "  的消息消费了";
                System.out.println(str);
                channel.basicAck(deliveryTag, false);//手动应答,代表这个消息处理完成了
            }
        });
    }
}

下面我们运行一下:
image.png

代码注释写的还是比较清晰的,希望大家可以看懂吧。

这一节,我没有像上两节一样,讲的那么细,因为如果从RabbitMQ的基础讲起,可能需要三四章的内容来做铺垫,这就脱离主题了。如果有机会的话,我会再花一个系列去介绍RabbitMQ。

好了,实现延时任务系列到这里就结束了,当然我这里只是抛砖引玉,大家肯定还有不少更好的实现方式。

原文出处:https://www.cnblogs.com/CodeBear/p/10056810.html  

點(diǎn)擊查看更多內(nèi)容
1人點(diǎn)贊

若覺(jué)得本文不錯(cuò),就分享一下吧!

評(píng)論

作者其他優(yōu)質(zhì)文章

正在加載中
感謝您的支持,我會(huì)繼續(xù)努力的~
掃碼打賞,你說(shuō)多少就多少
贊賞金額會(huì)直接到老師賬戶
支付方式
打開(kāi)微信掃一掃,即可進(jìn)行掃碼打賞哦
今天注冊(cè)有機(jī)會(huì)得

100積分直接送

付費(fèi)專欄免費(fèi)學(xué)

大額優(yōu)惠券免費(fèi)領(lǐng)

立即參與 放棄機(jī)會(huì)
微信客服

購(gòu)課補(bǔ)貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動(dòng)學(xué)習(xí)伙伴

公眾號(hào)

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號(hào)

舉報(bào)

0/150
提交
取消