Springboot-Redis消息发布订阅

Springboot-Redis消息发布订阅

Springboot通过Redis实现消息发布、订阅

Redis 发布订阅 (pub/sub) 是一种消息通信模式: 发送者 (pub) 发送消息, 订阅者 (sub) 接收消息。 Redis 客户端可以订阅任意数量的频道。

Redis中的常用命令

  • 1 PSUBSCRIBE pattern [pattern ...]

    订阅一个或多个符合给定模式的频道。

  • 2 PUBSUB subcommand [argument [argument ...]]

    查看订阅与发布系统状态。

  • 3 PUBLISH channel message

    将信息发送到指定的频道。

  • 4 PUNSUBSCRIBE [pattern [pattern ...]]

    退订所有给定模式的频道。

  • 5 SUBSCRIBE channel [channel ...]

    订阅给定的一个或多个频道的信息。

  • 6 UNSUBSCRIBE [channel [channel ...]]

    指退订给定的频道。

RedisTemplate 实现

POM 依赖项

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
</dependency>

代码实现

@SpringBootApplication
public class RedispubsubApplication {
    public static void main(String[] args) {
        SpringApplication.run(RedispubsubApplication.class, args);
    }
}

@Configuration
class RedisConfig {
    @Resource
    private SubscribeListener listener;

    @Bean
    public ChannelTopic channelTopic() {
        return new ChannelTopic("push");
    }
    // redistemplate 配置
    @Bean
    public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        // 使用Jackson2JsonRedisSerialize 替换默认的jdkSerializeable序列化
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
        // 设置value的序列化规则和 key的序列化规则
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }
    // Redis 订阅发布配置
    @Bean
    public RedisMessageListenerContainer messageListenerContainer(RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(listener(), channelTopic());
        Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        seria.setObjectMapper(objectMapper);
        container.setTopicSerializer(seria);
        return container;
    }
}

// 模拟发布者
@RestController
class TestController {
    @Resource
    private ChannelTopic channelTopic;
    @Resource
    private RedisTemplate<String, Object> redisTemplate;

    @GetMapping("/send/{task}")
    public Result<String> send(@PathVariable(value = "task") String task) {
        redisTemplate.convertAndSend(channelTopic.getTopic(), task);
        return Result.OK();
    }
}
// 订阅者
@Component
@Slf4j
class SubscribeListener implements MessageListener {
    @Override
    public void onMessage(Message message, byte[] pattern) {
        byte[] body = message.getBody();
        log.info("消费者收到的消息为:{}:{},pattern:{}", new String(message.getChannel()), new String(body), new String(pattern));
    }
}

@Data
@JsonInclude(value = JsonInclude.Include.NON_NULL)
class Result<T> implements Serializable {
    private Integer code;
    private String message;
    private T data;

    public Result(Integer code, String message, T data) {
        this.code = code;
        this.message = message;
        this.data = data;
    }

    public static <T> Result<T> OK() {
        return new Result<>(200, "request success!", null);
    }
}

资料参考