美文网首页算法小白菜spark
Flink--WaterMark理解和实践

Flink--WaterMark理解和实践

作者: 李小李的路 | 来源:发表于2019-08-06 18:20 被阅读16次
  • 基于flink-1.8.1

Watermark作用

  • watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用watermark机制结合window来实现。
  • 我们知道,流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、背压等原因,导致乱序的产生(out-of-order或者说late element)。
  • 但是对于late element,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是watermark。
  • watermark基础知识:Flink--EventTime中WaterMark知识点扫盲

Window的划分

  • Window的设定无关数据本身,而是系统定义好的。
  • window是flink中划分数据一个基本单位,window的划分方式是固定的,默认会根据自然时间划分window,并且划分方式是前闭后开。
  • window示例:
window划分 第一个 第二个 第三个
3s [00:00:00~00:00:03) [00:00:03~00:00:06) [00:00:06~00:00:09)
5s [00:00:00~00:00:05) [00:00:05~00:00:10) [00:00:10~00:00:15)
10s [00:00:00~00:00:10) [00:00:10~00:00:20) [00:00:20~00:00:30)
1min [00:00:00~00:01:00) [00:01:00~00:02:00) [00:02:00~00:03:00)

Watermark分配方式

Watermark默认更新时间

  • 详见源码解释
  • 在非processing time的模式下,默认是200ms;
// --------------------------------------------------------------------------------------------
    //  Time characteristic
    // --------------------------------------------------------------------------------------------

    /**
     * Sets the time characteristic for all streams create from this environment, e.g., processing
     * time, event time, or ingestion time.
     *
     * <p>If you set the characteristic to IngestionTime of EventTime this will set a default
     * watermark update interval of 200 ms. If this is not applicable for your application
     * you should change it using {@link ExecutionConfig#setAutoWatermarkInterval(long)}.
     *
     * @param characteristic The time characteristic.
     */
    @PublicEvolving
    public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
        this.timeCharacteristic = Preconditions.checkNotNull(characteristic);
        if (characteristic == TimeCharacteristic.ProcessingTime) {
            getConfig().setAutoWatermarkInterval(0);
        } else {
            getConfig().setAutoWatermarkInterval(200);
        }
    }

Periodic Watermarks跟踪

  • 因为Periodic Watermarks允许设定一个最大乱序时间,这种情况应用最多。
package github.yahuili1128.watermark;

import github.yahuili1128.pojo.MockUpModel;
import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import javax.annotation.Nullable;
import java.text.SimpleDateFormat;
import java.util.Iterator;
import java.util.TreeSet;

import static github.yahuili1128.connector.SourceKafka010.getMockUpkafka010;

/**
 * @Description : 从kafka中读取数据,练习watermark
 * @Author : LiYahui
 * @Date : 2019-08-06 11:45
 * @Version : V1.0
 */
public class PeriodicWatermarkTest {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        SingleOutputStreamOperator<MockUpModel> mockUpkafka010 = getMockUpkafka010(env).name("kafka source");
        SingleOutputStreamOperator<String> result = mockUpkafka010.filter(line -> line.gender.equals("male"))
                .assignTimestampsAndWatermarks(new GetWateramrk()).keyBy(line -> line.gender)
                .window(TumblingEventTimeWindows.of(Time.seconds(10))).apply(new WindowTest()).name("watermark print");

        result.print();

        env.execute(PeriodicWatermarkTest.class.getSimpleName());
    }

    public static class WindowTest implements WindowFunction<MockUpModel, String, String, TimeWindow> {

        @Override
        public void apply(String key, TimeWindow window, Iterable<MockUpModel> input, Collector<String> out)
                throws Exception {
            TreeSet<Long> set = new TreeSet<>();
            //          元素个数
            int size = Iterables.size(input);
            Iterator<MockUpModel> eles = input.iterator();
            while (eles.hasNext()) {
                set.add(eles.next().timestamp);
            }
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
            //(code,窗口内元素个数,窗口内最早元素的时间,窗口内最晚元素的时间,窗口自身开始时间,窗口自身结束时间)
            String first = sdf.format(set.first());
            String last = sdf.format(set.last());
            String start = sdf.format(window.getStart());
            String end = sdf.format(window.getEnd());
            // 调试使用
            out.collect("event.key:" + key + ",window中元素个数:" + size + ",window第一个元素时间戳:" + first + ",window最后一个元素时间戳:"
                    + last + ",window开始时间戳:" + start + ",window结束时间戳:" + end + ",窗口内所有的时间戳:" + set.toString());

        }
    }


    public static class GetWateramrk implements AssignerWithPeriodicWatermarks<MockUpModel> {
        //      定义最大延迟 2s
        private final long maxOutOfOrderness = 5000L;
        private long currentMaxTimestamp;
        private Watermark watermark;

        //      将时间戳信息格式化,调试学习
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

        @Nullable
        @Override
        public Watermark getCurrentWatermark() {
            watermark = new Watermark(this.currentMaxTimestamp - this.maxOutOfOrderness);
            return watermark;
        }

        @Override
        public long extractTimestamp(MockUpModel element, long previousElementTimestamp) {
            //      获取event中的时间戳
            long timestamp = element.getTimestamp();
            currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
            //          将所有的时间信息打印
            System.out.println("--->>> event.key:" + element.gender + " | event中timestamp:" + timestamp + "| " + sdf
                    .format(timestamp) + "| currentMaxTimestamp:" + currentMaxTimestamp + "| " + sdf
                    .format(currentMaxTimestamp) + "| watermark" + watermark.toString());
            //          返回event中的时间戳
            return timestamp;
        }
    }

}

  • print数据案例
--->>> event.key:male | event中timestamp:1565082033747| 2019-08-06 17:00:33.747| currentMaxTimestamp:1565082033747| 2019-08-06 17:00:33.747| watermarkWatermark @ -5000
--->>> event.key:male | event中timestamp:1565082036758| 2019-08-06 17:00:36.758| currentMaxTimestamp:1565082036758| 2019-08-06 17:00:36.758| watermarkWatermark @ -5000
--->>> event.key:male | event中timestamp:1565082038778| 2019-08-06 17:00:38.778| currentMaxTimestamp:1565082038778| 2019-08-06 17:00:38.778| watermarkWatermark @ -5000
--->>> event.key:male | event中timestamp:1565082037791| 2019-08-06 17:00:37.791| currentMaxTimestamp:1565082038778| 2019-08-06 17:00:38.778| watermarkWatermark @ -5000
--->>> event.key:male | event中timestamp:1565082038803| 2019-08-06 17:00:38.803| currentMaxTimestamp:1565082038803| 2019-08-06 17:00:38.803| watermarkWatermark @ -5000
--->>> event.key:male | event中timestamp:1565082039815| 2019-08-06 17:00:39.815| currentMaxTimestamp:1565082039815| 2019-08-06 17:00:39.815| watermarkWatermark @ -5000
--->>> event.key:male | event中timestamp:1565082041839| 2019-08-06 17:00:41.839| currentMaxTimestamp:1565082041839| 2019-08-06 17:00:41.839| watermarkWatermark @ -5000
--->>> event.key:male | event中timestamp:1565082044850| 2019-08-06 17:00:44.850| currentMaxTimestamp:1565082044850| 2019-08-06 17:00:44.850| watermarkWatermark @ -5000
--->>> event.key:male | event中timestamp:1565082046869| 2019-08-06 17:00:46.869| currentMaxTimestamp:1565082046869| 2019-08-06 17:00:46.869| watermarkWatermark @ 1565082039850
8> event.key:male,window中元素个数:6,window第一个元素时间戳:2019-08-06 17:00:33.747,window最后一个元素时间戳:2019-08-06 17:00:39.815,window开始时间戳:2019-08-06 17:00:30.000,window结束时间戳:2019-08-06 17:00:40.000,窗口内所有的时间戳:[1565082033747, 1565082036758, 1565082037791, 1565082038778, 1565082038803, 1565082039815]
--->>> event.key:male | event中timestamp:1565082047880| 2019-08-06 17:00:47.880| currentMaxTimestamp:1565082047880| 2019-08-06 17:00:47.880| watermarkWatermark @ 1565082041869
--->>> event.key:male | event中timestamp:1565082046890| 2019-08-06 17:00:46.890| currentMaxTimestamp:1565082047880| 2019-08-06 17:00:47.880| watermarkWatermark @ 1565082042880
--->>> event.key:male | event中timestamp:1565082049900| 2019-08-06 17:00:49.900| currentMaxTimestamp:1565082049900| 2019-08-06 17:00:49.900| watermarkWatermark @ 1565082042880
--->>> event.key:male | event中timestamp:1565082051921| 2019-08-06 17:00:51.921| currentMaxTimestamp:1565082051921| 2019-08-06 17:00:51.921| watermarkWatermark @ 1565082044900
--->>> event.key:male | event中timestamp:1565082050934| 2019-08-06 17:00:50.934| currentMaxTimestamp:1565082051921| 2019-08-06 17:00:51.921| watermarkWatermark @ 1565082046921
--->>> event.key:male | event中timestamp:1565082051945| 2019-08-06 17:00:51.945| currentMaxTimestamp:1565082051945| 2019-08-06 17:00:51.945| watermarkWatermark @ 1565082046921
--->>> event.key:male | event中timestamp:1565082052955| 2019-08-06 17:00:52.955| currentMaxTimestamp:1565082052955| 2019-08-06 17:00:52.955| watermarkWatermark @ 1565082046945
--->>> event.key:male | event中timestamp:1565082055965| 2019-08-06 17:00:55.965| currentMaxTimestamp:1565082055965| 2019-08-06 17:00:55.965| watermarkWatermark @ 1565082047955
8> event.key:male,window中元素个数:6,window第一个元素时间戳:2019-08-06 17:00:41.839,window最后一个元素时间戳:2019-08-06 17:00:49.900,window开始时间戳:2019-08-06 17:00:40.000,window结束时间戳:2019-08-06 17:00:50.000,窗口内所有的时间戳:[1565082041839, 1565082044850, 1565082046869, 1565082046890, 1565082047880, 1565082049900]
--->>> event.key:male | event中timestamp:1565082056983| 2019-08-06 17:00:56.983| currentMaxTimestamp:1565082056983| 2019-08-06 17:00:56.983| watermarkWatermark @ 1565082050965
--->>> event.key:male | event中timestamp:1565082055993| 2019-08-06 17:00:55.993| currentMaxTimestamp:1565082056983| 2019-08-06 17:00:56.983| watermarkWatermark @ 1565082051983
--->>> event.key:male | event中timestamp:1565082059005| 2019-08-06 17:00:59.005| currentMaxTimestamp:1565082059005| 2019-08-06 17:00:59.005| watermarkWatermark @ 1565082051983
--->>> event.key:male | event中timestamp:1565082061028| 2019-08-06 17:01:01.028| currentMaxTimestamp:1565082061028| 2019-08-06 17:01:01.028| watermarkWatermark @ 1565082054005
--->>> event.key:male | event中timestamp:1565082062036| 2019-08-06 17:01:02.036| currentMaxTimestamp:1565082062036| 2019-08-06 17:01:02.036| watermarkWatermark @ 1565082056028
  • 为什么watermark会出现-5000
    • AssignerWithPeriodicWatermarks子类是每隔一段时间执行的,这个具体由ExecutionConfig.setAutoWatermarkInterval设置,默认是200ms,之所以会出现-5000时因为你没有数据进入窗口,当然一直都是-5000,但是getCurrentWatermark方法不是在执行extractTimestamp后才执行。

结论

  • window的触发要符合以下几个条件:
    • watermark时间 >= window_end_time
    • 在[window_start_time,window_end_time)中有数据存在;
  • 同时满足了以上2个条件,window才会触发。
  • watermark是一个全局的值,不是某一个key下的值,所以即使不是同一个key的数据,其warmark也会增加.
  • 这个部分的知识点需要细细的理解一下;

相关文章

  • Flink--WaterMark理解和实践

    基于flink-1.8.1 Watermark作用 watermark是用于处理乱序事件的,而正确的处理乱序事件,...

  • 怎样为知识定标签?2020-11-20

    我们填充知识体系时,为每个知识确定一个标签,可方便我们管理。 上次写到知识分为记忆、理解和实践三类,而理解和实践知...

  • 理论

    理论上的理解和贯通,是学习实践的关键方法,理论指导思想,思想指导实践

  • KVC & KVO 的实践和理解

    1.KVC 部分 KVC全称是Key Value Coding,KVC提供了一种间接访问其属性方法或成员变量的机制...

  • 拆书的理解和实践

    书籍:这样读书就对了 作者:赵周 阅读:全文 用两天的时间读完了这本书,感觉知识量比较多;本书中介绍的读书方法于我...

  • 实例1 -- 温度转换

    建议:实践,实践,实践!!!自己敲代码!!! 问题分析 理解一:直接转换 理解二:将温度信息发布的声音或图像形式进...

  • *【需要升入理解】React deeper into Compo

    这里涉及很多去优化app的内容,在实践不足的情况下理解会比较局限,最好可以在反复实践过程中回头理解和尝试优化自己的...

  • 批判性思维4-批判性阅读的方法和实践

    批判性阅读的方法和实践 #批判性阅读的方法和技术 ##批判性阅读的阶段, 目的和任务 1.理解阶段: 目的: 理解...

  • 读书笔记- patterns and Agile Adoptio

    书中内容没有太多新意,但是帮助启发和建立敏捷实践的结构化知识图谱, 更好的表述和理解敏捷实践。 在这个角度可以推荐...

  • 【Ajax(技术方案)】Ajax理解和实践

    关于Ajax Ajax是对Asynchronous Javascript +XML的简写,它的诞生使得向服务器请求...

网友评论

    本文标题:Flink--WaterMark理解和实践

    本文链接:https://www.haomeiwen.com/subject/naavdctx.html