Springboot-Redis消息发布订阅
Springboot通过Redis实现消息发布、订阅
Redis 发布订阅 (pub/sub) 是一种消息通信模式: 发送者 (pub) 发送消息, 订阅者 (sub) 接收消息。 Redis 客户端可以订阅任意数量的频道。
Redis中的常用命令
-
1
PSUBSCRIBE pattern [pattern ...]
订阅一个或多个符合给定模式的频道。
-
2
PUBSUB subcommand [argument [argument ...]]
查看订阅与发布系统状态。
-
3
PUBLISH channel message
将信息发送到指定的频道。
-
4
PUNSUBSCRIBE [pattern [pattern ...]]
退订所有给定模式的频道。
-
5
SUBSCRIBE channel [channel ...]
订阅给定的一个或多个频道的信息。
-
6
UNSUBSCRIBE [channel [channel ...]]
指退订给定的频道。
RedisTemplate 实现
POM 依赖项
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
代码实现
@SpringBootApplication
public class RedispubsubApplication {
public static void main(String[] args) {
SpringApplication.run(RedispubsubApplication.class, args);
}
}
@Configuration
class RedisConfig {
@Resource
private SubscribeListener listener;
@Bean
public ChannelTopic channelTopic() {
return new ChannelTopic("push");
}
// redistemplate 配置
@Bean
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
// 使用Jackson2JsonRedisSerialize 替换默认的jdkSerializeable序列化
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
// 设置value的序列化规则和 key的序列化规则
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
// Redis 订阅发布配置
@Bean
public RedisMessageListenerContainer messageListenerContainer(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(listener(), channelTopic());
Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
seria.setObjectMapper(objectMapper);
container.setTopicSerializer(seria);
return container;
}
}
// 模拟发布者
@RestController
class TestController {
@Resource
private ChannelTopic channelTopic;
@Resource
private RedisTemplate<String, Object> redisTemplate;
@GetMapping("/send/{task}")
public Result<String> send(@PathVariable(value = "task") String task) {
redisTemplate.convertAndSend(channelTopic.getTopic(), task);
return Result.OK();
}
}
// 订阅者
@Component
@Slf4j
class SubscribeListener implements MessageListener {
@Override
public void onMessage(Message message, byte[] pattern) {
byte[] body = message.getBody();
log.info("消费者收到的消息为:{}:{},pattern:{}", new String(message.getChannel()), new String(body), new String(pattern));
}
}
@Data
@JsonInclude(value = JsonInclude.Include.NON_NULL)
class Result<T> implements Serializable {
private Integer code;
private String message;
private T data;
public Result(Integer code, String message, T data) {
this.code = code;
this.message = message;
this.data = data;
}
public static <T> Result<T> OK() {
return new Result<>(200, "request success!", null);
}
}