关于flink的状态, checkpoint,exactly once 消费
这一篇文章写的特别的好。
https://www.jianshu.com/p/4d31d6cddc99
个人用自己的语言在捋一遍。
假设有如下一个程序:
kafka->source-> keybyUser->sink(统计PV)
首先简单起见,假设只有一个并行度。
第一步是要开启 checkpoint机制,设置checkpoint的时间间隔,可以当作是某种形式的备份状态数据。
既然要备份,那么就可以选择需要备份的地方,可以是内存,也可以使外存,比如hdfs,rocketdb等。
以上面的例子为例,假设10分钟做一个checkpoint,我们看看是如何实现的。
首先,jobManager 为整个job指定一个 checkpointer coordinator管理者(cdr),有他负责整个备份流程。
cdr 每10分钟发送一个事件,叫做barrier到流数据中。
从source 开始,在我们的例子中,source 备份了什么,主要就是记住,我已经消费了的kafka 的offset,比如
记录下来(partion-1, 1000)
然后,把barrier 转发给下游的算子,下游统计pv的程序,比如此时此刻,统计到
(site1,1000), 在收到 barrier之后,就会停止目前的流计算,然后进行state备份。
备份完之后,发给下游sink,sink 看自己需求是否是无状态,决定是否需要备份。
等这三个阶段都完成之后,cdr 就会决定这个程序完成了一次checkpoint机制。
加入下一个轮回中,有相关的算子出现了异常,整个jobmanager可以从最新的checkpoint中进行恢复。
在上面的例子中,就是从kafka 最最新的offset 读取数据,然后,统计pv的算子,也可以从已有的
(site1,1000)继续统计。这就是整个checkpoint 和异常恢复的机制。
这里涉及到一点,就是在处理快照的时候,整个处理程序是要倍阻塞停顿的,比如(site1,1000)触发快照,
如果不停段,在写入state的时候可能就是(site1,1001)了,造成不准确,exactly-once无法保障。
现在让我们提高复杂度?假设在多个并行度的情况下如何做处理?
这里有一个类似与join的概念, 如果 一个 task,他的上游输入是有多个流的,那么
对于 sub1,sub2 的barrier,task 需要等待这两个barrier都到达之后做一个checkpoint 并且向下游发送barrier。
这个操作在flink里面叫做barrier对齐。 先到barrier,对于后续的流数据,通常会存在缓冲里面,并不做处理。
这样做通常会影响部分性能,但是Exactly Once时必须barrier对齐,如果barrier不对齐就变成了At Least Once;
那么exactly once 的case 我们也就大概明白。
对于多个并行度,只有做barrier对齐才能达到exactly once。
网友评论