Kafka间件
1.Kafka简介和应用
Apache Kafka是分布式发布-订阅消息系统。它最初由LinkedIn公司开发,之后成为Apache项目的一部分。Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。常应用于大数据环境下的数据异步处理如批量上传文件,还有高负载任务如电商的秒杀活动!
Kafka和其他消息中间件的对比:
2.Kafka相关概念
Producer:消息的生产者,生产者将消息发送到topic的partition中,broker接收到partition的消息并存储起来。
Consumer:消息的消费者,消费者从broker中消费消息。
Topic:主题,用来划分数据的所属类,是一个逻辑上的概念
Partition:分区,一个topic下可以有多个pertition,方便topic的扩展
Broker:一台kafka服务器就是一个broker,broker用来存储partition的数据
Zookeeper:用来维护和协调broker集群,当系统中新增了broker或者哪个broker失效,由zookeeper通知到生产者和消费者
3.SpringBoot2.x整合Kafka实战
3.1Kafka和Zookeeper的本地安装和启动
3.2新建SpringBoot项目并加载相关依赖
Demo代码整体结构如下:
<!--kafka依赖-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!--lombok依赖-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--通用依赖-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
<scope>provided</scope>
</dependency>
<!-- druid连接池 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.0.9</version>
</dependency>
注意这里的spring-kafka不指定版本,由maven自适应版本,因为SpringBoot整合kafka经常遇到版本不兼容的问题!
3.3kafka相关属性配置
application.yml:
server:
port: 8082
tomcat:
uri-encoding: UTF-8
spring:
kafka:
# consumer:
# auto-offset-reset: latest
# group-id: opensdkProcesser
# key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# producer:
# key-serializer: org.apache.kafka.common.serialization.StringDeserializer
# value-serializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: logger
topic: opensdk
bootstrap-servers: localhost:9092
KafkaConfigProperties:
@Data
@Configuration
public class KafkaConfigProperties {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.topic}")
private String topic;
@Value("${spring.kafka.group-id}")
private String groupId;
}
通过全局配置文件来配置kafka的相关属性,在修改的时候更加灵活!
3.4生产者框架搭建
KafkaProducerConfig:这是生产者的配置类,向Spring注入ProducerFactory生产者工厂和KafkaTemplate模板。生产者工厂配置kafka属性,kafka模板配置发送的消息key-value数据类型,这是使用的是<String,String>,还有一种是<String,Object>支持实体传参!
@Configuration
public class KafkaProducerConfig {
@Resource
private KafkaConfigProperties kafkaConfigProperties;
@Bean
public ProducerFactory<String,String> producerFactory() {
Map<String,Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfigProperties.getBootstrapServers());
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String,String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
ReturnJsonFormatUntil:JSON格式转换工具类
public class ReturnJsonFormatUntil {
public static String json(boolean flag, Object result){
Map<String, Object> map = new HashMap<String, Object>();
if (flag){
map.put("success",true);
map.put("message","请求成功");
map.put("entity",result);
}else{
map.put("success",false);
map.put("message",result);
map.put("entity","");
}
return JSONUtils.toJSONString(map);
}
public static String jsonString(boolean flag, Object result){
Map<String, Object> map = new HashMap<String, Object>();
if (flag){
map.put("success",true);
map.put("message","请求成功");
map.put("entity",result);
}else{
map.put("success",false);
map.put("message",result);
map.put("entity","");
}
return JSON.toJSONString(map);
}
public static String jsonStrings(boolean success, String message,Object result){
Map<String, Object> map = new HashMap<String, Object>();
map.put("success",success);
map.put("message",message);
map.put("entity", result);
return JSON.toJSONString(map);
}
}
MessageProducer:这里用于发送消息,调用KafkaTemplate的send方法,第一个属性是topic主题,第二个属性是传送的数据,该发送方法会先生成producerRecord对象,对象里头包含了kafka的所有信息,然后再传递给doSend方法。另外使用ListenableFuture接收异步发送任务的一个同步返回,用来判断是否发送成功!
@Component
@Slf4j
public class MessageProducer {
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
@Resource
private KafkaConfigProperties kafkaConfigProperties;
public void sendMessage(String msg) {
ListenableFuture<SendResult<String, String>> future = null;
try {
String jsonMessage = JSON.toJSONString(msg);
future = kafkaTemplate.send(kafkaConfigProperties.getTopic(), jsonMessage);
} catch (Exception e) {
System.out.println(e.getMessage());
log.error(ReturnJsonFormatUntil.jsonString(false,"发送kafka日志信息,无法转化成FastJSON格式"));
}
if (future != null) {
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable throwable) {
//System.out.println("发送kafka信息失败,失败信息:" throwable.getMessage());
log.error(ReturnJsonFormatUntil.jsonStrings(false,"发送kafka日志信息失败,失败信息:" throwable.getMessage(),msg));
}
@Override
public void onSuccess(SendResult<String, String> stringStringSendResult) {
log.info(ReturnJsonFormatUntil.jsonStrings(true,"发送kafka信息成功!!!",msg));
}
});
}
}
}
KafkaSenderUtils:发送消息的工具类,用于外部完成发送操作的统一入口!
@Component
public class KafkaSenderUtils {
private static MessageProducer staticMessageProducer;
@Autowired
private MessageProducer messageProducer;
@PostConstruct
public void init() {
staticMessageProducer = messageProducer;
}
public static void sendMessage(String message) {
staticMessageProducer.sendMessage(message);
}
}
3.5消费者框架搭建
KafkaConsumerConfig:消费者配置类,向Spring注入kafkaListenerContainerFactory消费者监听工厂!
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Resource
private KafkaConfigProperties kafkaConfigProperties;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaConfigProperties.getBootstrapServers());
props.put(
ConsumerConfig.GROUP_ID_CONFIG,
kafkaConfigProperties.getTopic());
props.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
MessageConsumer:消费者监听服务,使用@KafkaListener注解监听生产者发送的消息!
@Component
public class MessageConsumer {
@KafkaListener(topics = "${spring.kafka.topic}",containerFactory = "kafkaListenerContainerFactory")
public void listen(String message) {
System.out.println("接收到:" message);
}
}
3.5测试生产和消费流程
这里是使用接口调用的方式做的测试
TestController:
@Controller
@Api(value = "test", description = "测试")
@RequestMapping("/test")
public class TestController {
@Autowired
private TestService testService;
@RequestMapping(value = "/sendMessage",method = RequestMethod.POST)
@ApiOperation(value = "发送消息",notes = "发送消息")
@ResponseBody
public String senMessage(@RequestParam String message){
testService.sendMessage(message);
return "发送成功";
}
}
TestService:
public interface TestService {
void sendMessage(String message);
}
TestServiceImpl:
@Service
public class TestServiceImpl implements TestService {
@Override
public void sendMessage(String message) {
KafkaSenderUtils.sendMessage(message);
}
}
启动项目后我们可以在控制台看到已经消费者已经开启消息的监听服务!
然后我们调用接口,传入字符串:我是生产者一号。再看一下控制台,说明消费者已经成功消费了这条数据!
注:为了快速搭建kafka案例,这个项目的消费者和生产者都是在一个项目下,实际项目的使用会放在不用的项目并且部署在不同的服务器上!
感谢您的阅读,希望对您有所帮助,不足之处也希望多探讨!
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhiacach
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
微信运动停用后别人还能看到步数吗
PHP中文网 07-22 -
excel打印预览压线压字怎么办
PHP中文网 06-22