• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Flink源码系列--StreamExecutionEnvironment类

武飞扬头像
打酱油的葫芦娃
帮助1

本文的Flink源码版本为: 1.15-SNAPSHOT,读者可自行从Github clone.

Flink 程序看起来像一个转换 DataStream 的常规程序。每个程序由相同的基本部分组成:

  • 获取一个执行环境(execution environment);
  • 加载/创建初始数据;
  • 指定数据相关的转换;
  • 指定计算结果的存储位置;
  • 触发程序执行。

StreamExecutionEnvironment 是所有 Flink 程序的基础。

创建

你可以使用 StreamExecutionEnvironment 的如下静态方法获取 StreamExecutionEnvironment:

getExecutionEnvironment();

createLocalEnvironment();

createRemoteEnvironment(String host, int port, String... jarFiles);

通常,你只需要使用 getExecutionEnvironment() 即可,因为该方法会根据上下文做正确的处理:如果你在 IDE 中执行你的程序或将其作为一般的 Java 程序执行,那么它将创建一个本地环境,该环境将在你的本地机器上执行你的程序。如果你基于程序创建了一个 JAR 文件,并通过命令行运行它,Flink 集群管理器将执行程序的 main 方法,同时 getExecutionEnvironment() 方法会返回一个执行环境以在集群上执行你的程序。

学新通
跟一下 getExecutionEnvironment() 方法:

public static StreamExecutionEnvironment getExecutionEnvironment() {
	return getExecutionEnvironment(new Configuration());
}

public static StreamExecutionEnvironment getExecutionEnvironment(Configuration configuration) {
	       // 首先检查当前上下文是否存在可用的 EnvironmentFactory
	return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)
			// 若当前上下文存在可用的 EnvironmentFactory,则基于该工厂类创建 ExecutionEnvironment
			.map(factory -> factory.createExecutionEnvironment(configuration))
			// 若工厂类未能创建 ExecutionEnvironment ,则调用 createLocalEnvironment(configuration) 方法创建 LocalStreamEnvironment
			.orElseGet(() -> StreamExecutionEnvironment.createLocalEnvironment(configuration));
}

public static LocalStreamEnvironment createLocalEnvironment(Configuration configuration) {
	// 会判断是否有设置默认并行度
	if (configuration.getOptional(CoreOptions.DEFAULT_PARALLELISM).isPresent()) {
		// 若有设置,则基于配置中的并行度创建 LocalStreamEnvironment
		return new LocalStreamEnvironment(configuration);
	} else {
		// 否则将基于 defaultLocalParallelism 创建 LocalStreamEnvironment
		// 其中,defaultLocalParallelism 为程序运行节点的核数
		Configuration copyOfConfiguration = new Configuration();
		copyOfConfiguration.addAll(configuration);
		copyOfConfiguration.set(CoreOptions.DEFAULT_PARALLELISM, defaultLocalParallelism);
		return new LocalStreamEnvironment(copyOfConfiguration);
	}
}

private static int defaultLocalParallelism = Runtime.getRuntime().availableProcessors();

数据接入

addSource()

第1种数据接入的方法是 addSource()。

该方法的入参为 SourceFunction 接口的实现类。

public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function) {
	// 若未传入 Source 名称,则默认名称为 Custom Source
	// 若应用存在多个 Source,建议手动指定各个 Source 的名称
	return addSource(function, "Custom Source");
}

public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName) {
	return addSource(function, sourceName, null);
}

public <OUT> DataStreamSource<OUT> addSource(
		SourceFunction<OUT> function, String sourceName, TypeInformation<OUT> typeInfo) {
		   // Boundedness 是标识数据源是否无界的枚举
		   // CONTINUOUS_UNBOUNDED 代表是连续无界的数据源
		   // BOUNDED 代表是有界的数据源
	return addSource(function, sourceName, typeInfo, Boundedness.CONTINUOUS_UNBOUNDED);
}

private <OUT> DataStreamSource<OUT> addSource(
		final SourceFunction<OUT> function,
		final String sourceName,
		@Nullable final TypeInformation<OUT> typeInfo,
		final Boundedness boundedness) {
	checkNotNull(function);
	checkNotNull(sourceName);
	checkNotNull(boundedness);
	// 会根据你传入的 SourceFunction 解析出源数据类型
	TypeInformation<OUT> resolvedTypeInfo =
			getTypeInfo(function, sourceName, SourceFunction.class, typeInfo);
	// 判断是否为 ParallelSourceFunction
	boolean isParallel = function instanceof ParallelSourceFunction;
	
	// 此处会检查传入的 SourceFunction 符合规范
	clean(function);
	
	// 没啥问题,就将该 SourceFunction 转化为 StreamSource
	final StreamSource<OUT, ?> sourceOperator = new StreamSource<>(function);
	return new DataStreamSource<>(
			this, resolvedTypeInfo, sourceOperator, isParallel, sourceName, boundedness);
}

createInput()

第2种数据接入的方法是 createInput()。

该方法的入参为 InputFormat 接口的实现类。

@PublicEvolving
public <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT, ?> inputFormat) {
	// TypeExtractor.getInputFormatTypes(inputFormat) 提取 Source 的类型
	return createInput(inputFormat, TypeExtractor.getInputFormatTypes(inputFormat));
}

@PublicEvolving
public <OUT> DataStreamSource<OUT> createInput(
		InputFormat<OUT, ?> inputFormat, TypeInformation<OUT> typeInfo) {
	DataStreamSource<OUT> source;
	// 判断是否为文件读取的 InputFormat
	if (inputFormat instanceof FileInputFormat) {
		@SuppressWarnings("unchecked")
		FileInputFormat<OUT> format = (FileInputFormat<OUT>) inputFormat;

		source =
				createFileInput(
						format,
						typeInfo,
						"Custom File source",
						FileProcessingMode.PROCESS_ONCE,
						-1);
	} else {
		source = createInput(inputFormat, typeInfo, "Custom Source");
	}
	return source;
}

当检测到传入的 InputFormat 为 FileInputFormat 时,会走文件读取的方法,即 createFileInput。

createFileInput 的参数中有1个 FileProcessingMode:

@PublicEvolving
public enum FileProcessingMode {

	// 处理当前目录下现存的文件内容(一次性读取)
    PROCESS_ONCE,

    // 周期性扫描目录下的文件以读取新数据(持续性读取)
    PROCESS_CONTINUOUSLY
}

继续跟一下 createFileInput 方法:

private <OUT> DataStreamSource<OUT> createFileInput(
		FileInputFormat<OUT> inputFormat,
		TypeInformation<OUT> typeInfo,
		String sourceName,
		FileProcessingMode monitoringMode,
		long interval) {

	Preconditions.checkNotNull(inputFormat, "Unspecified file input format.");
	Preconditions.checkNotNull(typeInfo, "Unspecified output type information.");
	Preconditions.checkNotNull(sourceName, "Unspecified name for the source.");
	Preconditions.checkNotNull(monitoringMode, "Unspecified monitoring mode.");

	Preconditions.checkArgument(
			monitoringMode.equals(FileProcessingMode.PROCESS_ONCE)
					|| interval >= ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL,
			"The path monitoring interval cannot be less than "
					  ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL
					  " ms.");
	// 创建1个周期性扫描目录文件内容的 Function
	ContinuousFileMonitoringFunction<OUT> monitoringFunction =
			new ContinuousFileMonitoringFunction<>(
					inputFormat, monitoringMode, getParallelism(), interval);

	// 定义文件读取工厂类
	ContinuousFileReaderOperatorFactory<OUT, TimestampedFileInputSplit> factory =
			new ContinuousFileReaderOperatorFactory<>(inputFormat);
	// 此处,会根据传入的 FileProcessingMode 来生成 Boundedness
	// FileProcessingMode.PROCESS_ONCE 对应 Boundedness.BOUNDED  有界
	// FileProcessingMode.PROCESS_CONTINUOUSLY 对应 Boundedness.CONTINUOUS_UNBOUNDED 无界
	final Boundedness boundedness =
			monitoringMode == FileProcessingMode.PROCESS_ONCE
					? Boundedness.BOUNDED
					: Boundedness.CONTINUOUS_UNBOUNDED;
	// 最后调用的还是 addSource() 方法
	// SingleOutputStreamOperator 是 DataStreamSource 的父类
	SingleOutputStreamOperator<OUT> source =
			addSource(monitoringFunction, sourceName, null, boundedness)
					.transform("Split Reader: "   sourceName, typeInfo, factory);

	return new DataStreamSource<>(source);
}

非文件的 InputFormat 的话,会走另外1个 createInput 方法:

private <OUT> DataStreamSource<OUT> createInput(
		InputFormat<OUT, ?> inputFormat, TypeInformation<OUT> typeInfo, String sourceName) {
	// 将传入的 InputFormat 转化为 InputFormatSourceFunction
	InputFormatSourceFunction<OUT> function =
			new InputFormatSourceFunction<>(inputFormat, typeInfo);
	// 最后调用的还是 addSource() 方法
	return addSource(function, sourceName, typeInfo);
}

InputFormatSourceFunction 为 SourceFunction 接口的实现类,其继承链路如下:

SourceFunction-->ParallelSourceFunction-->RichParallelSourceFunction-->InputFormatSourceFunction

所以,createInput() 本质上调用的还是 addSource() 方法。

fromSource()

第3种数据接入的方法是 fromSource()。

该方法的入参为 Source 接口的实现类。

Flink 1.12 及以后,社区基于 FLIP-27 的改进计划,实现了1种新的 Source 架构。

如果你对 Flink 新的 Source 架构不熟悉的话,可以参阅我之前写好的1篇博客:

Flink进阶系列–FLIP-27新的Source架构

如果你想基于新的 Source 接入数据,则需要通过 StreamExecutionEnvironment.fromSource()。

// 第1个参数为 Source 接口的实现类
// 第2个参数为水印生成策略
// 第3个参数为 Source 名称
@PublicEvolving
public <OUT> DataStreamSource<OUT> fromSource(
		Source<OUT, ?, ?> source,
		WatermarkStrategy<OUT> timestampsAndWatermarks,
		String sourceName) {
	return fromSource(source, timestampsAndWatermarks, sourceName, null);
}

@Experimental
public <OUT> DataStreamSource<OUT> fromSource(
		Source<OUT, ?, ?> source,
		WatermarkStrategy<OUT> timestampsAndWatermarks,
		String sourceName,
		TypeInformation<OUT> typeInfo) {
	// 基于传入的 Source 提取出数据源的类型
	final TypeInformation<OUT> resolvedTypeInfo =
			getTypeInfo(source, sourceName, Source.class, typeInfo);

	return new DataStreamSource<>(
			this,
			checkNotNull(source, "source"),
			checkNotNull(timestampsAndWatermarks, "timestampsAndWatermarks"),
			checkNotNull(resolvedTypeInfo),
			checkNotNull(sourceName));
}

任务执行

一旦指定了完整的程序,需要调用 StreamExecutionEnvironment 的 execute() 方法来触发程序执行。根据 ExecutionEnvironment 的类型,执行会在你的本地机器上触发,或将你的程序提交到某个集群上执行。

execute() 方法将等待作业完成,然后返回一个 JobExecutionResult,其中包含执行时间和累加器结果。

如果不想等待作业完成,可以通过调用 StreamExecutionEnvironment 的 executeAsync() 方法来触发作业异步执行。它会返回一个 JobClient,你可以通过它与刚刚提交的作业进行通信。如下是使用 executeAsync() 实现 execute() 语义的示例。

final JobClient jobClient = env.executeAsync();

final JobExecutionResult jobExecutionResult = jobClient.getJobExecutionResult().get();

关于程序执行的最后一部分对于理解何时以及如何执行 Flink 算子是至关重要的。所有 Flink 程序都是延迟执行的:当程序的 main 方法被执行时,数据加载和转换不会直接发生。相反,每个算子都被创建并添加到 dataflow 形成的有向图。当执行被执行环境的 execute() 方法显示地触发时,这些算子才会真正执行。程序是在本地执行还是在集群上执行取决于执行环境的类型。

紧接着看一下源码:

public JobExecutionResult execute() throws Exception {
	// getStreamGraph() 方法用于获取任务的 StreamGraph
	return execute(getStreamGraph());
}

@Internal
public StreamGraph getStreamGraph() {
	return getStreamGraph(true);
}

@Internal
public StreamGraph getStreamGraph(boolean clearTransformations) {
	final StreamGraph streamGraph = getStreamGraphGenerator(transformations).generate();
	if (clearTransformations) {
		transformations.clear();
	}
	return streamGraph;
}

private StreamGraphGenerator getStreamGraphGenerator(List<Transformation<?>> transformations) {
	if (transformations.size() <= 0) {
		throw new IllegalStateException(
				"No operators defined in streaming topology. Cannot execute.");
	}

	// We copy the transformation so that newly added transformations cannot intervene with the
	// stream graph generation.
	return new StreamGraphGenerator(
					new ArrayList<>(transformations), config, checkpointCfg, configuration)
			.setStateBackend(defaultStateBackend)
			.setChangelogStateBackendEnabled(changelogStateBackendEnabled)
			.setSavepointDir(defaultSavepointDirectory)
			.setChaining(isChainingEnabled)
			.setUserArtifacts(cacheFile)
			.setTimeCharacteristic(timeCharacteristic)
			.setDefaultBufferTimeout(bufferTimeout)
			.setSlotSharingGroupResource(slotSharingGroupResources);
}

可以看出,execute() 提交的是任务的 StreamGraph,DataStreamEnvironment 持有整个任务的 transformations 列表,然后再基于 StreamGraphGenerator 类将 transformations 列表转化为 StreamGraph。

StreamGraphGenerator 类源码后面我们专门出1篇博客详细讲解,此处暂时不做讲解。

execute() 本质上调用的是 execute(StreamGraph streamGraph)。

@Internal
public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
	// 最终还是调用的 executeAsync() 方法
	final JobClient jobClient = executeAsync(streamGraph);

	try {
		final JobExecutionResult jobExecutionResult;
		// 如果部署配置为 DeploymentOptions.ATTACHED 连接模式
		// 此时,客户端需要等待任务执行完毕,然后调用 getJobExecutionResult().get() 方法获取执行结果
		if (configuration.getBoolean(DeploymentOptions.ATTACHED)) {
			// jobClient.getJobExecutionResult() 返回的是1个 CompletableFuture<JobExecutionResult>
			jobExecutionResult = jobClient.getJobExecutionResult().get();
		// 若部署模式为非连接模式,则客户端无需关注执行结果,异步提交任务之后,直接构造1个 DetachedJobExecutionResult 返回即可
		} else {
			jobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID());
		}
		
		// 获取到任务执行结果之后,依次执行各个任务监听器的 onJobExecuted() 方法
		jobListeners.forEach(
				jobListener -> jobListener.onJobExecuted(jobExecutionResult, null));

		return jobExecutionResult;
	} catch (Throwable t) {
		Throwable strippedException = ExceptionUtils.stripExecutionException(t);
		jobListeners.forEach(
				jobListener -> {
					jobListener.onJobExecuted(null, strippedException);
				});
		ExceptionUtils.rethrowException(strippedException);

		// never reached, only make javac happy
		return null;
	}
}

execute() 最终还是通过 executeAsync() 方法异步提交的任务。

@Internal
public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
	checkNotNull(streamGraph, "StreamGraph cannot be null.");
	checkNotNull(
			configuration.get(DeploymentOptions.TARGET),
			"No execution.target specified in your configuration file.");

	final PipelineExecutorFactory executorFactory =
			executorServiceLoader.getExecutorFactory(configuration);

	checkNotNull(
			executorFactory,
			"Cannot find compatible factory for specified execution.target (=%s)",
			configuration.get(DeploymentOptions.TARGET));
	// 通过 PipelineExecutorFactory 提交 StreamGraph
	CompletableFuture<JobClient> jobClientFuture =
			executorFactory
					.getExecutor(configuration)
					.execute(streamGraph, configuration, userClassloader);

	try {
		JobClient jobClient = jobClientFuture.get();
		// 获取到任务提交结果之后,依次执行各个任务监听器的 onJobSubmitted() 方法
		jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));
		return jobClient;
	} catch (ExecutionException executionException) {
		final Throwable strippedException =
				ExceptionUtils.stripExecutionException(executionException);
		jobListeners.forEach(
				jobListener -> jobListener.onJobSubmitted(null, strippedException));

		throw new FlinkException(
				String.format("Failed to execute job '%s'.", streamGraph.getJobName()),
				strippedException);
	}
}

整个任务 execute() 过程包含2大步,而且均是异步的。

  • 第1步是异步提交任务,提交之后直接返回 CompletableFuture,然后通过 CompletableFuture 的 get() 方法拿到提交结果后,执行各个任务监听器的 onJobSubmitted() 方法;
  • 第2步是异步执行任务,调用之后直接返回 CompletableFuture,然后通过 CompletableFuture 的 get() 方法拿到任务执行结果后,执行各个任务监听器的 onJobExecuted() 方法。

任务监听器 JobListener:

@PublicEvolving
public interface JobListener {

    // 当任务被提交后调用
    void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable throwable);

    // 当任务被执行完后调用
    void onJobExecuted(
            @Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable throwable);
}

注册方法为:

@PublicEvolving
public void registerJobListener(JobListener jobListener) {
	checkNotNull(jobListener, "JobListener cannot be null");
	jobListeners.add(jobListener);
}

所以,如果你需要在任务提交之后及任务执行完之后执行某些操作(如日志埋点、回调接口等),可以通过注册 JobListener 的方式轻松实现。

本文到此结束,感谢阅读!

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhiagebg
系列文章
更多 icon
同类精品
更多 icon
继续加载