kafka知识点
一 、kafka结构
1.kafka 基础结构
kafka有两种消息队列的模式 即点对点 和主题模式;
为了方便扩展,并提高吞吐量,一个topic被切分成多个pertition
一个主机对应一个broker,每个break里面又被分成topic;
(1)Producer:消息生产者,就是向Kafka broker 发消息的客户端。
(2)Consumer:消息消费者,向Kafka broker 取消息的客户端。
(3)Consumer Group(CG):消费者组,由多个consumer 组成。消费者组内每个消
费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不
影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
(4)Broker:一台Kafka 服务器就是一个broker。一个集群由多个broker 组成。一个broker 可以容纳多个topic。
(5)Topic:可以理解为一个队列,生产者和消费者面向的都是一个topic。
(6)Partition:为了实现扩展性,一个非常大的topic 可以分布到多个broker(即服务器)上,一个topic 可以分为多个
partition,每个partition 是一个有序的队列。
(7)Replica:副本。一个topic 的每个分区都有若干个副本,一个Leader 和若干个Follower。
(8)Leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是Leader。
(9)Follower:每个分区多个副本中的“从”,实时从Leader 中同步数据,保持和Leader 数据的同步。Leader 发生故障时,某个
Follower 会成为新的Leader。
2、常用配置
配置文件为config包下的service.property 文件
-
broker 的 全局唯一编号,不能重复 ,只能是数字 。
-
**broker.id=0**
-
-
处理网络请求的线程数量
-
num.network.threads=3
-
用来处理磁盘IO 的线程数量
-
num.io.threads=8
-
发送套接字的缓冲区大小
-
socket.send.buffer.bytes=102400
-
接收套接字的缓冲区大小
-
socket.receive.buffer.bytes=102400
-
请求套接字的缓冲区大小
-
socket.request.max.bytes=104857600
-
kafka 运行日志数据存放的路径 ,路径不需要提前创建 kafka 自动帮你创建 ,可以
-
配置多个磁盘路径,路径与路径之间可以用分隔
-
log.dirs=/opt/module/kafka/ datas
-
topic 在当前 broker 上的分区个数
-
num. partitions=1
-
用来恢复和清理 data 下数据的线程数量
-
num.recovery.threads.per.data.dir=1
-
每个 topic 创建时的副本数,默认时 1 个副本
-
offsets.topic.replication.factor=1
-
segment 文件保留的最长时间,超时将被删除
-
log.retention.hours=168
-
每个 segment 文件的大小,默认最大 1G
-
log.segment.bytes=1073741824
-
检查过期数据的时间,默认 5 分钟检查一次是否数据过期
-
log.retention.check.interval.ms=300000
-
配置连接 Zookeeper 集群 地址 (在 zk 根目录下创建 kaf ka ,方便管理
-
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181 /kafka
集群部署kafka的shell脚本
#! /bin/bash
case $1 in
"
for i in hadoop102 hadoop103 hadoop104
do
echo " 启动 $i Kafka
ssh $i "/opt/module/kafka/bin/kafka server start.sh
daemon /opt/module/kafka/config/server.properties"
done
"
for i in hadoop102 hadoop103 hadoop104
do
echo " 停止 $i Kafka
ssh $i "/opt/module/kafka/bin/kafka server stop.sh "
done
esac
二、kafka生产者
在消息发送的过程中,涉及到了两个线程——main 线程和Sender 线程。在main 线程中创建了一个双端队列RecordAccumulator。main 线程将消息发送给RecordAccumulator,Sender 线程不断从RecordAccumulator 中拉取消息发送到Kafka Broker。
buffer.memory RecordAccumulator 缓冲区总大小, 默认 32 m ;
reques是批量应答的时候用到,broker最多缓存几个requst;
1、三种生产者发送消息的方式
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies>
1、普通异步发送的生产者代码,不关注返回结果
-
package com.atguigu.kafka.producer;
-
-
import org.apache.kafka.clients.producer.KafkaProducer;
-
import org.apache.kafka.clients.producer.ProducerRecord;
-
import java.util.Properties;
-
-
public class CustomProducer {
-
public static void main(String[] args) throws InterruptedException {
-
-
// 1. 创建 kafka 生产者的配置对象
-
Properties properties = new Properties();
-
-
// 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
-
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
-
-
// key,value 序列化(必须):key.serializer,value.serializer
-
-
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
-
-
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
-
-
// 3. 创建 kafka 生产者对象
-
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
-
-
// 4. 调用 send 方法,发送消息
-
for (int i = 0; i < 5; i ) {
-
kafkaProducer.send(new ProducerRecord<>("first","atguigu " i));
-
-
-
// 异步发送 默认
-
// kafkaProducer.send(new ProducerRecord<>("first","kafka" i));
-
-
// 同步发送
-
kafkaProducer.send(new ProducerRecord<>("first","kafka" i)).get();
-
-
//send的返回值是一个Future对象,可以通过get方法取出其中存储的发送消息的一些元信息RecordMetadata
-
Future<RecordMetadata> sendResult = producer.send(record);
-
RecordMetadata recordMetadata = sendResult.get();
-
-
}
-
// 5. 关闭资源
-
kafkaProducer.close();
-
}
-
}
2、带回调函数的异步发送,通过回调函数获取返回结果,且回调函数是异步的
回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。
注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。
-
package com.atguigu.kafka.producer;
-
import org.apache.kafka.clients.producer.*;
-
import java.util.Properties;
-
-
public class CustomProducerCallback {
-
public static void main(String[] args) throws InterruptedException {
-
-
// 1. 创建 kafka 生产者的配置对象
-
Properties properties = new Properties();
-
// 2. 给 kafka 配置对象添加配置信息
-
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
-
-
// key,value 序列化(必须):key.serializer,value.serializer
-
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
-
-
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
-
-
// 3. 创建 kafka 生产者对象
-
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
-
-
// 4. 调用 send 方法,发送消息
-
for (int i = 0; i < 5; i ) {
-
// 添加回调
-
kafkaProducer.send(new ProducerRecord<>("first", "atguigu " i), new Callback() {
-
-
// 该方法在 Producer 收到 ack 时调用,为异步调用
-
-
public void onCompletion(RecordMetadata metadata, Exception exception) {
-
if (exception == null) {
-
-
// 没有异常,输出信息到控制台
-
System.out.println(" 主题: " metadata.topic() "->" "分区:" metadata.partition());
-
} else {
-
// 出现异常打印
-
exception.printStackTrace();
-
}
-
}
-
});
-
// 延迟一会会看到数据发往不同分区
-
Thread.sleep(2);
-
}
-
// 5. 关闭资源
-
kafkaProducer.close();
-
}
-
}
3、同步发送,关注返回结果,同步的会被阻塞
-
public static void sendMessageWithCareResult() throws ExecutionException, InterruptedException {
-
-
//ProducerRecord的三个参数,topic,发送的key,发送的value
-
ProducerRecord<String,String> record = new ProducerRecord<>("user-info-topic","name","路飞");
-
-
//send的返回值是一个Future对象,可以通过get方法取出其中存储的发送消息的一些元信息RecordMetadata
-
Future<RecordMetadata> sendResult = producer.send(record);
-
RecordMetadata recordMetadata = sendResult.get();
-
-
//打印下发送消息的topic,partition,offset
-
System.out.println(String.format("发送结果:topic:%s,存储的partition:%s,offset:%s",
-
recordMetadata.topic(),
-
recordMetadata.partition(),
-
recordMetadata.offset()));
-
-
producer.close();
-
}
通过topic里面的pertition分区来提高消息处理的效率
(1)便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果。 (2)提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。
在IDEA中全局查找(ctrl n)ProducerRecord类,在类中可以看到构造方法:
- // 指定数据发送到 1 号分区,key 为空(IDEA 中 ctrl p 查看参数)
kafkaProducer.send(new ProducerRecord<>("first", 1,"","atguigu " i)); 第一个参数是topic,第二个参数是pertition,第三个是消息messenger
- 没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值
// 依次指定 key 值为 a,b,f ,数据 key 的 hash 值与 3 个分区求余, 分别发往 1、2、0
kafkaProducer.send(new ProducerRecord<>("first", "a","atguigu " i), new Callback() { });
2、自定义分区器
定义类实现 Partitioner 接口来重写分区方法;
-
package com.atguigu.kafka.producer;
-
import org.apache.kafka.clients.producer.Partitioner;
-
import org.apache.kafka.common.Cluster;
-
import java.util.Map;
-
-
/**
-
* 1. 实现接口 Partitioner
-
* 2. 实现 3 个方法:partition,close,configure
-
* 3. 编写 partition 方法,返回分区号
-
*/
-
public class MyPartitioner implements Partitioner {
-
-
/**
-
* 返回信息对应的分区
-
* @param topic 主题
-
* @param key 消息的 key
-
* @param keyBytes 消息的 key 序列化后的字节数组
-
* @param value 消息的 value
-
* @param valueBytes 消息的 value 序列化后的字节数组
-
* @param cluster 集群元数据可以查看分区信息
-
* @return
-
*/
-
-
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
-
// 获取消息
-
String msgValue = value.toString();
-
-
// 创建 partition
-
int partition;
-
-
// 判断消息是否包含 atguigu
-
if (msgValue.contains("atguigu")){
-
partition = 0;
-
}else {
-
partition = 1;
-
}
-
-
// 返回分区号
-
return partition;
-
}
-
-
// 关闭资源
-
-
public void close() {
-
}
-
-
// 配置方法
-
-
public void configure(Map<String, ?> configs) {
-
}
-
}
使用分区器的方法,在生产者的配置中添加分区器参数。其实就是再properties里面引入分区的配置类
-
package com.atguigu.kafka.producer;
-
import org.apache.kafka.clients.producer.*;
-
import java.util.Properties;
-
-
public class CustomProducerCallbackPartitions {
-
public static void main(String[] args) throws InterruptedException {
-
-
Properties properties = new Properties();
-
-
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
-
-
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
-
-
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
-
-
// 添加自定义分区器
-
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.atguigu.kafka.producer.MyPartitioner");
-
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
-
-
for (int i = 0; i < 5; i ) {
-
-
kafkaProducer.send(new ProducerRecord<>("first", "atguigu " i), new Callback() {
-
-
public void onCompletion(RecordMetadata metadata, Exception e) {
-
if (e == null){
-
System.out.println(" 主题: " metadata.topic() "->" "分区:" metadata.partition()
-
);
-
-
-
}else {
-
e.printStackTrace();
-
}
-
}
-
});
-
-
}
-
kafkaProducer.close();
-
}
-
}
为了提高生成者的效率,还可以通过配置以下内容
// batch.size:批次大小,默认 16K properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
// linger.ms:等待时间,默认 0 properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
// RecordAccumulator:缓冲区大小,默认 32M:buffer.memory properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
// compression.type:压缩,默认 none,可配置值 gzip、snappy、lz4 和 zstd
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
三、消息可靠性问题
1、可靠性和重复性
可靠性总结:
acks=0,生产者发送过来数据就不管了,可靠性差,效率高;
acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等;
acks=-1,生产者发送过来数据Leader和ISR队列里面所有Follwer应答,可靠性高,效率低;
在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1,一般用于传输和钱相关的数据,对可靠性要求比较高的场景。
同意是通过在properties里面配置ack的机制
-
// 设置 acks
-
properties.put(ProducerConfig.ACKS_CONFIG, "all");
-
// 重试次数 retries,默认是 int 最大值,2147483647
-
properties.put(ProducerConfig.RETRIES_CONFIG, 3);
数据重复性问题
• 至少一次(At Least Once)= ACK级别设置为-1 分区副本大于等于2 ISR里应答的最小副本数量大于等于2
• 最多一次(At Most Once)= ACK级别设置为0
• 总结:
At Least Once可以保证数据不丢失,但是不能保证数据不重复;
At Most Once可以保证数据不重复,但是不能保证数据不丢失。
• 精确一次(Exactly Once):对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不丢失。Kafka 0.11版本以后,引入了一项重大特性:幂等性和事务。
2、幂等性和事务
幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。
精确一次(Exactly Once) = 幂等性 至少一次( ack=-1 分区副本数>=2 ISR最小副本数量>=2) 。
重复数据的判断标准:具有<PID, Partition, SeqNumber>相同主键的消息提交时,Broker只会持久化一条。其中PID是Kafka每次重启都会分配一个新的;Partition 表示分区号;Sequence Number是单调自增的。
所以幂等性只能保证的是在单分区单会话内不重复。
开启幂等性参数 :enable.idempotence 默认为 true,false 关闭。
事务:
开启事务前必须开启幂等性
Kafka 的事务一共有如下 5 个 API
-
// 1 初始化事务
-
void initTransactions();
-
-
// 2 开启事务
-
void beginTransaction() throws ProducerFencedException;
-
-
// 3 在事务内提交已经消费的偏移量(主要用于消费者)
-
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,String consumerGroupId) throws ProducerFencedException;
-
-
// 4 提交事务
-
void commitTransaction() throws ProducerFencedException;
-
-
// 5 放弃事务(类似于回滚事务的操作)
-
-
void abortTransaction() throws ProducerFencedException;
-
单个 Producer,使用事务保证消息的仅一次发送
-
-
package com.atguigu.kafka.producer;
-
import org.apache.kafka.clients.producer.KafkaProducer;
-
import org.apache.kafka.clients.producer.ProducerRecord;
-
import java.util.Properties;
-
-
public class CustomProducerTransactions {
-
-
public static void main(String[] args) throws InterruptedException {
-
// 1. 创建 kafka 生产者的配置对象
-
Properties properties = new Properties();
-
-
// 2. 给 kafka 配置对象添加配置信息
-
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
-
-
// key,value 序列化
-
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
-
-
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
-
-
// 设置事务 id(必须),事务 id 任意起名
-
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction_id_0");
-
-
// 3. 创建 kafka 生产者对象
-
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
-
-
// 初始化事务
-
kafkaProducer.initTransactions();
-
// 开启事务
-
kafkaProducer.beginTransaction();
-
-
try {
-
// 4. 调用 send 方法,发送消息
-
for (int i = 0; i < 5; i ) {
-
// 发送消息
-
kafkaProducer.send(new ProducerRecord<>("first", "atguigu " i));
-
}
-
-
// int i = 1 / 0;
-
// 提交事务
-
kafkaProducer.commitTransaction();
-
-
} catch (Exception e) {
-
// 终止事务
-
kafkaProducer.abortTransaction();
-
} finally {
-
-
// 5. 关闭资源
-
kafkaProducer.close();
-
}
-
}
-
}
3、生产者消息的有序性
主要是开启幂等性后会通过其序号来落盘,如果失败,则会缓存起来知道正序的到来才落盘
四、broker工作流程
本章主要介绍kafka如何存储数据的
在zookeeper的服务端存储的Kafka相关信息:
1)/kafka/brokers/ids [0,1,2] 记录有哪些服务器
2)/kafka/brokers/topics/first/partitions/0/state {"leader":1 ,"isr":[1,0,2] } 记录谁是Leader,有哪些服务器可用
Zookeeper中存储的Kafka 信息
3)/kafka/controller {“brokerid”:0} 辅助选举Leader
ISR 是lead 跟follow里面通讯正常的节点
1、节点的服役和退役
2、kafka的副本
(1 Kafka 副本作用:提高数据可靠性 。
(2 Kafka 默认副本 1 个,生产环境一般配置为 2 个,保证数据可靠性;太多副本会增加 磁盘存储空间,增加网络上数据传输,
降低效率。
(3 Kafka 中副本分为: Leader 和 Follower 。 Kafka 生产者只会把数据发往 Leader然后 Follower 找 Leader
进行同步数据。
(4 Kafka 分区中的所有副本统称为 AR Assigned Repllicas )。AR =ISR OSR
ISR表示 和 Leader 保持同步的 Follower 集合。 如果 Follower 长时间未向 Leader 发送通信请求或同步数据,
则该Follower 将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s。Leader 发生故障之后,
就会从ISR 中选举新的Leader。OSR,表示Follower 与Leader 副本同步时,延迟过多的副本。
3、lead选举流程
Kafka 集群中有一个broker 的Controller 会被选举为Controller Leader,负责管理集群
broker 的上下线,所有topic 的分区副本分配和Leader 选举等工作。
Controller 的信息同步工作是依赖于Zookeeper 的。
4. lead 和follow故障处理
1)Follower故障
(1) Follower发生故障后会被临时踢出ISR (2) 这个期间Leader和Follower继续接收数据 (3)待该Follower恢复后,Follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向Leader进行同步。 (4)等该Follower的LEO大于等于该Partition的HW,即Follower追上Leader之后,就可以重新加入ISR了。
2)Leader故障
(1) Leader发生故障之后,会从ISR中选出一个新的Leader (2)为保证多个副本之间的数据一致性,其余的Follower会先将各自的log文件高于HW的部分截掉,然后从新的Leader同步数据。
注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。
5. Leader Partition 负载平衡
正常情况下,Kafka本身会自动把Leader Partition均匀分散在各个机器上,来保证每台机器的读写吞吐量都是均匀的。
但是如果某些broker宕机,会导致Leader Partition过于集中在其他少部分几台broker上,这会导致少数几台broker的读写请求压力过高,
其他宕机的broker重启之后都是follower partition,读写请求很低,造成集群负载不均衡。
6. 生产经验——增加副本因子
在生产环境当中,由于某个主题的重要等级需要提升,我们考虑增加副本。副本数的增加需要先制定计划,然后根据计划执行。
7.topic 里面文件存储的机制
Topic是逻辑上的概念,而partition是物理上的概念,每个partition对应于一个log文件,该log文件中存储的就是Producer生产的数 据。Producer生产的数据会被不断追加到该log文件末端,为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引机制, 将每个partition分为多个segment。每个segment包括:“.index”文件、“.log”文件和.timeindex等文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称 分区序号,例如:first-0。
说明:日志存储参数配置
参数描述 log.segment.bytes Kafka 中 log 日志是分成一块块存储的,此配置是指 log 日志划分成块的大小,默认值 1G。
log.index.interval.bytes 默认 4kb,kafka 里面每当写入了 4kb 大小的日志(.log),然后就往 index 文件里面记录一个索引。 稀疏索引。
8.文件清理策略
(1)检查是否过期的配置
Kafka 中默认的日志保存时间为 7 天,可以通过调整如下参数修改保存时间。
log.retention.hours,最低优先级小时,默认 7 天。
log.retention.minutes,分钟。
log.retention.ms,最高优先级毫秒。
log.retention.check.interval.ms,负责设置检查周期,默认 5 分钟。
(2)过期清楚策略
Kafka 中提供的日志清理策略有 delete 和 compact 两种。
delete 日志删除:将过期数据删除
log.cleanup.policy = delete 所有数据启用删除策略
(1)基于时间:默认打开。以 segment 中所有记录中的最大时间戳作为该文件时间戳。
(2)基于大小:默认关闭。超过设置的所有日志总大小,删除最早的 segment。
log.retention.bytes,默认等于-1,表示无穷大。
思考:如果一个 segment 中有一部分数据过期,一部分没有过期,怎么处理?
compact日志压缩:对于相同key的不同value值,只保留最后一个版本。
压缩后的offset可能是不连续的,比如上图中没有6,当从这些offset消费消息时,将会拿到比这个offset大
的offset对应的消息,实际上会拿到offset为7的消息,并从这个位置开始消费。
这种策略只适合特殊场景,比如消息的key是用户ID,value是用户的资料,通过这种压缩策略,整个消息
集里就保存了所有用户最新的资料。
log.cleanup.policy = compact 所有数据启用压缩策
读数据采用稀疏索引,可以快速定位要消费的数据,写入的时候是通过顺序写磁盘
Kafka 的 producer 生产数据,要写入到 log 文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能到 600M/s,而随机写只有 100K/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。
零拷贝: Kafka的数据加工处理操作交由Kafka生产者和Kafka消费者处理。Kafka Broker应用层不关心存储的数据,所以就不用 走应用层,传输效率高。
PageCache页缓存: Kafka重度依赖底层操作系统提供的PageCache功 能。当上层有写操作时,操作系统只是将数据写入 PageCache。当读操作发生时,先从PageCache中查找,如果找不到,再去磁盘中读取。实际上PageCache是把尽可能多的空闲内存 都当做了磁盘缓存
五、kafka的消费者
消费者组 :由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同。 • 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。 • 消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
1、基础消费者代码:
-
import org.apache.kafka.clients.consumer.ConsumerRecord;
-
import org.apache.kafka.clients.consumer.ConsumerRecords;
-
import org.apache.kafka.clients.consumer.KafkaConsumer;
-
import java.time.Duration;
-
import java.util.ArrayList;
-
import java.util.Properties;
-
-
public class CustomConsumer {
-
public static void main(String[] args) {
-
-
// 1.创建消费者的配置对象
-
Properties properties = new Properties();
-
// 2.给消费者配置对象添加参数
-
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
-
-
// 配置序列化 必须
-
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-
-
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-
-
// 配置消费者组(组名任意起名) 必须
-
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
-
-
// 创建消费者对象
-
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
-
-
// 注册要消费的主题(可以消费多个主题)
-
ArrayList<String> topics = new ArrayList<>();
-
topics.add("first");
-
kafkaConsumer.subscribe(topics);
-
-
// 拉取数据打印
-
while (true) {
-
// 设置 1s 中消费一批数据
-
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
-
// 打印消费到的数据
-
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
-
System.out.println(consumerRecord);
-
}
-
}
-
}
-
}
如果是消费某个topic里面特定的partitions,需要在配置里面注明
// 消费某个主题的某个分区数据
ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
topicPartitions.add(new TopicPartition("first", 0));// 消费first主题下的0分区
kafkaConsumer.assign(topicPartitions);
复制一份基础消费者的代码,在 IDEA 中同时启动,即可启动同一个消费者组中的两个消费者,代码与上面的基础消费者完全一样,消费者组名还是test 这样就能在test消费者组里面启动两个消费者
2、分区的分配以及再平衡
(1、一个consumer group中有多个consumer组成,一个 topic有多个partition组成,现在的问题是,到底由哪个consumer来消费哪个partition的数据。
(2、Kafka有四种主流的分区分配策略: Range、RoundRobin、Sticky、CooperativeSticky。可以通过配置参数partition.assignment.strategy,
修改分区的分配策略。默认策略是Range CooperativeSticky。Kafka可以同时使用多个分区分配策略。
常见参数的配置:
heartbeat.interval.ms : Kafka 消费者和 coordinator 之间的心跳时间,默认 3s。该条目的值必须小于session.timeout.ms,也不应该高于session.timeout.ms 的 1/3。
session.timeout.ms : Kafka 消费者和 coordinator 之间连接超时时间,默认 45s。超过该值,该消费者被移除,消费者组执行再平衡。
max.poll.interval.ms: 消费者处理消息的最大时长,默认是 5 分钟。超过该值,该消费者被移除,消费者组执行再平衡。
partition.assignment.strategy: 消 费 者 分 区 分 配 策 略 , 默 认 策 略 是 Range CooperativeSticky。Kafka 可以同时使用多个分区分配策略。
可 以 选 择 的 策 略 包 括 : Range 、 RoundRobin 、 Sticky 、CooperativeSticky
3、四种分区分配策略
1)Range 分区策略原理
-
RoundRobin 分区策略原理
RoundRobin 轮询分区策略,是把所有的 partition 和所有的consumer 都列出来,然后按照 hashcode 进行排序,最后 通过轮询算法来分配 partition 给到各个消费者。
// 修改分区分配策略
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");
0 号消费者宕机后,0 号消费者的任务会按照 RoundRobin 的方式,把数据轮询分成 0 、6 和 3 号分区数据,
分别由 1 号消费者或者 2 号消费者消费。
3)Sticky 以及再平衡
粘性分区定义:可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前,考虑上一次分配的结果,
尽量少的调整分配的变动,可以节省大量的开销。粘性分区是 Kafka 从 0.11.x 版本开始引入这种分配策略,
首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化。
-
// 修改分区分配策略
-
ArrayList<String> startegys = new ArrayList<>();
-
startegys.add("org.apache.kafka.clients.consumer.StickyAssignor");
-
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, startegys);
4、offset的位移
消费完一批数据后,需要提交offset,可以设置自动提交和手动提交;默认是自动提交
设置自动自交offset和提交时间;这种方式时间不好控制
// 是否自动提交 offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
// 提交 offset 的时间周期 1000ms,默认 5s
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
同步提交offset和异步提交:
虽然自动提交offset十分简单便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。因
此Kafka还提供了手动提交offset的API。手动提交offset的方法有两种:分别是commitSync(同步提交)
和commitAsync(异步提交)。两者的相同点是,都会将本次提交的一批数据最高的偏移量提交;不同点是,
同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);
而异步提交则没有失败重试机制,故有可能提交失败。
• commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据。
• commitAsync(异步提交) :发送完提交offset请求后,就开始消费下一批数据了。
-
package com.atguigu.kafka.consumer;
-
import org.apache.kafka.clients.consumer.ConsumerConfig;
-
import org.apache.kafka.clients.consumer.ConsumerRecord;
-
import org.apache.kafka.clients.consumer.ConsumerRecords;
-
import org.apache.kafka.clients.consumer.KafkaConsumer;
-
import java.util.Arrays;
-
import java.util.Properties;
-
-
public class CustomConsumerByHandSync {
-
public static void main(String[] args) {
-
-
// 1. 创建 kafka 消费者配置类
-
Properties properties = new Properties();
-
// 2. 添加配置参数
-
// 添加连接
-
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
-
-
// 配置序列化
-
-
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
-
"org.apache.kafka.common.serialization.StringDeserializer");
-
-
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
-
"org.apache.kafka.common.serialization.StringDeserializer");
-
-
// 配置消费者组
-
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
-
-
// 是否自动提交 offset
-
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
-
-
//3. 创建 kafka 消费者
-
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
-
-
//4. 设置消费主题 形参是列表
-
consumer.subscribe(Arrays.asList("first"));
-
//5. 消费数据
-
while (true){
-
// 读取消息
-
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
-
-
// 输出消息
-
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
-
System.out.println(consumerRecord.value());
-
}
-
-
// 同步提交 offset
-
consumer.commitSync();
-
-
// 异步提交 offset
-
consumer.commitAsync();
-
}
-
}
-
}
虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此 吞吐量会受到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式。
5、指定offser消费
auto.offset.reset = earliest | latest | none 默认是 latest。
当 Kafka 中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量时(例如该数据已被删除),该怎么办?
(1)earliest:自动将偏移量重置为最早的偏移量,--from-beginning。
(2)latest(默认值):自动将偏移量重置为最新偏移量。
(3)none:如果未找到消费者组的先前偏移量,则向消费者抛出异常。
(4)任意指定 offset 位移开始消费
-
// 2 订阅一个主题
-
ArrayList<String> topics = new ArrayList<>();
-
topics.add("first");
-
kafkaConsumer.subscribe(topics);
-
-
Set<TopicPartition> assignment= new HashSet<>();
-
while (assignment.size() == 0) {
-
kafkaConsumer.poll(Duration.ofSeconds(1));
-
// 获取消费者分区分配信息(有了分区分配信息才能开始消费)
-
assignment = kafkaConsumer.assignment();
-
}
-
-
// 遍历所有分区,并指定 offset 从 1700 的位置开始消费
-
for (TopicPartition tp: assignment) {
-
kafkaConsumer.seek(tp, 1700);
-
}
-
-
// 3 消费该主题数据
-
while (true) {
-
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
-
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
-
System.out.println(consumerRecord);
-
}
-
}
(5)指定时间消费
需求:在生产环境中,会遇到最近消费的几个小时数据异常,想重新按照时间消费。例如要求按照时间消费前一天的数据,怎么处理?
-
package com.atguigu.kafka.consumer;
-
import org.apache.kafka.clients.consumer.*;
-
import org.apache.kafka.common.TopicPartition;
-
import org.apache.kafka.common.serialization.StringDeserializer;
-
import java.time.Duration;
-
import java.util.*;
-
-
public class CustomConsumerForTime {
-
public static void main(String[] args) {
-
-
// 0 配置信息
-
-
Properties properties = new Properties();
-
// 连接
-
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
-
-
// key value 反序列化
-
-
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-
-
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-
-
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test2");
-
-
// 1 创建一个消费者
-
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
-
-
// 2 订阅一个主题
-
ArrayList<String> topics = new ArrayList<>();
-
topics.add("first");
-
kafkaConsumer.subscribe(topics);
-
-
Set<TopicPartition> assignment = new HashSet<>();
-
-
while (assignment.size() == 0) {
-
kafkaConsumer.poll(Duration.ofSeconds(1));
-
// 获取消费者分区分配信息(有了分区分配信息才能开始消费)
-
assignment = kafkaConsumer.assignment();
-
}
-
-
HashMap<TopicPartition, Long> timestampToSearch = new HashMap<>();
-
-
// 封装集合存储,每个分区对应一天前的数据
-
for (TopicPartition topicPartition : assignment) {
-
timestampToSearch.put(topicPartition, System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
-
}
-
-
// 获取从 1 天前开始消费的每个分区的 offset
-
Map<TopicPartition, OffsetAndTimestamp> offsets = kafkaConsumer.offsetsForTimes(timestampToSearch);
-
-
// 遍历每个分区,对每个分区设置消费时间。
-
for (TopicPartition topicPartition : assignment) {
-
OffsetAndTimestamp offsetAndTimestamp = offsets.get(topicPartition);
-
// 根据时间指定开始消费的位置
-
if (offsetAndTimestamp != null){
-
kafkaConsumer.seek(topicPartition, offsetAndTimestamp.offset());
-
}
-
}
-
-
// 3 消费该主题数据
-
while (true) {
-
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
-
-
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
-
System.out.println(consumerRecord);
-
}
-
}
-
}
-
}
6、重复消费和漏消费问题
重复消费:已经消费了数据,但是 offset 没提交。
漏消费:先提交 offset 后消费,有可能会造成数据的漏消费。
如果想完成Consumer端的精准一次性消费,那么需要Kafka消费端将消费过程和提交offset过程做原子绑定。此时我们需要将Kafka的offset保存到支持事务的自定义介质(比 如MySQL)。这部分知识会在后续项目部分涉及。
7. 数据积压问题
六、kafka集成其他框架
1.kafka与flink集成
kafka作为生产者发送消息到flink,也可以kafka作为消费者从flink里面消费数据``
-
org.apache.flink flink-java 1.13.1
-
-
<dependency>
-
<groupId>org.apache.flink</groupId>
-
<artifactId>flink-streaming-java_2.12</artifactId>
-
<version>1.13.1</version>
-
</dependency>
-
-
<dependency>
-
<groupId>org.apache.flink</groupId>
-
<artifactId>flink-clients_2.12</artifactId>
-
<version>1.13.1</version>
-
</dependency>
-
-
<dependency>
-
<groupId>org.apache.flink</groupId>
-
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
-
<version>1.7.2</version>
-
</dependency>
-
-
</dependencies>
生产者
-
public class MyFlinkKafkaProducer1 {
-
-
public static void main(String[] args) throws Exception {
-
// 准备环境
-
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
env.setParallelism(3);
-
// 准备数据源
-
ArrayList<String> strings = new ArrayList<>();
-
strings.add("test1");
-
strings.add("test2");
-
DataStreamSource<String> stream = env.fromCollection(strings);
-
//创建kafka生产者 -- 未完善
-
Properties properties = new Properties();
-
-
FlinkKafkaProducer<String> first = new FlinkKafkaProducer<>("first", new SimpleStringSchema(), properties);
-
// 添加数据源
-
stream.addSink((SinkFunction<String>) first);
-
// 执行
-
env.execute();
-
-
}
-
}
消费者
-
public class MyFlinkKafkaConsumer {
-
-
public static void main(String[] args) {
-
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
env.setParallelism(3);
-
-
Properties properties = new Properties();
-
-
FlinkKafkaConsumer<String> first =
-
new FlinkKafkaConsumer<>("first", new SimpleStringSchema(), properties);
-
-
}
-
}
2、springboot与kafka的集成
org.springframework.kafka spring-kafka
-
#配置连接的集群
-
spring.kafka.bootstrap-servers=ip:socket,ip1:socket
-
# key value 的序列化
-
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
-
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
生产者
-
-
-
public class ProducerController {
-
-
-
KafkaTemplate<String,String> kafkaTemplate;
-
-
-
public String data(String msg){
-
kafkaTemplate.send("first",msg);
-
return "true";
-
}
-
}
消费者
-
# 反序列化
-
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
-
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
-
-
#消费者组id
-
spring.kafka.consumer.group-id=testcon
-
-
public class ConsumerController {
-
-
-
KafkaTemplate<String,String> kafkaTemplate;
-
-
-
public void getData(String msg){
-
System.out.println(msg);
-
}
-
}`
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhigiebi
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
excel下划线不显示怎么办
PHP中文网 06-23 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel图片置于文字下方的方法
PHP中文网 06-27 -
微信运动停用后别人还能看到步数吗
PHP中文网 07-22