SpringbootRedisStream消息队列
Springboot RedisStream 消息队列
发送消息配置
/**
* @author devcxl
*/
@Slf4j
public class RedisPublisher<T> {
private RedisTemplate<String, Object> redisPublisherTemplate;
public RedisPublisher(RedisTemplate<String, Object> redisPublisherTemplate) {
this.redisPublisherTemplate = redisPublisherTemplate;
}
public void push(String streamName, String group, T data) {
Long size = redisPublisherTemplate.opsForStream().size(streamName);
if (size <= 0) {
try {
redisPublisherTemplate.opsForStream().createGroup(streamName, group);
} catch (RedisSystemException ex) {
log.error(ex.getMessage());
}
}
ObjectRecord<String, T> record = StreamRecords.newRecord().in(streamName).ofObject(data).withId(RecordId.autoGenerate());
RecordId recordId = redisPublisherTemplate.opsForStream().add(record);
log.info("publish: stream:{} group:{} recordId:{}", streamName, group,recordId);
}
}
eg:信息发送组件
/**
* @author devcxl
*/
@Component
public class VerificationPublisher extends RedisPublisher<VerificationEmail> {
public VerificationPublisher(RedisTemplate<String, Object> redisPublisherTemplate) {
super(redisPublisherTemplate);
}
}
接收消息配置
/**
* 订阅验证邮件队列
*
* @return
*/
@Bean
public Subscription myListenerContainer(
LettuceConnectionFactory lettuceConnectionFactory,
MyStreamListener myEmailStreamListener
) {
StreamMessageListenerContainer<String, ObjectRecord<String, MyDTO>> container =
StreamMessageListenerContainer.create(
lettuceConnectionFactory,
StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofSeconds(1))
.targetType(MyDTO.class)
.build()
);
String customer = "{消费者名称}";
Subscription subscription = container.receive(
Consumer.from("{消费组名称}", customer),
StreamOffset.create("{队列名称}", ReadOffset.lastConsumed()),
myEmailStreamListener
);
log.info("\uD83D\uDE00 Subscription Group: [{}] ConsumerName: [{}]", "{消费组名称}", customer);
container.start();
return subscription;
}
/**
* 验证邮件
*
* @author devcxl
*/
@Slf4j
@Component
public class MyStreamListener implements StreamListener<String, ObjectRecord<String, MyDTO>> {
@Override
public void onMessage(ObjectRecord<String, MyDTO> message) {
// todo:处理消息
// todo:ack
// todo:del item
}
}
MyDTO 可以替换成你自定义的实体类
实体类上记得要加上注解
@TypeAlias("{完整包名}")