Flink读取文件目录问题解决方案,目录下的文件在上传产生的临时文件报错等问题
Flink读取文件目录:
因为目录下的文件可能会不断新增,在新增过程中文件处于传输阶段
会出现比如01.data
文件正在上传,在hdfs中显示的是01.data._COPYING_
文件,只有真正上传完成后才能读取,而不设置过滤器的话就会报错,会提示._COPYING_
文件不存在,所以像这样的临时文件需要我们过滤掉, 目前默认过滤器已经满足了我们的需求:方案如下
/**
* 2.流处理: 监听并读取hdfs文件夹目录下的所有文件
*
* @throws Exception
*/
@Test
public void flink_read_hdfs_dir_stream() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
try {
//source,这里读取CSV文件,并转换为对应的Class
//DataStreamSource<String> dataSource = env.readTextFile("hdfs://node01:8020/FUTURE/mlamp/01.data").setParallelism(1);
//DataStreamSource<String> dataSource = env.readTextFile("C:\\temp\\01.data").setParallelism(1);
//String filePath = "hdfs://node1:8020/FUTURE/mlamp/01.data";
String filePath = "hdfs://node1:8020/FUTURE/mlamp/";
TextInputFormat format = new TextInputFormat(new Path(filePath));
//过滤开始
//String fileNameTag=".data";
//Configuration configuration = new Configuration();
//configuration.setBoolean("recursive.file.enumeration", true);
// 设置递归获取文件
//includePatterns – 要包含的文件的全局模式
//excludePatterns – 要排除的文件的全局模式
/*GlobFilePathFilter filesFilter = new GlobFilePathFilter(
Collections.singletonList("**"),
Arrays.asList("**._COPYING_")
);*/
//format.configure(configuration);
//format.setFilesFilter(filesFilter);
//调用默认过滤器会过滤".", "_", "_COPYING_"三种开头的文件,一般为临时文件,没有完成写入的文件,再加上自定的过滤条件即可
format.setFilesFilter(FilePathFilter.createDefaultFilter());
/*format.setFilesFilter(new FilePathFilter() {
@Override
public boolean filterPath(Path filePath) {
return FilePathFilter.createDefaultFilter().filterPath(filePath)
||!filePath.getPath().contains(fileNameTag);
}
});*/
//过滤结束
//interval:间隔时间,毫秒单位
DataStreamSource<String> dataSource = env.readFile(format, filePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1L).setParallelism(1);
//MapFunction(){},是转换成单条数据,
//FlatMapFunction(){},是将数据放到一个Collection集合中进行返回
SingleOutputStreamOperator<UserDO> map = dataSource.map(new MapFunction<String, UserDO>() {
@Override
public UserDO map(String s) throws Exception {
String[] arr = s.split("\\|\\ \\|");
System.out.println(arr[1]);
UserDO user = new UserDO();
user.setId(arr[0]);
user.setName(arr[1]);
user.setSex(arr[2]);
user.setPhone(arr[3]);
user.setCd(arr[4]);
user.setAddress(arr[5]);
return user;
}
});
//4. 将DataStream转换成Table(可以将DataStream转换成Table)
Table table = tEnv.fromDataStream(map, "id, name, sex, phone, cd, address");
//5. 将DataStream注册成Table(也可以将DataStream注册成Table)
tEnv.createTemporaryView("future", map, "id, name, sex, phone, cd, address");
// String sql = "select * from future";
String sql = "select * from " table;
Table resultTable = tEnv.sqlQuery(sql);
System.out.println("结果表约束:");
resultTable.printSchema();
System.out.println("表名:" resultTable);
//6. 将DataSet转换成Table对象
//7. 打印输出结果
tEnv.toAppendStream(resultTable, UserDO.class).print().setParallelism(1);
//8. 执行任务
env.execute();
} catch (ExecutionException e) {
e.printStackTrace();
//判断异常类型文件有可能在上传状态,不可读
//System.out.println("文件正在上传中,不可读!!");
} catch (Exception e) {
e.printStackTrace();
}
}
查看默认过滤器可以看到
默认过滤器已经过滤出来了这些
“.”, “_”, "COPYING"三种开头的文件,一般为临时文件,没有完成写入的文件
拓展一下
Java TextInputFormat.setFilesFilter方法代码示例
本文整理汇总了Java中org.apache.flink.api.java.io.TextInputFormat.setFilesFilter方法的典型用法代码示例。如果您正苦于以下问题:Java TextInputFormat.setFilesFilter方法的具体用法?Java TextInputFormat.setFilesFilter怎么用?Java TextInputFormat.setFilesFilter使用的例子?那么恭喜您, 这里精选的方法代码示例或许可以为您提供帮助。您也可以进一步了解该方法所在类org.apache.flink.api.java.io.TextInputFormat的用法示例。
在下文中一共展示了TextInputFormat.setFilesFilter方法的7个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: main
import org.apache.flink.api.java.io.TextInputFormat; //导入方法依赖的package包/类
public static void main(String... args) throws
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhiafech
系列文章
更多
同类精品
更多
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
微信运动停用后别人还能看到步数吗
PHP中文网 07-22