自定义kafka消息序列化
在kafka消息传递过程中,客户端与服务器需约定使用相同的语法格式。kafka提供的默认的转换(如String、Long等),但也支持特定应用的自定义序列化场景。本文通过示例说明如何自定义序列化实现。
kafka序列化
序列化时转换对象为字节数组的过程,反序列化是其逆过程——转换字节数组为对象。总之,转换内容为可读、可解释的信息。kafka提供了对基础类型提供缺省的序列化实现,但也支持自定义序列化:
上图显示了通过网络发送消息给kafka主题的过程,我们看到生产者在发送消息给主题之前需要序列化转换对象为字节数组。类似的,在消费者正确处理之前需要通过反序列化把字节数组转为对象。
自定义序列化
kafka提供对基本类型的序列化和反序列化实现:
- StringSerializer
- ShortSerializer
- IntegerSerializer
- LongSerializer
- DoubleSerializer
- BytesSerializer
但也提供了自定义序列化能力。为了序列化对象,需要实现org.apache.kafka.common.serialization包下的Serializer接口,类似的反序列化实现接口为Deserializer。两个接口有三个方法需要重写:
configure: 用于实现配置方面的细节
serialize/deserialize: 包括实际序列化和反序列化实现逻辑
close: 用于关闭Kafka session
示例实战
kafka提供了自定义序列化的能力,不仅针对消息值的转换,也可以实现键的转换。
增加依赖
首先需要增加kafka客户端 API依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
为了演示我们定义消息实体,这里使用lombok简化数据传输类的定义:
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class MessageDto {
private String message;
private String version;
}
序列化
接下来,我们实现kafka提供的Serializer接口,用于生产者发送消息:
@Slf4j
public class CustomSerializer implements Serializer {
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public byte[] serialize(String topic, MessageDto data) {
try {
if (data == null){
System.out.println("Null received at serializing");
return null;
}
System.out.println("Serializing...");
return objectMapper.writeValueAsBytes(data);
} catch (Exception e) {
throw new SerializationException("Error when serializing MessageDto to byte[]");
}
}
@Override
public void close() {
}
}
我们覆盖了serialize接口的方法。在实现中使用Jackson ObjectMapper转换自定义对象,然后返回字节流用于给网络发送消息。
反序列化
同样的方式,实现Deserializer接口:
@Slf4j
public class CustomDeserializer implements Deserializer<MessageDto> {
private ObjectMapper objectMapper = new ObjectMapper();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public MessageDto deserialize(String topic, byte[] data) {
try {
if (data == null){
System.out.println("Null received at deserializing");
return null;
}
System.out.println("Deserializing...");
return objectMapper.readValue(new String(data, "UTF-8"), MessageDto.class);
} catch (Exception e) {
throw new SerializationException("Error when deserializing byte[] to MessageDto");
}
}
@Override
public void close() {
}
}
与前节一样,覆盖了deserialize 接口方法,使用Jackson ObjectMapper把字节流转换为自定义对象。
发送和接收消息
下面代码实现是使用自定义序列化类发送消息,使用反序列化类消费消息。首先创建并配置生产者类:
private static KafkaProducer<String, MessageDto> createKafkaProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
props.put(ProducerConfig.CLIENT_ID_CONFIG, CONSUMER_APP_ID);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "com.dataz.kafka.serdes.CustomSerializer");
return new KafkaProducer(props);
}
我们看到key序列化使用StringSerializer,value序列化使用了我们自定义的类。
下面创建并配置消费者类:
private static KafkaConsumer<String, MessageDto> createKafkaConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
props.put(ConsumerConfig.CLIENT_ID_CONFIG, CONSUMER_APP_ID);
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_ID);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.dataz.kafka.serdes.CustomDeserializer");
return new KafkaConsumer<>(props);
}
除了指定了key和value的反序列化类,还专门包括了分组ID。除此之外,我们将自动偏移量重置为最早,以确保生产者在消费者启动之前发送所有消息也能够被消费。
上面已创建好生产者和消费者,下面开始发送消息:
MessageDto msgProd = MessageDto.builder().message("test").version("1.0").build();
KafkaProducer<String, MessageDto> producer = createKafkaProducer();
producer.send(new ProducerRecord<String, MessageDto>(TOPIC, "1", msgProd));
System.out.println("Message sent " msgProd);
producer.close();
下面消费者订阅主题并消费消息:
AtomicReference<MessageDto> msgCons = new AtomicReference<>();
KafkaConsumer<String, MessageDto> consumer = createKafkaConsumer();
consumer.subscribe(Arrays.asList(TOPIC));
ConsumerRecords<String, MessageDto> records = consumer.poll(Duration.ofSeconds(1));
records.forEach(record -> {
msgCons.set(record.value());
System.out.println("Message received " record.value());
});
consumer.close();
控制台输出日志信息:
Serializing...
Message sent MessageDto(message=test, version=1.0)
Deserializing...
Message received MessageDto(message=test, version=1.0)
总结
本文展示了Kafka生产者如何使用序列化类通过网络发送和消费消息。此外还介绍了可用的默认序列化器,最重要的是通过示例实现了kafka自定义序列化类和反序列化类的能力。
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhigiebj
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
excel图片置于文字下方的方法
PHP中文网 06-27 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
微信提示登录环境异常是什么意思原因
PHP中文网 04-09 -
微信运动停用后别人还能看到步数吗
PHP中文网 07-22 -
微信人名旁边有个图标有什么用
PHP中文网 03-11