第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號(hào)安全,請(qǐng)及時(shí)綁定郵箱和手機(jī)立即綁定

Spring Cloud Stream消息驅(qū)動(dòng)@SendTo和消息降級(jí)

標(biāo)簽:
Java

参考程序员DD大佬的文章,自己新建demo学习学习,由于需要消息回执,看到了@SendTo这个注解能够实现,下面开始学习demo,新建两个项目cloud-stream-consumer消费端 和 cloud-stream-consumer 生产端

public interface StreamReceive {

    @Input("MQRece")
    SubscribableChannel mqReceive();
}

添加一个StreamReceive接口,定义@input通道

@Component
@Slf4j
public class ReceiveListener {

    @StreamListener("MQRece")
    public byte[] receive(byte[] bytes){
        log.info("接受消息:"+new String(bytes));
        return "ok".getBytes();
    }

}

添加消息监听,接受消息定义为byte[]

添加application.properties配置文件信息

spring.cloud.stream.rocketmq.binder.namesrv-addr= 192.168.211.11:9876
spring.cloud.stream.bindings.MQRece.destination=message-topic
spring.cloud.stream.bindings.MQRece.group=rece-group
server.port=19999

为MQRece通道添加主题message-topic,组名rece-group

到此Stream 客户端消费就完成了,本节需要把@SendTo注解用起来,需要新建一个MessageChannel进行产生消息

public interface MsgBackPush {
    
    @Output("back-push")
    MessageChannel backPush();
}

然后在ReceiveListener添加@SendTo

@Component
@Slf4j
public class ReceiveListener {

    @StreamListener("MQRece")
    @SendTo("back-push")
    public byte[] receive(byte[] bytes){
        log.info("接受消息:"+new String(bytes));
        return "ok".getBytes();
    }

}

新增通道配置application.properties

spring.cloud.stream.bindings.back-push.destination=back-topic
spring.cloud.stream.bindings.back-push.group=back-group

SpringBoot启动类记得添加 @EnableBinding(value = {StreamReceive.class,MsgBackPush.class})

@SpringBootApplication
@EnableBinding(value = {StreamReceive.class,MsgBackPush.class})
public class CloudStreamConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(CloudStreamConsumerApplication.class, args);
    }

}

到此,cloud-stream-consumer这个demo就完成了

接下来看看 cloud-stream-producer

public interface StreamPush {

    @Output("MQPush")
    MessageChannel mqPush();
}

定义一个通道名为MQPush,进行消息生产

public interface ProducerReceive {


    @Input("producer-receive")
    SubscribableChannel producerReceive();
}

定义一个通道名为producer-receive,进行回执消息的消费

@Component
@Slf4j
public class ProducerListener {
    @StreamListener("producer-receive")
    public void producerReceive(byte[] bytes){
        log.info("come back message:"+new String(bytes));
    }
}

具体回执消息处理逻辑,再来看看application.properties

spring.cloud.stream.rocketmq.binder.namesrv-addr= 192.168.214.191:9876
spring.cloud.stream.bindings.MQPush.destination=message-topic
spring.cloud.stream.bindings.MQPush.group=push-group


spring.cloud.stream.bindings.producer-receive.destination=back-topic
spring.cloud.stream.bindings.producer-receive.group=back-group


server.port=20000

为通道设置topic和group,新建一个Http接口测试一下成果

@SpringBootApplication
@EnableBinding(value = {StreamPush.class,ProducerReceive.class})
@RestController
public class CloudStreamProducerApplication {

    public static void main(String[] args) {
        SpringApplication.run(CloudStreamProducerApplication.class, args);
    }

    @Autowired
    private StreamPush streamPush;

    @GetMapping("/sendMessage")
    public String sendMessage(){
        streamPush.mqPush().send(MessageBuilder.withPayload("message body".getBytes()).build());
        return "ok";
    }
}

访问http://localhost:20000/sendMessage,结果图如下

cloud-stream-consumer日志输出

file

cloud-stream-producer日志输出

file

学习@ServiceActivator这个注解,上面的项目cloud-stream-consumer ReceiveListener类中添加

@Component
@Slf4j
public class ReceiveListener {

    @StreamListener("MQRece")
    @SendTo("back-push")
    public byte[] receive(byte[] bytes){
        log.info("接受消息:"+new String(bytes));
				// 抛出异常
        if(1==1){
            throw new RuntimeException("Message consumer failed!");
        }
        return "ok".getBytes();
    }

    @Autowired
    private MsgBackPush msgBackPush;

    @ServiceActivator(inputChannel = "message-topic.rece-group.errors")
    public void error(Message<?> message){
        log.info("消费者消费消息失败:"+message);
        msgBackPush.backPush().send(MessageBuilder.withPayload("消息消费失败".getBytes()).build());
    }
}

通过使用@ServiceActivator(inputChannel = “test-topic.stream-exception-handler.errors”)指定了某个通道的错误处理映射。其中,inputChannel的配置中对应关系如下:

  • message-topic:消息通道对应的目标(destination,即:spring.cloud.stream.bindings.MQRece.destination的配置)
  • rece-group:消息通道对应的消费组(group,即:spring.cloud.stream.bindings.MQRece.group的配置)

访问http://localhost:20000/sendMessage,结果图如下

cloud-stream-consumer日志输出

file

cloud-stream-producer日志输出

file

點(diǎn)擊查看更多內(nèi)容
TA 點(diǎn)贊

若覺(jué)得本文不錯(cuò),就分享一下吧!

評(píng)論

作者其他優(yōu)質(zhì)文章

正在加載中
  • 推薦
  • 評(píng)論
  • 收藏
  • 共同學(xué)習(xí),寫(xiě)下你的評(píng)論
感謝您的支持,我會(huì)繼續(xù)努力的~
掃碼打賞,你說(shuō)多少就多少
贊賞金額會(huì)直接到老師賬戶
支付方式
打開(kāi)微信掃一掃,即可進(jìn)行掃碼打賞哦
今天注冊(cè)有機(jī)會(huì)得

100積分直接送

付費(fèi)專欄免費(fèi)學(xué)

大額優(yōu)惠券免費(fèi)領(lǐng)

立即參與 放棄機(jī)會(huì)
微信客服

購(gòu)課補(bǔ)貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動(dòng)學(xué)習(xí)伙伴

公眾號(hào)

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號(hào)

舉報(bào)

0/150
提交
取消