李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
32.Flink Checkpoint配置
Leefs
2022-02-13 PM
2234℃
0条
### 前言 Flink 中的每个方法或算子都能够是**有状态的**。状态化的方法在处理单个`元素/事件`的时候存储数据,让状态成为使各个类型的算子更加精细的重要部分。为了让状态容错,Flink 需要为状态添加 **checkpoint(检查点)**。Checkpoint 使得 Flink 能够恢复状态和在流中的位置,从而向应用提供和无故障执行时一样的语义。 ### 一、开启与配置 Checkpoint 默认情况下 checkpoint 是禁用的。通过调用 `StreamExecutionEnvironment` 的 `enableCheckpointing(n)` 来启用 checkpoint,里面的 *n* 是进行 checkpoint 的间隔,单位毫秒。 **Checkpoint 其他的属性包括:** - **精确一次(exactly-once)对比至少一次(at-least-once)**:你可以选择向 `enableCheckpointing(long interval, CheckpointingMode mode)` 方法中传入一个模式来选择使用两种保证等级中的哪一种。 对于大多数应用来说,精确一次是较好的选择。至少一次可能与某些延迟超低(始终只有几毫秒)的应用的关联较大。 - **checkpoint 超时**:如果 checkpoint 执行的时间超过了该配置的阈值,还在进行中的 checkpoint 操作就会被抛弃。 + **checkpoints 之间的最小时间**:该属性定义在 checkpoint 之间需要多久的时间,以确保流应用在 checkpoint 之间有足够的进展。如果值设置为了 *5000*, 无论 checkpoint 持续时间与间隔是多久,在前一个 checkpoint 完成时的至少五秒后会才开始下一个 checkpoint。 往往使用“checkpoints 之间的最小时间”来配置应用会比 checkpoint 间隔容易很多,因为“checkpoints 之间的最小时间”在 checkpoint 的执行时间超过平均值时不会受到影响(例如如果目标的存储系统忽然变得很慢)。 注意这个值也意味着并发 checkpoint 的数目是*一*。 + **checkpoint 可容忍连续失败次数**:该属性定义可容忍多少次连续的 checkpoint 失败。超过这个阈值之后会触发作业错误 fail over。 默认次数为“0”,这意味着不容忍 checkpoint 失败,作业将在第一次 checkpoint 失败时fail over。 + **并发 checkpoint 的数目**: 默认情况下,在上一个 checkpoint 未完成(失败或者成功)的情况下,系统不会触发另一个 checkpoint。这确保了拓扑不会在 checkpoint 上花费太多时间,从而影响正常的处理流程。 不过允许多个 checkpoint 并行进行是可行的,对于有确定的处理延迟(例如某方法所调用比较耗时的外部服务),但是仍然想进行频繁的 checkpoint 去最小化故障后重跑的 pipelines 来说,是有意义的。 该选项不能和 “checkpoints 间的最小时间"同时使用。 + **externalized checkpoints**: 你可以配置周期存储 checkpoint 到外部系统中。Externalized checkpoints 将他们的元数据写到持久化存储上并且在 job 失败的时候*不会*被自动删除。 这种方式下,如果你的 job 失败,你将会有一个现有的 checkpoint 去恢复。 ### 二、Checkpoint 参数详解 ```java StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 每 60s 开始一次 checkpoint env.enableCheckpointing(60000); // 高级选项: // checkpoint 语义设置为 EXACTLY_ONCE(精确一次),这是默认语义 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 两次 checkpoint 的间隔时间至少为 1 s,默认是 0,立即进行下一次 checkpoint env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000); // checkpoint 必须在 60s 内结束,否则被丢弃,默认是 10 分钟 env.getCheckpointConfig().setCheckpointTimeout(60000); // 同一时间只能允许有一个 checkpoint env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 最多允许 checkpoint 失败 3 次 env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3); // 当 Flink 任务取消时,保留外部保存的 checkpoint 信息 env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // 当有较新的 Savepoint 时,作业也会从 Checkpoint 处恢复 env.getCheckpointConfig().setPreferCheckpointForRecovery(true); // 允许实验性的功能:非对齐的 checkpoint,以提升性能 env.getCheckpointConfig().enableUnalignedCheckpoints(); ``` **说明** + **env.enableCheckpointing(60000)**:1 分钟触发一次 checkpoint; + **setCheckpointTimeout**:checkpoint 超时时间,默认是 10 分钟超时,超过了超时时间就会被丢弃; + **setCheckpointingMode**:设置 checkpoint 语义,可以设置为 EXACTLY_ONCE,表示既不重复消费也不丢数据;AT_LEAST_ONCE,表示至少消费一次,可能会重复消费; + **setMinPauseBetweenCheckpoints**:两次 checkpoint 之间的间隔时间。 假如设置每分钟进行一次 checkpoint,两次 checkpoint 间隔时间为 30s。假设某一次 checkpoint 耗时 40s,那么理论上20s 后就要进行一次 checkpoint,但是设置了两次 checkpoint 之间的间隔时间为 30s,所以是 30s 之后才会进行 checkpoint。另外,如果配置了该参数,那么同时进行的 checkpoint 数量只能为 1; + **enableExternalizedCheckpoints**:Flink 任务取消后,外部 checkpoint 信息是否被清理。 + **DELETE_ON_CANCELLATION**:任务取消后,所有的 checkpoint 都将会被清理。只有在任务失败后,才会被保留; + **RETAIN_ON_CANCELLATION**:任务取消后,所有的 checkpoint 都将会被保留,需要手工清理。 + **setPreferCheckpointForRecovery**:恢复任务时,是否从最近一个比较新的 savepoint 处恢复,默认是 false; + **enableUnalignedCheckpoints**:是否开启试验性的非对齐的 checkpoint,可以在反压情况下极大减少 checkpoint 的次数; *附参考文章链接* *https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/datastream/fault-tolerance/checkpointing/*
标签:
Flink
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://www.lilinchao.com/archives/1892.html
上一篇
31.Flink容错机制介绍
下一篇
33.Flink重启策略
取消回复
评论啦~
提交评论
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
标签云
MySQL
微服务
散列
DataWarehouse
Hive
Spark
Eclipse
Java工具类
Azkaban
Stream流
Flume
哈希表
Filter
gorm
Python
随笔
Kibana
稀疏数组
Elasticsearch
Scala
Quartz
CentOS
查找
Elastisearch
Jenkins
Kafka
Linux
Hbase
MyBatis
队列
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞