Kafka学习笔记(三):Kafka的第一个HelloWorld


基本概念、基本配置都有啦,Kafka已经启动起来,接下来就是写一个HelloWorld,创建生产者和消费者连接Kafka,做消费和生产的动作啦。

1. 引入依赖

这里引入依赖需要小心,如果没有做相关的了解很容易引入jar包不可用,导致一直有错误。在下载Kafka的时候,Kafka的压缩包的名称都类似这样kafka_2.11-0.10.1.1。其中2.11是代表Kafka的版本,0.10.1.1代表的是客户端的版本。所以在项目中引入Kafka的客户端jar的时候一定要和这个版本对应上,并不是越高越好。

maven的依赖引入如下:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.10.1.1</version>
</dependency>

2. 生产者和消费者

2.1 生产者

生产者是将消息send到Kafka服务上的,连接Kafka服务首先就是服务的地址,然后就是需要发送消息的主题(topic),需要发送的内容。但是还有一点需要注意的是加上一个Key,这个Key是给到分区器,由分区器根据相应的算法判断Key对应到那个分区,并将消息内容发送到对应的分区中,当然这个Key也可以为空。

生产者代码如下:

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class KafkaProducerClient {

    public static void main(String[] args) throws Exception {
        //1、构建参数
        Properties properties = new Properties();
        //1.1、Kafka服务地址
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        //1.2、指定Key的序列化器
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        //1.3、指定value的序列化器
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        //2、构建生产者和封装消息对象
        try (KafkaProducer<String, String> producer = new KafkaProducer<>(properties)) {
            ProducerRecord<String, String> record = new ProducerRecord<>
                    ("first-topic", "first-key", "first-value");
             producer.send(record);
        }
    }

代码的封装是很简洁的,做以下几点说明。

  • 在Kafka里面生产者KafkaProducer是线程安全的,整个应用可以共用一个生产者。

  • 在封装properties的时候,其中的键都是用的Kafka自带的配置常量类,这个很人性化,进入这个配置类可以看到每个常量下面都有对应的解释,非常实用,为我们实际实用提供了很大的方便。如下:

    //这是从ProducerConfig类里面复制出来的一段代码,上一行是常量,下一行是对这个常量的说明
    public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer";
    public static final String VALUE_SERIALIZER_CLASS_DOC = "Serializer class for value that implements the <code>Serializer</code> interface.";
  • 在指定Kafka服务地址的时候,尽量写全,为什么呢?正常生产环境都是集群,如果只写其中一个,刚好这台机器宕机了,就会导致无法连接集群中其他的Kafka服务。

  • 序列化器,是对键和数据序列化使用的,Kafka提供了7种序列化器,分别是:ByteArraySerializerByteBufferSerializerBytesSerializerLongSerializerIntegerSerializerDoubleSerializerStringSerializer,这些序列化器都实现了Serializer。具体选哪一种看实际的代码,不过最常用的毋庸置疑就是StringSerializer。如果默认的序列化器不能满足实际需求,可以选择自定义或者选择其他的第三方组件。

  • 构建Kafka生产者、封装消息很简单,然后调用生产者的send方法,把封装好的消息发送出去即可。

2.2 生产者进阶

生产者的send方法是有返回值的,进入方法可以看到,返回的对象是Future<RecordMetadata>,从返回值就可以看出来发送消息是异步执行。返回的具体数据是RecordMetadata。看下面的代码:

Future<RecordMetadata> future = producer.send(record);
RecordMetadata recordMetadata = future.get();
System.out.println("future-topic:" + recordMetadata.topic());
System.out.println("future-offset:" + recordMetadata.offset());
System.out.println("future-partition:" + recordMetadata.partition());

通过这个返回的数据可以判断消息是否发送成功、消息发送到的分区编号以及偏移量信息。

上面是一种方式获取响应,还可以通过回调的方式来获取响应信息,代码如下:

producer.send(record, (recordMetadata, exception) -> {
    if (exception != null) {
        exception.printStackTrace();
    }
    if (recordMetadata != null) {
        System.out.println("callback-topic:" + recordMetadata.topic());
        System.out.println("callback-offset:" + recordMetadata.offset());
        System.out.println("callback-partition:" + recordMetadata.partition());
    }
});

这段代码也是很好理解的,匿名实现Callback接口,send执行后会回调实现类的方法,将结果响应给调用者。

2.3 消费者

了解生产者,消费者就不难啦。直接上代码:

public class KafkaConsumerClient {

    public static void main(String[] args) {
        //1、构建配置信息
        Properties properties = new Properties();
        //1.1、配置Kafka服务地址
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        //1.2、配置分组信息
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "groupA");
        //1.3、配置Key和Value的反序列化器
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        //2、构建消费者消费消息,输出消息
        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties)) {
            consumer.subscribe(Collections.singleton("retention-test-topic"));
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(500);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("topic:" + record.topic());
                    System.out.println("key:" + record.key());
                    System.out.println("value:" + record.value());
                    System.out.println("partition:" + record.partition());
                    System.out.println("offset:" + record.offset());
                }
            }
        }
    }
}

相对于生产者,消费者要简单很多了,但还是要做以下简单的说明。

  • 相对应生产者的线程安全,消费者KafkaConsumer就是线程不安全的,需要线程独享。
  • Kafka服务地址,怎么配参考生产者即可。
  • 分组信息需要说一下,在第一篇有说到消费者群组的概念,多个消费者可以组成一个群组(当然一个消费者也可以看成一个群组),消费同一个topic下的消息,需要注意的是一个topic下的单个分区只能被消费者群组中的一个消费者消费,但是一个消费者可以消费多个分区。
  • 反序列化器和序列化器是对应的,也是有7个,名称里面多了De,实现的是Deserializer接口。如果默认的反序列化器不能满足实际需求,可以选择自定义或者选择其他的第三方组件。
  • 一个消费者可以消费多个topic的消息,所以在消费者订阅的时候可以传入集合,集合中可以包含多个topic
  • 循环执行consumerpoll方法,不断到Kafka服务拉去数据。
  • 在订阅主题的时候,Kafka不仅支持列表的方式,也提供了正则表达式的方式。

3. 生产者的配置详解

上面代码中写到生产者的配置类ProducerConfig,配置类中的参数很多,在示例中只是配置了必须的几个参数,还有其他一些重要的参数,下面依次了解一下。

3.1 acks

指定了必须要有多少个分区副本收到消息,生产者才会认为写入消息是成功的,这个参数对消息丢失的可能性有重大影响。

  • acks=0,生产者在写入消息之前不会等待任何来自服务器的响应,容易丢消息,但是吞吐量高。
  • acks=1,默认配置,只要集群的首领节点收到消息,生产者会收到来自服务器的成功响应。如果消息无法到达首领节点(比如首领节点崩溃,新首领没有选举出来),生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。
  • acks=all,只有当所有参与复制的节点都收到消息,生产者才会收到一个来自服务器的成功响应。延迟高。因为正常的生产环境部署都是集群,一个topic下的分区在集群中都会有副本,首领分区收到消息后会分发到各个副本分区完成复制动作。

3.2 buffer.memory

设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。如果数据产生速度大于向Broker发送的速度,消息会缓存起来,缓存的大小是通过此参数配置(缺省33554432 (32M)),如果最终导致生产者超过缓冲空间大小,producer会阻塞或者抛出异常。

3.3 max.block.ms

指定了在调用send方法或者使用partitionsFor方法获取元数据时生产者的阻塞时间。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法就会阻塞。在阻塞时间达到此参数值时(缺省60000ms),生产者会抛出超时异常。

3.4 retries

发送失败时,指定生产者可以重发消息的次数。默认情况下,生产者在每次重试之间等待100ms,可以通过参数retry.backoff.ms参数来改变这个时间间隔。此参数缺省为0,不重发。

3.5 batch.size

当多个消息被发送同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。当批次内存被填满后,批次里的所有消息会被发送出去。但是生产者不一定都会等到批次被填满才发送,半满甚至只包含一个消息的批次也有可能被发送,具体发送时机是受linger.ms影响。缺省16384(16k)。

3.6 linger.ms

指定了生产者在发送批次前等待更多消息加入批次的时间。它和batch.size以先到者为先。也就是说,一旦我们获得消息的数量够batch.size的数量了,他将会立即发送而不顾这项设置,然而如果我们获得消息字节数比batch.size设置要小的多,我们需要看linger.ms指定的时间,时间到就发送,不管批次是否已经满足batch.size要求。linger.ms默认值为0,即没有延迟。具体设置时间长短看服务器对消息延迟的要求,如果延迟要求不高且需要高吞吐量,那么这个值就可以稍微设置高一点。

3.7 compression.type

producer用于压缩数据的压缩类型。默认是无压缩(none)。正确的选项值是none、gzip、snappy。压缩最好用于批量处理,批量处理消息越多,压缩性能越好。snappy占用cpu少,提供较好的性能和可观的压缩比,如果比较关注性能和网络带宽,用这个。如果带宽紧张,性能要求不高,用gzip,压缩性能最好,提供更高的压缩比,但是会占用较多的cpu。

3.8 max.in.flight.requests.per.connection

指定了生产者在接收到服务器响应之前可以发送多个消息,值越高,占用的内存越大,当然也可以提升吞吐量。发生错误时,可能会造成数据的发送顺序改变,默认是5。如果需要保证消息在一个分区上的严格顺序,这个值应该设为1。不过这样会严重影响生产者的吞吐量。

3.9 request.timeout.ms

客户端将等待请求的响应的最大时间,如果在这个时间内没有收到响应,客户端将重发请求,超过重试次数将抛异常。

3.10 metadata.fetch.timeout.ms

是指我们所获取的一些元数据的第一个时间数据。元数据包含:topic,host,partitions。此项配置是指当等待元数据fetch成功完成所需要的时间,否则会抛出异常给客户端。

3.11 timeout.ms

此配置选项控制broker等待副本确认的最大时间。如果确认的请求数目在此时间内没有实现,则会返回一个错误。这个超时限制是以server端度量的,没有包含请求的网络延迟。这个参数和acks的配置相匹配。如果acks指定为all,那么这个超时时间就要相应的配置高点,否则副本还未复制完就会超时异常。

3.12 max.request.size

控制生产者发送请求最大大小。假设这个值为1M,如果一个请求里只有一个消息,那这个消息不能大于1M,如果一次请求是一个批次,该批次包含了1000条消息,那么1000条消息所占字节总数不能超过1M。

注意:broker具有自己对消息记录尺寸的覆盖,使用的是message.max.bytes参数,在Kafka的server.properties配置文件中配置,默认值是1000000个字节,大概900KB~1MB,如果max.request.size的尺寸小于生产者的message.max.bytes对应的尺寸,会导致消息被拒绝。但是这种不是一定的,因为Broker上的大小配置是针对单条消息的大小。

4. 消费者配置详解

消费者有很多属性可以设置,大部分都有合理的默认值,无需调整。有些参数可能对内存使用,性能和可靠性方面有较大影响。和生产者一样,消费者的所有配置也是有对应的配置类ConsumerConfig

4.1 fetch.min.bytes

每次fetch请求时,server应该返回的最小字节数。如果没有足够的数据返回,请求会等待,直到足够的数据才会返回。缺省为1个字节。多消费者下,可以设大这个值,以降低broker的工作负载。

4.2 fetch.max.wait.ms

如果没有足够的数据能够满足fetch.min.bytes,则此项配置是指在应答fetch请求之前,server会阻塞的最大时间。缺省为500个毫秒。和上面的fetch.min.bytes结合起来,要么满足数据的大小,要么满足时间,就看哪个条件先满足。

4.3 max.partition.fetch.bytes

指定了服务器从每个分区里返回给消费者的最大字节数,默认1MB。假设一个主题有20个分区和5个消费者,那么每个消费者至少要有4MB的可用内存来接收记录,而且一旦有消费者崩溃,这个内存还需更大。注意,这个参数要比服务器的message.max.bytes更大,否则消费者可能无法读取消息。

4.4 session.timeout.ms

心跳超时时间,如果consumer在这段时间内没有发送心跳信息,则它会被认为挂掉了。默认3秒。

4.5 auto.offset.reset

消费者在读取一个没有偏移量的分区或者偏移量无效的情况下,如何处理。默认值是latest,从最新的记录开始读取,另一个值是earliest,表示消费者从起始位置读取分区的记录。

注意:默认值是latest,意思是说,在偏移量无效或者无偏移量的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录),可以先启动生产者,再启动消费者,观察到这种情况。

4.6 enable.auto.commit

默认值true,表明消费者是否自动提交偏移。为了尽量避免重复数据和数据丢失,可以改为false,自行控制何时提交。

4.7 partition.assignment.strategy

分区分配给消费者的策略。系统提供两种策略(rangeroundRobin),默认为range。允许自定义策略。

range:把主题下的分区连续分配给消费者。

roundRobin:把主题的分区循环分配给消费者。

自定义策略:自定义策略类继承AbstractPartitionAssignor,然后再定义消费者的时候,将此参数加入。如下方式:

properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,SelfPartitionAssignor.class)即可。

4.8 max.poll.records

控制每次poll方法返回的的记录数量。

5. 源代码

码云(gitee):https://gitee.com/itcrud/itcrud-note/tree/master/itcrud-note-1-3

具体代码在com.itcrud.kafka.base包内!!!


文章作者: 程序猿洞晓
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 程序猿洞晓 !
评论
 上一篇
Kafka学习笔记(四):自定义序列化器、反序列化器、分区器以及分区再均衡策略 Kafka学习笔记(四):自定义序列化器、反序列化器、分区器以及分区再均衡策略
序列化器、反序列化器以及分区器都有所了解,Kafka客户端提供默认的7种方式基本够用了,但是对于一个大型复杂的项目群里面,总是会有特殊的要求,这个时候就需要自己去定义相关的序列化器和反序列化器。另外就是自定义分区器,这个应用的可能会多一点,根据实际业务将消息发送到指定的分区中。
2019-06-15
下一篇 
Kafka学习笔记(二):Kafka之常用配置文件核心内容 Kafka学习笔记(二):Kafka之常用配置文件核心内容
上一篇了解了Kafka的基本概念,主要是日志、主题、分区、生产者、消费者、消费者群组、偏移量、消息和批次以及Broker和集群。这些基本概念了解以后就是上手自己下载Kafka,安装起来。安装Kafka很简单,……,同样Kafka启动使用默认配置也可以,但是为了学习Kafka,还是需要了解配置文件中各项内容的含义以及在服务中起到的作用。
2019-05-28
  目录