• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

flink写入到kafka 大坑。

武飞扬头像
cclovezbf
帮助1

1.kafka能不能发送null消息?

   能!

学新通

2 flink能不能发送null消息到kafka?

不能!

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment
            .getExecutionEnvironment();
    Properties properties = new Properties();
    properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"9.135.68.201:9092" );
    FlinkKafkaProducer<String> flinkKafkaProducer = new FlinkKafkaProducer<>( "cc_test",new SimpleStringSchema(),properties);
    env.fromCollection(Lists.newArrayList("111", "222", "333"))
        .map(s->{return s.equals("222")?null:s;})
            .addSink(flinkKafkaProducer);
    env.execute("ContractLabelJob");
}

学新通 

 学新通

这里就报了java的最常见错误 空指针,原因就是flink要把kafka的消息getbytes。所以flink不能发送null到kafka。

这种问题会造成什么后果?

flink直接挂掉。

如果我们采取了失败重试机制会怎样?

env.setRestartStrategy(  RestartStrategies.fixedDelayRestart(3, Time.seconds(5))  );

数据重复或者丢失。

学新通

还有此时kafka的offset由flink在管理, 消费的offset 一直没有被commit,所以一直重复消费。

来个demo

  1.  
    public static void main(String[] args) throws Exception {
  2.  
    StreamExecutionEnvironment env = StreamExecutionEnvironment
  3.  
    .getExecutionEnvironment();
  4.  
    env.enableCheckpointing(6000);
  5.  
    // env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
  6.  
    Properties properties = new Properties();
  7.  
    properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"9.135.68.201:9092" );
  8.  
    FlinkKafkaProducer<String> sink = new FlinkKafkaProducer<>( "cc_test2",new SimpleStringSchema(),properties);
  9.  
    KafkaSource<String> source = KafkaSource.<String>builder()
  10.  
    .setTopics("cc_test")
  11.  
    .setGroupId("cc_test1234")
  12.  
    .setBootstrapServers("9.135.68.201:9092")
  13.  
    .setValueOnlyDeserializer(new SimpleStringSchema())
  14.  
    .setStartingOffsets(OffsetsInitializer.earliest())
  15.  
    .build();
  16.  
    DataStreamSource<String> stringDataStreamSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka");
  17.  
    // stringDataStreamSource.print("kafka msg");
  18.  
    stringDataStreamSource
  19.  
    .addSink(sink);
  20.  
    env.execute("test");
学新通

 从topic cc_test消费 然后写到cc_test2里面去

cc_test里的数据

学新通

 cc_test_2里写入的数据

学新通

 可以看到一个null 报错了,然后它分区的333就会一直被提交。

总之大家小心这个问题。

不加检查点 flink报错后就会直接停掉。。

加了检查点env.enableCheckpointing(6000); flink失败后会一直重试

加了重试机制 env.setRestartStrategy(RestartStrategies.failureRateRestart(3,Time.of(5000, TimeUnit.SECONDS),Time.of(5000,TimeUnit.SECONDS))); 失败的任务只会重试几次。

还是得熟悉源码呀。

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhiagacg
系列文章
更多 icon
同类精品
更多 icon
继续加载