• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Kafka间件

武飞扬头像
去撒哈拉当海盗
帮助2

1.Kafka简介和应用

Apache Kafka是分布式发布-订阅消息系统。它最初由LinkedIn公司开发,之后成为Apache项目的一部分。Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。常应用于大数据环境下的数据异步处理如批量上传文件,还有高负载任务如电商的秒杀活动!
Kafka和其他消息中间件的对比:
学新通

2.Kafka相关概念

Producer:消息的生产者,生产者将消息发送到topic的partition中,broker接收到partition的消息并存储起来。
Consumer:消息的消费者,消费者从broker中消费消息。
Topic:主题,用来划分数据的所属类,是一个逻辑上的概念
Partition:分区,一个topic下可以有多个pertition,方便topic的扩展
Broker:一台kafka服务器就是一个broker,broker用来存储partition的数据
Zookeeper:用来维护和协调broker集群,当系统中新增了broker或者哪个broker失效,由zookeeper通知到生产者和消费者

3.SpringBoot2.x整合Kafka实战

3.1Kafka和Zookeeper的本地安装和启动

3.2新建SpringBoot项目并加载相关依赖

Demo代码整体结构如下:
学新通

		<!--kafka依赖-->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <!--lombok依赖-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!--通用依赖-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
            <scope>provided</scope>
        </dependency>
        <!-- druid连接池 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.0.9</version>
        </dependency>
学新通

注意这里的spring-kafka不指定版本,由maven自适应版本,因为SpringBoot整合kafka经常遇到版本不兼容的问题!

3.3kafka相关属性配置

application.yml:

server:
  port: 8082
  tomcat:
    uri-encoding: UTF-8

spring:
  kafka:
    #    consumer:
    #      auto-offset-reset: latest
    #      group-id: opensdkProcesser
    #      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    #      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    #    producer:
    #      key-serializer: org.apache.kafka.common.serialization.StringDeserializer
    #      value-serializer: org.apache.kafka.common.serialization.StringDeserializer
    group-id: logger
    topic: opensdk
    bootstrap-servers: localhost:9092
学新通

KafkaConfigProperties:

@Data
@Configuration
public class KafkaConfigProperties {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.topic}")
    private String topic;

    @Value("${spring.kafka.group-id}")
    private String groupId;
}

通过全局配置文件来配置kafka的相关属性,在修改的时候更加灵活!

3.4生产者框架搭建

KafkaProducerConfig:这是生产者的配置类,向Spring注入ProducerFactory生产者工厂和KafkaTemplate模板。生产者工厂配置kafka属性,kafka模板配置发送的消息key-value数据类型,这是使用的是<String,String>,还有一种是<String,Object>支持实体传参!

@Configuration
public class KafkaProducerConfig {

    @Resource
    private KafkaConfigProperties kafkaConfigProperties;

    @Bean
    public ProducerFactory<String,String> producerFactory() {
        Map<String,Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfigProperties.getBootstrapServers());
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String,String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}
学新通

ReturnJsonFormatUntil:JSON格式转换工具类

public class ReturnJsonFormatUntil {
    
    public static String json(boolean flag, Object result){
        Map<String, Object> map = new HashMap<String, Object>();
        if (flag){
            map.put("success",true);
            map.put("message","请求成功");
            map.put("entity",result);
        }else{
            map.put("success",false);
            map.put("message",result);
            map.put("entity","");
        }

        return JSONUtils.toJSONString(map);
    }

    public static String jsonString(boolean flag, Object result){
        Map<String, Object> map = new HashMap<String, Object>();
        if (flag){
            map.put("success",true);
            map.put("message","请求成功");
            map.put("entity",result);
        }else{
            map.put("success",false);
            map.put("message",result);
            map.put("entity","");
        }

        return JSON.toJSONString(map);
    }
    public static String jsonStrings(boolean success, String message,Object result){
        Map<String, Object> map = new HashMap<String, Object>();
        map.put("success",success);
        map.put("message",message);
        map.put("entity", result);

        return JSON.toJSONString(map);
    }
}
学新通

MessageProducer:这里用于发送消息,调用KafkaTemplate的send方法,第一个属性是topic主题,第二个属性是传送的数据,该发送方法会先生成producerRecord对象,对象里头包含了kafka的所有信息,然后再传递给doSend方法。另外使用ListenableFuture接收异步发送任务的一个同步返回,用来判断是否发送成功!

@Component
@Slf4j
public class MessageProducer {

    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    @Resource
    private KafkaConfigProperties kafkaConfigProperties;

    public void sendMessage(String msg) {
        ListenableFuture<SendResult<String, String>> future  = null;

        try {
            String jsonMessage = JSON.toJSONString(msg);
            future = kafkaTemplate.send(kafkaConfigProperties.getTopic(), jsonMessage);
        } catch (Exception e) {
            System.out.println(e.getMessage());
            log.error(ReturnJsonFormatUntil.jsonString(false,"发送kafka日志信息,无法转化成FastJSON格式"));
        }

        if (future != null) {
            future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
                @Override
                public void onFailure(Throwable throwable) {
                    //System.out.println("发送kafka信息失败,失败信息:"   throwable.getMessage());
                    log.error(ReturnJsonFormatUntil.jsonStrings(false,"发送kafka日志信息失败,失败信息:"   throwable.getMessage(),msg));
                }
                @Override
                public void onSuccess(SendResult<String, String> stringStringSendResult) {
                    log.info(ReturnJsonFormatUntil.jsonStrings(true,"发送kafka信息成功!!!",msg));
                }
            });
        }
    }
}
学新通

KafkaSenderUtils:发送消息的工具类,用于外部完成发送操作的统一入口!

@Component
public class KafkaSenderUtils {
    
    private static MessageProducer staticMessageProducer;

    @Autowired
    private MessageProducer messageProducer;

    @PostConstruct
    public void init() {
        staticMessageProducer = messageProducer;
    }
    
    public static void sendMessage(String message) {
        staticMessageProducer.sendMessage(message);
    }
}
学新通

3.5消费者框架搭建

KafkaConsumerConfig:消费者配置类,向Spring注入kafkaListenerContainerFactory消费者监听工厂!

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Resource
    private KafkaConfigProperties kafkaConfigProperties;

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(
                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                kafkaConfigProperties.getBootstrapServers());
        props.put(
                ConsumerConfig.GROUP_ID_CONFIG,
                kafkaConfigProperties.getTopic());
        props.put(
                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        props.put(
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}
学新通

MessageConsumer:消费者监听服务,使用@KafkaListener注解监听生产者发送的消息!

@Component
public class MessageConsumer {

    @KafkaListener(topics = "${spring.kafka.topic}",containerFactory = "kafkaListenerContainerFactory")
    public void listen(String message) {
        System.out.println("接收到:" message);
    }
}

3.5测试生产和消费流程

这里是使用接口调用的方式做的测试
TestController:

@Controller
@Api(value = "test", description = "测试")
@RequestMapping("/test")
public class TestController {

    @Autowired
    private TestService testService;

    @RequestMapping(value = "/sendMessage",method = RequestMethod.POST)
    @ApiOperation(value = "发送消息",notes = "发送消息")
    @ResponseBody
    public String senMessage(@RequestParam String message){
        testService.sendMessage(message);
        return "发送成功";
    }
}
学新通

TestService:

public interface TestService {

    void sendMessage(String message);
}

TestServiceImpl:

@Service
public class TestServiceImpl implements TestService {

    @Override
    public void sendMessage(String message) {
        KafkaSenderUtils.sendMessage(message);
    }
}

启动项目后我们可以在控制台看到已经消费者已经开启消息的监听服务!
学新通
然后我们调用接口,传入字符串:我是生产者一号。再看一下控制台,说明消费者已经成功消费了这条数据!
学新通
注:为了快速搭建kafka案例,这个项目的消费者和生产者都是在一个项目下,实际项目的使用会放在不用的项目并且部署在不同的服务器上!
感谢您的阅读,希望对您有所帮助,不足之处也希望多探讨!

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhiacach
系列文章
更多 icon
同类精品
更多 icon
继续加载