基本概念、基本配置都有啦,Kafka已经启动起来,接下来就是写一个HelloWorld,创建生产者和消费者连接Kafka,做消费和生产的动作啦。
1. 引入依赖
这里引入依赖需要小心,如果没有做相关的了解很容易引入jar包不可用,导致一直有错误。在下载Kafka的时候,Kafka的压缩包的名称都类似这样kafka_2.11-0.10.1.1
。其中2.11
是代表Kafka的版本,0.10.1.1
代表的是客户端的版本。所以在项目中引入Kafka的客户端jar的时候一定要和这个版本对应上,并不是越高越好。
maven的依赖引入如下:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.1.1</version>
</dependency>
2. 生产者和消费者
2.1 生产者
生产者是将消息send
到Kafka服务上的,连接Kafka服务首先就是服务的地址,然后就是需要发送消息的主题(topic
),需要发送的内容。但是还有一点需要注意的是加上一个Key,这个Key是给到分区器,由分区器根据相应的算法判断Key对应到那个分区,并将消息内容发送到对应的分区中,当然这个Key也可以为空。
生产者代码如下:
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class KafkaProducerClient {
public static void main(String[] args) throws Exception {
//1、构建参数
Properties properties = new Properties();
//1.1、Kafka服务地址
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
//1.2、指定Key的序列化器
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//1.3、指定value的序列化器
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//2、构建生产者和封装消息对象
try (KafkaProducer<String, String> producer = new KafkaProducer<>(properties)) {
ProducerRecord<String, String> record = new ProducerRecord<>
("first-topic", "first-key", "first-value");
producer.send(record);
}
}
代码的封装是很简洁的,做以下几点说明。
在Kafka里面生产者
KafkaProducer
是线程安全的,整个应用可以共用一个生产者。在封装
properties
的时候,其中的键都是用的Kafka自带的配置常量类,这个很人性化,进入这个配置类可以看到每个常量下面都有对应的解释,非常实用,为我们实际实用提供了很大的方便。如下://这是从ProducerConfig类里面复制出来的一段代码,上一行是常量,下一行是对这个常量的说明 public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer"; public static final String VALUE_SERIALIZER_CLASS_DOC = "Serializer class for value that implements the <code>Serializer</code> interface.";
在指定Kafka服务地址的时候,尽量写全,为什么呢?正常生产环境都是集群,如果只写其中一个,刚好这台机器宕机了,就会导致无法连接集群中其他的Kafka服务。
序列化器,是对键和数据序列化使用的,Kafka提供了7种序列化器,分别是:
ByteArraySerializer
、ByteBufferSerializer
、BytesSerializer
、LongSerializer
、IntegerSerializer
、DoubleSerializer
、StringSerializer
,这些序列化器都实现了Serializer
。具体选哪一种看实际的代码,不过最常用的毋庸置疑就是StringSerializer
。如果默认的序列化器不能满足实际需求,可以选择自定义或者选择其他的第三方组件。构建Kafka生产者、封装消息很简单,然后调用生产者的
send
方法,把封装好的消息发送出去即可。
2.2 生产者进阶
生产者的send
方法是有返回值的,进入方法可以看到,返回的对象是Future<RecordMetadata>
,从返回值就可以看出来发送消息是异步执行。返回的具体数据是RecordMetadata
。看下面的代码:
Future<RecordMetadata> future = producer.send(record);
RecordMetadata recordMetadata = future.get();
System.out.println("future-topic:" + recordMetadata.topic());
System.out.println("future-offset:" + recordMetadata.offset());
System.out.println("future-partition:" + recordMetadata.partition());
通过这个返回的数据可以判断消息是否发送成功、消息发送到的分区编号以及偏移量信息。
上面是一种方式获取响应,还可以通过回调的方式来获取响应信息,代码如下:
producer.send(record, (recordMetadata, exception) -> {
if (exception != null) {
exception.printStackTrace();
}
if (recordMetadata != null) {
System.out.println("callback-topic:" + recordMetadata.topic());
System.out.println("callback-offset:" + recordMetadata.offset());
System.out.println("callback-partition:" + recordMetadata.partition());
}
});
这段代码也是很好理解的,匿名实现Callback
接口,send
执行后会回调实现类的方法,将结果响应给调用者。
2.3 消费者
了解生产者,消费者就不难啦。直接上代码:
public class KafkaConsumerClient {
public static void main(String[] args) {
//1、构建配置信息
Properties properties = new Properties();
//1.1、配置Kafka服务地址
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
//1.2、配置分组信息
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "groupA");
//1.3、配置Key和Value的反序列化器
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
//2、构建消费者消费消息,输出消息
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties)) {
consumer.subscribe(Collections.singleton("retention-test-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(500);
for (ConsumerRecord<String, String> record : records) {
System.out.println("topic:" + record.topic());
System.out.println("key:" + record.key());
System.out.println("value:" + record.value());
System.out.println("partition:" + record.partition());
System.out.println("offset:" + record.offset());
}
}
}
}
}
相对于生产者,消费者要简单很多了,但还是要做以下简单的说明。
- 相对应生产者的线程安全,消费者
KafkaConsumer
就是线程不安全的,需要线程独享。 - Kafka服务地址,怎么配参考生产者即可。
- 分组信息需要说一下,在第一篇有说到消费者群组的概念,多个消费者可以组成一个群组(当然一个消费者也可以看成一个群组),消费同一个
topic
下的消息,需要注意的是一个topic
下的单个分区只能被消费者群组中的一个消费者消费,但是一个消费者可以消费多个分区。 - 反序列化器和序列化器是对应的,也是有7个,名称里面多了
De
,实现的是Deserializer
接口。如果默认的反序列化器不能满足实际需求,可以选择自定义或者选择其他的第三方组件。 - 一个消费者可以消费多个
topic
的消息,所以在消费者订阅的时候可以传入集合,集合中可以包含多个topic
。 - 循环执行
consumer
的poll
方法,不断到Kafka服务拉去数据。 - 在订阅主题的时候,Kafka不仅支持列表的方式,也提供了正则表达式的方式。
3. 生产者的配置详解
上面代码中写到生产者的配置类ProducerConfig
,配置类中的参数很多,在示例中只是配置了必须的几个参数,还有其他一些重要的参数,下面依次了解一下。
3.1 acks
指定了必须要有多少个分区副本收到消息,生产者才会认为写入消息是成功的,这个参数对消息丢失的可能性有重大影响。
- acks=0,生产者在写入消息之前不会等待任何来自服务器的响应,容易丢消息,但是吞吐量高。
- acks=1,默认配置,只要集群的首领节点收到消息,生产者会收到来自服务器的成功响应。如果消息无法到达首领节点(比如首领节点崩溃,新首领没有选举出来),生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。
- acks=all,只有当所有参与复制的节点都收到消息,生产者才会收到一个来自服务器的成功响应。延迟高。因为正常的生产环境部署都是集群,一个
topic
下的分区在集群中都会有副本,首领分区收到消息后会分发到各个副本分区完成复制动作。
3.2 buffer.memory
设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。如果数据产生速度大于向Broker发送的速度,消息会缓存起来,缓存的大小是通过此参数配置(缺省33554432 (32M)),如果最终导致生产者超过缓冲空间大小,producer会阻塞或者抛出异常。
3.3 max.block.ms
指定了在调用send
方法或者使用partitionsFor
方法获取元数据时生产者的阻塞时间。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法就会阻塞。在阻塞时间达到此参数值时(缺省60000ms),生产者会抛出超时异常。
3.4 retries
发送失败时,指定生产者可以重发消息的次数。默认情况下,生产者在每次重试之间等待100ms,可以通过参数retry.backoff.ms
参数来改变这个时间间隔。此参数缺省为0,不重发。
3.5 batch.size
当多个消息被发送同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。当批次内存被填满后,批次里的所有消息会被发送出去。但是生产者不一定都会等到批次被填满才发送,半满甚至只包含一个消息的批次也有可能被发送,具体发送时机是受linger.ms
影响。缺省16384(16k)。
3.6 linger.ms
指定了生产者在发送批次前等待更多消息加入批次的时间。它和batch.size
以先到者为先。也就是说,一旦我们获得消息的数量够batch.size
的数量了,他将会立即发送而不顾这项设置,然而如果我们获得消息字节数比batch.size
设置要小的多,我们需要看linger.ms
指定的时间,时间到就发送,不管批次是否已经满足batch.size
要求。linger.ms
默认值为0,即没有延迟。具体设置时间长短看服务器对消息延迟的要求,如果延迟要求不高且需要高吞吐量,那么这个值就可以稍微设置高一点。
3.7 compression.type
producer用于压缩数据的压缩类型。默认是无压缩(none)。正确的选项值是none、gzip、snappy。压缩最好用于批量处理,批量处理消息越多,压缩性能越好。snappy占用cpu少,提供较好的性能和可观的压缩比,如果比较关注性能和网络带宽,用这个。如果带宽紧张,性能要求不高,用gzip,压缩性能最好,提供更高的压缩比,但是会占用较多的cpu。
3.8 max.in.flight.requests.per.connection
指定了生产者在接收到服务器响应之前可以发送多个消息,值越高,占用的内存越大,当然也可以提升吞吐量。发生错误时,可能会造成数据的发送顺序改变,默认是5。如果需要保证消息在一个分区上的严格顺序,这个值应该设为1。不过这样会严重影响生产者的吞吐量。
3.9 request.timeout.ms
客户端将等待请求的响应的最大时间,如果在这个时间内没有收到响应,客户端将重发请求,超过重试次数将抛异常。
3.10 metadata.fetch.timeout.ms
是指我们所获取的一些元数据的第一个时间数据。元数据包含:topic,host,partitions。此项配置是指当等待元数据fetch成功完成所需要的时间,否则会抛出异常给客户端。
3.11 timeout.ms
此配置选项控制broker等待副本确认的最大时间。如果确认的请求数目在此时间内没有实现,则会返回一个错误。这个超时限制是以server端度量的,没有包含请求的网络延迟。这个参数和acks的配置相匹配。如果acks指定为all,那么这个超时时间就要相应的配置高点,否则副本还未复制完就会超时异常。
3.12 max.request.size
控制生产者发送请求最大大小。假设这个值为1M,如果一个请求里只有一个消息,那这个消息不能大于1M,如果一次请求是一个批次,该批次包含了1000条消息,那么1000条消息所占字节总数不能超过1M。
注意:broker具有自己对消息记录尺寸的覆盖,使用的是message.max.bytes
参数,在Kafka的server.properties
配置文件中配置,默认值是1000000个字节,大概900KB~1MB,如果max.request.size
的尺寸小于生产者的message.max.bytes
对应的尺寸,会导致消息被拒绝。但是这种不是一定的,因为Broker上的大小配置是针对单条消息的大小。
4. 消费者配置详解
消费者有很多属性可以设置,大部分都有合理的默认值,无需调整。有些参数可能对内存使用,性能和可靠性方面有较大影响。和生产者一样,消费者的所有配置也是有对应的配置类ConsumerConfig
。
4.1 fetch.min.bytes
每次fetch
请求时,server应该返回的最小字节数。如果没有足够的数据返回,请求会等待,直到足够的数据才会返回。缺省为1个字节。多消费者下,可以设大这个值,以降低broker的工作负载。
4.2 fetch.max.wait.ms
如果没有足够的数据能够满足fetch.min.bytes
,则此项配置是指在应答fetch
请求之前,server会阻塞的最大时间。缺省为500个毫秒。和上面的fetch.min.bytes
结合起来,要么满足数据的大小,要么满足时间,就看哪个条件先满足。
4.3 max.partition.fetch.bytes
指定了服务器从每个分区里返回给消费者的最大字节数,默认1MB。假设一个主题有20个分区和5个消费者,那么每个消费者至少要有4MB的可用内存来接收记录,而且一旦有消费者崩溃,这个内存还需更大。注意,这个参数要比服务器的message.max.bytes
更大,否则消费者可能无法读取消息。
4.4 session.timeout.ms
心跳超时时间,如果consumer在这段时间内没有发送心跳信息,则它会被认为挂掉了。默认3秒。
4.5 auto.offset.reset
消费者在读取一个没有偏移量的分区或者偏移量无效的情况下,如何处理。默认值是latest
,从最新的记录开始读取,另一个值是earliest
,表示消费者从起始位置读取分区的记录。
注意:默认值是latest,意思是说,在偏移量无效或者无偏移量的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录),可以先启动生产者,再启动消费者,观察到这种情况。
4.6 enable.auto.commit
默认值true
,表明消费者是否自动提交偏移。为了尽量避免重复数据和数据丢失,可以改为false
,自行控制何时提交。
4.7 partition.assignment.strategy
分区分配给消费者的策略。系统提供两种策略(range
和roundRobin
),默认为range
。允许自定义策略。
range
:把主题下的分区连续分配给消费者。
roundRobin
:把主题的分区循环分配给消费者。
自定义策略:自定义策略类继承AbstractPartitionAssignor
,然后再定义消费者的时候,将此参数加入。如下方式:
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,SelfPartitionAssignor.class)即可。
4.8 max.poll.records
控制每次poll方法返回的的记录数量。
5. 源代码
码云(gitee):https://gitee.com/itcrud/itcrud-note/tree/master/itcrud-note-1-3
具体代码在com.itcrud.kafka.base
包内!!!