美文网首页apache iceberg
iceberg源码-StreamingMonitorFuncti

iceberg源码-StreamingMonitorFuncti

作者: 神奇的考拉 | 来源:发表于2021-09-23 16:58 被阅读0次

一、概述

是为了支持iceberg在flink的实时读取场景下的需求,主要用来

  • 监控icebeg snaoshots
  • 对新增的data files生成对应的FlinkInputSplit
  • 把生成的split交由下游进行处理的
    不过需要注意的是:StreamingMonitorFunction并行度=1,主要用来监控iceberg monitor

二、实现

定义关键属性
// 最近checkpoint的提交的最新snapshot id
private static final long INIT_LAST_SNAPSHOT_ID = -1L;
// iceberg table加载器
private final TableLoader tableLoader;
// iceberg table读取时指定的scan上下文
private final ScanContext scanContext;
// 是否允许
private volatile boolean isRunning = true;

// The checkpoint thread is not the same thread that running the function for SourceStreamTask now. It's necessary to
// mark this as volatile.
// 因为
private volatile long lastSnapshotId = INIT_LAST_SNAPSHOT_ID;
// 记录scan上下文中的InputSplit
private transient SourceContext<FlinkInputSplit> sourceContext;
// iceberg table
private transient Table table;
// checkpoint的state数据
private transient ListState<Long> lastSnapshotIdState;

iceberg流式读取

public void run(SourceContext<FlinkInputSplit> ctx) throws Exception {
  this.sourceContext = ctx;
  while (isRunning) { // 一直进行读取
    synchronized (sourceContext.getCheckpointLock()) { // 此处使用checkpoint lock来防止source读取的并发问题
      if (isRunning) { // 二次检查是否处于可读状态
        monitorAndForwardSplits();
      }
    }
    Thread.sleep(scanContext.monitorInterval().toMillis()); // 根据指定monitor周期进行sleep,接着进行下一轮数据读取
  }
}

private void monitorAndForwardSplits() {
  // Refresh the table to get the latest committed snapshot.
  // 首先进行refresh 获取最新的snapshot
  table.refresh();
  // 获取当前iceberg的snapshot
  Snapshot snapshot = table.currentSnapshot();
  if (snapshot != null && snapshot.snapshotId() != lastSnapshotId) { // 获取有效snapshot进行读取
    long snapshotId = snapshot.snapshotId();  // snapshot

    ScanContext newScanContext;  // 获取本地读取对应的scan
    if (lastSnapshotId == INIT_LAST_SNAPSHOT_ID) { // 实时读取
      newScanContext = scanContext.copyWithSnapshotId(snapshotId);
    } else { // 读取固定[lastsnapshotid, snapshotid]间的内容
      newScanContext = scanContext.copyWithAppendsBetween(lastSnapshotId, snapshotId);
    }
    // 生成splits
    FlinkInputSplit[] splits = FlinkSplitGenerator.createInputSplits(table, newScanContext);
    for (FlinkInputSplit split : splits) { // 将FlinkInputSplit下发到下游进行处理
      sourceContext.collect(split);
    }

    lastSnapshotId = snapshotId;  // 同步最新的snapshot id
  }
}
// 关于FlinkInputSplit部分见后续的源码分析

关于checkpoint部分

// 初始化checkpoint state
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
  // Load iceberg table from table loader.
  // 1、加载iceberg table
  tableLoader.open();
  table = tableLoader.loadTable();

  // Initialize the flink state for last snapshot id.
  // 2、初始化state: 采用ListState
  lastSnapshotIdState = context.getOperatorStateStore().getListState(
      new ListStateDescriptor<>(
          "snapshot-id-state",
          LongSerializer.INSTANCE));

  // Restore the last-snapshot-id from flink's state if possible.
  // 针对进行恢复的情况
  if (context.isRestored()) {
    LOG.info("Restoring state for the {}.", getClass().getSimpleName());
    lastSnapshotId = lastSnapshotIdState.get().iterator().next();
  } else if (scanContext.startSnapshotId() != null) { // 若是不是进行恢复,并且存在start snapshot id
    Preconditions.checkNotNull(table.currentSnapshot(), "Don't have any available snapshot in table.");

    // 需要获取当前iceberg table关联的snapshot;并且判断当前table关联的snapshot与指定的startsnapshot是否存在“关系”
    // 若是存在“继承关系” 则指定lastSnapshot为指定的scan参数的startsnapshot
    long currentSnapshotId = table.currentSnapshot().snapshotId();
    Preconditions.checkState(SnapshotUtil.ancestorOf(table, currentSnapshotId, scanContext.startSnapshotId()),
        "The option start-snapshot-id %s is not an ancestor of the current snapshot.", scanContext.startSnapshotId());

    lastSnapshotId = scanContext.startSnapshotId();
  }
}
// 当快照结束时,清理state并记录下当前最新的snapshot id
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
  lastSnapshotIdState.clear();
  lastSnapshotIdState.add(lastSnapshotId);
}
// 当处理source在run被执行前被取消时
@Override
public void cancel() {
  // this is to cover the case where cancel() is called before the run()
  if (sourceContext != null) { // 此时有checkpoint需要获取其对应的lock在进行处理
    synchronized (sourceContext.getCheckpointLock()) {
      isRunning = false;
    }
  } else {  // 若是未开始则不需要
    isRunning = false;
  }

  // 释放iceberg tableLoader相关的资源 
  // Release all the resources here.
  if (tableLoader != null) {
    try {
      tableLoader.close();
    } catch (IOException e) {
      throw new UncheckedIOException(e);
    }
  }
}

相关文章

网友评论

    本文标题:iceberg源码-StreamingMonitorFuncti

    本文链接:https://www.haomeiwen.com/subject/tleogltx.html