消息的发布和订阅,第一想到的是Kafka、RabbitMQ、ActiveMQ等,但是实际上Redis也是有这个功能,这个功能在Redis中实现很简单,也比较粗暴。没有存储,没有各种订阅模式。只要订阅同一个渠道的订阅者就都可以收到发布到该渠道的信息。如果没有订阅者,消息也不会缓存起来,而是直接丢弃。在简单的功能、能够接受这种模式并且有补偿机制的业务中是可以考虑使用的。下面看一下这个到底是怎么玩的。
1. 订阅者(subscribe)
客户端订阅,直接使用Jedis的API即可,没有复杂花哨的内容。看一下代码实现:
Jedis resource = jedisPool.getResource();
resource.subscribe(new JedisPubSub() {//订阅
@Override
public void onMessage(String channel, String message) {
executor.execute(() -> handler(channel, message));//异步处理
}
}, "MESSAGE_SYNC_TOPIC_1","MESSAGE_SYNC_TOPIC_2");
具体JedisPool实例的构建不多做赘述,通过JedisPool获取Jedis实例,直接使用Jedis的sbuscribe
方法即可。
- 第一个参数是订阅后,消息回调的方法
- 第二个参数是可变数组,可以同时订阅多个channel,如上代码是订阅了两个channel
注意:这里的回调方法是阻塞的,如果内部实现逻辑复杂,使用异步处理更好,上面的代码使用的是线程池。另外如果是多台服务器,注意加上Redis分布锁的控制,防止多台机器重复消费。
2. 发布者(publish)
发布逻辑相对于订阅逻辑就简单很多,不多做解释。看代码实现:
try (Jedis resource = jedisPool.getResource()) { //获取jedis,这种写法jdk7及以上特性,会自动释放资源
resource.publish("MESSAGE_SYNC_TOPIC_1", "message_content");//发布
} catch (Exception e) {
log.error(LogUtils.pattern("异步发布异常,异常信息:{}"), e.getMessage());
}
代码逻辑很简单,这种处理一般都是异步处理,需要对发布消息逻辑做try……catch
处理,防止异常导致原逻辑的回滚。特别是页面数据数据较多,本身业务逻辑处理很复杂,回滚后会对本身的功能有较大的影响。
3. 总结
Redis的这种发布订阅方式没有什么难点,只要了解其特性,然后对存在的坑做一下规避即可,具体有哪些特性,我总结了下面几种,如果不全或者存在问题,希望得到您的指正。
- Jedis客户端回调方法是阻塞的。数据量大,逻辑处理复杂,会导致本地堆集过多的消息,异步处理避坑
- Jedis订阅方法
subscribe
本身也是阻塞的,如果你是在系统启动的时候去加载订阅者,注意异步避坑,防止阻塞,系统一直停在这个位置,不能完全启动 - 消息分发方式是广播,对于同一个channel,每个订阅者都可以收到同样的消息,如果此消息不能被重复消费,注意使用分布式锁避坑
- 如果没有订阅者,发布者的消息会被丢弃,没有存储机制,所以在发布消息之前,确保订阅者已经完成订阅操作
上面基本都是订阅的坑,发布没有啥,主要是订阅者要先于发布者启动。对于上面说的系统启动的时候完成订阅者的加载,可以使用spring的一些方法。提供以下两种实现方式。(实现方式有很多,这里只做为两种示例)
方式一:实现ApplicationContextAware
接口,重写setApplicationContext
方法
@Component
public class DemoConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
Demo bean = applicationContext.getBean(Demo.class);//获取订阅逻辑实现的bean
new Thread(() -> bean.subscribeStart()).start();//异步启动
}
}
方式二:实现InitializingBean
接口,重写afterPropertiesSet
方法
@Component
public class Demo implements InitializingBean {
//具体订阅逻辑
private void sbuscribeStart(){}
@Override
public void afterPropertiesSet() throws Exception {
new Thread(this::sbuscribeStart).start();//异步启动
}
}