SpringbootRedisStream消息队列

SpringbootRedisStream消息队列

Springboot RedisStream 消息队列

发送消息配置

/**
 * @author devcxl
 */
@Slf4j
public class RedisPublisher<T> {


    private RedisTemplate<String, Object> redisPublisherTemplate;

    public RedisPublisher(RedisTemplate<String, Object> redisPublisherTemplate) {
        this.redisPublisherTemplate = redisPublisherTemplate;
    }

    public void push(String streamName, String group, T data) {
        Long size = redisPublisherTemplate.opsForStream().size(streamName);
        if (size <= 0) {
            try {
                redisPublisherTemplate.opsForStream().createGroup(streamName, group);
            } catch (RedisSystemException ex) {
                log.error(ex.getMessage());
            }
        }
        ObjectRecord<String, T> record = StreamRecords.newRecord().in(streamName).ofObject(data).withId(RecordId.autoGenerate());
        RecordId recordId = redisPublisherTemplate.opsForStream().add(record);
        log.info("publish: stream:{} group:{} recordId:{}", streamName, group,recordId);
    }
}

eg:信息发送组件

/**
 * @author devcxl
 */
@Component
public class VerificationPublisher extends RedisPublisher<VerificationEmail> {
    public VerificationPublisher(RedisTemplate<String, Object> redisPublisherTemplate) {
        super(redisPublisherTemplate);
    }
}

接收消息配置


    /**
     * 订阅验证邮件队列
     *
     * @return
     */
    @Bean
    public Subscription myListenerContainer(
        LettuceConnectionFactory lettuceConnectionFactory,
        MyStreamListener myEmailStreamListener
    ) {

        StreamMessageListenerContainer<String, ObjectRecord<String, MyDTO>> container =
                StreamMessageListenerContainer.create(
                        lettuceConnectionFactory,
                        StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                                .builder()
                                .pollTimeout(Duration.ofSeconds(1))
                                .targetType(MyDTO.class)
                                .build()
                );

        String customer = "{消费者名称}";

        Subscription subscription = container.receive(
                Consumer.from("{消费组名称}", customer),
                StreamOffset.create("{队列名称}", ReadOffset.lastConsumed()),
                myEmailStreamListener
        );
        log.info("\uD83D\uDE00 Subscription Group: [{}] ConsumerName: [{}]", "{消费组名称}", customer);
        container.start();
        return subscription;
    }

/**
 * 验证邮件
 *
 * @author devcxl
 */
@Slf4j
@Component
public class MyStreamListener implements StreamListener<String, ObjectRecord<String, MyDTO>> {

    @Override
    public void onMessage(ObjectRecord<String, MyDTO> message) {
        // todo:处理消息
        // todo:ack
        // todo:del item
    }
}

MyDTO 可以替换成你自定义的实体类
实体类上记得要加上注解
@TypeAlias("{完整包名}")