序列化器、反序列化器以及分区器都有所了解,Kafka客户端提供默认的7种方式基本够用了,但是对于一个大型复杂的项目群里面,总是会有特殊的要求,这个时候就需要自己去定义相关的序列化器和反序列化器。另外就是自定义分区器,这个应用的可能会多一点,根据实际业务将消息发送到指定的分区中。
1. 自定义序列化器
自定的序列化器并不难,只要遵循序列化器的规矩就可以。实现Kafka提供的Serializer
序列化接口,在serialize
方法中写入具体序列化的实现代码即可。如下代码:
//实现Serializer<T>接口
public class SelfSerializer implements Serializer<SelfUser> {
@Override
public void configure(Map<String, ?> map, boolean b) {
}
@Override
public byte[] serialize(String s, SelfUser selfUser) {
//序列化逻辑,具体实现略
return JSON.toJSONBytes(selfUser);//直接用fastjson来获取字节数组
}
@Override
public void close() {
}
}
在构建生产者的时候,指定序列化器的时候,将自定义的序列化器写入即可。如下:
//对值采用自定义序列化器
properties = ProducerConfig.addSerializerToConfig(properties, new StringSerializer(), new SelfSerializer());
到此生产者的自定义序列化器就完成啦。下面看一下自定义反序列化器的定义。
2. 自定义反序列化器
反序列化器的定义和序列化器的定义是完全相同的。不多做赘述,直接看代码。
自定义反序列化器代码:
//实现Deserializer<T>接口
public class SelfDeserializer implements Deserializer<SelfUser> {
@Override
public void configure(Map<String, ?> map, boolean b) {
}
@Override
public SelfUser deserialize(String s, byte[] bytes) {
//反序列化逻辑,具体逻辑略
return JSONObject.parseObject(bytes, SelfUser.class);//直接用fastjson将字节数据转换成对象
}
@Override
public void close() {
}
}
消费者配置代码修改:
//对值采用自定义反序列化器
properties = ProducerConfig.addSerializerToConfig(properties, new StringSerializer(), new SelfSerializer());
测试一下即可,这里不演示测试过程了,具体可以参考文末提供的代码示例。
3. 自定义分区器
自定义分区器创建方式和自定义序列化器、反序列化器的套路都是相同的。无非就是实现的接口不同,实现的方法不同,核心逻辑不同。
自定义分区器代码:
public class SelfPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//可以根据topic、key、value来进行自定义分区方式,将消息发送到指定的分区
//根据key的hash值来获取指定的分区(假设key都是String类型)
//cluster.partitionCountForTopic(topic):获取当前topic的分区个数
return ((String) key).hashCode() % cluster.partitionCountForTopic(topic);
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
这里简单的使用当前key的hash值对当前topic分区个数取余,最终确定分区编号。演示代码文末提供。(如果想更明显的看到效果,可以直接返回一个常量,如:0或者1等)
生产者中使用自定义分区器:
//1、直接用全限定类名
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.itcrud.kafka.selfpartition.SelfPartitioner");
//2、使用class
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,SelfPartitioner.class);
3.1 默认自定义分区器
这里补充说明一下默认分区器,在默认的逻辑里面首先就是对传入的key
进行非空判断。
如果key
是null
,那么默认分区器就会采用轮询的方式,将消息下发到topic
下的分区中。
如果key
不为null
,默认分区器根据自定义的散列算法,这个散列算法是针对key
的,这样就可以达到,将同一个key
的数据发到同一个分区中。但是这样还是存在问题的,什么问题?
采用这种方式有一个弊端,如果这个topic
下的分区数量是固定的,不会做增加的操作,这种算法是没有问题的,也能达到预想的效果。但是当分区增加时,虽然key
是不变的,但是key
算出来的分区就可能会发生变化。
如果在业务需求中必须需强固定分区且一个topic
下分区数变化,那么默认的分区器就不能使用,只能根据业务来自定义分区器啦。
4. 分区和消费者
4.1 分区个数对消费者的影响
在单个topic下可以创建多个分区,在一个消费者群组中,一个分区只能被一个消费者消费,那么消费者群组成员的个数就不能超过当前消费的topic下分区个数,当存在超过时,超过的消费者也是闲置。如果消费者在消费消息的时候耗时太长,就会导致消息阻塞,不能被及时的消费。这个时候怎么解决呢?如果再添加一个消费者群组呢?这样同样无法解决消息阻塞的问题,同时还导致了消息重复消费的问题。
解决方案:
- 增加分区个数,从而横向扩展消费者个数,让单位时间内消费的速度大于生产的速度,至少要持平。
- 消费者单线程拉取数据,将拉取的数据多线程消费。但是需要注意,如果单程拉取后直接交给子线程执行,而没等到子线程执行结束,就会出现消费端不断的拉取数据,一直开启新线程处理新的数据,消耗消费者大量的性能。即使使用了线程池,也可能会导致线程池过饱和,大量数据堆积在消费端。此时可以考虑线程池和
CountDownLatch
配合使用,数据交给线程池处理,用CountDownLatch
保证每次拉取的数据都已经处理结束。
4.2 分区再均衡策略
分区再均衡策略在Kafka中是非常重要的,保证了消费能均衡的被每个消费者消费,在什么情况下会触发分区再平衡策略呢?
- 当有消费者退出:比如消费者群组中的某一台机器宕机了,又或者是消费者过剩,需要削减消费者个数。此时此机器消费的分区就要分配到群组中的其他机器中。
- 当新增消费者:比如原来10个分区对应5个消费者,可以正常运行,后来业务量增加,5个消费者压力太大,此时就需要添加机器来分担消息的消费任务。
- 当新增分区:消费者群组需要将新增的分区均衡分配到现有消费者群组成员上。
需要注意的是分区再均衡的过程消费者群组会有短时间拉取不到数据的问题,导致短暂的消费延迟问题。
5. 源代码
码云(gitee):https://gitee.com/itcrud/itcrud-note/tree/master/itcrud-note-1-3
自定义序列化器+自定义反序列化器在com.itcrud.kafka.selfserializer
包内!!!
自定义分区器在com.itcrud.kafka.selfpartition
包内!!!