一致性检查点(Checkpoints)
-
Flink 故障恢复机制的核心,就是应用状态的一致性检查点
-
有状态流应用的一致检查点,其实就是所有任务的状态,在某个时间点的一份拷贝(一份快照);这个时间点,应该是所有任务都恰好处理完一个相同的输入数据的时候(如5这个数据虽然进了奇数流但是偶数流也应该做快照,因为属于同一个相同数据,只是没有被他处理)
-
在JobManager中也有个Chechpoint的指针,指向了仓库的状态快照的一个拓扑图,为以后的数据故障恢复做准备
从检查点恢复状态
- 在执行流应用程序期间,Flink 会定期保存状态的一致检查点
- 如果发生故障, Flink 将会使用最近的检查点来一致恢复应用程序的状态,并重新启动处理流程(如图中所示,7这个数据被source读到了,准备传给奇数流时,奇数流宕机了,数据传输发生中断)
- 遇到故障之后,第一步就是重启应用(重启后的流都是空的)
- 第二步是从 checkpoint 中读取状态,将状态重置(读取在远程仓库(Storage,这里的仓库指状态后端保存数据指定的三种方式之一)保存的状态),从检查点重新启动应用程序后,其内部状态与检查点完成时的状态完全相同
- 第三步:开始消费并处理检查点到发生故障之间的所有数据
- 这种检查点的保存和恢复机制可以为应用程序状态提供“精确一次”(exactly-once)的一致性,因为所有算子都会保存检查点并恢复其所有状态,这样一来所有的输入流就都会被重置到检查点完成时的位置
Chandy-Lamport 算法
在上图所示的数据7,同样被Source读取后,在传向奇数流时,奇数流宕机了,那么这个数据7在开始已经Source读取了,但是由于宕机,奇数流又没有处理到这个数据7,那么当检查点恢复后,这个数据7是否还会重新从输入队列中读取,如果不重新读取则数据将发生丢失,为了防止这种情况Flink做了改进实现,这种实现叫Chandy-Lamport 算法的分布式快照,这样做的好处是将检查点的保存和数据处理分离开,不暂停整个应用,简单来说就是Source读取一个数据,自己就做一份CheckPoint保存,不用管其他数据流是否读取,其他数据流依然如此,读取一个数据保存一份数据
Flink 检查点算法
Flink 检查点算法 :
- Flink 的检查点算法用到了一种称为分界线(barrier)的特殊数据形式,用来把一条流上数据按照不同的检查点分开
- 分界线之前到来的数据导致的状态更改,都会被包含在当前分界线所属的检查点中;而基于分界线之后的数据导致的所有更改,就会被包含在之后的检查点中
算法操作解析
- 现在是一个有两个输入流的应用程序,用并行的两个 Source 任务来读取
- 两条自然数数据流,蓝色数据流已经输出完
蓝3
了,黄色数据流输出完黄4
了 - 在Souce端 Source1 接收到了数据
蓝3
正在往下游发向一个数据蓝2 和 蓝3
; Source2 接受到了数据黄4
,且往下游发送数据黄4
- 偶数流已经处理完
黄2
所以后面显示为2, 奇数流处理完蓝1 和 黄1 黄3
所以为5 并分别往下游发送每次聚合后的结果给Sink
- JobManager 会向每个 source 任务发送一条带有新检查点 ID 的消息,通过这种方式来启动检查点,这个带有新检查点ID的东西为barrier,图中三角型表示,2只是ID
- 在Source端接受到barrier后,将自己此身的3 和 4 的数据,将它们的状态写入检查点,且向JobManager发送checkpoint成功的消息(状态后端在状态存入检查点之后,会返回通知给 source 任务,source 任务就会向 JobManager 确认检查点完),然后向下游分别发出一个检查点 barrier
- 可以看出在Source接受barrier时,数据流也在不断的处理,不会进行中断,
- 此时的偶数流已经处理完
蓝2
变成了4,但是还没处理到黄4
,只是下游发送了一个次数的数据4,而奇数流已经处理完蓝3
变成了8,并向下游发送了8 - 此时barrier都还未到奇数流和偶数流
- 此时蓝色流的barrier先一步抵达了偶数流,黄色的barrier还未到,但是因为数据的不中断一直处理,此时的先到的蓝色的barrier会将此时的偶数流的数据4进行缓存处理,流接着处理接下来的数据等待着黄色的barrier的到来,而黄色barrier之前的数据将会对缓存的数据相加
- 这次处理的总结:分界线对齐:barrier 向下游传递,sum 任务会等待所有输入分区的 barrier 到达,对于barrier已经到达的分区,继续到达的数据会被缓存。而barrier尚未到达的分区,数据会被正常处理
- 当蓝色的barrier和黄色的barrier(所有分区的)都到达后,进行状态保存到远程仓库,然后对JobManager发送消息,说自己的检查点保存完毕了
- 此时的偶数流和奇数流都为8
- 当收到所有输入分区的 barrier 时,任务就将其状态保存到状态后端的检查点中,然后将 barrier 继续向下游转发
- 向下游转发检查点 barrier 后,任务继续正常的数据处理
- Sink 任务向 JobManager 确认状态保存到 checkpoint 完毕
- 当所有任务都确认已成功将状态保存到检查点时,检查点就真正完成了
保存点(Savepoints)
简而言之,CheckPoint为自动保存,SavePoint为手动保存
CheckPoint配置
env.enableCheckpointing(1000L) // 开启 触发时间间隔为1000毫秒
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE) // 语义 默认EXACTLY_ONCE
env.getCheckpointConfig.setCheckpointTimeout(60000L) // 超时时间
env.getCheckpointConfig.setMaxConcurrentCheckpoints(2) // 最大允许同时出现几个CheckPoint
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500L) // 最小得间隔时间
env.getCheckpointConfig.setPreferCheckpointForRecovery(true) // 是否倾向于用CheckPoint做故障恢复
env.getCheckpointConfig.setTolerableCheckpointFailureNumber(3) // 容忍多少次CheckPoint失败
// 重启策略
// 固定时间重启
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000L))
// 失败率重启
env.setRestartStrategy(RestartStrategies.failureRateRestart(5, Time.of(5, TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS)))