Spring Boot + Redis Streams 實(shí)現(xiàn)消息隊(duì)列,采用注解消費(fèi)
前言
Redis Streams在Redis5.0中引入,主要用于消息队列和事件流的存储与传递,是一个高性能、持久化的日志数据结构。
一、依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
二、配置文件
在配置文件中添加redis
spring: redis: host: ****** port: 6379 database: 10 password: ******
三、定义注解和抽象类
定义MsgStreamListener注解
@Target(ElementType.METHOD)
@Retention(value = RetentionPolicy.RUNTIME)
public @interface MsgStreamListener
{
String stream();
String group();
String name();
}定义抽象类AbstractMsgService
public abstract class AbstractMsgService {
}四、创建容器
@Configuration
@Slf4j
public class RedisStreamConfig
{
private final RedisTemplate<String,String> redisTemplate;
private final ApplicationContext applicationContext;
public RedisStreamConfig(RedisTemplate<String, String> redisTemplate, ApplicationContext applicationContext) {
this.redisTemplate = redisTemplate;
this.applicationContext = applicationContext;
}
@Bean
public StreamMessageListenerContainer<String, ObjectRecord<String,String>> streamMessageListenerContainer(RedisConnectionFactory connectionFactory)
{
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String,String>> options =
StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
.pollTimeout(java.time.Duration.ofSeconds(1))
.targetType(String.class)
.build();
//创建监听redis流的消息监听容器
StreamMessageListenerContainer<String, ObjectRecord<String,String>> listenerContainer =
StreamMessageListenerContainer.create(connectionFactory, options);
//找到所有继承AbstractMsgService的类
var serviceArray = applicationContext.getBeansOfType(AbstractMsgService.class).values().toArray();
for (var service : serviceArray) {
for (Method method : service.getClass().getMethods()) {
if(method.isAnnotationPresent(MsgStreamListener.class)){
MsgStreamListener annotation = method.getAnnotation(MsgStreamListener.class);
String stream = annotation.stream();
String group = annotation.group();
String name = annotation.name();
StreamListener<String,ObjectRecord<String,String>> listener = (StreamListener<String, ObjectRecord<String,String>>) message -> {
try {
method.invoke(service,message);
}catch (Exception e){
log.warn(e.getMessage());
}
};
//创建redis流的消息监听器
listenerContainer.receive(Consumer.from(group,name),
StreamOffset.create(stream, ReadOffset.lastConsumed()),
listener);
initializeStream(stream,name);
}
}
}
listenerContainer.start();
return listenerContainer;
}
public void initializeStream(String stream,String group) {
StreamOperations<String, Object, Object> streamOperations = redisTemplate.opsForStream();
// 创建一个流
try {
streamOperations.createGroup(stream, ReadOffset.from("0"), group);
} catch (Exception e) {
// 流可能已存在,忽略异常
}
}
}五、创建生产者和消费者
生产者
@Service
@RequiredArgsConstructor
public class RedisMessageProducer
{
private final RedisTemplate<String,String> redisTemplate;
public void sendMsg(String streamKey,String msgKey,String msg){
Map<String,String> msgMap = new HashMap<>();
msgMap.put(msgKey,msg);
RecordId recordId = redisTemplate.opsForStream().add(streamKey,msgMap);
if(recordId == null){
throw new RuntimeException("发送消息失败");
}
}
} 消费者
因为在RediStreamConfig中会根据注解自动创建消息监听器,所以只需要添加MsgStreamListener注解就可以自动消费事件。
@Service
@Slf4j
public class MessageHandlerService extends AbstractMsgService
{
@MsgStreamListener(group = "test1",name = "test1",stream = "test1")
public void onMessage(ObjectRecord<String, String> message)
{
var stream = message.getStream();
var msgId = message.getId().toString();
var msgBody = message.getValue();
log.info("receive test1 msg stream:{} msgId:{} msgBody:{}",stream,msgId,msgBody);
}
@MsgStreamListener(group = "test2",name = "test2",stream = "test2")
public void onMessageFail(ObjectRecord<String, String> message)
{
var stream = message.getStream();
var msgId = message.getId().toString();
var msgBody = message.getValue();
log.info("receive test2 msg stream:{} msgId:{} msgBody:{}",stream,msgId,msgBody);
}
}这样的话只要添加注解,就可以消费不同stream事件。
参考:
https://blog.csdn.net/Mrxiao_bo/article/details/135191850
點(diǎn)擊查看更多內(nèi)容
為 TA 點(diǎn)贊
評論
評論
共同學(xué)習(xí),寫下你的評論
評論加載中...
作者其他優(yōu)質(zhì)文章
正在加載中
感謝您的支持,我會繼續(xù)努力的~
掃碼打賞,你說多少就多少
贊賞金額會直接到老師賬戶
支付方式
打開微信掃一掃,即可進(jìn)行掃碼打賞哦