Ch11-Flink 之 Watermark

Ch11-Flink 之 Watermark

January 9, 2021
Apache Flink
flink

Flink 之 watermark

1. Flink 中的时间 #

在做实时计算的时候,首先就需要搞清楚一个问题,这个实时到底是怎么样的一个时间概念。在 Flink 中,总共有 3 种时间概念,分别是事件时间(Event time)处理时间(Processing time)接入时间(Ingestion time).

flink-time

  • 事件时间 (Event time) 就是真实的用户发生操作的时候所产生的时间,对应到 flink 中,需要用户显式的告诉 flink 到底每个输入中的哪一个字段代表这个事件时间。
  • 接入时间 (Ingestion time) 和处理时间 (Processing time) 是不需要用户去指定的,flink 自己会去处理这个时间。接入时间的代表的是一个事件通过 source Operator 的时间,相比于 event time, ingestion time 不能处理乱序事件,因此也就不用生成对应的 watermark. 处理时间是指事件在操作算子计算过程中获取到的所在主机的时间。processing time 适合用于时间计算精度要求不是特别高的计算场景,例如统计某些延时非常高的日志数据。

2. watermark 分类 #

2.1 单分区 watermark #

watermark 这个概念在 flink 中是与 event time 这个时间概念相互依存的,其目的是为了解决数据乱序到达和系统延迟的问题。flink 会把读取进系统的最新事件时间减去固定的时间间隔作为 watermark。还是用一张图来解释 watermark 的作用。

flink-watermark

当事件进入 flink 中的时候,根据提取的 event time 产生 watermark 时间戳,记为 X, 进入 flink 中的 event time 记为 Y. 当窗口的 end time < X 的时候,则触发窗口计算结果并输出。只要 X < end time, 那么 事件就可以 一直进入到当前窗口中,这样的话即便发生乱序,也可以在窗口中调整。调整的方法就是按照 Y.

用伪代码描述大概如下

if (x > windowEndtime) {
    compute();
} else {
    pushToWindow();
}

2.2 多分区 watermark #

flink-multi-watermark

具有两个或多个输入流(如 Union 或 CoFlatMap)的算子任务也会以所有分区 watermark 的最小值作为事件时间时钟。它们并不区分不同输入流的分区 watermark,所以两个输入流的数据都是基于相同的事件时间时钟进行处理的。

3. 使用 watermark #

3.1 在 Source Function 中 直接指定 Timestamps 和 Watermark #

DataStream<Tuple3<Long, String, Integer>> stream = env.addSource(
    new SourceFunction<Tuple3<Long, String, Integer>>() {
        @Override
        public void run(SourceContext<Tuple3<Long, String, Integer>> ctx) throws Exception {
            for (Tuple3<Long, String, Integer> item : list) {
                long timestamp = item.f0;

                ctx.collectWithTimestamp(item, timestamp);
                ctx.emitWatermark(new Watermark(timestamp - 3));

            }
        }

        @Override
        public void cancel() {

        }
    }
);

3.2.1 BoundedOutOfOrdernessTimestampExtractor #

DataStream<Tuple3<Long, String, Integer>> stream = env.fromCollection(list)
    .assignTimestampsAndWatermarks(
        new BoundedOutOfOrdernessTimestampExtractor<Tuple3<Long, String, Integer>>(Time.seconds(3)) {
            @Override
            public long extractTimestamp(Tuple3<Long, String, Integer> element) {
                return element.f0;
            }
        }
    );

3.2.2 AscendingTimestampExtractor #

DataStream<Tuple3<Long, String, Integer>> stream = env.fromCollection(list)
    .assignTimestampsAndWatermarks(
        new AscendingTimestampExtractor<Tuple3<Long, String, Integer>>() {
            @Override
            public long extractAscendingTimestamp(Tuple3<Long, String, Integer> element) {
                return element.f0;
            }
        }

    );

3.2.3 自定义 Timestamp Assigner 和 Watermark Generator #

注意: 下述两种 watermark 在 Flink 1.11 中已经废弃

class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<Tuple3<Long, String, Integer>> {
    private long maxOutOfOrderness;

    private long currentMaxTimestamp;

    BoundedOutOfOrdernessGenerator(long maxOutOfOrderness) {
        this.maxOutOfOrderness = maxOutOfOrderness;
    }

    @Override
    public long extractTimestamp(Tuple3<Long, String, Integer> element, long previousElementTimestamp) {
        long timestamp = element.f0;
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
        return timestamp;
    }

    @Override
    public Watermark getCurrentWatermark() {
        // return the watermark as current highest timestamp minus the out-of-orderness bound
        return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
    }
}
class PunctuatedAssigner implements AssignerWithPunctuatedWatermarks<Tuple3<Long, String, Integer>> {
    @Nullable
    @Override
    public Watermark checkAndGetNextWatermark(Tuple3<Long, String, Integer> lastElement, long extractedTimestamp) {
        return new Watermark(extractedTimestamp);
    }

    @Override
    public long extractTimestamp(Tuple3<Long, String, Integer> element, long previousElementTimestamp) {
        return element.f0;
    }
}

4. 参考文献 #