• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Python3.6.9 Flink 1.15.2消费Kafaka Topic

武飞扬头像
Shen Liang
帮助1

查看Linux发行版和内核版本

root@shenliang-VirtualBox:~# cat /proc/version

Linux version 5.4.0-135-generic (buildd@lcy02-amd64-053) (gcc version 7.5.0 (Ubuntu 7.5.0-3ubuntu1~18.04)) #152~18.04.2-Ubuntu SMP Tue Nov 29 08:23:49 UTC 2022

注:pyflink 1.15.2对python3的版本有要求,当前验证环境为python 3.6.9。

学新通

安装Pyflink 1.15.2

pip3 install apache-flink==1.15.2 -i http://pypi.douban.com/simple --trusted-host pypi.douban.com

学新通

下载Kafka

启动Zookeeper

进入kafka主目录,在该目录内启动zookeeper。

bin/zookeeper-server-start.sh config/zookeeper.properties

:可以通过nohup命令使其在后台运行,以下执行的命令类似。

启动kafka

bin/kafka-server-start.sh config/server.properties

创建Topic主题

通过virtualenv命令创建虚拟目录

bin/kafka-topics.sh --create --zookeeper localhost:2181 -replication-factor 1 --partitions 1 --topic flink_kafakasource

查看当前创建的Topic

bin/kafka-topics.sh --list --zookeeper localhost:2181

学新通

获取kafka的版本

find ./libs/ -name \*kafka_\* | head -1 | grep -o '\kafka[^\n]*

学新通

从结果可以看出scala的版本是2.12,kafka的版本是2.8.2

配置Flink Kafka连接

在https://mvnrepository.com/里输入 flink kafka寻找对应版本的连接器。

学新通

 选择Flink对应的版本1.15.2,点击jar,分别下载flink-connector-basekafka-clients对应的jar包。

学新通

将该jar包放置在python的lib目录下。

/usr/local/lib/python3.6/dist-packages/pyflink/lib

学新通

注:

flink-connector-kafka-1.15.2.jar

kafka-clients-2.8.1.jar

flink-connector-base-1.15.2.jar

 编写并运行Flink程序

# -*- coding: UTF-8 -*-
from pyflink.datastream import StreamExecutionEnvironment, CheckpointingMode
from pyflink.table import StreamTableEnvironment, TableConfig, DataTypes, CsvTableSink, WriteMode, SqlDialect
s_env = StreamExecutionEnvironment.get_execution_environment()
s_env.set_parallelism(1)
s_env.enable_checkpointing(3000)
st_env = StreamTableEnvironment.create(s_env)#, TableConfig())
st_env.use_catalog("default_catalog")
st_env.use_database("default_database")
sourceKafkaDdl = """
create table sourceKafka(
id int,name varchar
)
with(
 'connector'='kafka',
 'topic'='flink_kafakasource',
 'properties.bootstrap.servers'='localhost:9092',
 'scan.startup.mode'='latest-offset',
 'format'='json'
)
"""
st_env.execute_sql(sourceKafkaDdl)
fieldNames = ["id", "name"]
fieldTypes = [DataTypes.INT()
, DataTypes.STRING()]
csvSink = CsvTableSink(fieldNames
, fieldTypes, "/root/tiamaes/result.csv", ",", 1, WriteMode.OVERWRITE)
st_env.register_table_sink("csvTableSink", csvSink)
st_env.execute_sql("""
    INSERT INTO csvTableSink
        select * from sourceKafka
""").wait()

#执行程序
python flinkdemo.py

打开kafka生产者

打开kafka生产者,通过客户端生产数据。

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic flink_kafakasource

学新通

查看Flink侧结果 

学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhhkbahi
系列文章
更多 icon
同类精品
更多 icon
继续加载