一、概述
是为了支持iceberg在flink的实时读取场景下的需求,主要用来
- 监控icebeg snaoshots
- 对新增的data files生成对应的FlinkInputSplit
- 把生成的split交由下游进行处理的
不过需要注意的是:StreamingMonitorFunction并行度=1,主要用来监控iceberg monitor
二、实现
定义关键属性
// 最近checkpoint的提交的最新snapshot id
private static final long INIT_LAST_SNAPSHOT_ID = -1L;
// iceberg table加载器
private final TableLoader tableLoader;
// iceberg table读取时指定的scan上下文
private final ScanContext scanContext;
// 是否允许
private volatile boolean isRunning = true;
// The checkpoint thread is not the same thread that running the function for SourceStreamTask now. It's necessary to
// mark this as volatile.
// 因为
private volatile long lastSnapshotId = INIT_LAST_SNAPSHOT_ID;
// 记录scan上下文中的InputSplit
private transient SourceContext<FlinkInputSplit> sourceContext;
// iceberg table
private transient Table table;
// checkpoint的state数据
private transient ListState<Long> lastSnapshotIdState;
iceberg流式读取
public void run(SourceContext<FlinkInputSplit> ctx) throws Exception {
this.sourceContext = ctx;
while (isRunning) { // 一直进行读取
synchronized (sourceContext.getCheckpointLock()) { // 此处使用checkpoint lock来防止source读取的并发问题
if (isRunning) { // 二次检查是否处于可读状态
monitorAndForwardSplits();
}
}
Thread.sleep(scanContext.monitorInterval().toMillis()); // 根据指定monitor周期进行sleep,接着进行下一轮数据读取
}
}
private void monitorAndForwardSplits() {
// Refresh the table to get the latest committed snapshot.
// 首先进行refresh 获取最新的snapshot
table.refresh();
// 获取当前iceberg的snapshot
Snapshot snapshot = table.currentSnapshot();
if (snapshot != null && snapshot.snapshotId() != lastSnapshotId) { // 获取有效snapshot进行读取
long snapshotId = snapshot.snapshotId(); // snapshot
ScanContext newScanContext; // 获取本地读取对应的scan
if (lastSnapshotId == INIT_LAST_SNAPSHOT_ID) { // 实时读取
newScanContext = scanContext.copyWithSnapshotId(snapshotId);
} else { // 读取固定[lastsnapshotid, snapshotid]间的内容
newScanContext = scanContext.copyWithAppendsBetween(lastSnapshotId, snapshotId);
}
// 生成splits
FlinkInputSplit[] splits = FlinkSplitGenerator.createInputSplits(table, newScanContext);
for (FlinkInputSplit split : splits) { // 将FlinkInputSplit下发到下游进行处理
sourceContext.collect(split);
}
lastSnapshotId = snapshotId; // 同步最新的snapshot id
}
}
// 关于FlinkInputSplit部分见后续的源码分析
关于checkpoint部分
// 初始化checkpoint state
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// Load iceberg table from table loader.
// 1、加载iceberg table
tableLoader.open();
table = tableLoader.loadTable();
// Initialize the flink state for last snapshot id.
// 2、初始化state: 采用ListState
lastSnapshotIdState = context.getOperatorStateStore().getListState(
new ListStateDescriptor<>(
"snapshot-id-state",
LongSerializer.INSTANCE));
// Restore the last-snapshot-id from flink's state if possible.
// 针对进行恢复的情况
if (context.isRestored()) {
LOG.info("Restoring state for the {}.", getClass().getSimpleName());
lastSnapshotId = lastSnapshotIdState.get().iterator().next();
} else if (scanContext.startSnapshotId() != null) { // 若是不是进行恢复,并且存在start snapshot id
Preconditions.checkNotNull(table.currentSnapshot(), "Don't have any available snapshot in table.");
// 需要获取当前iceberg table关联的snapshot;并且判断当前table关联的snapshot与指定的startsnapshot是否存在“关系”
// 若是存在“继承关系” 则指定lastSnapshot为指定的scan参数的startsnapshot
long currentSnapshotId = table.currentSnapshot().snapshotId();
Preconditions.checkState(SnapshotUtil.ancestorOf(table, currentSnapshotId, scanContext.startSnapshotId()),
"The option start-snapshot-id %s is not an ancestor of the current snapshot.", scanContext.startSnapshotId());
lastSnapshotId = scanContext.startSnapshotId();
}
}
// 当快照结束时,清理state并记录下当前最新的snapshot id
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
lastSnapshotIdState.clear();
lastSnapshotIdState.add(lastSnapshotId);
}
// 当处理source在run被执行前被取消时
@Override
public void cancel() {
// this is to cover the case where cancel() is called before the run()
if (sourceContext != null) { // 此时有checkpoint需要获取其对应的lock在进行处理
synchronized (sourceContext.getCheckpointLock()) {
isRunning = false;
}
} else { // 若是未开始则不需要
isRunning = false;
}
// 释放iceberg tableLoader相关的资源
// Release all the resources here.
if (tableLoader != null) {
try {
tableLoader.close();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}
网友评论