Redis学习笔记(六):Redis之消息发布、订阅机制


消息的发布和订阅,第一想到的是Kafka、RabbitMQ、ActiveMQ等,但是实际上Redis也是有这个功能,这个功能在Redis中实现很简单,也比较粗暴。没有存储,没有各种订阅模式。只要订阅同一个渠道的订阅者就都可以收到发布到该渠道的信息。如果没有订阅者,消息也不会缓存起来,而是直接丢弃。在简单的功能、能够接受这种模式并且有补偿机制的业务中是可以考虑使用的。下面看一下这个到底是怎么玩的。

1. 订阅者(subscribe)

客户端订阅,直接使用Jedis的API即可,没有复杂花哨的内容。看一下代码实现:

Jedis resource = jedisPool.getResource();
resource.subscribe(new JedisPubSub() {//订阅
    @Override
    public void onMessage(String channel, String message) {
        executor.execute(() -> handler(channel, message));//异步处理
    }
}, "MESSAGE_SYNC_TOPIC_1","MESSAGE_SYNC_TOPIC_2");

具体JedisPool实例的构建不多做赘述,通过JedisPool获取Jedis实例,直接使用Jedis的sbuscribe方法即可。

  • 第一个参数是订阅后,消息回调的方法
  • 第二个参数是可变数组,可以同时订阅多个channel,如上代码是订阅了两个channel

注意:这里的回调方法是阻塞的,如果内部实现逻辑复杂,使用异步处理更好,上面的代码使用的是线程池。另外如果是多台服务器,注意加上Redis分布锁的控制,防止多台机器重复消费。

2. 发布者(publish)

发布逻辑相对于订阅逻辑就简单很多,不多做解释。看代码实现:

try (Jedis resource = jedisPool.getResource()) { //获取jedis,这种写法jdk7及以上特性,会自动释放资源
    resource.publish("MESSAGE_SYNC_TOPIC_1", "message_content");//发布
} catch (Exception e) {
    log.error(LogUtils.pattern("异步发布异常,异常信息:{}"), e.getMessage());
}

代码逻辑很简单,这种处理一般都是异步处理,需要对发布消息逻辑做try……catch处理,防止异常导致原逻辑的回滚。特别是页面数据数据较多,本身业务逻辑处理很复杂,回滚后会对本身的功能有较大的影响。

3. 总结

Redis的这种发布订阅方式没有什么难点,只要了解其特性,然后对存在的坑做一下规避即可,具体有哪些特性,我总结了下面几种,如果不全或者存在问题,希望得到您的指正。

  • Jedis客户端回调方法是阻塞的。数据量大,逻辑处理复杂,会导致本地堆集过多的消息,异步处理避坑
  • Jedis订阅方法subscribe本身也是阻塞的,如果你是在系统启动的时候去加载订阅者,注意异步避坑,防止阻塞,系统一直停在这个位置,不能完全启动
  • 消息分发方式是广播,对于同一个channel,每个订阅者都可以收到同样的消息,如果此消息不能被重复消费,注意使用分布式锁避坑
  • 如果没有订阅者,发布者的消息会被丢弃,没有存储机制,所以在发布消息之前,确保订阅者已经完成订阅操作

上面基本都是订阅的坑,发布没有啥,主要是订阅者要先于发布者启动。对于上面说的系统启动的时候完成订阅者的加载,可以使用spring的一些方法。提供以下两种实现方式。(实现方式有很多,这里只做为两种示例)

方式一:实现ApplicationContextAware接口,重写setApplicationContext方法

@Component
public class DemoConfig implements ApplicationContextAware {
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        Demo bean = applicationContext.getBean(Demo.class);//获取订阅逻辑实现的bean
        new Thread(() -> bean.subscribeStart()).start();//异步启动
    }
}

方式二:实现InitializingBean接口,重写afterPropertiesSet方法

@Component
public class Demo implements InitializingBean {
    //具体订阅逻辑
    private void sbuscribeStart(){}
    @Override
    public void afterPropertiesSet() throws Exception {
        new Thread(this::sbuscribeStart).start();//异步启动
    }
}

文章作者: 程序猿洞晓
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 程序猿洞晓 !
评论
 上一篇
Redis学习笔记(七):Redis中Lua语言应用和原子性控制 Redis学习笔记(七):Redis中Lua语言应用和原子性控制
Lua脚本在Redis里面使用的范围还是很广的,如从数据库中批量将数据导入到Redis中、分布式锁防止锁误删、多操作原子性要求等,这些都会用到Lua脚本。但是这里还是需要注意的是Lua只能保证原子性,不能保证事务性。另外根据对Redis的了解,其本身是提供事务机制的,但是这个事务机制在很多情况是不能回滚的(鸡肋),所以用起来也更少。这里不说具体的事务性,而是来一起看看Lua脚本实现原子操作。
2019-08-30
下一篇 
Redis学习笔记(五):redis使用的RESP报文格式和手写Redis简易客户端 Redis学习笔记(五):redis使用的RESP报文格式和手写Redis简易客户端
Redis客户端和服务端交互是通过tcp协议,在通讯的报文格式使用的是RESP协议规范,也就是意味只要和Redis服务端建立Scoket连接,通过RESP报文格式传输数据就可以实现Redis客户端和服务端的交互。看起来是很简单的,但是实际上的确是这么简单,RESP报文格式的可读性也是很高的。
2019-08-16
  目录