• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

kafka知识点

武飞扬头像
zero _s
帮助1

一 、kafka结构

1.kafka 基础结构

kafka有两种消息队列的模式 即点对点 和主题模式;

为了方便扩展,并提高吞吐量,一个topic被切分成多个pertition
一个主机对应一个broker,每个break里面又被分成topic;

学新通

(1)Producer:消息生产者,就是向Kafka broker 发消息的客户端。

(2)Consumer:消息消费者,向Kafka broker 取消息的客户端。

(3)Consumer Group(CG):消费者组,由多个consumer 组成。消费者组内每个消
    费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不
    影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

(4)Broker:一台Kafka 服务器就是一个broker。一个集群由多个broker 组成。一个broker 可以容纳多个topic。

(5)Topic:可以理解为一个队列,生产者和消费者面向的都是一个topic。

(6)Partition:为了实现扩展性,一个非常大的topic 可以分布到多个broker(即服务器)上,一个topic 可以分为多个                
     partition,每个partition 是一个有序的队列。

(7)Replica:副本。一个topic 的每个分区都有若干个副本,一个Leader 和若干个Follower。

(8)Leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是Leader。

(9)Follower:每个分区多个副本中的“从”,实时从Leader 中同步数据,保持和Leader 数据的同步。Leader 发生故障时,某个 
      Follower 会成为新的Leader。

常见题目

2、常用配置

配置文件为config包下的service.property 文件

  1.  
    broker 的 全局唯一编号,不能重复 ,只能是数字 。
  2.  
    **broker.id=0**
  3.  
     
  4.  
    处理网络请求的线程数量
  5.  
    num.network.threads=3
  6.  
    用来处理磁盘IO 的线程数量
  7.  
    num.io.threads=8
  8.  
    发送套接字的缓冲区大小
  9.  
    socket.send.buffer.bytes=102400
  10.  
    接收套接字的缓冲区大小
  11.  
    socket.receive.buffer.bytes=102400
  12.  
    请求套接字的缓冲区大小
  13.  
    socket.request.max.bytes=104857600
  14.  
    kafka 运行日志数据存放的路径 ,路径不需要提前创建 kafka 自动帮你创建 ,可以
  15.  
    配置多个磁盘路径,路径与路径之间可以用分隔
  16.  
    log.dirs=/opt/module/kafka/ datas
  17.  
    topic 在当前 broker 上的分区个数
  18.  
    num. partitions=1
  19.  
    用来恢复和清理 data 下数据的线程数量
  20.  
    num.recovery.threads.per.data.dir=1
  21.  
    每个 topic 创建时的副本数,默认时 1 个副本
  22.  
    offsets.topic.replication.factor=1
  23.  
    segment 文件保留的最长时间,超时将被删除
  24.  
    log.retention.hours=168
  25.  
    每个 segment 文件的大小,默认最大 1G
  26.  
    log.segment.bytes=1073741824
  27.  
    检查过期数据的时间,默认 5 分钟检查一次是否数据过期
  28.  
    log.retention.check.interval.ms=300000
  29.  
    配置连接 Zookeeper 集群 地址 (在 zk 根目录下创建 kaf ka ,方便管理
  30.  
    zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181 /kafka

集群部署kafka的shell脚本

学新通

#! /bin/bash
case $1 in
"
for i in hadoop102 hadoop103 hadoop104
do
echo " 启动 $i Kafka
ssh $i "/opt/module/kafka/bin/kafka server start.sh
daemon /opt/module/kafka/config/server.properties"
done
"
for i in hadoop102 hadoop103 hadoop104
do
echo " 停止 $i Kafka
ssh $i "/opt/module/kafka/bin/kafka server stop.sh "
done
esac

二、kafka生产者

在消息发送的过程中,涉及到了两个线程——main 线程和Sender 线程。在main 线程中创建了一个双端队列RecordAccumulator。main 线程将消息发送给RecordAccumulator,Sender 线程不断从RecordAccumulator 中拉取消息发送到Kafka Broker。

学新通

buffer.memory  RecordAccumulator 缓冲区总大小, 默认 32 m ;
reques是批量应答的时候用到,broker最多缓存几个requst;

1、三种生产者发送消息的方式

发送消息的三种方式

<dependencies>
  <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>3.0.0</version>
  </dependency>
</dependencies>

1、普通异步发送的生产者代码,不关注返回结果

  1.  
    package com.atguigu.kafka.producer;
  2.  
     
  3.  
    import org.apache.kafka.clients.producer.KafkaProducer;
  4.  
    import org.apache.kafka.clients.producer.ProducerRecord;
  5.  
    import java.util.Properties;
  6.  
     
  7.  
    public class CustomProducer {
  8.  
    public static void main(String[] args) throws InterruptedException {
  9.  
     
  10.  
    // 1. 创建 kafka 生产者的配置对象
  11.  
    Properties properties = new Properties();
  12.  
     
  13.  
    // 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
  14.  
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
  15.  
     
  16.  
    // key,value 序列化(必须):key.serializer,value.serializer
  17.  
     
  18.  
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  19.  
     
  20.  
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  21.  
     
  22.  
    // 3. 创建 kafka 生产者对象
  23.  
    KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
  24.  
     
  25.  
    // 4. 调用 send 方法,发送消息
  26.  
    for (int i = 0; i < 5; i ) {
  27.  
    kafkaProducer.send(new ProducerRecord<>("first","atguigu " i));
  28.  
     
  29.  
     
  30.  
    // 异步发送 默认
  31.  
    // kafkaProducer.send(new ProducerRecord<>("first","kafka" i));
  32.  
     
  33.  
    // 同步发送
  34.  
    kafkaProducer.send(new ProducerRecord<>("first","kafka" i)).get();
  35.  
     
  36.  
    //send的返回值是一个Future对象,可以通过get方法取出其中存储的发送消息的一些元信息RecordMetadata
  37.  
    Future<RecordMetadata> sendResult = producer.send(record);
  38.  
    RecordMetadata recordMetadata = sendResult.get();
  39.  
     
  40.  
    }
  41.  
    // 5. 关闭资源
  42.  
    kafkaProducer.close();
  43.  
    }
  44.  
    }

2、带回调函数的异步发送,通过回调函数获取返回结果,且回调函数是异步的

回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

  1.  
    package com.atguigu.kafka.producer;
  2.  
    import org.apache.kafka.clients.producer.*;
  3.  
    import java.util.Properties;
  4.  
     
  5.  
    public class CustomProducerCallback {
  6.  
    public static void main(String[] args) throws InterruptedException {
  7.  
     
  8.  
    // 1. 创建 kafka 生产者的配置对象
  9.  
    Properties properties = new Properties();
  10.  
    // 2. 给 kafka 配置对象添加配置信息
  11.  
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
  12.  
     
  13.  
    // key,value 序列化(必须):key.serializer,value.serializer
  14.  
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  15.  
     
  16.  
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  17.  
     
  18.  
    // 3. 创建 kafka 生产者对象
  19.  
    KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
  20.  
     
  21.  
    // 4. 调用 send 方法,发送消息
  22.  
    for (int i = 0; i < 5; i ) {
  23.  
    // 添加回调
  24.  
    kafkaProducer.send(new ProducerRecord<>("first", "atguigu " i), new Callback() {
  25.  
     
  26.  
    // 该方法在 Producer 收到 ack 时调用,为异步调用
  27.  
    @Override
  28.  
    public void onCompletion(RecordMetadata metadata, Exception exception) {
  29.  
    if (exception == null) {
  30.  
     
  31.  
    // 没有异常,输出信息到控制台
  32.  
    System.out.println(" 主题: " metadata.topic() "->" "分区:" metadata.partition());
  33.  
    } else {
  34.  
    // 出现异常打印
  35.  
    exception.printStackTrace();
  36.  
    }
  37.  
    }
  38.  
    });
  39.  
    // 延迟一会会看到数据发往不同分区
  40.  
    Thread.sleep(2);
  41.  
    }
  42.  
    // 5. 关闭资源
  43.  
    kafkaProducer.close();
  44.  
    }
  45.  
    }

3、同步发送,关注返回结果,同步的会被阻塞

  1.  
    public static void sendMessageWithCareResult() throws ExecutionException, InterruptedException {
  2.  
     
  3.  
    //ProducerRecord的三个参数,topic,发送的key,发送的value
  4.  
    ProducerRecord<String,String> record = new ProducerRecord<>("user-info-topic","name","路飞");
  5.  
     
  6.  
    //send的返回值是一个Future对象,可以通过get方法取出其中存储的发送消息的一些元信息RecordMetadata
  7.  
    Future<RecordMetadata> sendResult = producer.send(record);
  8.  
    RecordMetadata recordMetadata = sendResult.get();
  9.  
     
  10.  
    //打印下发送消息的topic,partition,offset
  11.  
    System.out.println(String.format("发送结果:topic:%s,存储的partition:%s,offset:%s",
  12.  
    recordMetadata.topic(),
  13.  
    recordMetadata.partition(),
  14.  
    recordMetadata.offset()));
  15.  
     
  16.  
    producer.close();
  17.  
    }

学新通

通过topic里面的pertition分区来提高消息处理的效率

(1)便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果。 (2)提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。

在IDEA中全局查找(ctrl n)ProducerRecord类,在类中可以看到构造方法:

学新通

  1. // 指定数据发送到 1 号分区,key 为空(IDEA 中 ctrl p 查看参数)

kafkaProducer.send(new ProducerRecord<>("first", 1,"","atguigu " i)); 第一个参数是topic,第二个参数是pertition,第三个是消息messenger

  1. 没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值

// 依次指定 key 值为 a,b,f ,数据 key 的 hash 值与 3 个分区求余, 分别发往 1、2、0

kafkaProducer.send(new ProducerRecord<>("first", "a","atguigu " i), new Callback() { });

2、自定义分区器

定义类实现 Partitioner 接口来重写分区方法;

  1.  
    package com.atguigu.kafka.producer;
  2.  
    import org.apache.kafka.clients.producer.Partitioner;
  3.  
    import org.apache.kafka.common.Cluster;
  4.  
    import java.util.Map;
  5.  
     
  6.  
    /**
  7.  
    * 1. 实现接口 Partitioner
  8.  
    * 2. 实现 3 个方法:partition,close,configure
  9.  
    * 3. 编写 partition 方法,返回分区号
  10.  
    */
  11.  
    public class MyPartitioner implements Partitioner {
  12.  
     
  13.  
    /**
  14.  
    * 返回信息对应的分区
  15.  
    * @param topic 主题
  16.  
    * @param key 消息的 key
  17.  
    * @param keyBytes 消息的 key 序列化后的字节数组
  18.  
    * @param value 消息的 value
  19.  
    * @param valueBytes 消息的 value 序列化后的字节数组
  20.  
    * @param cluster 集群元数据可以查看分区信息
  21.  
    * @return
  22.  
    */
  23.  
    @Override
  24.  
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
  25.  
    // 获取消息
  26.  
    String msgValue = value.toString();
  27.  
     
  28.  
    // 创建 partition
  29.  
    int partition;
  30.  
     
  31.  
    // 判断消息是否包含 atguigu
  32.  
    if (msgValue.contains("atguigu")){
  33.  
    partition = 0;
  34.  
    }else {
  35.  
    partition = 1;
  36.  
    }
  37.  
     
  38.  
    // 返回分区号
  39.  
    return partition;
  40.  
    }
  41.  
     
  42.  
    // 关闭资源
  43.  
    @Override
  44.  
    public void close() {
  45.  
    }
  46.  
     
  47.  
    // 配置方法
  48.  
    @Override
  49.  
    public void configure(Map<String, ?> configs) {
  50.  
    }
  51.  
    }

使用分区器的方法,在生产者的配置中添加分区器参数。其实就是再properties里面引入分区的配置类

  1.  
    package com.atguigu.kafka.producer;
  2.  
    import org.apache.kafka.clients.producer.*;
  3.  
    import java.util.Properties;
  4.  
     
  5.  
    public class CustomProducerCallbackPartitions {
  6.  
    public static void main(String[] args) throws InterruptedException {
  7.  
     
  8.  
    Properties properties = new Properties();
  9.  
     
  10.  
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
  11.  
     
  12.  
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  13.  
     
  14.  
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  15.  
     
  16.  
    // 添加自定义分区器
  17.  
    properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.atguigu.kafka.producer.MyPartitioner");
  18.  
    KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
  19.  
     
  20.  
    for (int i = 0; i < 5; i ) {
  21.  
     
  22.  
    kafkaProducer.send(new ProducerRecord<>("first", "atguigu " i), new Callback() {
  23.  
    @Override
  24.  
    public void onCompletion(RecordMetadata metadata, Exception e) {
  25.  
    if (e == null){
  26.  
    System.out.println(" 主题: " metadata.topic() "->" "分区:" metadata.partition()
  27.  
    );
  28.  
     
  29.  
     
  30.  
    }else {
  31.  
    e.printStackTrace();
  32.  
    }
  33.  
    }
  34.  
    });
  35.  
     
  36.  
    }
  37.  
    kafkaProducer.close();
  38.  
    }
  39.  
    }

为了提高生成者的效率,还可以通过配置以下内容

// batch.size:批次大小,默认 16K properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);

// linger.ms:等待时间,默认 0 properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);

// RecordAccumulator:缓冲区大小,默认 32M:buffer.memory properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);

// compression.type:压缩,默认 none,可配置值 gzip、snappy、lz4 和 zstd

properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");

三、消息可靠性问题

1、可靠性和重复性

学新通

学新通

可靠性总结:

acks=0,生产者发送过来数据就不管了,可靠性差,效率高;
acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等;
acks=-1,生产者发送过来数据Leader和ISR队列里面所有Follwer应答,可靠性高,效率低;
在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1,一般用于传输和钱相关的数据,对可靠性要求比较高的场景。

同意是通过在properties里面配置ack的机制

  1.  
    // 设置 acks
  2.  
    properties.put(ProducerConfig.ACKS_CONFIG, "all");
  3.  
    // 重试次数 retries,默认是 int 最大值,2147483647
  4.  
    properties.put(ProducerConfig.RETRIES_CONFIG, 3);

数据重复性问题

• 至少一次(At Least Once)= ACK级别设置为-1   分区副本大于等于2   ISR里应答的最小副本数量大于等于2
• 最多一次(At Most Once)= ACK级别设置为0

• 总结:
At Least Once可以保证数据不丢失,但是不能保证数据不重复;
At Most Once可以保证数据不重复,但是不能保证数据不丢失。
• 精确一次(Exactly Once):对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不丢失。Kafka 0.11版本以后,引入了一项重大特性:幂等性和事务。

2、幂等性和事务

幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。

精确一次(Exactly Once) = 幂等性 至少一次( ack=-1 分区副本数>=2 ISR最小副本数量>=2) 。

重复数据的判断标准:具有<PID, Partition, SeqNumber>相同主键的消息提交时,Broker只会持久化一条。其中PID是Kafka每次重启都会分配一个新的;Partition 表示分区号;Sequence Number是单调自增的。

所以幂等性只能保证的是在单分区单会话内不重复。

开启幂等性参数 :enable.idempotence 默认为 true,false 关闭。

事务:

学新通

开启事务前必须开启幂等性

Kafka 的事务一共有如下 5 个 API

  1.  
    // 1 初始化事务
  2.  
    void initTransactions();
  3.  
     
  4.  
    // 2 开启事务
  5.  
    void beginTransaction() throws ProducerFencedException;
  6.  
     
  7.  
    // 3 在事务内提交已经消费的偏移量(主要用于消费者)
  8.  
    void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,String consumerGroupId) throws ProducerFencedException;
  9.  
     
  10.  
    // 4 提交事务
  11.  
    void commitTransaction() throws ProducerFencedException;
  12.  
     
  13.  
    // 5 放弃事务(类似于回滚事务的操作)
  14.  
     
  15.  
    void abortTransaction() throws ProducerFencedException;
  16.  
    单个 Producer,使用事务保证消息的仅一次发送
  17.  
     
  18.  
    package com.atguigu.kafka.producer;
  19.  
    import org.apache.kafka.clients.producer.KafkaProducer;
  20.  
    import org.apache.kafka.clients.producer.ProducerRecord;
  21.  
    import java.util.Properties;
  22.  
     
  23.  
    public class CustomProducerTransactions {
  24.  
     
  25.  
    public static void main(String[] args) throws InterruptedException {
  26.  
    // 1. 创建 kafka 生产者的配置对象
  27.  
    Properties properties = new Properties();
  28.  
     
  29.  
    // 2. 给 kafka 配置对象添加配置信息
  30.  
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
  31.  
     
  32.  
    // key,value 序列化
  33.  
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  34.  
     
  35.  
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  36.  
     
  37.  
    // 设置事务 id(必须),事务 id 任意起名
  38.  
    properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction_id_0");
  39.  
     
  40.  
    // 3. 创建 kafka 生产者对象
  41.  
    KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
  42.  
     
  43.  
    // 初始化事务
  44.  
    kafkaProducer.initTransactions();
  45.  
    // 开启事务
  46.  
    kafkaProducer.beginTransaction();
  47.  
     
  48.  
    try {
  49.  
    // 4. 调用 send 方法,发送消息
  50.  
    for (int i = 0; i < 5; i ) {
  51.  
    // 发送消息
  52.  
    kafkaProducer.send(new ProducerRecord<>("first", "atguigu " i));
  53.  
    }
  54.  
     
  55.  
    // int i = 1 / 0;
  56.  
    // 提交事务
  57.  
    kafkaProducer.commitTransaction();
  58.  
     
  59.  
    } catch (Exception e) {
  60.  
    // 终止事务
  61.  
    kafkaProducer.abortTransaction();
  62.  
    } finally {
  63.  
     
  64.  
    // 5. 关闭资源
  65.  
    kafkaProducer.close();
  66.  
    }
  67.  
    }
  68.  
    }

3、生产者消息的有序性

学新通

主要是开启幂等性后会通过其序号来落盘,如果失败,则会缓存起来知道正序的到来才落盘

四、broker工作流程

本章主要介绍kafka如何存储数据的

学新通

在zookeeper的服务端存储的Kafka相关信息:

1)/kafka/brokers/ids [0,1,2] 记录有哪些服务器

2)/kafka/brokers/topics/first/partitions/0/state {"leader":1 ,"isr":[1,0,2] } 记录谁是Leader,有哪些服务器可用
    Zookeeper中存储的Kafka 信息

3)/kafka/controller  {“brokerid”:0}  辅助选举Leader

ISR 是lead 跟follow里面通讯正常的节点

1、节点的服役和退役

2、kafka的副本

(1 Kafka 副本作用:提高数据可靠性 。
(2 Kafka 默认副本 1 个,生产环境一般配置为 2 个,保证数据可靠性;太多副本会增加 磁盘存储空间,增加网络上数据传输,
    降低效率。
(3 Kafka 中副本分为: Leader 和 Follower 。 Kafka 生产者只会把数据发往 Leader然后 Follower 找 Leader 
    进行同步数据。
(4 Kafka 分区中的所有副本统称为 AR Assigned Repllicas )。AR =ISR   OSR
  ISR表示 和 Leader 保持同步的 Follower 集合。 如果 Follower 长时间未向 Leader 发送通信请求或同步数据,
  则该Follower 将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s。Leader 发生故障之后,
  就会从ISR 中选举新的Leader。OSR,表示Follower 与Leader 副本同步时,延迟过多的副本。

3、lead选举流程

Kafka 集群中有一个broker 的Controller 会被选举为Controller Leader,负责管理集群
broker 的上下线,所有topic 的分区副本分配和Leader 选举等工作。
Controller 的信息同步工作是依赖于Zookeeper 的。

学新通

4. lead 和follow故障处理

1)Follower故障

(1) Follower发生故障后会被临时踢出ISR (2) 这个期间Leader和Follower继续接收数据 (3)待该Follower恢复后,Follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向Leader进行同步。 (4)等该Follower的LEO大于等于该Partition的HW,即Follower追上Leader之后,就可以重新加入ISR了。

2)Leader故障

(1) Leader发生故障之后,会从ISR中选出一个新的Leader (2)为保证多个副本之间的数据一致性,其余的Follower会先将各自的log文件高于HW的部分截掉,然后从新的Leader同步数据。

注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。

5. Leader Partition 负载平衡

正常情况下,Kafka本身会自动把Leader Partition均匀分散在各个机器上,来保证每台机器的读写吞吐量都是均匀的。
但是如果某些broker宕机,会导致Leader Partition过于集中在其他少部分几台broker上,这会导致少数几台broker的读写请求压力过高,
其他宕机的broker重启之后都是follower partition,读写请求很低,造成集群负载不均衡。

学新通

6. 生产经验——增加副本因子

 在生产环境当中,由于某个主题的重要等级需要提升,我们考虑增加副本。副本数的增加需要先制定计划,然后根据计划执行。

7.topic 里面文件存储的机制

Topic是逻辑上的概念,而partition是物理上的概念,每个partition对应于一个log文件,该log文件中存储的就是Producer生产的数 据。Producer生产的数据会被不断追加到该log文件末端,为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引机制, 将每个partition分为多个segment。每个segment包括:“.index”文件、“.log”文件和.timeindex等文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称 分区序号,例如:first-0。

学新通

说明:日志存储参数配置

参数描述 log.segment.bytes Kafka 中 log 日志是分成一块块存储的,此配置是指 log 日志划分成块的大小,默认值 1G。

log.index.interval.bytes 默认 4kb,kafka 里面每当写入了 4kb 大小的日志(.log),然后就往 index 文件里面记录一个索引。 稀疏索引。

8.文件清理策略

(1)检查是否过期的配置

     Kafka 中默认的日志保存时间为 7 天,可以通过调整如下参数修改保存时间。
      log.retention.hours,最低优先级小时,默认 7 天。
      log.retention.minutes,分钟。
      log.retention.ms,最高优先级毫秒。
      log.retention.check.interval.ms,负责设置检查周期,默认 5 分钟。


(2)过期清楚策略

    Kafka 中提供的日志清理策略有 delete 和 compact 两种。

    delete 日志删除:将过期数据删除
     log.cleanup.policy = delete 所有数据启用删除策略
    (1)基于时间:默认打开。以 segment 中所有记录中的最大时间戳作为该文件时间戳。
    (2)基于大小:默认关闭。超过设置的所有日志总大小,删除最早的 segment。
        log.retention.bytes,默认等于-1,表示无穷大。
        思考:如果一个 segment 中有一部分数据过期,一部分没有过期,怎么处理?


    compact日志压缩:对于相同key的不同value值,只保留最后一个版本。

        压缩后的offset可能是不连续的,比如上图中没有6,当从这些offset消费消息时,将会拿到比这个offset大
        的offset对应的消息,实际上会拿到offset为7的消息,并从这个位置开始消费。
        这种策略只适合特殊场景,比如消息的key是用户ID,value是用户的资料,通过这种压缩策略,整个消息
        集里就保存了所有用户最新的资料。
        log.cleanup.policy = compact 所有数据启用压缩策

读数据采用稀疏索引,可以快速定位要消费的数据,写入的时候是通过顺序写磁盘

Kafka 的 producer 生产数据,要写入到 log 文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能到 600M/s,而随机写只有 100K/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。

零拷贝: Kafka的数据加工处理操作交由Kafka生产者和Kafka消费者处理。Kafka Broker应用层不关心存储的数据,所以就不用 走应用层,传输效率高。

PageCache页缓存: Kafka重度依赖底层操作系统提供的PageCache功 能。当上层有写操作时,操作系统只是将数据写入 PageCache。当读操作发生时,先从PageCache中查找,如果找不到,再去磁盘中读取。实际上PageCache是把尽可能多的空闲内存 都当做了磁盘缓存

五、kafka的消费者

消费者组 :由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同。 • 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。 • 消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

1、基础消费者代码:

  1.  
    import org.apache.kafka.clients.consumer.ConsumerRecord;
  2.  
    import org.apache.kafka.clients.consumer.ConsumerRecords;
  3.  
    import org.apache.kafka.clients.consumer.KafkaConsumer;
  4.  
    import java.time.Duration;
  5.  
    import java.util.ArrayList;
  6.  
    import java.util.Properties;
  7.  
     
  8.  
    public class CustomConsumer {
  9.  
    public static void main(String[] args) {
  10.  
     
  11.  
    // 1.创建消费者的配置对象
  12.  
    Properties properties = new Properties();
  13.  
    // 2.给消费者配置对象添加参数
  14.  
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
  15.  
     
  16.  
    // 配置序列化 必须
  17.  
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  18.  
     
  19.  
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  20.  
     
  21.  
    // 配置消费者组(组名任意起名) 必须
  22.  
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
  23.  
     
  24.  
    // 创建消费者对象
  25.  
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
  26.  
     
  27.  
    // 注册要消费的主题(可以消费多个主题)
  28.  
    ArrayList<String> topics = new ArrayList<>();
  29.  
    topics.add("first");
  30.  
    kafkaConsumer.subscribe(topics);
  31.  
     
  32.  
    // 拉取数据打印
  33.  
    while (true) {
  34.  
    // 设置 1s 中消费一批数据
  35.  
    ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
  36.  
    // 打印消费到的数据
  37.  
    for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
  38.  
    System.out.println(consumerRecord);
  39.  
    }
  40.  
    }
  41.  
    }
  42.  
    }

如果是消费某个topic里面特定的partitions,需要在配置里面注明

// 消费某个主题的某个分区数据
 ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
 topicPartitions.add(new TopicPartition("first", 0));// 消费first主题下的0分区
 kafkaConsumer.assign(topicPartitions);

复制一份基础消费者的代码,在 IDEA 中同时启动,即可启动同一个消费者组中的两个消费者,代码与上面的基础消费者完全一样,消费者组名还是test 这样就能在test消费者组里面启动两个消费者

2、分区的分配以及再平衡

(1、一个consumer group中有多个consumer组成,一个 topic有多个partition组成,现在的问题是,到底由哪个consumer来消费哪个partition的数据。

(2、Kafka有四种主流的分区分配策略: Range、RoundRobin、Sticky、CooperativeSticky。可以通过配置参数partition.assignment.strategy,
 修改分区的分配策略。默认策略是Range   CooperativeSticky。Kafka可以同时使用多个分区分配策略。

学新通

常见参数的配置:

heartbeat.interval.ms : Kafka 消费者和 coordinator 之间的心跳时间,默认 3s。该条目的值必须小于session.timeout.ms,也不应该高于session.timeout.ms 的 1/3。

session.timeout.ms : Kafka 消费者和 coordinator 之间连接超时时间,默认 45s。超过该值,该消费者被移除,消费者组执行再平衡。

max.poll.interval.ms: 消费者处理消息的最大时长,默认是 5 分钟。超过该值,该消费者被移除,消费者组执行再平衡。

partition.assignment.strategy: 消 费 者 分 区 分 配 策 略 , 默 认 策 略 是 Range   CooperativeSticky。Kafka 可以同时使用多个分区分配策略。

可 以 选 择 的 策 略 包 括 : Range 、 RoundRobin 、 Sticky 、CooperativeSticky

3、四种分区分配策略

1)Range 分区策略原理

学新通

  1. RoundRobin 分区策略原理

    RoundRobin 轮询分区策略,是把所有的 partition 和所有的consumer 都列出来,然后按照 hashcode 进行排序,最后 通过轮询算法来分配 partition 给到各个消费者。

// 修改分区分配策略
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");

学新通

0 号消费者宕机后,0 号消费者的任务会按照 RoundRobin 的方式,把数据轮询分成 0 、6 和 3 号分区数据,
 分别由 1 号消费者或者 2 号消费者消费。

3)Sticky 以及再平衡

  粘性分区定义:可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前,考虑上一次分配的结果,
  尽量少的调整分配的变动,可以节省大量的开销。粘性分区是 Kafka 从 0.11.x 版本开始引入这种分配策略,
  首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化。
  1.  
    // 修改分区分配策略
  2.  
    ArrayList<String> startegys = new ArrayList<>();
  3.  
    startegys.add("org.apache.kafka.clients.consumer.StickyAssignor");
  4.  
    properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, startegys);

4、offset的位移

消费完一批数据后,需要提交offset,可以设置自动提交和手动提交;默认是自动提交

学新通

学新通

设置自动自交offset和提交时间;这种方式时间不好控制

// 是否自动提交 offset
 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
 // 提交 offset 的时间周期 1000ms,默认 5s
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);

同步提交offset和异步提交:

  虽然自动提交offset十分简单便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。因
  此Kafka还提供了手动提交offset的API。手动提交offset的方法有两种:分别是commitSync(同步提交)
  和commitAsync(异步提交)。两者的相同点是,都会将本次提交的一批数据最高的偏移量提交;不同点是,
  同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);
  而异步提交则没有失败重试机制,故有可能提交失败。

      • commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据。
      • commitAsync(异步提交) :发送完提交offset请求后,就开始消费下一批数据了。
  1.  
    package com.atguigu.kafka.consumer;
  2.  
    import org.apache.kafka.clients.consumer.ConsumerConfig;
  3.  
    import org.apache.kafka.clients.consumer.ConsumerRecord;
  4.  
    import org.apache.kafka.clients.consumer.ConsumerRecords;
  5.  
    import org.apache.kafka.clients.consumer.KafkaConsumer;
  6.  
    import java.util.Arrays;
  7.  
    import java.util.Properties;
  8.  
     
  9.  
    public class CustomConsumerByHandSync {
  10.  
    public static void main(String[] args) {
  11.  
     
  12.  
    // 1. 创建 kafka 消费者配置类
  13.  
    Properties properties = new Properties();
  14.  
    // 2. 添加配置参数
  15.  
    // 添加连接
  16.  
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
  17.  
     
  18.  
    // 配置序列化
  19.  
     
  20.  
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
  21.  
    "org.apache.kafka.common.serialization.StringDeserializer");
  22.  
     
  23.  
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
  24.  
    "org.apache.kafka.common.serialization.StringDeserializer");
  25.  
     
  26.  
    // 配置消费者组
  27.  
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
  28.  
     
  29.  
    // 是否自动提交 offset
  30.  
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
  31.  
     
  32.  
    //3. 创建 kafka 消费者
  33.  
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
  34.  
     
  35.  
    //4. 设置消费主题 形参是列表
  36.  
    consumer.subscribe(Arrays.asList("first"));
  37.  
    //5. 消费数据
  38.  
    while (true){
  39.  
    // 读取消息
  40.  
    ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
  41.  
     
  42.  
    // 输出消息
  43.  
    for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
  44.  
    System.out.println(consumerRecord.value());
  45.  
    }
  46.  
     
  47.  
    // 同步提交 offset
  48.  
    consumer.commitSync();
  49.  
     
  50.  
    // 异步提交 offset
  51.  
    consumer.commitAsync();
  52.  
    }
  53.  
    }
  54.  
    }

虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此 吞吐量会受到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式

5、指定offser消费

auto.offset.reset = earliest | latest | none 默认是 latest。

当 Kafka 中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量时(例如该数据已被删除),该怎么办?

(1)earliest:自动将偏移量重置为最早的偏移量,--from-beginning。
(2)latest(默认值):自动将偏移量重置为最新偏移量。
(3)none:如果未找到消费者组的先前偏移量,则向消费者抛出异常。
(4)任意指定 offset 位移开始消费
  1.  
    // 2 订阅一个主题
  2.  
    ArrayList<String> topics = new ArrayList<>();
  3.  
    topics.add("first");
  4.  
    kafkaConsumer.subscribe(topics);
  5.  
     
  6.  
    Set<TopicPartition> assignment= new HashSet<>();
  7.  
    while (assignment.size() == 0) {
  8.  
    kafkaConsumer.poll(Duration.ofSeconds(1));
  9.  
    // 获取消费者分区分配信息(有了分区分配信息才能开始消费)
  10.  
    assignment = kafkaConsumer.assignment();
  11.  
    }
  12.  
     
  13.  
    // 遍历所有分区,并指定 offset 从 1700 的位置开始消费
  14.  
    for (TopicPartition tp: assignment) {
  15.  
    kafkaConsumer.seek(tp, 1700);
  16.  
    }
  17.  
     
  18.  
    // 3 消费该主题数据
  19.  
    while (true) {
  20.  
    ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
  21.  
    for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
  22.  
    System.out.println(consumerRecord);
  23.  
    }
  24.  
    }

(5)指定时间消费

需求:在生产环境中,会遇到最近消费的几个小时数据异常,想重新按照时间消费。例如要求按照时间消费前一天的数据,怎么处理?
  1.  
    package com.atguigu.kafka.consumer;
  2.  
    import org.apache.kafka.clients.consumer.*;
  3.  
    import org.apache.kafka.common.TopicPartition;
  4.  
    import org.apache.kafka.common.serialization.StringDeserializer;
  5.  
    import java.time.Duration;
  6.  
    import java.util.*;
  7.  
     
  8.  
    public class CustomConsumerForTime {
  9.  
    public static void main(String[] args) {
  10.  
     
  11.  
    // 0 配置信息
  12.  
     
  13.  
    Properties properties = new Properties();
  14.  
    // 连接
  15.  
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
  16.  
     
  17.  
    // key value 反序列化
  18.  
     
  19.  
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  20.  
     
  21.  
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  22.  
     
  23.  
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test2");
  24.  
     
  25.  
    // 1 创建一个消费者
  26.  
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
  27.  
     
  28.  
    // 2 订阅一个主题
  29.  
    ArrayList<String> topics = new ArrayList<>();
  30.  
    topics.add("first");
  31.  
    kafkaConsumer.subscribe(topics);
  32.  
     
  33.  
    Set<TopicPartition> assignment = new HashSet<>();
  34.  
     
  35.  
    while (assignment.size() == 0) {
  36.  
    kafkaConsumer.poll(Duration.ofSeconds(1));
  37.  
    // 获取消费者分区分配信息(有了分区分配信息才能开始消费)
  38.  
    assignment = kafkaConsumer.assignment();
  39.  
    }
  40.  
     
  41.  
    HashMap<TopicPartition, Long> timestampToSearch = new HashMap<>();
  42.  
     
  43.  
    // 封装集合存储,每个分区对应一天前的数据
  44.  
    for (TopicPartition topicPartition : assignment) {
  45.  
    timestampToSearch.put(topicPartition, System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
  46.  
    }
  47.  
     
  48.  
    // 获取从 1 天前开始消费的每个分区的 offset
  49.  
    Map<TopicPartition, OffsetAndTimestamp> offsets = kafkaConsumer.offsetsForTimes(timestampToSearch);
  50.  
     
  51.  
    // 遍历每个分区,对每个分区设置消费时间。
  52.  
    for (TopicPartition topicPartition : assignment) {
  53.  
    OffsetAndTimestamp offsetAndTimestamp = offsets.get(topicPartition);
  54.  
    // 根据时间指定开始消费的位置
  55.  
    if (offsetAndTimestamp != null){
  56.  
    kafkaConsumer.seek(topicPartition, offsetAndTimestamp.offset());
  57.  
    }
  58.  
    }
  59.  
     
  60.  
    // 3 消费该主题数据
  61.  
    while (true) {
  62.  
    ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
  63.  
     
  64.  
    for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
  65.  
    System.out.println(consumerRecord);
  66.  
    }
  67.  
    }
  68.  
    }
  69.  
    }

6、重复消费和漏消费问题

重复消费:已经消费了数据,但是 offset 没提交。

漏消费:先提交 offset 后消费,有可能会造成数据的漏消费。

学新通

如果想完成Consumer端的精准一次性消费,那么需要Kafka消费端将消费过程和提交offset过程做原子绑定。此时我们需要将Kafka的offset保存到支持事务的自定义介质(比 如MySQL)。这部分知识会在后续项目部分涉及。

7. 数据积压问题

学新通

六、kafka集成其他框架

1.kafka与flink集成

kafka作为生产者发送消息到flink,也可以kafka作为消费者从flink里面消费数据``

  1.  
    org.apache.flink flink-java 1.13.1
  2.  
     
  3.  
    <dependency>
  4.  
    <groupId>org.apache.flink</groupId>
  5.  
    <artifactId>flink-streaming-java_2.12</artifactId>
  6.  
    <version>1.13.1</version>
  7.  
    </dependency>
  8.  
     
  9.  
    <dependency>
  10.  
    <groupId>org.apache.flink</groupId>
  11.  
    <artifactId>flink-clients_2.12</artifactId>
  12.  
    <version>1.13.1</version>
  13.  
    </dependency>
  14.  
     
  15.  
    <dependency>
  16.  
    <groupId>org.apache.flink</groupId>
  17.  
    <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
  18.  
    <version>1.7.2</version>
  19.  
    </dependency>
  20.  
     
  21.  
    </dependencies>
生产者
  1.  
    public class MyFlinkKafkaProducer1 {
  2.  
     
  3.  
    public static void main(String[] args) throws Exception {
  4.  
    // 准备环境
  5.  
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  6.  
    env.setParallelism(3);
  7.  
    // 准备数据源
  8.  
    ArrayList<String> strings = new ArrayList<>();
  9.  
    strings.add("test1");
  10.  
    strings.add("test2");
  11.  
    DataStreamSource<String> stream = env.fromCollection(strings);
  12.  
    //创建kafka生产者 -- 未完善
  13.  
    Properties properties = new Properties();
  14.  
     
  15.  
    FlinkKafkaProducer<String> first = new FlinkKafkaProducer<>("first", new SimpleStringSchema(), properties);
  16.  
    // 添加数据源
  17.  
    stream.addSink((SinkFunction<String>) first);
  18.  
    // 执行
  19.  
    env.execute();
  20.  
     
  21.  
    }
  22.  
    }
消费者
  1.  
    public class MyFlinkKafkaConsumer {
  2.  
     
  3.  
    public static void main(String[] args) {
  4.  
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  5.  
    env.setParallelism(3);
  6.  
     
  7.  
    Properties properties = new Properties();
  8.  
     
  9.  
    FlinkKafkaConsumer<String> first =
  10.  
    new FlinkKafkaConsumer<>("first", new SimpleStringSchema(), properties);
  11.  
     
  12.  
    }
  13.  
    }

 2、springboot与kafka的集成

org.springframework.kafka spring-kafka 

  1.  
    #配置连接的集群
  2.  
    spring.kafka.bootstrap-servers=ip:socket,ip1:socket
  3.  
    # key value 的序列化
  4.  
    spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
  5.  
    spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

生产者

  1.  
    @RestController
  2.  
    @RequestMapping("api/v1/kafka")
  3.  
    public class ProducerController {
  4.  
     
  5.  
    @Autowired
  6.  
    KafkaTemplate<String,String> kafkaTemplate;
  7.  
     
  8.  
    @GetMapping("/producedata")
  9.  
    public String data(String msg){
  10.  
    kafkaTemplate.send("first",msg);
  11.  
    return "true";
  12.  
    }
  13.  
    }

消费者

  1.  
    # 反序列化
  2.  
    spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
  3.  
    spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
  4.  
     
  5.  
    #消费者组id
  6.  
    spring.kafka.consumer.group-id=testcon
  1.  
    @Configuration
  2.  
    public class ConsumerController {
  3.  
     
  4.  
    @Autowired
  5.  
    KafkaTemplate<String,String> kafkaTemplate;
  6.  
     
  7.  
    @KafkaListener(topics = "first")
  8.  
    public void getData(String msg){
  9.  
    System.out.println(msg);
  10.  
    }
  11.  
    }`

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhigiebi
系列文章
更多 icon
同类精品
更多 icon
继续加载