FlinkSql知识点
0 简介
-
flinksql定义?
Flink core架构上的sql语义处理结构化数据的上层库
-
工作流程?
sql得有表啊,数据源绑定schema后注册为catalog中的表->用户sql表达计算逻辑->tableplanner通过apache calcite解析sql并绑定元数据生成语法树,变成逻辑执行计划(树)并优化->逻辑计划 算子具体实现得到物理执行计划->代码生成得到算子树->转为作业图给到集群执行
-
两个优化器?
RBO(规则) CBO(代价),简单概括为前者调整树结构,后者换低成本算子
以下面sql为例:
select t1.id,1 2 t1.value AS v from t1 join t2 where t1.id=t2.id and t2.id<1000
优化过程:
先解析成语法树(from-join-where-select)->得到逻辑执行计划(scan t1,t2-join-filter-project)->RBO优化(常量折叠,谓词下推即减少join数据量,投影下推即join前列裁剪),得到(scan t1 project,scan t2 project filter-join-project)->CBO优化得到物理执行计划(scan-calc-broadcasthashjoin-calc),包括用broadcasthashjoin而非hashjoin,即按键分区广播连接流将实现mapjoin而非reducejoin,以及project filter用calc物理算子实现
-
动态表?
由于流式处理无界流,源表持续输入flinksql持续计算结果表持续输出,过程中涉及到的表就是动态表
动态体现在结果表的数据的追加、撤回、更新
因此动态表的底层数据流中数据特有属性:ChangeMode(修正模式标记),对应的就是changelogStream
1 编程基础
-
入口环境TableEnvironment主要功能?
注册catalog、向catalog注册表、加载模块、sql查询(解析、生成计划、job提交)、注册UDF函数、流表互转
//方式1,直接创建表环境(直接流批一体) EnvironmentSettings settings = EnvironmentSettings.inStreamingMode(); TableEnvironment tabEnv = TableEnvironment.create(settings); //方式2,由流环境得到(间接使用流环境的流批一体),常用于流表互转 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tev = StreamTableEnvironment.create(env);
-
TableSql编程示例?
4部曲:入口->执行建表(做数据源映射,连接器)->sql查询->插入目标表
public class Demo01_TableSql { public static void main(String[] args) { EnvironmentSettings settings = EnvironmentSettings.inStreamingMode(); TableEnvironment tabEnv = TableEnvironment.create(settings); tabEnv.executeSql( " create table t_kafka( " " id int " " ,name string " " ,age int " " ,gender string " " ) WITH ( " " 'connector' = 'kafka', " " 'topic' = 'doit0204', " " 'properties.bootstrap.servers' = 'hadoop102:9092', " " 'properties.group.id' = 'zxk', " " 'scan.startup.mode' = 'earliest-offset', " " 'format' = 'json' " " ) "); tabEnv.executeSql("select gender,avg(age) as age_avg from t_kafka group by gender").print(); } }
-
TableApi编程示例?
说明:建表(不用create而用from,传入表描述器,而表描述器又是通过连接器 schema来build)得到表table,然后execute查询得到tableResult,直接可以打印,也可以调用关键字传字段名(需用$封装为表达式,建表时只有逻辑字段定义时基于的物理字段可用),最后可以执行也可以插入sink表
public class Demo02_TableApi { public static void main(String[] args) { EnvironmentSettings settings = EnvironmentSettings.inStreamingMode(); TableEnvironment tabEnv = TableEnvironment.create(settings); Table table = tabEnv.from(TableDescriptor.forConnector("kafka") .schema(Schema.newBuilder() .column("id", DataTypes.STRING()) .column("name", DataTypes.STRING()) .column("age", DataTypes.INT()) .column("gender", DataTypes.STRING()) .build()) .format("json") .option("topic", "doit0204") .option("properties.bootstrap.servers", "hadoop102:9092") .option("properties.group.id", "zxk") .option("scan.startup.mode", "earliest-offset") .build()); Table table2 = table.groupBy($("gender")) .select($("gender"),$("age").avg().as("age_avg")); table2.execute().print(); } }
2 表概念相关
-
表的标识?
共有三级标识,一级为catalog,二三级为库和表
代码中如何
-
不同catalog的区别?
不同catalog表示元数据存于不同的数据源(因此对库表的管理机制也不同),hive catalog表示连接hive的metastore即表结构存于hive的mysql中(hive虽能看见元数据但无法查数据因为hive查需要路径format等,而flink查需要connecter等),而inner catalog将schema存于会话期间flink本地内存中
-
catalog存于何处,如何管理?
一个flinksql程序运行时,首先,创建表环境(创建了可插拔模块,catalogManager,其中就设置了默认的为GenericInMemoryCatalog,该catalog的构造器中就将二三级标识存放于两个LinkedHashMap中),tableEnv持有Map结构记录注册的catalog
-
视图和表区别?
表由实际数据映射而来,而视图是由表查询逻辑而来
-
永久和临时区别?
区别在于表结构信息是否持久化,结合catalog判断,临时表schema只维护在所属flinksession运行时内存且flinksession间无法共享,永久表schema只有记录在外部可持久化元数据管理器才能真正持久化,这样程序重启元数据可用且不用重新建表且session间可共享,若记录在inner catalog中则和临时表没区别
因此想要实现表元数据在session间共享,首先得用永久表,此外得用外部catalog持久化
-
临时表与永久表重名问题?
现象:flinksql中建hive下临时表,查询发现其不在任何catalog中,因为存入了代表临时表的Map对象中,而该临时表的一级标识就仅是一个前缀了
问题:临时表和永久表重名,即两个表的三级标识都相同,表类型不同,优先查临时表
-
flink使用hive元数据空间?
连接hive的元数据管理系统,加依赖,连元数据服务9083,需要给配置文件以说明,注册hivecatalog时指定文件路径,使用时建表前必须use语句指定hivecatalog
//了解catalog public class Demo05_CatalogDemo { public static void main(String[] args) { EnvironmentSettings settings = EnvironmentSettings.inStreamingMode(); TableEnvironment tabEnv = TableEnvironment.create(settings); //创建catalog实现对象 HiveCatalog hiveCatalog = new HiveCatalog("hive",null,"F:\\IDEAProject\\flink_doit20230110\\conf\\hiveconf"); //注册到表环境中 tabEnv.registerCatalog("myCatalog",hiveCatalog); tabEnv.executeSql( " create table t_kafka( " " id int " " ,name string " " ,age int " " ,gender string " " ) WITH ( " " 'connector' = 'kafka', " " 'topic' = 'doit0204', " " 'properties.bootstrap.servers' = 'hadoop102:9092', " " 'properties.group.id' = 'zxk', " " 'scan.startup.mode' = 'earliest-offset', " " 'format' = 'json' " " ) "); //使用内置catalog /*tabEnv.executeSql("show catalogs").print(); tabEnv.executeSql("use catalog default_catalog"); tabEnv.executeSql("show databases").print(); tabEnv.executeSql("use default_database"); tabEnv.executeSql("show tables").print();*/ //使用外部catalog tabEnv.executeSql("show catalogs").print(); tabEnv.executeSql("use catalog myCatalog"); tabEnv.executeSql("show databases").print(); tabEnv.executeSql("show tables").print(); } }
3 建表相关
-
Table表对象获取方法?
tableEnv.from开头的方法,传入标识(sql表名)/表描述器/row流/javabean流(若不是JB则默认的schema只有一个字段f0,javabean才能反射自动推断schema,也可以手动定义schema,但是无法重命名字段)/测试数据(重命名字段需传入数据类型)/table调api查询结果/SqlQuery返回值
说明:由于表映射数据,而视图基于表,所以无论怎么调方法,只有传入表描述器得到的才是表,否则只是视图
//Table对象获取方法 public class Demo03_TableObjectCreate { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tev = StreamTableEnvironment.create(env); //标识 /*tev.executeSql( " create table t_kafka( " " id int " " ,name string " " ,age int " " ,gender string " " ) WITH ( " " 'connector' = 'kafka', " " 'topic' = 'doit0204', " " 'properties.bootstrap.servers' = 'hadoop102:9092', " " 'properties.group.id' = 'zxk', " " 'scan.startup.mode' = 'earliest-offset', " " 'format' = 'json' " " ) "); Table table1 = tev.from("t_kafka");*/ //描述器 /*Table table2 = tev.from(TableDescriptor.forConnector("kafka") .schema(Schema.newBuilder() .column("id", DataTypes.STRING()) .column("name", DataTypes.STRING()) .column("age", DataTypes.INT()) .column("gender", DataTypes.STRING()) .build()) .format("json") .option("topic", "doit0204") .option("properties.bootstrap.servers", "hadoop102:9092") .option("properties.group.id", "zxk") .option("scan.startup.mode", "earliest-offset") .build());*/ //javabean流 /*KafkaSource<String> kafkaSource = KafkaSource.<String>builder() .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)) .setGroupId("zxk") .setTopics("doit0204") .setBootstrapServers("hadoop102:9092") .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); //如果该流直接生成Table对象,没指定format无法拆分出字段解析出schema,则使用默认的Schema DataStreamSource<String> kfk = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kfk"); //转为javabean流才能拆分出字段 SingleOutputStreamOperator<Person> mapStream = kfk.map(json -> JSON.parseObject(json, Person.class)); //自动推断schema // Table table3 = tev.fromDataStream(mapStream); //手动指定schema Table table3 = tev.fromDataStream(mapStream, Schema.newBuilder() .column("id", DataTypes.INT()) .column("name", DataTypes.STRING()) .column("age", DataTypes.INT()) .column("gender", DataTypes.STRING()) .build()); table3.execute().print(); env.execute();*/ //测试数据 //单字段 /*Table table4 = tev.fromValues(1, 2, 3, 4); table4.printSchema(); table4.execute().print();*/ //多字段 Table table4 = tev.fromValues( //没传入数据类型则没有字段名 DataTypes.ROW( DataTypes.FIELD("id",DataTypes.INT()), DataTypes.FIELD("name",DataTypes.STRING()), DataTypes.FIELD("age",DataTypes.DOUBLE()) ), Row.of(1, "zs", 18.2), Row.of(2, "bb", 28.2), Row.of(3, "cc", 16.2), Row.of(4, "dd", 38.2) ); table4.printSchema(); table4.execute().print(); } @NoArgsConstructor @AllArgsConstructor public static class Person{ public int id; public String name; public int age; public String gender; } }
-
sql表创建方法?
tableEnv.create开头的方法:①传入表描述器生成临时/永久表②传入流(最好传javabean流,和上面一样)/Table对象生成临时视图(这里就体现流基于表)③执行sql的DDL语句(这里视图的创建就可以通过查询/连接器)
//sql表的创建 public class Demo04_SqlTableCreate { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tev = StreamTableEnvironment.create(env); //表描述器建sql表标识 tev.createTable("t_kafka" , TableDescriptor.forConnector("filesystem") .schema(Schema.newBuilder() .column("id",DataTypes.INT()) .column("name",DataTypes.STRING()) .column("age",DataTypes.INT()) .column("gender",DataTypes.STRING()) .build()) .format("csv") .option("path","data/sqldemo/a.txt") .build()); // tev.executeSql("select gender,max(age) from t_kafka group by gender").print(); //流建视图 /*DataStreamSource<String> socketTextStream = env.socketTextStream("hadoop102", 9999); SingleOutputStreamOperator<Person> stream = socketTextStream.map(s -> new Person(Integer.parseInt(s.split(",")[0]), s.split(",")[1], Integer.parseInt(s.split(",")[2]), s.split(",")[3])); tev.createTemporaryView("t_kafka", stream); tev.executeSql("select gender,max(age) from t_kafka group by gender").print();*/ //表建视图 tev.createTemporaryView("x_kafka", tev.from("t_kafka")); tev.executeSql("select gender,max(age) from x_kafka group by gender").print(); } @NoArgsConstructor @AllArgsConstructor public static class Person{ public int id; public String name; public int age; public String gender; } }
4 表定义相关
4.1 schema字段定义
字段有3种:①物理字段②逻辑字段(表达式字段)③元数据字段
-
flinksql定义字段:
//{"id":1,"name":"zs","nick":"tiedan","age":18,"gender":"male"} //schema定义详解之字段 public class Demo07_ColumnDetail01 { public static void main(String[] args) { EnvironmentSettings settings = EnvironmentSettings.inStreamingMode(); TableEnvironment tabEnv = TableEnvironment.create(settings); tabEnv.executeSql( " create table t_kafka( " " id int " " ,name string " " ,nick string " " ,age int " " ,gender string " " ,guid as id "//逻辑字段实现重命名 " ,big_age as age 10 "//逻辑字段 " ,offs bigint metadata from 'offset' "//元数据字段 " ,ts TIMESTAMP_LTZ(3) metadata from 'timestamp'"//元数据字段 " ) WITH ( " " 'connector' = 'kafka', " " 'topic' = 'doit0204', " " 'properties.bootstrap.servers' = 'hadoop102:9092', " " 'properties.group.id' = 'zxk', " " 'scan.startup.mode' = 'latest-offset', " " 'format' = 'json' " " ) "); tabEnv.executeSql("desc t_kafka").print(); tabEnv.executeSql("select id,name,nick,age,gender,guid,big_age,offs,ts from t_kafka").print(); } }
-
tableapi定义字段:
//{"id":1,"name":"zs","nick":"tiedan","age":18,"gender":"male"} //schema定义详解之字段 public class Demo07_ColumnDetail02 { public static void main(String[] args) { EnvironmentSettings settings = EnvironmentSettings.inStreamingMode(); TableEnvironment tabEnv = TableEnvironment.create(settings); tabEnv.createTable("t_kafka", TableDescriptor.forConnector("kafka") .schema(Schema.newBuilder() .column("id", DataTypes.INT()) .column("name", DataTypes.STRING()) .column("nick", DataTypes.STRING()) .column("age", DataTypes.INT()) .column("gender", DataTypes.STRING()) .columnByExpression("guid","id")//这里使用$他还不知道id是哪个字段 // .columnByExpression("big_age",$("age").plus(10)) .columnByExpression("big_age","age 10") .columnByMetadata("offs",DataTypes.BIGINT(),"offset",true) .columnByMetadata("ts",DataTypes.TIMESTAMP_LTZ(3),"timestamp",true) // .primaryKey("id") .build()) .format("json") .option("topic", "doit0204") .option("properties.bootstrap.servers", "hadoop102:9092") .option("properties.group.id", "zxk") .option("scan.startup.mode", "latest-offset") .build()); tabEnv.executeSql("desc t_kafka").print(); tabEnv.executeSql("select id,name,nick,age,gender,guid,big_age,offs,ts from t_kafka").print(); } }
4.2 format详解
-
csv文件解析
连接器为fliesystem,列信息顺序对应,这里文件系统连接器仅支持批处理模式,参数包括:包围字符,注释行解析,null值定义,忽视解析错误
//csvformat解析 public class Demo08_CsvFormat { public static void main(String[] args) { EnvironmentSettings settings = EnvironmentSettings.inStreamingMode(); TableEnvironment tabEnv = TableEnvironment.create(settings); // tabEnv.executeSql( " create table t_kafka( " " id int " " ,name string " " ,age int " " ) WITH ( " " 'connector' = 'filesystem', " " 'path' = 'data/csv', " " 'csv.quote-character' = '|', " " 'csv.allow-comments' = 'true', " " 'csv.ignore-parse-errors' = 'true', " " 'csv.null-literal' = '\\N', " " 'format' = 'csv' " " ) "); tabEnv.executeSql("desc t_kafka").print(); tabEnv.executeSql("select * from t_kafka").print(); } }
-
json格式解析
json数据:3种符号分别是{}对象,[]数组,:表示一对kv,具体类型只有字符串/数字,其他类型则需要string去映射,数组/{}可以嵌套,最外层也可以是[]
json中怎么取内部字段? row就是.,map就是[‘key’],array就是[index]从1开始
//jsonformat解析 public class Demo08_JsonFormat { public static void main(String[] args) { EnvironmentSettings settings = EnvironmentSettings.inStreamingMode(); TableEnvironment tabEnv = TableEnvironment.create(settings); /*//定义为map类型 tabEnv.executeSql( " create table t_kafka( " " id int " " ,name map<string,string> " " ,big_id as id*10 " " ) WITH ( " " 'connector' = 'filesystem', " " 'path' = 'data/json/qiantao', " " 'format' = 'json' " " ) "); tabEnv.executeSql("desc t_kafka").print(); tabEnv.executeSql("select * from t_kafka").print(); tabEnv.executeSql("select id,name['nick'] as nick from t_kafka").print(); System.out.println("======================="); //定义为row类型 tabEnv.createTable("t", TableDescriptor.forConnector("filesystem") .schema(Schema.newBuilder() .column("id", DataTypes.INT()) .column("name",DataTypes.ROW( DataTypes.FIELD("nick",DataTypes.STRING()) ,DataTypes.FIELD("formal",DataTypes.STRING()) ,DataTypes.FIELD("height",DataTypes.INT()) )) .build()) .format("json") .option("path","data/json/qiantao2") .build()); tabEnv.executeSql("desc t").print(); tabEnv.executeSql("select * from t").print(); tabEnv.executeSql("select id,name.nick,name.formal,name.height as nick from t").print(); System.out.println("=======================");*/ //复杂嵌套类型 tabEnv.executeSql( " create table t_json3( " " id int " " ,friends array<row<name string,info map<string,string>>>" " ) WITH ( " " 'connector' = 'filesystem', " " 'path' = 'data/json/qiantao3', " " 'format' = 'json' " " ) "); tabEnv.executeSql("desc t_json3").print(); tabEnv.executeSql("select * from t_json3").print(); tabEnv.executeSql("select id" ",friends[1].name as name1" ",friends[1].info['addr'] as addr1" ",friends[1].info['gender'] as gender1" ",friends[2].name as name2" ",friends[2].info['addr'] as addr2" ",friends[2].info['gender'] as gender2 " "from t_json3").print(); } }
4.3 watermark与时间属性详解
-
如何定义时间相关字段,watermark,rowtime,processtime?
连接器建表时定义watermark类似声明表达式字段,查询当前水印传入事件时间字段,要求相关时间字段类型为TIMESTAMP(3),必须到毫秒精度,若时间字段为13位长整型epochtime,利用逻辑字段将物理字段转换,再在watermark传入逻辑字段,处理时间字段声明和查询都像逻辑字段
//时间语义和watermark定义 /*测试数据: {"guid":1,"eventId":"e02","eventTime":"2022-06-12 14:35:10.200","pageId":"p001"} {"guid":1,"eventId":"e02","eventTime":"2022-06-12 14:35:11.200","pageId":"p001"} {"guid":1,"eventId":"e02","eventTime":1655017433000,"pageId":"p001"} {"guid":1,"eventId":"e03","eventTime":1655017434000,"pageId":"p001"} {"guid":1,"eventId":"e04","eventTime":1655017435000,"pageId":"p001"} {"guid":1,"eventId":"e05","eventTime":1655017436000,"pageId":"p001"} {"guid":1,"eventId":"e06","eventTime":1655017437000,"pageId":"p001"} {"guid":1,"eventId":"e07","eventTime":1655017438000,"pageId":"p001"} {"guid":1,"eventId":"e08","eventTime":1655017439000,"pageId":"p001"}*/ public class Demo09_EventTimeAndWatermark { public static void main(String[] args) { EnvironmentSettings settings = EnvironmentSettings.inStreamingMode(); TableEnvironment tabEnv = TableEnvironment.create(settings); /*//timestamp类型时间 tabEnv.executeSql( " create table t_kafka( " " guid int " " ,eventId string " " ,eventTime timestamp(3) " " ,pageId string " " ,watermark for eventTime as eventTime - interval '0.001' second " " ) WITH ( " " 'connector' = 'kafka', " " 'topic' = 'doit0204', " " 'properties.bootstrap.servers' = 'hadoop102:9092', " " 'properties.group.id' = 'zxk', " " 'scan.startup.mode' = 'latest-offset', " " 'format' = 'json' " " ) ");*/ //13位长整型类型时间 tabEnv.executeSql( " create table t_kafka( " " guid int " " ,eventId string " " ,eventTime bigint " " ,pageId string " " ,rt as to_timestamp_ltz(eventTime,3) "//事件时间逻辑字段 " ,watermark for rt as rt - interval '0.001' second "//事件时间 " ,pt as proctime() "//处理时间 " ) WITH ( " " 'connector' = 'kafka', " " 'topic' = 'doit0204', " " 'properties.bootstrap.servers' = 'hadoop102:9092', " " 'properties.group.id' = 'zxk', " " 'scan.startup.mode' = 'latest-offset', " " 'format' = 'json' " " ) "); tabEnv.executeSql("desc t_kafka").print(); tabEnv.executeSql("select guid,eventId,eventTime,pageId,current_watermark(rt),pt from t_kafka").print(); } }
-
表与流之间watermark传承?
首先,有水印流create转表再from转Table再to转流发现表中没有水印(说明流转表时间语义和水位都没有自动继承),而且流泛型为Row其有rowkind属性为模式描述( I,-U, U,-D)数组存数据,因此,有watermark的流转表时仍需重新指明水印字段;
①重新定义写法:create方法传入表名 流 schema,schema中就定义watermark字段(若时间字段类型不符,得先得到符合的逻辑字段);
②沿用流定义的写法:用到了流转表changelog流底层的隐藏连接器的元数据字段,先定义事件时间元数据字段(注意传参,若两字段重名则无需传metadatakey),他就是符合要求的TIMESTAMP(3)类型,再沿用流的水印生成策略;
表转流水印自动继承
//流转表处理watermark定义 /*测试数据: {"guid":1,"eventId":"e02","eventTime":"2022-06-12 14:35:10.200","pageId":"p001"} {"guid":1,"eventId":"e02","eventTime":"2022-06-12 14:35:11.200","pageId":"p001"} {"guid":1,"eventId":"e02","eventTime":1655017433000,"pageId":"p001"} 1,e02,1655017433000,p001 1,e02,1655017434000,p001 1,e02,1655017435000,p001 1,e02,1655017436000,p001 {"guid":1,"eventId":"e03","eventTime":1655017434000,"pageId":"p001"} {"guid":1,"eventId":"e04","eventTime":1655017435000,"pageId":"p001"} {"guid":1,"eventId":"e05","eventTime":1655017436000,"pageId":"p001"} {"guid":1,"eventId":"e06","eventTime":1655017437000,"pageId":"p001"} {"guid":1,"eventId":"e07","eventTime":1655017438000,"pageId":"p001"} {"guid":1,"eventId":"e08","eventTime":1655017439000,"pageId":"p001"}*/ public class Demo09_EventTimeAndWatermark02 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1);//并行度不设置为1观察不到水印的推进 StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env); DataStreamSource<String> source = env.socketTextStream("hadoop102", 9999); SingleOutputStreamOperator<Event> stream = source.map(new MapFunction<String, Event>() { @Override public Event map(String s) throws Exception { String[] strings = s.split(","); return new Event(Integer.parseInt(strings[0]), strings[1], Long.parseLong(strings[2]), strings[3]); } }).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Event>() { @Override public long extractTimestamp(Event event, long l) { return event.getEventTime(); } })); //拿到有水印流,开始操作表 tabEnv.createTemporaryView("t_kafka",stream, Schema.newBuilder() .column("guid", DataTypes.INT()) .column("eventId", DataTypes.STRING()) .column("eventTime", DataTypes.BIGINT()) .column("pageId", DataTypes.STRING()) // .columnByExpression("rt","to_timestamp_ltz(eventTime,3)")//重新定义事件时间 // .watermark("rt","rt - interval '1' second")//重新设计水印生成 .columnByMetadata("rt",DataTypes.TIMESTAMP_LTZ(3),"rowtime")//沿用元数据字段事件时间 .watermark("rt","source_watermark()")//沿用流中水印生成策略,sql中作为表达式放在as后 .build()); tabEnv.executeSql("desc t_kafka").print(); tabEnv.executeSql("select guid,eventId,eventTime,pageId,current_watermark(rt) from t_kafka").print(); env.execute(); } @Data @NoArgsConstructor @AllArgsConstructor public static class Event { public int guid; public String eventId; public long eventTime; public String pageId; } }
4.4 connector详解
-
理解connector?
①不同connector得导不用jar包依赖
②对应不同得连接器名称
③提供不同的所需参数
④提供元数据
⑤源表和目标表的connector实际为source/sink算子的封装
-
kafka连接器?
①kafka消息中key、value、headers中字段映射? 生产消息时(命令行只能生产value,这里用工具)设置key,value,headers,这里如何区分key和value中json解析出的同名字段(参数设置解析key,并给其字段加前缀和声明),sql中定义元数据字段时如果同名就无需加from…,查询结果中headers的value为string对应的byte数组,可以通过cast转string
//kafka连接器详解,需升级到2023版idea的bigdatatools /*测试数据: {"guid":1,"eventId":"e02","eventTime":"2022-06-12 14:35:10.200","pageId":"p001"} {"guid":1,"eventId":"e02","eventTime":"2022-06-12 14:35:11.200","pageId":"p001"} {"guid":1,"eventId":"e02","eventTime":1655017433000,"pageId":"p001"} {"guid":1,"eventId":"e03","eventTime":1655017434000,"pageId":"p001"} {"guid":1,"eventId":"e04","eventTime":1655017435000,"pageId":"p001"} {"guid":1,"eventId":"e05","eventTime":1655017436000,"pageId":"p001"} {"guid":1,"eventId":"e06","eventTime":1655017437000,"pageId":"p001"} {"guid":1,"eventId":"e07","eventTime":1655017438000,"pageId":"p001"} {"guid":1,"eventId":"e08","eventTime":1655017439000,"pageId":"p001"}*/ public class Demo10_KafkaConnectorDetail { public static void main(String[] args) { EnvironmentSettings settings = EnvironmentSettings.inStreamingMode(); TableEnvironment tabEnv = TableEnvironment.create(settings); //13位长整型类型时间 tabEnv.executeSql( " create table t_kafka( " " guid int " " ,eventId string " " ,eventTime bigint " " ,pageId string " " ,k1 int " " ,k2 string " " ,inKey_k1 int " " ,inKey_k2 string " " ,rec_ts timestamp(3) metadata from 'timestamp' " " ,`offset` bigint metadata " " ,rt as to_timestamp_ltz(eventTime,3) "//事件时间逻辑字段 " ,watermark for rt as rt - interval '0.001' second "//事件时间 " ,pt as proctime() "//处理时间 " ) WITH ( " " 'connector' = 'kafka', " " 'topic' = 'doit0204', " " 'properties.bootstrap.servers' = 'hadoop102:9092', " " 'properties.group.id' = 'zxk', " " 'scan.startup.mode' = 'latest-offset', " " 'value.fields-include' = 'EXCEPT_KEY', " " 'key.fields-prefix' = 'inKey_', " " 'key.fields' = 'inKey_k1;inKey_k2', " " 'key.format' = 'json', " " 'value.format' = 'json' " " ) "); tabEnv.executeSql("desc t_kafka").print(); tabEnv.executeSql("select guid,eventId,eventTime,pageId,k1,k2,inKey_k1,inKey_k2,rec_ts,`offset`,current_watermark(rt),pt from t_kafka").print(); } }
②upsert-kafka连接器? kafka连接器仅接收和生产appendonly流( I模式),upsert-kafka,作为source读到数据转为( I,-U U),读到null转为-D,作为sink则反过来处理,两种连接器的区别就在于流模式不同和表定义中是否有主键
③upsertkafka连接器的leftjoin问题? 首先,leftjoin结果数据经upsert-kafka写到kafka,会产生 I-D I三条数据(相同主键还会产生-U和 U),而kafka对应收到的是wrong,null,right三条数据,下游再用upsert-kafka消费kafka,能恢复这三条数据(遇到null怎么就能恢复出错误数据?将错误数据存入状态,遇到null通过其紧跟的下一条正确数据的主键还原出状态中的错误数据),这之后的如果有join上的相同主键的数据则为-U U属于重复数据更新问题,类似groupby,groupby的被kafka吃掉的-U就是通过 U的主键还原的//leftjoin测试 I,-D流 public class Demo11_UpsertKafkaConnectorTest02 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env); DataStreamSource<String> source1 = env.socketTextStream("hadoop102", 9998); SingleOutputStreamOperator<Bean1> stream1 = source1.map(new MapFunction<String, Bean1>() { @Override public Bean1 map(String s) throws Exception { String[] fields = s.split(","); return new Bean1(Integer.parseInt(fields[0]), fields[1]); } }); tabEnv.createTemporaryView("t1", stream1); DataStreamSource<String> source2 = env.socketTextStream("hadoop102", 9997); SingleOutputStreamOperator<Bean2> stream2 = source2.map(new MapFunction<String, Bean2>() { @Override public Bean2 map(String s) throws Exception { String[] fields = s.split(","); return new Bean2(Integer.parseInt(fields[0]), fields[1]); } }); tabEnv.createTemporaryView("t2", stream2); tabEnv.executeSql( "create table t_nick_kafka2( " " id int primary key not enforced" " ,gender string" " ,name string " " ) WITH ( " " 'connector' = 'upsert-kafka', " " 'topic' = 'doit0205', " " 'properties.bootstrap.servers' = 'hadoop102:9092', " " 'key.format' = 'json' ," " 'value.format' = 'json' " " ) "); tabEnv.executeSql("insert into t_nick_kafka2 " "select t1.id,t1.gender,t2.name from t1 left join t2 on t1.id=t2.id"); tabEnv.executeSql("select * from t_nick_kafka2").print(); env.execute(); } @Data @NoArgsConstructor @AllArgsConstructor public static class Bean1{ public int id; public String gender; } @Data @NoArgsConstructor @AllArgsConstructor public static class Bean2{ public int id; public String name; } }
④:groupby配合upsertkafka? sink表设置主键not enforced,表示虽有主键,但非幂等插入,这也是为什么leftjoin的sink表有主键但可以插入多条重复数据
//groupby测试 I,-U, U流 public class Demo11_UpsertKafkaConnectorTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env); DataStreamSource<String> source = env.socketTextStream("hadoop102", 9998); SingleOutputStreamOperator<Bean1> stream = source.map(new MapFunction<String, Bean1>() { @Override public Bean1 map(String s) throws Exception { String[] fields = s.split(","); return new Bean1(Integer.parseInt(fields[0]), fields[1]); } }); tabEnv.createTemporaryView("t", stream); tabEnv.executeSql( "create table t_nick_kafka( " " gender string primary key not enforced" " ,cnt bigint " " ) WITH ( " " 'connector' = 'upsert-kafka', " " 'topic' = 'doit0205', " " 'properties.bootstrap.servers' = 'hadoop102:9092', " " 'key.format' = 'json' ," " 'value.format' = 'json' " " ) "); tabEnv.executeSql("insert into t_nick_kafka " "select gender,count(distinct id) from t group by gender"); tabEnv.executeSql("select * from t_nick_kafka").print(); env.execute(); } @Data @NoArgsConstructor @AllArgsConstructor public static class Bean1{ public int id; public String gender; } }
-
jdbc连接器?
①作为source有2种模式,scan模式为不更新的快照得到有界流(分区参数为并行读,按列切分,不设置则1个并行度),读完job退出,相反对应的可以更新的无界流得用cdcsource
//jdbc作为source测试 public class Demo12_JdbcConnectorTest01 { public static void main(String[] args) { EnvironmentSettings settings = EnvironmentSettings.inStreamingMode(); TableEnvironment tabEnv = TableEnvironment.create(settings); tabEnv.executeSql( " create table flink_stu( " " id int primary key" " ,name string " " ,age int " " ,gender string " " ) WITH ( " " 'connector' = 'jdbc', " " 'url' = 'jdbc:mysql://hadoop102:3306/gmall', " " 'table-name' = 'score', " " 'username' = 'root', " " 'password' = '123456' " " ) "); tabEnv.executeSql("select * from flink_stu").print(); } }
②作为sink有追加语义和更新语义,更新语义和kafka不同(-U吞掉, U真就幂等插入,重复数据更新on duplicated update…),flink中表有没有设置主键就是append/update模式,撤回流插入的表若没有主键会失败,有主键则相同数据更新之前的(完全幂等插入,就算mysql表没有主键,相同永远只保留一条新join上的最新数据)
//jdbc作为sink测试 public class Demo12_JdbcConnectorTest02 { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env); DataStreamSource<String> source1 = env.socketTextStream("hadoop102", 9998); SingleOutputStreamOperator<Demo11_UpsertKafkaConnectorTest02.Bean1> stream1 = source1.map(new MapFunction<String, Demo11_UpsertKafkaConnectorTest02.Bean1>() { @Override public Demo11_UpsertKafkaConnectorTest02.Bean1 map(String s) throws Exception { String[] fields = s.split(","); return new Demo11_UpsertKafkaConnectorTest02.Bean1(Integer.parseInt(fields[0]), fields[1]); } }); tabEnv.createTemporaryView("t1", stream1); DataStreamSource<String> source2 = env.socketTextStream("hadoop102", 9997); SingleOutputStreamOperator<Demo11_UpsertKafkaConnectorTest02.Bean2> stream2 = source2.map(new MapFunction<String, Demo11_UpsertKafkaConnectorTest02.Bean2>() { @Override public Demo11_UpsertKafkaConnectorTest02.Bean2 map(String s) throws Exception { String[] fields = s.split(","); return new Demo11_UpsertKafkaConnectorTest02.Bean2(Integer.parseInt(fields[0]), fields[1]); } }); tabEnv.createTemporaryView("t2", stream2); tabEnv.executeSql( " create table flink_stu2( " " id int primary key" " ,gender string " " ,name string " " ) WITH ( " " 'connector' = 'jdbc', " " 'url' = 'jdbc:mysql://hadoop102:3306/gmall', " " 'table-name' = 'flinksql_test2', " " 'username' = 'root', " " 'password' = '123456' " " ) "); tabEnv.executeSql("insert into flink_stu2 " "select t1.id,t1.gender,t2.name from t1 left join t2 on t1.id=t2.id"); } @Data @NoArgsConstructor @AllArgsConstructor public static class Bean1{ public int id; public String gender; } @Data @NoArgsConstructor @AllArgsConstructor public static class Bean2{ public int id; public String name; } }
-
文件系统连接器?
能读能写,这里写出支持文件分区(分区字段为真实字段,与hive相反),分区提交策略(开启事务延迟分区文件提交,控制生命周期),分区内文件滚动策略,开启checkpoint提交数据
//文件系统连接器作为sink public class Demo13_FilesystemConnectorTest { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint"); env.setRuntimeMode(RuntimeExecutionMode.STREAMING); StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env); DataStreamSource<String> source1 = env.socketTextStream("hadoop102", 9998); SingleOutputStreamOperator<Tuple4<String, Double, String, String>> stream1 = source1.map(new MapFunction<String, Tuple4<String, Double, String, String>>() { @Override public Tuple4<String, Double, String, String> map(String s) throws Exception { String[] fields = s.split(","); return Tuple4.of(fields[0], Double.parseDouble(fields[1]), fields[2], fields[3]); } }); tabEnv.createTemporaryView("t1", stream1); tabEnv.executeSql( " create table fs( " " user_id string primary key" " ,order_amount double " " ,dt string " " ,`hour` string " " ) partitioned by (dt,`hour`) WITH ( " " 'connector' = 'filesystem', " " 'path' = 'file:///d:/filetable/', " " 'format' = 'json', " " 'sink.partition-commit.delay'='1 h',\n" " 'sink.partition-commit.policy.kind'='success-file',\n" " 'sink.rolling-policy.file-size' = '8M',\n" " 'sink.rolling-policy.rollover-interval'='30 min',\n" " 'sink.rolling-policy.check-interval'='10 second'\n" " ) "); tabEnv.executeSql("insert into fs " "select * from t1"); } }
5 cdc连接器
-
对比普通jdbc连接器?
jdbc连接器scan模式仅能得到有界流,而cdc读取binlog,其增删改操作正好对应 I-D-U U变化流,在源头就得到的changelogstream的动态源表,配合sink表upsert语义的连接器,直接可以在下游的存储器生成正确结果
-
对比实时采集架构?
以canal kw为例,对相同主键的后续操作数据无法处理,比如delete类型的数据,flink已经处理过该key数据无法删除了,无法像jdbc连接器作为sink那样自动删除mysql中数据,得到正确结果
-
cdc连接器建表?
sqlclient中set命令可以设置cp参数,表参数中,scanstartupmode=initial即同步方式为首次全量之后实时增量,关于字段类型映射可以看flink官网的jdbc连接器中,注意initial模式下记录offset的更新需配合开启flink的cp机制
-
cdc源表测试表查询?
测试groupby查询结果(源表的各种模式操作都会走下游的groupby逻辑并得到正确结果,即flink聚合算子状态维持),关于复杂逻辑的结果表中并没有主键但是会有-U U这是自动推断主键的结果(比如groupby,join,rownumber等以及各种配合),将结果表写出到mysql时也需要注意配合设置相同的主键实现结果自动更新,如果下游sink的表连接器像jdbc支持upsert语义,那么直接就可以出实时报表
//mysql的cdc连接器使用测试 public class Demo14_MysqlCdcConnector { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint"); env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE); StreamTableEnvironment tev = StreamTableEnvironment.create(env); tev.executeSql(" create table flink_score( " " id int primary key not enforced" " ,name string " " ,gender string " " ,score double " " ) WITH ( " " 'connector' = 'mysql-cdc', " " 'hostname' = 'hadoop102', " " 'port' = '3306', " " 'username'='root',\n" " 'password'='123456',\n" " 'database-name' = 'gmall',\n" " 'table-name'='score',\n" " 'scan.startup.mode'='latest-offset'\n" " ) "); tev.executeSql("select * from flink_score"); tev.executeSql("select gender,avg(score) from flink_score group by gender"); tev.executeSql( "create table flink_rank(\n" " gender string , \n" " name string, \n" " score_amt double, \n" " rn bigint , \n" " primary key(gender,rn) not enforced \n" ") with (\n" " 'connector' = 'jdbc',\n" " 'url' = 'jdbc:mysql://hadoop102:3306/gmall',\n" " 'table-name' = 'score_rank',\n" " 'username' = 'root',\n" " 'password' = '123456' \n" ")" ); tev.executeSql( "SELECT\n" " gender,\n" " name,\n" " score_amt,\n" " rn\n" "from(\n" "SELECT\n" " gender,\n" " name,\n" " score_amt,\n" " row_number() over(partition by gender order by score_amt desc) as rn\n" "from \n" "(\n" "SELECT\n" "gender,\n" "name,\n" "sum(score) as score_amt\n" "from flink_score\n" "group by gender,name\n" ") o1\n" ") o2\n" "where rn<=2").print(); /*tev.executeSql("insert into flink_rank \n" "SELECT\n" " gender,\n" " name,\n" " score_amt,\n" " rn\n" "from(\n" "SELECT\n" " gender,\n" " name,\n" " score_amt,\n" " row_number() over(partition by gender order by score_amt desc) as rn\n" "from \n" "(\n" "SELECT\n" "gender,\n" "name,\n" "sum(score) as score_amt\n" "from flink_score\n" "group by gender,name\n" ") o1\n" ") o2\n" "where rn<=2");*/ } }
-
结合实时数仓?
实时指标需求用flink开发,cdc连mysql,对业务库没压力,结果写回到mysql,实际实时数仓开发用hudi,flink实时cdc摄取,etl加工,dwd结果完事插到hudi,hudi支持changelog流,以及动态结果表数据的变化版本号,在里面得到dws批计算结果,也可以dwd到olap引擎对接到hudi
6 流表互转
-
怎么转?
流转表可以转为对象/sql表,表转流只能用table对象,得到对象的方法可以是sqlquery(“select”),表转流得注意动态表对应的语义
-
关于子查询?
可以将子查询结果封装为table对象再转为sql表查询,也可以直接create as将子查询封装为sql视图
-
关于复杂逻辑的撤回流报错?
topN逻辑的-U U动态中间表,表->流->表,后续再对表maxgroupby报错(但是没有中间的2次转换操作就不报错),解决:这里用kafka存放动态中间表结果(含-U U),kafka生产出1,male/null/2,male/1,male共4行数据,下游upsert-kakfa源表得到的就是 I,-D的流再group by得到正确结果
//流表互转测试 public class Demo15_StreamFromToTable { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint"); StreamTableEnvironment tev = StreamTableEnvironment.create(env); SingleOutputStreamOperator<Person> stream = env.socketTextStream("hadoop102", 9998) .map(new MapFunction<String, Person>() { @Override public Person map(String s) throws Exception { String[] fields = s.split(","); return new Person(Integer.parseInt(fields[0]), fields[1], fields[2], Integer.parseInt(fields[3])); } }); //流转表 tev.createTemporaryView("abc", stream); //查询嵌套方式一 String sql = "select\n" "\t*\n" "from\n" "(\n" "\tselect\n" "\t\t*\n" "\t\t,row_number() over (partition by gender order by age desc) as rn\n" "\tfrom abc\n" ")\n" "t1\n" "where rn<=3"; Table tmp = tev.sqlQuery(sql); tev.createTemporaryView("tmp", tmp); tev.executeSql("select * from tmp where age%2=1")/*.print()*/; //查询嵌套方式二 tev.executeSql("create temporary view topn as select\n" "\t*\n" "from\n" "(\n" "\tselect\n" "\t\t*\n" "\t\t,row_number() over (partition by gender order by age desc) as rn\n" "\tfrom abc\n" ")\n" "t1\n" "where rn<=3"); tev.executeSql("create temporary view topn_filter as select * from topn where age%2=1"); tev.executeSql("desc topn_filter").print(); // tev.executeSql("select * from topn_filter").print();// I,-U, U, I 4条 // tev.executeSql("select gender,max(age) as big_age from topn_filter group by gender").print();// I,-U, U 3条 /*DataStream<Row> changelogStream = tev.toChangelogStream(tev.from("topn_filter")); tev.createTemporaryView("t_edu", changelogStream);*/ tev.executeSql( "create table t_nick_kafka( " " id int" " ,name string " " ,gender string " " ,age int " " ,rn bigint " " ,primary key(gender,rn) not enforced " " ) WITH ( " " 'connector' = 'upsert-kafka', " " 'topic' = 'doit0205', " " 'properties.bootstrap.servers' = 'hadoop102:9092', " " 'key.format' = 'json' ," " 'value.format' = 'json' " " ) "); tev.executeSql("insert into t_nick_kafka select * from topn_filter"); // tev.executeSql("select * from t_nick_kafka ").print();// I, I,-D, I 4条 tev.executeSql("select gender,max(age) as big_age from t_nick_kafka group by gender").print();// I,-U, U 3条 env.execute(); } @Data @NoArgsConstructor @AllArgsConstructor public static class Person{ private int id; private String name; private String gender; private int age; } }
-
问题是这里-U怎么变成-D了?
topN结果动态表数据中-U写到kafka居然生产出null,也就是-D数据,难道因为主键和自动推断的不同???
-
如何判断流中数据的更新对应哪两条?
这里更新有2种(主键变化/主键不变),首先,流的源头如果对数据的修改为改变主键对应地源表往下游输出-D I(不改主键的变化为-U U)row,下游处理过程中,不同sql逻辑产生的动态表(结果表)会产生不同row(比如groupby对应-U U,leftjoin全有,rownumber全有比如分组topN的源表产生修改则结果产生-D I,新增则结果表-U U),最后,若在建sink表(upsert-kafka)时设置主键如果和动态结果表的自动推断主键不一致,对应关系会乱???
-
表的输出方式?
table对象.executeInsert(sql表/表描述器)/table对象转流添加sink算子/sql语句insert
-
多维聚合?
hive语法:分组后withcube,flink中groupby cube(维度)
7 时间窗口TVF
-
写法理解?
table关键字(窗口)写在from后产生表,该表内数据为一个个窗口内数据,groupby(窗口表不能作为普通表查询必须聚合,即3种约束,且粒度<=start,end)始终时间将数据按窗口粒度(可再降)分组查询,目前只有滚动滑动,没有会话
-
tvf函数的约束?
分组/开窗partitionby/joinon,后面的key只能是窗口的一对,而且窗口必须作为源表配合三者之一的关键字使用
-
累计窗口?
可以理解为一个大的滚动,窗口内部每隔一个步长输出一次窗口当前状态,窗口到达最大值则滚动一次
//表值函数
//2020-04-15 08:05:00.000,4.00,C
//2020-04-15 08:07:00.000,2.00,A
//2020-04-15 08:09:00.000,5.00,D
//2020-04-15 08:11:00.000,3.00,B
//2020-04-15 08:13:00.000,1.00,E
//2020-04-16 08:17:00.000,6.00,F
//2020-04-16 08:19:00.000,6.00,F
public class Demo16_TimeWindowDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint");
StreamTableEnvironment tev = StreamTableEnvironment.create(env);
SingleOutputStreamOperator<Bid> stream = env.socketTextStream("hadoop102", 9998)
.map(new MapFunction<String, Bid>() {
@Override
public Bid map(String s) throws Exception {
String[] fields = s.split(",");
return new Bid(fields[0],Double.parseDouble(fields[1]),fields[2] );
}
});
//流转表
tev.createTemporaryView("t_bid", stream, Schema.newBuilder()
.column("bidtime", DataTypes.STRING())
.column("price", DataTypes.DOUBLE())
.column("item", DataTypes.STRING())
.columnByExpression("rt",$("bidtime").toTimestamp())
.watermark("rt","rt - interval '1' second")
.build());
//测试数据
// tev.executeSql("select bidtime,price,item,current_watermark(rt) from t_bid").print();
//每分钟,计算最近5分钟的交易总额,滑动
String sql = "select\n"
"\twindow_start\n"
"\t,window_end\n"
"\t,sum(price)\n"
"from table(\n"
"\thop(table t_bid,descriptor(rt),interval '1' minutes,interval '5' minutes))\n"
"group by window_start,window_end";
// 每2分钟计算最近2分钟的交易总额,滚动
sql = "select\n"
"\twindow_start\n"
"\t,window_end\n"
"\t,sum(price)\n"
"from table(\n"
"\ttumble(table t_bid,descriptor(rt),interval '2' minutes))\n"
"group by window_start,window_end";
// 每2分钟计算今天以来的总交易额,累计
sql = "select\n"
"\twindow_start\n"
"\t,window_end\n"
"\t,sum(price)\n"
"from table(\n"
"\tcumulate(table t_bid,descriptor(rt),interval '2' minutes,interval '24' hour))\n"
"group by window_start,window_end";
//每10分钟计算一次,最近10分钟内交易总额最大的前2个商品及其交易单数
sql = "select\n"
"\t*\n"
"from\n"
"(\n"
"\tselect\n"
"\t\twindow_start\n"
"\t\t,window_end\n"
"\t\t,item\n"
"\t\t,sum(price) as price_amt\n"
"\t\t,count(*) as cnt\n"
"\t\t,row_number() over(partition by window_start,window_end order by sum(price) desc) rn\n"
"\tfrom table(\n"
"\t\ttumble(table t_bid,descriptor(rt),interval '10' minutes))\n"
"\tgroup by window_start,window_end,item\n"
")t1\n"
"where rn<=2";
//每10分钟计算一次,最近10分钟内交易额最大的2个订单
sql = "select\n"
"\t*\n"
"from\n"
"(\n"
"\tselect\n"
"\t\twindow_start\n"
"\t\t,window_end\n"
"\t\t,bidtime\n"
"\t\t,item\n"
"\t\t,price\n"
"\t\t,row_number() over(partition by window_start,window_end order by price desc) rn\n"
"\tfrom table(\n"
"\t\ttumble(table t_bid,descriptor(rt),interval '10' minutes))\n"
")t1\n"
"where rn<=2";
tev.executeSql(sql).print();
env.execute();
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Bid{
private String bidtime; // "2020-04-15 08:05:00.000"
private double price;
private String item;
}
}
8 Join
-
window join?
①结果上看,窗口关闭,两个表同一窗口内数据做join,没join上字段显示null(和普通的inner/leftjoin一样),不在同一窗口内无法join
②约束? 左右表的窗口划分必须一致,虽然左右表为不符合语法的窗口表子查询但是因为是window join所以特例可以,而且,join的结果表只能有一个事件时间相关字段,因此选a/b表的rt,也就是说只能选一个事件时间相关字段(rt/wm/windowstart/windowend)
//表值函数之windowjoin public class Demo17_TimeWindowJoin { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint"); env.setParallelism(1); StreamTableEnvironment tev = StreamTableEnvironment.create(env); //流 SingleOutputStreamOperator<Tuple3<String, String, Long>> stream1 = env.socketTextStream("hadoop102", 9997) .map(new MapFunction<String, Tuple3<String, String, Long>>() { @Override public Tuple3<String, String, Long> map(String s) throws Exception { String[] fields = s.split(","); return Tuple3.of(fields[0], fields[1], Long.parseLong(fields[2])); } }); SingleOutputStreamOperator<Tuple3<String, String, Long>> stream2 = env.socketTextStream("hadoop102", 9998) .map(new MapFunction<String, Tuple3<String, String, Long>>() { @Override public Tuple3<String, String, Long> map(String s) throws Exception { String[] fields = s.split(","); return Tuple3.of(fields[0], fields[1], Long.parseLong(fields[2])); } }); /** * 1,a,1000 * 2,b,2000 * 3,c,2500 * 4,d,3000 * 5,e,12000 */ /** * 1,bj,1000 * 2,sh,2000 * 4,xa,2600 * 5,yn,12000 */ //流转左右两表 tev.createTemporaryView("t1",stream1, Schema.newBuilder() .column("f0", DataTypes.STRING()) .column("f1", DataTypes.STRING()) .column("f2", DataTypes.BIGINT()) .columnByExpression("rt","to_timestamp_ltz(f2,3)") .watermark("rt","rt - interval '0' second") .build()); tev.createTemporaryView("t2",stream2, Schema.newBuilder() .column("f0", DataTypes.STRING()) .column("f1", DataTypes.STRING()) .column("f2", DataTypes.BIGINT()) .columnByExpression("rt","to_timestamp_ltz(f2,3)") .watermark("rt","rt - interval '0' second") .build()); //window join //inner / left /right /full String sql = "select\n" "\ta.f0\n" "\t,a.f1\n" "\t,a.f2\n" "\t,a.window_start\n" "\t,b.f0\n" "\t,b.f1\n" "from\n" "(select * from table(tumble(table t1,descriptor(rt),interval '10' second))) a\n" "join\n" "(select * from table(tumble(table t2,descriptor(rt),interval '10' second))) b\n" "on a.window_start=b.window_start \n" "and a.window_end=b.window_end\n" "and a.f0=b.f0"; //semi join ==>where ... in ... //anti join ==>where ... not in ... sql = "select\n" "\ta.f0\n" "\t,a.f1\n" "\t,a.f2\n" "\t,rt\n" "\t,current_watermark(rt)\n" "from\n" "(select * from table(tumble(table t1,descriptor(rt),interval '10' second))) a\n" "where \n" "\ta.f0 \n" "in\n" "(\n" "\tselect \n" "\t\tb.f0 \n" "\tfrom \n" "\t(\n" "\t\tselect * from table(tumble(table t2,descriptor(rt),interval '10' second))\n" "\t) b \n" "\twhere a.window_start=b.window_start \n" "\tand a.window_end=b.window_end\n" ")"; tev.executeSql(sql).print(); env.execute(); } }
-
常规join?
左右流数据存入状态,默认永久存活,容易过大造成背压,所以应该设置ttl(参考官网sql下config),设置ttl状态时长(默认值0即不清理,两种参数都可以设置)
//常规join设置ttl public class Demo18_RegularJoin { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint"); env.setParallelism(1); StreamTableEnvironment tev = StreamTableEnvironment.create(env); // tev.getConfig().getConfiguration().setLong("table.exec.state.ttl",10*1000);//ms tev.getConfig().setIdleStateRetention(Duration.ofSeconds(10)); //流 SingleOutputStreamOperator<Tuple3<String, String, Long>> stream1 = env.socketTextStream("hadoop102", 9997) .map(new MapFunction<String, Tuple3<String, String, Long>>() { @Override public Tuple3<String, String, Long> map(String s) throws Exception { String[] fields = s.split(","); return Tuple3.of(fields[0], fields[1], Long.parseLong(fields[2])); } }); SingleOutputStreamOperator<Tuple3<String, String, Long>> stream2 = env.socketTextStream("hadoop102", 9998) .map(new MapFunction<String, Tuple3<String, String, Long>>() { @Override public Tuple3<String, String, Long> map(String s) throws Exception { String[] fields = s.split(","); return Tuple3.of(fields[0], fields[1], Long.parseLong(fields[2])); } }); /** * 1,a,1000 * 2,b,2000 * 3,c,2500 * 4,d,3000 * 5,e,12000 */ /** * 1,bj,1000 * 2,sh,2000 * 4,xa,2600 * 5,yn,12000 */ //流转左右两表 tev.createTemporaryView("t1",stream1, Schema.newBuilder() .column("f0", DataTypes.STRING()) .column("f1", DataTypes.STRING()) .column("f2", DataTypes.BIGINT()) .columnByExpression("rt","to_timestamp_ltz(f2,3)") .watermark("rt","rt - interval '0' second") .build()); tev.createTemporaryView("t2",stream2, Schema.newBuilder() .column("f0", DataTypes.STRING()) .column("f1", DataTypes.STRING()) .column("f2", DataTypes.BIGINT()) .columnByExpression("rt","to_timestamp_ltz(f2,3)") .watermark("rt","rt - interval '0' second") .build()); //inner / left /right /full join String sql = "select\n" "\tt1.f0\n" "\t,t1.f1\n" "\t,t1.f2\n" "\t,t2.f0\n" "\t,t2.f1\n" "from\n" "t1 join t2\n" "on t1.f0=t2.f0"; tev.executeSql(sql).print(); env.execute(); } }
-
interval join?
就是普通join加上where/on条件对时间属性筛选,可以用between也可以<=>,使用场景:广告曝光流和广告观看流的join
//间隔join public class Demo18_IntervalJoin { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint"); env.setParallelism(1); StreamTableEnvironment tev = StreamTableEnvironment.create(env); // tev.getConfig().getConfiguration().setLong("table.exec.state.ttl",10*1000);//ms // tev.getConfig().setIdleStateRetention(Duration.ofSeconds(10)); //流 SingleOutputStreamOperator<Tuple3<String, String, Long>> stream1 = env.socketTextStream("hadoop102", 9997) .map(new MapFunction<String, Tuple3<String, String, Long>>() { @Override public Tuple3<String, String, Long> map(String s) throws Exception { String[] fields = s.split(","); return Tuple3.of(fields[0], fields[1], Long.parseLong(fields[2])); } }); SingleOutputStreamOperator<Tuple3<String, String, Long>> stream2 = env.socketTextStream("hadoop102", 9998) .map(new MapFunction<String, Tuple3<String, String, Long>>() { @Override public Tuple3<String, String, Long> map(String s) throws Exception { String[] fields = s.split(","); return Tuple3.of(fields[0], fields[1], Long.parseLong(fields[2])); } }); /** * 1,a,1000 * 2,b,2000 * 3,c,2500 * 4,d,3000 * 5,e,12000 */ /** * 1,bj,1000 * 2,sh,2000 * 4,xa,2600 * 5,yn,12000 */ //流转左右两表 tev.createTemporaryView("t1",stream1, Schema.newBuilder() .column("f0", DataTypes.STRING()) .column("f1", DataTypes.STRING()) .column("f2", DataTypes.BIGINT()) .columnByExpression("rt","to_timestamp_ltz(f2,3)") .watermark("rt","rt - interval '0' second") .build()); tev.createTemporaryView("t2",stream2, Schema.newBuilder() .column("f0", DataTypes.STRING()) .column("f1", DataTypes.STRING()) .column("f2", DataTypes.BIGINT()) .columnByExpression("rt","to_timestamp_ltz(f2,3)") .watermark("rt","rt - interval '0' second") .build()); //interval join String sql = "select\n" "\tt1.f0\n" "\t,t1.f1\n" "\t,t1.f2\n" "\t,t1.rt\n" "\t,t2.f0\n" "\t,t2.f1\n" "from\n" "t1 join t2\n" "on t1.f0=t2.f0 and t1.rt between t2.rt - interval '2' second and t2.rt"; tev.executeSql(sql).print(); env.execute(); } }
-
temporal时态join?
左表join时只取右表相同key,而且,只取左表数据时间对应的右表版本,版本表得有主键,不同版本基于的是右表的不同时间(左表的时间用于确定取几个版本之一,而右表的时间用于确定可能迟到的最新版本),mysqlcdc是有元数据字段op_ts操作时间,总结,不管右表同主键数据怎么更新,左表一条数据永远只取自己时间对应的版本而且只输出一次,除非右表版本没有超过左表时间否则输出数据条数一定是左表条数
//时态join public class Demo20_TemporalJoin { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint"); env.setParallelism(1); StreamTableEnvironment tev = StreamTableEnvironment.create(env); // tev.getConfig().getConfiguration().setLong("table.exec.state.ttl",10*1000);//ms // tev.getConfig().setIdleStateRetention(Duration.ofSeconds(10)); //流 SingleOutputStreamOperator<Order> stream = env.socketTextStream("hadoop102", 9997) .map(new MapFunction<String, Order>() { @Override public Order map(String s) throws Exception { String[] fields = s.split(","); return new Order(Integer.parseInt(fields[0]), fields[1], Double.parseDouble(fields[2]), Long.parseLong(fields[3])); } }); /** * 1,bj,1000 * 2,sh,2000 * 4,xa,2600 * 5,yn,12000 */ //流转左右两表 tev.createTemporaryView("orders",stream, Schema.newBuilder() .column("orderId", DataTypes.INT()) .column("currency", DataTypes.STRING()) .column("price", DataTypes.DOUBLE()) .column("orderTime", DataTypes.BIGINT()) .columnByExpression("rt","to_timestamp_ltz(orderTime,3)") .watermark("rt","rt - interval '0' second") .build()); //flinkcdc source tev.executeSql( "CREATE TABLE currency_rate (\n" " currency STRING, \n" " rate double , \n" " update_time bigint , \n" " rt as to_timestamp_ltz(update_time,3) ," " watermark for rt as rt - interval '0' second ," " PRIMARY KEY(currency) NOT ENFORCED\n" " ) WITH ( \n" " 'connector' = 'mysql-cdc',\n" " 'hostname' = 'hadoop102',\n" " 'port' = '3306',\n" " 'username' = 'root',\n" " 'password' = '123456',\n" " 'database-name' = 'gmall',\n" " 'table-name' = 'currency_rate'\n" ")"); //temporal join String sql = "select\n" "\torders.* \n" "\t,c.rate\n" "\t,c.update_time\n" "from\n" "orders join currency_rate FOR SYSTEM_TIME AS OF orders.rt as c\n" "on orders.currency=c.currency"; tev.executeSql(sql).print(); env.execute(); } @Data @NoArgsConstructor @AllArgsConstructor public static class Order { // 订单Id,币种,金额,订单时间 public int orderId; public String currency; public double price; public long orderTime; } }
-
lookup join?
不同于常规的双流join,这里右表不如看成jdbc的源表(即jdbc source的scan模式外另一种模式),即维表join,有要求(左表得由pt字段,右表普通建表但得支持lookup source),连接操作太重所以数据在flink中cache,可以设置条数和ttl,单流点查数据库,右表数据更新不会产生撤回数据结果
//lookupjoin public class Demo19_LookupJoin { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint"); env.setParallelism(1); StreamTableEnvironment tev = StreamTableEnvironment.create(env); // tev.getConfig().getConfiguration().setLong("table.exec.state.ttl",10*1000);//ms tev.getConfig().setIdleStateRetention(Duration.ofSeconds(10)); //流 SingleOutputStreamOperator<Tuple3<Integer, String, Long>> stream1 = env.socketTextStream("hadoop102", 9997) .map(new MapFunction<String, Tuple3<Integer, String, Long>>() { @Override public Tuple3<Integer, String, Long> map(String s) throws Exception { String[] fields = s.split(","); return Tuple3.of(Integer.parseInt(fields[0]), fields[1], Long.parseLong(fields[2])); } }); /** * 1,bj,1000 * 2,sh,2000 * 4,xa,2600 * 5,yn,12000 */ //流转左右两表 tev.createTemporaryView("t1",stream1, Schema.newBuilder() .column("f0", DataTypes.INT()) .column("f1", DataTypes.STRING()) .column("f2", DataTypes.BIGINT()) .columnByExpression("pt","proctime()") .build()); //lookup source tev.executeSql( " create table t2( " " id int primary key" " ,name string " " ,gender string " " ,score double " " ) WITH ( " " 'connector' = 'jdbc', " " 'url' = 'jdbc:mysql://hadoop102:3306/gmall', " " 'table-name' = 'score', " " 'username' = 'root', " " 'password' = '123456' " " ) "); //lookup join String sql = "select\n" "\tt1.f0 as id\n" "\t,t1.f1 as city\n" "\t,t1.f2 as ts\n" "\t,c.name\n" "\t,c.gender\n" "\t,c.score\n" "from\n" "t1 join t2 FOR SYSTEM_TIME AS OF t1.pt as c\n" "on t1.f0=c.id"; tev.executeSql(sql).print(); env.execute(); } }
-
over窗口聚合?
不仅有hive语法的行范围,flink特有range between时间范围,而且可以定义window简化over后面的重复语句
9 自定义函数
-
自定义标量函数?
自定义类继承函数类,重写eval,注册,调用
//自定义标量函数 public class Demo21_CustomScalarFunction { public static void main(String[] args) { TableEnvironment tev = TableEnvironment.create(EnvironmentSettings.inStreamingMode()); Table table = tev.fromValues(DataTypes.ROW(DataTypes.FIELD("name", DataTypes.STRING())) , Row.of("aaa") , Row.of("bbb") , Row.of("ccc")); tev.createTemporaryView("t", table); tev.createTemporarySystemFunction("my_upper", MyUpper.class); tev.executeSql("select my_upper(name) as big_str from t").print(); } public static class MyUpper extends ScalarFunction { public String eval(String str) { return str.toUpperCase(); } } }
-
自定义聚合函数?
需重写3个(getValue,accumulate,createAccumulator)方法
//自定义分组聚合函数 public class Demo22_CustomAggregateFunction { public static void main(String[] args) { TableEnvironment tev = TableEnvironment.create(EnvironmentSettings.inStreamingMode()); Table table = tev.fromValues(DataTypes.ROW( DataTypes.FIELD("id", DataTypes.INT()), DataTypes.FIELD("gender", DataTypes.STRING()), DataTypes.FIELD("score", DataTypes.DOUBLE())) , Row.of(1,"male",80), Row.of(2,"male",100), Row.of(3,"female",90)); tev.createTemporaryView("t", table); tev.createTemporarySystemFunction("my_avg", MyAvg.class); tev.executeSql("select gender,my_avg(score) as avg_score from t group by gender").print(); } public static class MyAccumulator{ public int count; public double sum; } public static class MyAvg extends AggregateFunction<Double,MyAccumulator> { @Override public Double getValue(MyAccumulator myAccumulator) { return myAccumulator.sum/ myAccumulator.count; } @Override public MyAccumulator createAccumulator() { return new MyAccumulator(); } public void accumulate(MyAccumulator acc,Double score) { acc.count = 1; acc.sum = score; } } }
-
自定义表生成函数?
①回顾表生成函数? 数组炸裂生成多行的表与原表其他字段关联(laterval view explode),flink也有类似的数组展开(join),关键字(cross join unnest),unnest就是切开数组的函数
②代码模板? 自定义函数类(需重写eval,父类collect方法收集结果,注解指定行类型信息),函数切开字段得到表(lateral table)直接常规join原表,还可以as重命名侧视表和字段
//表生成函数 public class Demo23_TableFunction { public static void main(String[] args) { TableEnvironment tev = TableEnvironment.create(EnvironmentSettings.inStreamingMode()); //数组展开(系统函数join) /*Table table = tev.fromValues(DataTypes.ROW( DataTypes.FIELD("id", DataTypes.INT()), DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("phone_numbers", DataTypes.ARRAY(DataTypes.STRING()))) , Row.of(1, "zs", Expressions.array("138","139","135")),//这里没有字符串切割函数只能用数组 Row.of(2, "bb", Expressions.array("135","136"))); tev.createTemporaryView("t", table); //该join语法仅对数组展开有效 tev.executeSql("select t.id,t.name,t2.number from t cross join unnest(phone_numbers) as t2(number)").print();*/ //自定义表生成函数 Table table = tev.fromValues(DataTypes.ROW( DataTypes.FIELD("id", DataTypes.INT()), DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("phone_numbers", DataTypes.STRING())) , Row.of(1, "zs", "13888,137,1354455"),//这里没有字符串切割函数只能用数组 Row.of(2, "bb", "1366688,1374,132224455")); tev.createTemporaryView("t", table); tev.createTemporarySystemFunction("my_split", MySplit.class); //普通,写法 // tev.executeSql("select id,name,word,length from t,lateral table(my_split(phone_numbers))").print(); //join写法,需带上关联条件 // tev.executeSql("select id,name,word,length from t left join lateral table(my_split(phone_numbers)) on true").print(); //重命名 tev.executeSql("select id,name,p,l from t,lateral table(my_split(phone_numbers)) as t2(p,l)").print(); } //由字符串生成表 @FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>")) public static class MySplit extends TableFunction<Row> { public void eval(String str) { String[] strings = str.split(","); for (String string : strings) { collect(Row.of(string,string.length())); } } } }
-
自定义表聚合函数?
①标量:一行一列->一行一列;聚合:一列多行->一列一行;表生成:一列一行->多列多行;表聚合:聚合后还是一个表(多行多列);
②自定义表聚合? 这里聚合结果还是只有聚合字段,想要实现类似开窗保留一行更多字段,得将多个字段封装为整体
③表聚合问题? 该函数没开发完成所以只能tableapi里调用,而且源表为fromValues时先处理的可能是随机的一组数据//表聚合函数 public class Demo24_TableAggregateFunction { public static void main(String[] args) { TableEnvironment tev = TableEnvironment.create(EnvironmentSettings.inStreamingMode()); //自定义表生成函数 Table table = tev.fromValues(DataTypes.ROW( DataTypes.FIELD("id", DataTypes.INT()), DataTypes.FIELD("gender", DataTypes.STRING()), DataTypes.FIELD("score", DataTypes.DOUBLE())) , Row.of(1, "male", 67), Row.of(2, "male", 88), Row.of(3, "male", 98), Row.of(4, "female", 99), Row.of(5, "female", 84), Row.of(6, "female", 89)); // tev.createTemporarySystemFunction("top2", MyTopN.class); table.groupBy($("gender")) .flatAggregate(call(MyTopN.class, $("score"))) // .flatAggregate("top2(score)") .select($("gender"), $("score_top"), $("rank_no")) .execute().print(); } public static class MyAccumulator { public double first; public double second; } //由字符串生成表 @FunctionHint(output = @DataTypeHint("ROW<score_top DOUBLE, rank_no INT>")) public static class MyTopN extends TableAggregateFunction<Row, MyAccumulator> { @Override public MyAccumulator createAccumulator() { MyAccumulator acc = new MyAccumulator(); acc.first = Double.MIN_VALUE; acc.second = Double.MIN_VALUE; return acc; } public void accumulate(MyAccumulator acc, Double score) { if (score > acc.first) { acc.second = acc.first; acc.first = score; }else if (score > acc.second) { acc.second = score; } } public void merge(MyAccumulator acc, Iterable<MyAccumulator> iterable) { for (MyAccumulator other : iterable) { accumulate(acc,other.first); accumulate(acc,other.second); } } public void emitValue(MyAccumulator acc, Collector<Row> out) { if (acc.first != Double.MIN_VALUE) { out.collect(Row.of(acc.first, 1)); } if (acc.second != Double.MIN_VALUE) { out.collect(Row.of(acc.second, 2)); } } } }
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhiagckf
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
微信运动停用后别人还能看到步数吗
PHP中文网 07-22 -
excel打印预览压线压字怎么办
PHP中文网 06-22