• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Flink读取文件目录问题解决方案,目录下的文件在上传产生的临时文件报错等问题

武飞扬头像
`FUTURE`
帮助2

Flink读取文件目录:
因为目录下的文件可能会不断新增,在新增过程中文件处于传输阶段
会出现比如01.data文件正在上传,在hdfs中显示的是01.data._COPYING_文件,只有真正上传完成后才能读取,而不设置过滤器的话就会报错,会提示._COPYING_文件不存在,所以像这样的临时文件需要我们过滤掉, 目前默认过滤器已经满足了我们的需求:方案如下

/**
     * 2.流处理: 监听并读取hdfs文件夹目录下的所有文件
     *
     * @throws Exception
     */
    @Test
    public void flink_read_hdfs_dir_stream() throws Exception {
   
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        try {
   
            //source,这里读取CSV文件,并转换为对应的Class
            //DataStreamSource<String> dataSource = env.readTextFile("hdfs://node01:8020/FUTURE/mlamp/01.data").setParallelism(1);
            //DataStreamSource<String> dataSource = env.readTextFile("C:\\temp\\01.data").setParallelism(1);
            //String filePath = "hdfs://node1:8020/FUTURE/mlamp/01.data";
            String filePath = "hdfs://node1:8020/FUTURE/mlamp/";
            TextInputFormat format = new TextInputFormat(new Path(filePath));
            //过滤开始
            //String fileNameTag=".data";
            //Configuration configuration = new Configuration();
            //configuration.setBoolean("recursive.file.enumeration", true);
            // 设置递归获取文件
            //includePatterns – 要包含的文件的全局模式
			//excludePatterns – 要排除的文件的全局模式
            /*GlobFilePathFilter filesFilter = new GlobFilePathFilter(
                    Collections.singletonList("**"),
                    Arrays.asList("**._COPYING_")
            );*/
            //format.configure(configuration);
            //format.setFilesFilter(filesFilter);
            //调用默认过滤器会过滤".", "_", "_COPYING_"三种开头的文件,一般为临时文件,没有完成写入的文件,再加上自定的过滤条件即可
            format.setFilesFilter(FilePathFilter.createDefaultFilter());
            /*format.setFilesFilter(new FilePathFilter() {
                @Override
                public boolean filterPath(Path filePath) {
                    return FilePathFilter.createDefaultFilter().filterPath(filePath)
                            ||!filePath.getPath().contains(fileNameTag);

                }
            });*/
            //过滤结束
            //interval:间隔时间,毫秒单位
            DataStreamSource<String> dataSource = env.readFile(format, filePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1L).setParallelism(1);
            //MapFunction(){},是转换成单条数据,
            //FlatMapFunction(){},是将数据放到一个Collection集合中进行返回
            SingleOutputStreamOperator<UserDO> map = dataSource.map(new MapFunction<String, UserDO>() {
   
                @Override
                public UserDO map(String s) throws Exception {
   
                    String[] arr = s.split("\\|\\ \\|");
                    System.out.println(arr[1]);
                    UserDO user = new UserDO();
                    user.setId(arr[0]);
                    user.setName(arr[1]);
                    user.setSex(arr[2]);
                    user.setPhone(arr[3]);
                    user.setCd(arr[4]);
                    user.setAddress(arr[5]);
                    return user;
                }
            });

            //4. 将DataStream转换成Table(可以将DataStream转换成Table)
            Table table = tEnv.fromDataStream(map, "id, name, sex, phone, cd, address");
            //5. 将DataStream注册成Table(也可以将DataStream注册成Table)
            tEnv.createTemporaryView("future", map, "id, name, sex, phone, cd, address");
//            String sql = "select * from future";
            String sql = "select * from "   table;
            Table resultTable = tEnv.sqlQuery(sql);
            System.out.println("结果表约束:");
            resultTable.printSchema();
            System.out.println("表名:"   resultTable);
            //6. 将DataSet转换成Table对象
            //7. 打印输出结果
            tEnv.toAppendStream(resultTable, UserDO.class).print().setParallelism(1);
            //8. 执行任务
            env.execute();
        } catch (ExecutionException e) {
   
            e.printStackTrace();
            //判断异常类型文件有可能在上传状态,不可读
            //System.out.println("文件正在上传中,不可读!!");
        } catch (Exception e) {
   
            e.printStackTrace();
        }
    }
学新通

查看默认过滤器可以看到学新通
默认过滤器已经过滤出来了这些
“.”, “_”, "COPYING"三种开头的文件,一般为临时文件,没有完成写入的文件

拓展一下

Java TextInputFormat.setFilesFilter方法代码示例

本文整理汇总了Java中org.apache.flink.api.java.io.TextInputFormat.setFilesFilter方法的典型用法代码示例。如果您正苦于以下问题:Java TextInputFormat.setFilesFilter方法的具体用法?Java TextInputFormat.setFilesFilter怎么用?Java TextInputFormat.setFilesFilter使用的例子?那么恭喜您, 这里精选的方法代码示例或许可以为您提供帮助。您也可以进一步了解该方法所在类org.apache.flink.api.java.io.TextInputFormat的用法示例。

在下文中一共展示了TextInputFormat.setFilesFilter方法的7个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: main
import org.apache.flink.api.java.io.TextInputFormat; //导入方法依赖的package包/类
public static void main(String... args) throws  

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhiafech
系列文章
更多 icon
同类精品
更多 icon
继续加载