• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Flink部署——参数配置

武飞扬头像
京河小蚁
帮助2

所有的配置都在conf/flink-conf.yaml里面设置,预计将是YAML 键值对格式的平面集合key: value。

当 Flink 进程启动时,配置会被解析和评估。更改配置文件需要重新启动相关进程。

开箱即用的配置将使用您的默认 Java 安装。如果要手动覆盖要使用的 Java 运行时,可以手动设置环境变量JAVA_HOME或配置键env.java.home。conf/flink-conf.yaml

FLINK_CONF_DIR您可以通过定义环境变量来指定不同的配置目录位置。对于提供非会话部署的资源提供者,您可以通过这种方式指定每个作业的配置。从 Flink 发行版复制conf目录并根据每个作业修改设置。请注意,这在 Docker 或独立 Kubernetes 部署中不受支持。在基于 Docker 的部署中,您可以使用FLINK_PROPERTIES环境变量来传递配置值。

在会话集群上,提供的配置将仅用于配置执行参数,例如影响作业的配置参数,而不是底层集群。

基本设置

默认配置支持在不做任何更改的情况下启动单节点 Flink 会话集群。本节中的选项是基本分布式 Flink 设置最常用的选项。

主机名/端口

这些选项仅对standalone应用程序或session集群部署(简单独立或Kubernetes)是必需的。

如果您将 Flink 与Yarn或活动的Kubernetes 集成一起使用,则会自动发现主机名和端口。

  • rest.address, rest.port: 这些是客户端用来连接 Flink 的。将此设置为 JobManager 运行的主机名,或者设置为 JobManager 的 REST 接口前面的 (Kubernetes) 服务的主机名。

  • TaskManager使用jobmanager.rpc.address(默认为“localhost”)和jobmanager.rpc.port(默认为6123)配置条目连接到 JobManager/ResourceManager。将此设置为 JobManager 运行的主机名,或 JobManager 的(Kubernetes 内部)服务的主机名。在使用领导者选举机制自动发现这一点的高可用性设置中,此选项将被忽略。

内存大小

默认内存大小支持简单的流/批处理应用程序,但太低而无法为更复杂的应用程序产生良好的性能。

  • jobmanager.memory.process.size: JobManager(JobMaster / ResourceManager / Dispatcher)进程的总大小。
  • taskmanager.memory.process.size:TaskManager 进程的总大小。

总大小包括一切。Flink 会为 JVM 自己的内存需求(元空间和其他)减去一些内存,并在其组件(JVM 堆、堆外、任务管理器以及网络、托管内存等)之间自动划分和配置其余部分。

这些值配置为内存大小,例如1536m或2g。

并行性

  • taskmanager.numberOfTaskSlots:TaskManager 提供的槽数(默认值:1)。每个插槽可以接受一个任务或管道。在 TaskManager 中拥有多个插槽有助于在并行任务或管道之间分摊某些恒定开销(JVM、应用程序库或网络连接)。有关详细信息,请参阅任务槽和资源概念部分。
    运行更多较小的 TaskManager,每个都有一个插槽是一个很好的起点,并导致任务之间的最佳隔离。将相同的资源专用于具有更多插槽的更少的大型 TaskManager 有助于提高资源利用率,但代价是任务之间的隔离较弱(更多任务共享同一个 JVM)。

  • parallelism.default:在任何地方都没有指定并行度时使用的默认并行度(默认值:1)。

检查点

您可以直接在 Flink 作业或应用程序的代码中配置检查点。将这些值放在配置中将它们定义为默认值,以防应用程序未配置任何内容。

  • state.backend:要使用的状态后端。这定义了用于拍摄快照的数据结构机制。常见值为filesystem或rocksdb。
  • state.checkpoints.dir:要写入检查点的目录。这需要一个路径 URI,如s3://mybucket/flink-app/checkpoints或hdfs://namenode:port/flink/checkpoints。
  • state.savepoints.dir:保存点的默认目录。采用路径 URI,类似于 state.checkpoints.dir.
  • execution.checkpointing.interval:基本间隔设置。要启用检查点,您需要将此值设置为大于 0。

网页界面

  • web.submit.enable:启用通过 Flink UI 上传和启动作业(默认为 true)。请注意,即使禁用此功能,会话集群仍会通过 REST 请求(HTTP 调用)接受作业。此标志仅保护在 UI 中上传作业的功能。
  • web.cancel.enable:允许通过 Flink UI 取消作业(默认为 true)。请注意,即使禁用此功能,会话集群仍会通过 REST 请求(HTTP 调用)取消作业。此标志仅保护取消 UI 中的作业的功能。
  • web.upload.dir:存储上传作业的目录。仅在web.submit.enable为真时使用。

其他

  • io.tmp.dirs:Flink 存放本地数据的目录,默认为系统临时目录(java.io.tmpdir属性)。如果配置了目录列表,Flink 将在目录之间轮换文件。

默认情况下,放在这些目录中的数据包括 RocksDB 创建的文件、溢出的中间结果(批处理算法)和缓存的 jar 文件。

此数据不依赖于持久性/恢复,但如果此数据被删除,通常会导致重量级恢复操作。因此,建议将其设置为不会自动定期清除的目录。

默认情况下,Yarn 和 Kubernetes 设置会自动将此值配置到本地工作目录。

常用设置选项

配置 Flink 应用程序或集群的常用选项。

主机和端口

为不同的 Flink 组件配置主机名和端口的选项。

JobManager 主机名和端口仅与没有高可用性的独立设置相关。在该设置中,TaskManager 使用配置值来查找(并连接到)JobManager。在所有高可用设置中,TaskManagers 通过 High-Availability-Service(例如 ZooKeeper)发现 JobManager。

使用资源编排框架(K8s、Yarn)的设置通常使用框架的服务发现工具。

您不需要配置任何 TaskManager 主机和端口,除非设置需要使用特定的端口范围或特定的网络接口来绑定。

默认值 类型 描述
jobmanager.rpc.address (none) String 配置参数定义要连接到的网络地址,以便与作业管理器进行通信。此值仅在存在具有静态名称或地址的单个 JobManager 的设置中解释(简单的独立设置,或具有动态服务名称解析的容器设置)。当领导者选举服务(如 ZooKeeper)用于从潜在的多个备用 JobManager 中选举和发现 JobManager 领导者时,它不会在许多高可用性设置中使用。
jobmanager.rpc.port 6123 Integer 配置参数定义要连接到的网络端口,以便与作业管理器进行通信。与 jobmanager.rpc.address 一样,此值仅在存在具有静态名称/地址和端口的单个 JobManager 的设置中解释(简单的独立设置,或具有动态服务名称解析的容器设置)。当使用领导者选举服务(如 ZooKeeper)从潜在的多个备用 JobManager 中选举和发现 JobManager 领导时,许多高可用性设置中不使用此配置选项。
metrics.internal.query-service.port “0” String Flink 内部指标查询服务使用的端口范围。接受端口列表(“50100,50101”)、范围(“50100-50200”)或两者的组合。建议设置一个端口范围,以避免在同一台机器上运行多个 Flink 组件时发生冲突。默认情况下,Flink 会选择一个随机端口。
rest.address (none) String 客户端应该用来连接到服务器的地址。注意:仅当高可用性配置为 NONE 时才考虑此选项。
rest.bind-address (none) String 服务器自己绑定的地址。
rest.bind-port “8081” String 服务器自己绑定的端口。接受端口列表(“50100,50101”)、范围(“50100-50200”)或两者的组合。建议设置一个端口范围,以避免在同一台机器上运行多个 Rest 服务器时发生冲突。
rest.port 8081 Integer 客户端连接的端口。如果未指定 rest.bind-port,则 REST 服务器将绑定到此端口。注意:仅当高可用性配置为 NONE 时才考虑此选项。
taskmanager.data.port 0 Integer 用于数据交换操作的任务管理器的外部端口。
taskmanager.host (none) String TaskManager 暴露的网络接口的外部地址。因为不同的 TaskManager 需要不同的值来设置这个选项,所以通常在一个额外的非共享 TaskManager 特定的配置文件中指定。
taskmanager.rpc.port “0” String TaskManager 暴露的外部 RPC 端口。接受端口列表(“50100,50101”)、范围(“50100-50200”)或两者的组合。建议设置一个端口范围,以避免在同一台机器上运行多个 TaskManager 时发生冲突。

容错

这些配置选项控制 Flink 在执行过程中出现故障时的重启行为。通过在flink-conf.yaml 中配置这些选项,您可以定义集群的默认重启策略。

默认重启策略只有在没有通过ExecutionConfig.

默认值 类型 描述
restart-strategy (none) String 定义在作业失败时使用的重新启动策略。可接受的值为:1. none, off, disable: 无重启策略。2. fixeddelay, fixed-delay: 固定延迟重启策略。3. failurerate, failure-rate: 故障率重启策略。4. exponentialdelay, exponential-delay: 指数延迟重启策略。如果禁用检查点,则默认值为none. 如果启用了检查点,则默认值为fixed-delay重新Integer.MAX_VALUE启动尝试和“ 1 s”延迟。

固定延迟重启策略

默认值 类型 描述
restart-strategy.fixed-delay.attempts 1 Integer restart-strategy如果设置为,则在作业被声明为失败之前 Flink 重试执行的次数fixed-delay。
restart-strategy.fixed-delay.delay 1s Duration restart-strategy如果设置为 ,则两次连续重新启动尝试之间的延迟fixed-delay。当程序与外部系统交互时,延迟重试会很有帮助,例如连接或挂起的事务应该在尝试重新执行之前达到超时。可以使用符号来指定:“1 min”、“20 s”

故障率重启策略

默认值 类型 描述
restart-strategy.failure-rate.delay 1s Duration restart-strategy如果设置为 ,则两次连续重新启动尝试之间的延迟failure-rate。可以使用符号来指定:“1 min”、“20 s”
restart-strategy.failure-rate.failure-rate-interval 1min Duration 测量故障率的时间间隔(如果restart-strategy已设置为 )failure-rate。可以使用符号来指定:“1 min”、“20 s”
restart-strategy.failure-rate.max-failures-per-interval 1 Integer restart-strategy如果已设置为 ,则在作业失败之前的给定时间间隔内的最大重新启动次数failure-rate。

可重试清理

在作业达到全局终端状态后,将执行所有相关资源的清理。如果失败,可以重试此清理。可以配置不同的重试策略来改变这种行为:

默认值 类型 描述
cleanup-strategy “exponential-delay” String 定义在清理失败时使用的清理策略。
可接受的值为:
1. none, disable, off: 清理只执行一次。如果失败,将不会启动重试。作业工件(以及作业的 JobResultStore 条目)必须手动清理,以防万一发生故障。
2. fixed-delay, fixeddelay: 清理尝试将按固定间隔分开,直到清理被认为成功或达到设定的重试次数。达到配置的限制意味着可能需要手动清理作业工件(以及作业的 JobResultStore 条目)。
3. exponential-delay, exponentialdelay: 指数延迟重启策略触发清除,延迟呈指数增长,直到清除成功或达到设定的重试次数。达到配置的限制意味着可能需要手动清理作业工件(以及作业的 JobResultStore 条目)。
默认配置依赖于具有给定默认值的指数延迟重试策略。

固定延迟清理重试策略

默认值 类型 描述
cleanup-strategy.fixed-delay.attempts infinite Integer Flink 在放弃之前重试清理的次数 ifcleanup-strategy已设置为fixed-delay。达到配置的限制意味着可能需要手动清理作业工件(以及作业的 JobResultStore 条目)。
cleanup-strategy.fixed-delay.delay 1 min Duration cleanup-strategy如果设置为 ,则在尝试失败后重新触发清理之前 Flink 等待的时间量fixed-delay。可以使用以下符号指定:“1 min”、“20 s”

指数延迟清理重试策略

默认值 类型 描述
cleanup-strategy.exponential-delay.attempts infinite Integer cleanup-strategy如果设置为 ,则重试失败的清理的次数exponential-delay。达到配置的限制意味着可能需要手动清理作业工件(以及作业的 JobResultStore 条目)。
cleanup-strategy.exponential-delay.initial-backoff 1 s Duration cleanup-strategy如果已设置为 ,则清理重试之间的开始持续时间exponential-delay。可以使用以下符号指定:“1 min”、“20 s”
cleanup-strategy.exponential-delay.max-backoff 1 h Duration cleanup-strategy如果已设置为 ,则清理重试之间的最长可能持续时间exponential-delay。可以使用以下符号指定:“1 min”、“20 s”

检查点和状态后端

这些选项控制状态后端和检查点行为的基本设置。

这些选项仅与以连续流方式执行的作业/应用程序相关。以批处理方式执行的作业/应用程序不使用状态后端和检查点,而是使用针对批处理进行优化的不同内部数据结构。

默认值 类型 描述
state.backend (none) String 用于存储状态的状态后端。
可以通过快捷方式名称指定实现,也可以通过StateBackendFactory的类名指定实现。如果指定了一个工厂,它会通过它的零参数构造函数实例化,并调用它的StateBackendFactory#createFromConfig(ReadableConfig, ClassLoader)方法。可识别的快捷方式名称是’hashmap’和’rocksdb’。
state.checkpoint-storage (none) String 用于检查点状态的检查点存储实现。可以通过它们的快捷方式名称或通过 a 的类名来指定实现CheckpointStorageFactory。如果指定了工厂,则通过其零参数构造函数对其进行实例化,并CheckpointStorageFactory#createFromConfig(ReadableConfig, ClassLoader) 调用其方法。公认的快捷方式名称是“jobmanager”和“filesystem”。
state.checkpoints.dir (none) String Flink 支持的文件系统中用于存储检查点的数据文件和元数据的默认目录。存储路径必须可以从所有参与的进程/节点(即所有 TaskManager 和 JobManager)访问。
state.savepoints.dir (none) String 保存点的默认目录。由将保存点写入文件系统的状态后端(HashMapStateBackend、EmbeddedRocksDBStateBackend)使用。
state.backend.incremental false Boolean 如果可能,选择状态后端是否应创建增量检查点。对于增量检查点,仅存储与前一个检查点的差异,而不是完整的检查点状态。启用后,Web UI 中显示的状态大小或从 REST API 获取的状态大小仅表示增量检查点大小,而不是完整检查点大小。一些状态后端可能不支持增量检查点并忽略此选项。
state.backend.local-recovery false Boolean 此选项为此状态后端配置本地恢复。默认情况下,本地恢复处于禁用状态。本地恢复目前仅涵盖键控状态后端。目前,MemoryStateBackend 不支持本地恢复并忽略此选项。
state.checkpoints.num-retained 1 Integer 要保留的已完成检查点的最大数量。
taskmanager.state.local.root-dirs (none) String config 参数定义用于存储基于文件的状态以进行本地恢复的根目录。本地恢复目前仅涵盖键控状态后端。目前,MemoryStateBackend 不支持本地恢复并忽略此选项。如果未配置,它将默认为 <WORKING_DIR>/localState。<WORKING_DIR> 可以通过配置process.taskmanager.working-dir

高可用性

这里的高可用是指 JobManager 进程从故障中恢复的能力。

JobManager 确保跨 TaskManager 恢复期间的一致性。为了让 JobManager 自身持续恢复,外部服务必须存储最少量的恢复元数据(例如“最后提交的检查点的 ID”),并帮助选举和锁定哪个 JobManager 是领导者(以避免脑裂情况)。

默认值 类型 描述
high-availability “NONE” String 定义用于集群执行的高可用性模式。要启用高可用性,请将此模式设置为“ZOOKEEPER”或指定工厂类的 FQN
high-availability.cluster-id “/default” String Flink 集群的 ID,用于分隔多个 Flink 集群。需要为独立集群设置,但会在 YARN 中自动推断。
high-availability.storageDir (none) String Flink 在高可用性设置中保存元数据的文件系统路径 (URI)。

高可用性设置中的 JobResultStore 选项

默认值 类型 描述
job-result-store.delete-on-commit true Boolean 确定当相应实体转换为干净状态时是否应自动从底层作业结果存储中删除作业结果。如果为 false,则将清理后的作业结果标记为清理以指示其状态。这种情况下,Flink 不再拥有所有权,资源需要用户自己清理。
job-result-store.storage-path (none) String 定义作业结果的存储位置。这应该是一个提供写后读一致性的底层文件系统。默认情况下,这是{high-availability.storageDir}/job-result-store/{high-availability.cluster-id}.

ZooKeeper 的高可用性设置选项

默认值 类型 描述
high-availability.zookeeper.path.root “/flink” String Flink 在 ZooKeeper 中存储其条目的根路径。
high-availability.zookeeper.quorum (none) String 使用 ZooKeeper 以高可用性模式运行 Flink 时使用的 ZooKeeper 仲裁。

内存配置

这些配置值控制 TaskManager 和 JobManager 使用内存的方式。

Flink 试图尽可能地为用户屏蔽为数据密集型处理配置 JVM 的复杂性。在大多数情况下,用户只需要设置值taskmanager.memory.process.size或taskmanager.memory.flink.size(取决于设置方式),并可能通过taskmanager.memory.managed.fraction. 下面的其他选项可用于性能调整和修复与内存相关的错误。

默认值 类型 描述
jobmanager.memory.enable-jvm-direct-memory-limit false Boolean 是否开启JobManager进程的JVM直接内存限制(-XX:MaxDirectMemorySize)。限制将设置为“jobmanager.memory.off-heap.size”选项的值。
jobmanager.memory.flink.size (none) MemorySize JobManager 的总 Flink 内存大小。这包括 JobManager 消耗的所有内存,但 JVM Metaspace 和 JVM Overhead 除外。它由JVM堆内存和堆外内存组成。有关总进程内存大小配置,另请参见“jobmanager.memory.process.size”。
jobmanager.memory.heap.size (none) MemorySize JobManager 的 JVM 堆内存大小。建议的最小 JVM 堆大小为 128.000mb(134217728 字节)。
jobmanager.memory.jvm-metaspace.size 256 mb MemorySize JobManager 的 JVM 元空间大小。
jobmanager.memory.jvm-overhead.fraction 0.1 Float 为 JVM 开销保留的总进程内存的一部分。这是为 JVM 开销保留的堆外内存,例如线程堆栈空间、编译缓存等。这包括本机内存但不包括直接内存,并且在 Flink 计算 JVM 最大直接内存大小参数时不会计算在内。导出 JVM 开销的大小以构成总进程内存的配置部分。如果派生的大小小于或大于配置的最小或最大大小,将使用最小或最大大小。可以通过将最小和最大大小设置为相同的值来明确指定 JVM 开销的确切大小。
jobmanager.memory.jvm-overhead.max 1 gb MemorySize JobManager 的最大 JVM 开销大小。这是为 JVM 开销保留的堆外内存,例如线程堆栈空间、编译缓存等。这包括本机内存但不包括直接内存,并且在 Flink 计算 JVM 最大直接内存大小参数时不会计算在内。导出 JVM 开销的大小以构成总进程内存的配置部分。如果派生的大小小于或大于配置的最小或最大大小,将使用最小或最大大小。可以通过将最小和最大大小设置为相同的值来明确指定 JVM 开销的确切大小。
jobmanager.memory.jvm-overhead.min 192 mb MemorySize JobManager 的最小 JVM 开销大小。这是为 JVM 开销保留的堆外内存,例如线程堆栈空间、编译缓存等。这包括本机内存但不包括直接内存,并且在 Flink 计算 JVM 最大直接内存大小参数时不会计算在内。导出 JVM 开销的大小以构成总进程内存的配置部分。如果派生的大小小于或大于配置的最小或最大大小,将使用最小或最大大小。可以通过将最小和最大大小设置为相同的值来明确指定 JVM 开销的确切大小。
jobmanager.memory.off-heap.size 128 mb MemorySize JobManager 的堆外内存大小。此选项涵盖所有堆外内存使用,包括直接和本机内存分配。如果通过“jobmanager.memory.enable-jvm-direct-memory-limit”启用限制,则 JobManager 进程的 JVM 直接内存限制 (-XX:MaxDirectMemorySize) 将设置为此值。
jobmanager.memory.process.size (none) MemorySize JobManager 的总进程内存大小。这包括 JobManager JVM 进程消耗的所有内存,包括 Total Flink Memory、JVM Metaspace 和 JVM Overhead。在容器化设置中,这应该设置为容器内存。有关总 Flink 内存大小配置,另请参阅“jobmanager.memory.flink.size”。
taskmanager.memory.flink.size (none) MemorySize TaskExecutors 的总 Flink 内存大小。这包括 TaskExecutor 消耗的所有内存,但 JVM Metaspace 和 JVM Overhead 除外。它由框架堆内存、任务堆内存、任务堆外内存、托管内存和网络内存组成。有关总进程内存大小配置,另请参见“taskmanager.memory.process.size”。
taskmanager.memory.framework.heap.size 128 mb MemorySize TaskExecutors 的框架堆内存大小。这是为 TaskExecutor 框架保留的 JVM 堆内存大小,不会分配给任务槽。
taskmanager.memory.framework.off-heap.batch-shuffle.size 64 mb MemorySize 阻塞 shuffle 用于读取 shuffle 数据的内存大小(当前仅由 sort-shuffle 使用)。注意: 1) 内存是从 ‘taskmanager.memory.framework.off-heap.size’ 中截取的,所以必须小于那个,这意味着你可能还需要增加 'taskmanager.memory.framework.off-heap.size’增加此配置值后;2) 此内存大小会影响 shuffle 性能,您可以为大规模批处理作业增加此配置值(例如,增加到 128M 或 256M)。
taskmanager.memory.framework.off-heap.size 128 mb MemorySize TaskExecutors 的框架堆外内存大小。这是为 TaskExecutor 框架保留的堆外内存(JVM 直接内存和本机内存)的大小,不会分配给任务槽。当 Flink 计算 JVM 最大直接内存大小参数时,配置的值将被完全计算在内。
taskmanager.memory.jvm-metaspace.size 256 mb MemorySize 任务执行器的 JVM 元空间大小。
taskmanager.memory.jvm-overhead.fraction 0.1 Float 为 JVM 开销保留的总进程内存的一部分。这是为 JVM 开销保留的堆外内存,例如线程堆栈空间、编译缓存等。这包括本机内存但不包括直接内存,并且在 Flink 计算 JVM 最大直接内存大小参数时不会计算在内。导出 JVM 开销的大小以构成总进程内存的配置部分。如果派生的大小小于/大于配置的最小/最大大小,将使用最小/最大大小。通过将最小/最大大小设置为相同的值,可以明确指定 JVM 开销的确切大小。
taskmanager.memory.jvm-overhead.max 1 gb MemorySize TaskExecutors 的最大 JVM 开销大小。这是为 JVM 开销保留的堆外内存,例如线程堆栈空间、编译缓存等。这包括本机内存但不包括直接内存,并且在 Flink 计算 JVM 最大直接内存大小参数时不会计算在内。导出 JVM 开销的大小以构成总进程内存的配置部分。如果派生的大小小于/大于配置的最小/最大大小,将使用最小/最大大小。通过将最小/最大大小设置为相同的值,可以明确指定 JVM 开销的确切大小。
taskmanager.memory.jvm-overhead.min 192 mb MemorySize TaskExecutors 的最小 JVM 开销大小。这是为 JVM 开销保留的堆外内存,例如线程堆栈空间、编译缓存等。这包括本机内存但不包括直接内存,并且在 Flink 计算 JVM 最大直接内存大小参数时不会计算在内。导出 JVM 开销的大小以构成总进程内存的配置部分。如果派生的大小小于/大于配置的最小/最大大小,将使用最小/最大大小。通过将最小/最大大小设置为相同的值,可以明确指定 JVM 开销的确切大小。
taskmanager.memory.managed.consumer-weights OPERATOR:70,STATE_BACKEND:70,PYTHON:30 Map 为不同类型的消费者管理内存权重。插槽的托管内存由它包含的所有类型的消费者共享,与类型的权重成比例,并且与每种类型的消费者数量无关。目前支持的消费者类型是 OPERATOR(用于内置算法)、STATE_BACKEND(用于 RocksDB 状态后端)和 PYTHON(用于 Python 进程)。
taskmanager.memory.managed.fraction 0.4 Float 如果未明确指定托管内存大小,则用作托管内存的总 Flink 内存的分数。
taskmanager.memory.managed.size (none) MemorySize 任务执行器的托管内存大小。这是由内存管理器管理的堆外内存的大小,保留用于排序、哈希表、中间结果缓存和 RocksDB 状态后端。内存使用者可以以 MemorySegments 的形式从内存管理器分配内存,或者从内存管理器保留字节并将其内存使用量保持在该边界内。如果未指定,它将被派生以构成总 Flink 内存的配置部分。
taskmanager.memory.network.fraction 0.1 Float 用作网络内存的总 Flink 内存的分数。网络内存是为 ShuffleEnvironment 保留的堆外内存(例如,网络缓冲区)。派生网络内存大小以构成总 Flink 内存的配置部分。如果派生的大小小于/大于配置的最小/最大大小,将使用最小/最大大小。通过将最小/最大大小设置为相同的值,可以明确指定网络内存的确切大小。
taskmanager.memory.network.max 1 gb MemorySize 任务执行器的最大网络内存大小。网络内存是为 ShuffleEnvironment 保留的堆外内存(例如,网络缓冲区)。派生网络内存大小以构成总 Flink 内存的配置部分。如果派生的大小小于/大于配置的最小/最大大小,将使用最小/最大大小。可以通过将 min/max 设置为相同的值来明确指定网络内存的确切大小。
taskmanager.memory.network.min 64 mb MemorySize 任务执行器的最小网络内存大小。网络内存是为 ShuffleEnvironment 保留的堆外内存(例如,网络缓冲区)。派生网络内存大小以构成总 Flink 内存的配置部分。如果派生的大小小于/大于配置的最小/最大大小,将使用最小/最大大小。可以通过将 min/max 设置为相同的值来明确指定网络内存的确切大小。
taskmanager.memory.process.size (none) MemorySize TaskExecutors 的总进程内存大小。这包括 TaskExecutor 消耗的所有内存,包括 Total Flink Memory、JVM Metaspace 和 JVM Overhead。在容器化设置中,这应该设置为容器内存。另请参阅“taskmanager.memory.flink.size”以了解 Flink 总内存大小配置。
taskmanager.memory.task.heap.size (none) MemorySize 任务执行器的任务堆内存大小。这是为任务保留的 JVM 堆内存大小。如果未指定,它将得出 Total Flink Memory 减去 Framework Heap Memory、Framework Off-Heap Memory、Task Off-Heap Memory、Managed Memory 和 Network Memory。
taskmanager.memory.task.off-heap.size 0 bytes MemorySize TaskExecutors 的任务堆外内存大小。这是为任务保留的堆外内存(JVM 直接内存和本机内存)的大小。当 Flink 计算 JVM 最大直接内存大小参数时,配置的值将被完全计算在内。

其他选项

默认值 类型 描述
fs.allowed-fallback-filesystems (none) String 文件scheme的(分号分隔)列表,可以使用 Hadoop 代替适当的 Flink 插件。(例如:s3;wasb)
fs.default-scheme (none) String 默认文件系统scheme,用于未明确声明scheme的路径。可能包含一个权限,例如在 HDFS NameNode 的情况下的 host:port。
io.tmp.dirs ‘LOCAL_DIRS’ on Yarn. System.getProperty(“java.io.tmpdir”) in standalone. String 临时文件的目录,用“,”,“|”或系统的 java.io.File.pathSeparator 分隔。

安全(Security)

用于配置 Flink 的安全性以及与外部系统的安全交互的选项。

SSL

Flink 的网络连接可以通过 SSL 保护。有关详细的设置指南和背景,请参阅SSL 设置文档

默认值 类型 描述
security.ssl.algorithms “TLS_RSA_WITH_AES_128_CBC_SHA” String 要支持的标准 SSL 算法的逗号分隔列表。在这里阅读更多
security.ssl.internal.cert.fingerprint (none) String 内部证书的 sha1 指纹。这进一步保护了内部通信以提供 Flink 使用的确切证书。这在无法使用私有 CA(自签名)或需要内部公司范围的 CA 时是必要的
security.ssl.internal.enabled FALSE Boolean 为内部网络通信打开 SSL。可选地,特定组件可以通过它们自己的设置(rpc、数据传输、REST 等)覆盖它。
security.ssl.internal.key-password (none) String 用于解密 Flink 内部端点(rpc、数据传输、blob 服务器)的密钥库中的密钥的秘密
security.ssl.internal.keystore (none) String 带有 SSL 密钥和证书的 Java 密钥库文件,用于 Flink 的内部端点(rpc、数据传输、blob 服务器)。
security.ssl.internal.keystore-password (none) String 为 Flink 的内部端点(rpc、数据传输、blob 服务器)解密 Flink 的密钥库文件的秘密。
security.ssl.internal.truststore (none) String 包含公共 CA 证书的信任库文件,用于验证 Flink 内部端点(rpc、数据传输、blob 服务器)的对等点。
security.ssl.internal.truststore-password (none) String 用于解密 Flink 内部端点(rpc、数据传输、blob 服务器)的信任库的密码。
security.ssl.protocol “TLSv1.2” String ssl 传输支持的 SSL 协议版本。请注意,它不支持逗号分隔列表。
security.ssl.rest.authentication-enabled FALSE Boolean 通过 REST 端点为外部通信打开相互 SSL 身份验证。
security.ssl.rest.cert.fingerprint (none) String 其余证书的 sha1 指纹。这进一步保护了其余 REST 端点以提供仅由代理服务器使用的证书这在曾经使用公共 CA 或内部公司范围的 CA 的情况下是必要的
security.ssl.rest.enabled FALSE Boolean 通过 REST 端点为外部通信打开 SSL。
security.ssl.rest.key-password (none) String 用于解密 Flink 外部 REST 端点的密钥库中的密钥的秘密。
security.ssl.rest.keystore (none) String 带有 SSL 密钥和证书的 Java 密钥库文件,用于 Flink 的外部 REST 端点。
security.ssl.rest.keystore-password (none) String 为 Flink 的外部 REST 端点解密 Flink 的密钥库文件的秘密。
security.ssl.rest.truststore (none) String 包含公共 CA 证书的信任库文件,用于验证 Flink 外部 REST 端点的对等点。
security.ssl.rest.truststore-password (none) String 解密 Flink 外部 REST 端点的信任库的密码。
security.ssl.verify-hostname TRUE Boolean 标志以在 ssl 握手期间启用对等方的主机名验证。

使用外部系统进行身份验证

ZooKeeper 身份验证/授权

当连接到安全的 ZooKeeper quorum时,这些选项是必需的。

默认值 类型 描述
zookeeper.sasl.disable false Boolean  
zookeeper.sasl.login-context-name “Client” String  
zookeeper.sasl.service-name “zookeeper” String  

基于 Kerberos 的身份验证/授权

请参阅Flink 和 Kerberos 文档以获取设置指南和 Flink 可以通过 Kerberos 对其进行身份验证的外部系统列表。

默认值 类型 描述
security.kerberos.fetch.delegation-token true Boolean 指示是否为 Flink 作业需要联系的外部服务获取委托令牌。仅支持 HDFS 和 HBase。它用于 Yarn 部署。如果为 true,Flink 将获取 HDFS 和 HBase 委托令牌并将它们注入 Yarn AM 容器。如果为 false,Flink 将假定委托令牌在 Flink 之外进行管理。因此,它不会为 HDFS 和 HBase 获取委托令牌。如果您依赖提交机制(例如 Apache Oozie)来处理委托令牌,您可能需要禁用此选项。
security.kerberos.login.contexts (none) String 以逗号分隔的登录上下文列表,用于提供 Kerberos 凭据(例如,Client,KafkaClient 以使用凭据进行 ZooKeeper 身份验证和 Kafka 身份验证)
security.kerberos.login.keytab (none) String 包含用户凭据的 Kerberos 密钥表文件的绝对路径。
security.kerberos.login.principal (none) String 与密钥表关联的 Kerberos 主体名称。
security.kerberos.login.use-ticket-cache true Boolean 指示是否从您的 Kerberos 票证缓存中读取。

资源编排框架

本节包含与将 Flink 与资源编排框架(如 Kubernetes、Yarn 等)集成相关的选项。

请注意,将 Flink 与资源编排框架集成并不总是必要的。例如,您可以轻松地在 Kubernetes 上部署 Flink 应用程序,而无需 Flink 知道它在 Kubernetes 上运行(并且无需在此处指定任何 Kubernetes 配置选项。)请参阅此设置指南以获取示例。

本节中的选项对于 Flink 本身主动从编排器请求和释放资源的设置是必需的。

YARN

默认值 类型 描述
external-resource.<resource_name>.yarn.config-key (none) String 如果配置了,Flink 会将这个 key 添加到对 Yarn 的容器请求的资源配置文件中。该值将设置为 external-resource.<resource_name>.amount 的值。
flink.hadoop.<key> (none) String 通过前缀’flink.hadoop.'探测Hadoop配置的一般选项。Flink 将删除要获取的前缀(从core-site.xmlhdfs-default.xml),然后将 and 值设置为 Hadoop 配置。例如,Flink 配置中的 flink.hadoop.dfs.replication=5,在 Hadoop 配置中转换为 dfs.replication=5。
flink.yarn.<key> (none) String 通过前缀’flink.yarn’探测Yarn配置的一般选项。Flink 将删除前缀 “flink”以获取yarn。 (从yarn-default.xml)然后设置Yarn。 和yarn配置的值。例如,在 Flink 配置中,flink.yarn.resourcemanager.container.liveness-monitor.interval-ms=300000,并在 Yarn 配置中转换为 yarn.resourcemanager.container.liveness-monitor.interval-ms=300000。
yarn.application-attempt-failures-validity-interval 10000 Long 以毫秒为单位的时间窗口,它定义了重新启动 AM 时应用程序尝试失败的次数。不考虑超出此窗口范围的故障。将此值设置为 -1 以便全局计数。有关详细信息,请参阅此处
yarn.application-attempts (none) String 应用程序主机重新启动的次数。默认情况下,该值将设置为 1。如果启用了高可用性,则默认值将为 2。重新启动次数也受 YARN 限制(通过 yarn.resourcemanager.am.max-trys 配置)。请注意,整个 Flink 集群将重新启动,YARN 客户端将失去连接。
yarn.application-master.port “”“0"”" String 使用此配置选项,用户可以为 Application Master(和 JobManager)RPC 端口指定端口、端口范围或端口列表。默认情况下,我们建议使用默认值 (0) 让操作系统选择合适的端口。特别是当多个 AM 在同一物理主机上运行时,固定端口分配会阻止 AM 启动。例如,在具有限制性防火墙的环境中在 YARN 上运行 Flink 时,此选项允许指定允许的端口范围。
yarn.application.id (none) String 正在运行的 YARN 集群的 YARN 应用程序 ID。这是将要执行管道的 YARN 集群。
yarn.application.name (none) String YARN 应用程序的自定义名称。
yarn.application.node-label (none) String 为 YARN 应用程序指定 YARN 节点标签。
yarn.application.priority -1 Integer 一个非负整数,表示提交 Flink YARN 应用程序的优先级。只有启用 YARN 优先级调度设置后才会生效。较大的整数对应于较高的优先级。如果优先级为负数或设置为’-1’(默认),Flink 将取消设置yarn优先级设置并使用集群默认优先级。请参阅 YARN 的官方文档,了解为目标 YARN 版本启用优先级调度所需的具体设置。
yarn.application.queue (none) String 放置当前管道的 YARN 队列。
yarn.application.type (none) String 您的 YARN 应用程序的自定义类型…
yarn.appmaster.vcores 1 Integer YARN 应用程序主服务器使用的虚拟核心 (vcore) 的数量。
yarn.classpath.include-user-jar ORDER Enum 定义 user-jars 是否包含在系统类路径中以及它们在路径中的位置。
可能的值:
1. “禁用”:从系统类路径中排除用户 jar
2. “FIRST”:开头的位置
3. “LAST”:最后的位置
4. “ORDER”:基于jar名称的位置
yarn.containers.vcores -1 Integer 每个 YARN 容器的虚拟核心 (vcore) 数。默认情况下,vcore 的数量设置为每个 TaskManager 的插槽数(如果设置),否则设置为 1。为了使用此参数,您的集群必须启用 CPU 调度。您可以通过设置org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler.
yarn.file-replication -1 Integer 每个本地资源文件的文件复制数。如果未配置,Flink 将使用 hadoop 配置中的默认复制值。
yarn.flink-dist-jar (none) String Flink dist jar 的位置。
yarn.heartbeat.container-request-interval 500 Integer 如果 Flink 请求容器,则与 ResourceManager 的心跳之间的时间(以毫秒为单位):
1. 该值越低,Flink 越快收到有关容器分配的通知,因为请求和分配是通过心跳传输的。
2. 该值越低,可能分配的过多容器就越多,这些容器最终将被释放,但会对 Yarn 施加压力。
如果您在 ResourceManager 上观察到过多的容器分配,则建议增加此值。有关更多信息,请参阅此链接
yarn.heartbeat.interval 5 Integer 与 ResourceManager 的心跳之间的时间(以秒为单位)。
yarn.properties-file.location (none) String 当 Flink 作业提交给 YARN 时,JobManager 的主机和可用处理槽的数量被写入属性文件,以便 Flink 客户端能够获取这些详细信息。此配置参数允许更改该文件的默认位置(例如,对于在用户之间共享 Flink 安装的环境)。
yarn.provided.lib.dirs (none) List<String> 提供的 lib 目录的分号分隔列表。它们应该是预先上传的并且是世界可读的。Flink 将使用它们来排除本地 Flink jars(例如 flink-dist、lib/、plugins/)的上传,以加速作业提交过程。此外,YARN 会将它们缓存在节点上,这样就不需要为每个应用程序每次都下载它们。例如 hdfs://$namenode_address/path/of/flink/lib
yarn.security.kerberos.additionalFileSystems (none) List<String> Flink 将要访问的其他受 Kerberos 保护的 Hadoop 文件系统的逗号分隔列表。例如,yarn.security.kerberos.additionalFileSystems=hdfs://namenode2:9002,hdfs://namenode3:9003。提交给 YARN 的客户端需要有权访问这些文件系统才能检索安全令牌。
yarn.security.kerberos.localized-keytab-path “”“krb5.keytab”“” String kerberos 密钥表文件将本地化到的本地(在 NodeManager 上)路径。如果 yarn.security.kerberos.ship-local-keytab 设置为 true,Flink 会将 keytab 文件作为 YARN 本地资源发送。在这种情况下,路径是相对于本地资源目录的。如果设置为 false,Flink 将尝试直接从路径本身定位 keytab。
yarn.security.kerberos.ship-local-keytab TRUE Boolean 如果这是真的,Flink 会将通过 security.kerberos.login.keytab 配置的 keytab 文件作为本地化的 YARN 资源发送。
yarn.ship-archives (none) List<String> 要发送到 YARN 集群的archives的分号分隔列表。这些archives在本地化时将被解压缩,它们可以是以下任何一种类型:“.tar.gz”、“.tar”、“.tgz”、“.dst”、“.jar”、“.zip” .
yarn.ship-files (none) List<String> 要传送到 YARN 集群的文件和/或目录的分号分隔列表。
yarn.staging-directory (none) String 提交应用程序时用于存储 YARN 文件的暂存目录。默认情况下,它使用配置文件系统的主目录。
yarn.tags (none) String 应用于 Flink YARN 应用程序的以逗号分隔的标签列表。
yarn.taskmanager.node-label (none) String 为 Flink TaskManagers 指定 YARN 节点标签,如果两者都设置,它将覆盖 TaskManagers 的 yarn.application.node-label。

Kubernetes

参照官网地址

状态后端

有关状态后端的背景信息,请参阅状态后端文档

RocksDB 状态后端

这些是配置 RocksDB 状态后端通常需要的选项。有关高级低级配置和故障排除所需的选项,请参阅高级 RocksDB 后端部分

默认值 类型 描述
state.backend.rocksdb.memory.fixed-per-slot (none) MemorySize 每个插槽的所有 RocksDB 实例共享的固定总内存量。此选项在配置时会覆盖“state.backend.rocksdb.memory.managed”选项。如果这个选项和 ‘state.backend.rocksdb.memory.managed’ 选项都没有设置,那么每个 RocksDB 列族状态都有自己的内存缓存(由列族选项控制)。
state.backend.rocksdb.memory.high-prio-pool-ratio 0.1 Double 为索引、过滤器和压缩字典块等高优先级数据保留的高速缓存内存部分。此选项仅在配置 ‘state.backend.rocksdb.memory.managed’ 或 ‘state.backend.rocksdb.memory.fixed-per-slot’ 时有效。
state.backend.rocksdb.memory.managed TRUE Boolean 如果设置,RocksDB 状态后端将自动配置自身以使用任务槽的托管内存预算,并将内存划分为写入缓冲区、索引、块缓存等。这样,RocksDB内存的三个主要用途将被限制。
state.backend.rocksdb.memory.partitioned-index-filters FALSE Boolean 通过分区,SST 文件的索引/过滤器块被分割成更小的块,上面有一个额外的顶级索引。读取索引/过滤器时,仅将顶级索引加载到内存中。然后,分区索引/过滤器使用顶级索引将执行索引/过滤器查询所需的分区按需加载到块缓存中。此选项仅在配置 ‘state.backend.rocksdb.memory.managed’ 或 ‘state.backend.rocksdb.memory.fixed-per-slot’ 时有效。
state.backend.rocksdb.memory.write-buffer-ratio 0.5 Double 写入缓冲区可能占用的最大内存量,占总共享内存的一小部分。此选项仅在配置 ‘state.backend.rocksdb.memory.managed’ 或 ‘state.backend.rocksdb.memory.fixed-per-slot’ 时有效。
state.backend.rocksdb.timer-service.factory ROCKSDB Enum 这决定了定时器服务状态实现的工厂。可能的值:
“HEAP”:基于堆的
“ROCKSDB”:基于RocksDB的实现

指标

有关 Flink 指标基础设施的背景信息,请参阅指标系统文档

默认值 类型 描述
metrics.fetcher.update-interval 10000 Long Web UI 使用的指标获取器的更新间隔(以毫秒为单位)。减小此值可以更快地更新指标。如果指标提取器导致负载过多,请增加此值。将此值设置为 0 将完全禁用指标获取。
metrics.internal.query-service.port “0” String Flink 内部指标查询服务使用的端口范围。接受端口列表(“50100,50101”)、范围(“50100-50200”)或两者的组合。建议设置一个端口范围,以避免在同一台机器上运行多个 Flink 组件时发生冲突。默认情况下,Flink 会选择一个随机端口。
metrics.internal.query-service.thread-priority 1 Integer 用于 Flink 内部 metric 查询服务的线程优先级。该线程由 Akka 的线程池执行器创建。优先级的范围是从 1 (MIN_PRIORITY) 到 10 (MAX_PRIORITY)。警告,增加这个值可能会导致主要的 Flink 组件停机。
metrics.job.status.enable CURRENT_TIME List<Enum> 应报告的作业状态指标的选择。可能的值:
1. “STATE”:对于给定的状态,如果作业当前处于该状态,则返回 1,否则返回 0。
2. “CURRENT_TIME”:对于给定状态,如果作业当前处于该状态,则返回作业转换到该状态以来的时间,否则返回 0。
3. “TOTAL_TIME”:对于给定状态,返回作业在该状态中总共花费的时间。
metrics.latency.granularity “operator” String 定义延迟指标的粒度。可接受的值为:
1. single - 在不区分源和子任务的情况下跟踪延迟。
2. operator - 在区分源而不是子任务时跟踪延迟。
3. subtask(子任务) - 在区分源和子任务时跟踪延迟。
metrics.latency.history-size 128 Integer 定义每个算子要保持的测量延迟数。
metrics.latency.interval 0 Long 定义从源发出延迟跟踪标记的时间间隔。如果设置为 0 或负值,则禁用延迟跟踪。启用此功能会显着影响集群的性能。
metrics.reporter.<name>.<parameter> (none) String 为名为<name> 的报告器配置参数 <parameter>。
metrics.reporter.<name>.class (none) String 用于名为 <name> 的报告器的报告器类。
metrics.reporter.<name>.interval 10 s Duration 用于名为 <name> 的报告者的报告者间隔。
metrics.reporters (none) String 报告者姓名的可选列表。如果已配置,则仅启动名称与列表中任何名称匹配的报告器。否则,将启动配置中可以找到的所有报告器。
metrics.scope.delimiter “.” String 用于组合度量标识符的分隔符。
metrics.scope.jm “<host>.jobmanager” String 定义适用于 JobManager 范围内的所有指标的范围格式字符串。
metrics.scope.jm.job “<host>.jobmanager.<job_name>” String 定义适用于 JobManager 上作业范围的所有指标的范围格式字符串。
metrics.scope.operator “<host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>” String 定义适用于算子范围内的所有指标的范围格式字符串。
metrics.scope.task “<host>.taskmanager.<tm_id>.<job_name>.<task_name>.v<subtask_index>” String 定义适用于任务范围的所有指标的范围格式字符串。
metrics.scope.tm “<host>.taskmanager.<tm_id>” String 定义适用于 TaskManager 范围内的所有指标的范围格式字符串。
metrics.scope.tm.job “<host>.taskmanager.<tm_id>.<job_name>” String 定义适用于 TaskManager 上作业范围的所有指标的范围格式字符串。
metrics.system-resource FALSE Boolean 指示 Flink 是否应报告系统资源指标的标志,例如机器的 CPU、内存或网络使用情况。
metrics.system-resource-probing-interval 5000 Long 以毫秒为单位指定的系统资源指标探测之间的间隔。仅在启用 ‘metrics.system-resource’ 时有效。

RocksDB 原生指标

Flink 可以为使用 RocksDB 状态后端的应用程序报告来自 RocksDB 原生代码的指标。这里的指标范围是算子,然后按列族进一步细分;值报告为无符号长整数。
参考官网地址

History Server

历史服务器保存已完成作业的信息(图表、运行时、统计信息)。要启用它,您必须在 JobManager ( jobmanager.archive.fs.dir) 中启用“作业归档”。

更详细请查看

默认值 类型 描述
historyserver.archive.clean-expired-jobs FALSE Boolean HistoryServer 是否应该清理不再存在的作业 historyserver.archive.fs.dir
historyserver.archive.fs.dir (none) String 逗号分隔的目录列表,用于从中获取归档作业。历史服务器将监视这些目录中的归档作业。您可以配置 JobManager 以通过 jobmanager.archive.fs.dir 将作业归档到目录。
historyserver.archive.fs.refresh-interval 10000 Long 刷新归档作业目录的时间间隔(毫秒)。
historyserver.archive.retained-jobs -1 Integer historyserver.archive.fs.dir 定义的每个归档目录中保留的最大作业数。如果设置为 -1(默认),则存档数量没有限制。如果设置为“0”或小于“-1”,HistoryServer 将抛出一个IllegalConfigurationException
historyserver.web.address (none) String HistoryServer 的 Web 界面的地址。
historyserver.web.port 8082 Integer HistoryServers Web 界面的端口。
historyserver.web.refresh-interval 10000 Long HistoryServer Web 前端的刷新间隔(以毫秒为单位)。
historyserver.web.ssl.enabled FALSE Boolean 启用对 HistoryServer Web 前端的 HTTPs 访问。这仅在全局 SSL 标志 security.ssl.enabled 设置为 true 时适用。
historyserver.web.tmpdir (none) String 历史服务器 REST API 用于临时文件的本地目录。

Experimental(实验性功能)

Flink 中实验性功能的选项。

可查询状态

Queryable State是一项实验性功能,可让您像键/值存储一样访问 Flink 的内部状态。有关详细信息,请参阅可查询状态文档

默认值 类型 描述
queryable-state.client.network-threads 0 Integer 可查询状态客户端的网络(Netty 的事件循环)线程数。
queryable-state.enable FALSE Boolean 选项是否应在可能和可配置的情况下启用可查询状态代理和服务器。
queryable-state.proxy.network-threads 0 Integer 可查询状态代理的网络(Netty 的事件循环)线程数。
queryable-state.proxy.ports “9069” String 可查询状态代理的端口范围。指定的范围可以是单个端口:“9123”,端口范围:“50100-50200”,或范围和端口列表:“50100-50200,50300-50400,51234”。
queryable-state.proxy.query-threads 0 Integer 可查询状态代理的查询线程数。如果设置为 0,则使用插槽数。
queryable-state.server.network-threads 0 Integer 可查询状态服务器的网络(Netty 的事件循环)线程数。
queryable-state.server.ports “9067” String 可查询状态服务器的端口范围。指定的范围可以是单个端口:“9123”,端口范围:“50100-50200”,或范围和端口列表:“50100-50200,50300-50400,51234”。
queryable-state.server.query-threads 0 Integer 可查询状态服务器的查询线程数。如果设置为 0,则使用插槽数。

Client(客户端)

默认值 类型 描述
client.retry-period 2 s Duration 通过 CLI 或 Flink 的客户端执行命令失败的连续重试之间的间隔(以毫秒为单位),只要支持重试(默认为 2 秒)。
client.timeout 1 min Duration 客户端超时。

Execution (执行)

默认值 类型 描述
execution.allow-client-job-configurations TRUE Boolean 确定是否允许用户程序中的配置。根据您的部署模式,失败的作业可能会产生不同的影响。尝试将作业提交到外部集群(会话集群部署)的客户端会引发异常或作业管理器(应用程序模式部署)。
execution.attached FALSE Boolean 指定管道是以附加模式还是分离模式提交。
execution.job-listeners (none) List<String> 要在执行环境中注册的自定义 JobListener。注册的侦听器不能有带参数的构造函数。
execution.shutdown-on-application-finish TRUE Boolean Flink 应用程序集群是否应该在其应用程序完成后自动关闭(成功或失败)。对其他部署模式无效。
execution.shutdown-on-attached-exit FALSE Boolean 如果作业以附加模式提交,则在 CLI 突然终止时执行尽力而为的集群关闭,例如,响应用户中断,例如键入 Ctrl C。
execution.submit-failed-job-on-application-error FALSE Boolean 如果应用程序驱动程序在实际提交作业之前出现错误,则应提交失败的作业(在应用程序模式下)。这旨在提供一种向用户报告故障的干净方式,并且与“execution.shutdown-on-application-finish”结合使用特别有用。此选项仅在强制提交单个作业时有效(启用“高可用性”)。请注意,这是一个实验性选项,将来可能会更改。
execution.target (none) String 执行的部署目标。调用时可以采用以下值之一bin/flink run:
remote
local
yarn-per-job (deprecated)
yarn-session
kubernetes-session
以及调用时的以下值之一bin/flink run-application:
yarn-application
kubernetes-application

其它参数

官网地址

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhiafgib
系列文章
更多 icon
同类精品
更多 icon
继续加载