博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊flink的Sliding Window
阅读量:6638 次
发布时间:2019-06-25

本文共 6573 字,大约阅读时间需要 21 分钟。

本文主要研究一下flink的Sliding Window

SlidingEventTimeWindows

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java

@PublicEvolvingpublic class SlidingEventTimeWindows extends WindowAssigner
{ private static final long serialVersionUID = 1L; private final long size; private final long slide; private final long offset; protected SlidingEventTimeWindows(long size, long slide, long offset) { if (offset < 0 || offset >= slide || size <= 0) { throw new IllegalArgumentException("SlidingEventTimeWindows parameters must satisfy 0 <= offset < slide and size > 0"); } this.size = size; this.slide = slide; this.offset = offset; } @Override public Collection
assignWindows(Object element, long timestamp, WindowAssignerContext context) { if (timestamp > Long.MIN_VALUE) { List
windows = new ArrayList<>((int) (size / slide)); long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide); for (long start = lastStart; start > timestamp - size; start -= slide) { windows.add(new TimeWindow(start, start + size)); } return windows; } else { throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " + "Is the time characteristic set to 'ProcessingTime', or did you forget to call " + "'DataStream.assignTimestampsAndWatermarks(...)'?"); } } public long getSize() { return size; } public long getSlide() { return slide; } @Override public Trigger
getDefaultTrigger(StreamExecutionEnvironment env) { return EventTimeTrigger.create(); } @Override public String toString() { return "SlidingEventTimeWindows(" + size + ", " + slide + ")"; } public static SlidingEventTimeWindows of(Time size, Time slide) { return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), 0); } public static SlidingEventTimeWindows of(Time size, Time slide, Time offset) { return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), offset.toMilliseconds() % slide.toMilliseconds()); } @Override public TypeSerializer
getWindowSerializer(ExecutionConfig executionConfig) { return new TimeWindow.Serializer(); } @Override public boolean isEventTime() { return true; }}复制代码
  • SlidingEventTimeWindows继承了Window,其中元素类型为Object,而窗口类型为TimeWindow;它有三个参数,一个是size,一个是slide,一个是offset,其中offset必须大于等于0,offset必须大于slide,size必须大于0
  • assignWindows方法以slide作为size通过TimeWindow.getWindowStartWithOffset(timestamp, offset, slide)计算lastStart,然后以为start + size > timestamp为循环条件,每次对start减去slide,挨个计算TimeWindow(start, start + size);getDefaultTrigger方法返回的是EventTimeTrigger;getWindowSerializer方法返回的是TimeWindow.Serializer();isEventTime返回的为true
  • SlidingEventTimeWindows提供了of静态工厂方法,可以指定size、slide及offset参数,它对于传入的offset参数转为毫秒然后与slide.toMilliseconds()取余作为最后的offset值

SlidingProcessingTimeWindows

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java

public class SlidingProcessingTimeWindows extends WindowAssigner
{ private static final long serialVersionUID = 1L; private final long size; private final long offset; private final long slide; private SlidingProcessingTimeWindows(long size, long slide, long offset) { if (offset < 0 || offset >= slide || size <= 0) { throw new IllegalArgumentException("SlidingProcessingTimeWindows parameters must satisfy 0 <= offset < slide and size > 0"); } this.size = size; this.slide = slide; this.offset = offset; } @Override public Collection
assignWindows(Object element, long timestamp, WindowAssignerContext context) { timestamp = context.getCurrentProcessingTime(); List
windows = new ArrayList<>((int) (size / slide)); long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide); for (long start = lastStart; start > timestamp - size; start -= slide) { windows.add(new TimeWindow(start, start + size)); } return windows; } public long getSize() { return size; } public long getSlide() { return slide; } @Override public Trigger
getDefaultTrigger(StreamExecutionEnvironment env) { return ProcessingTimeTrigger.create(); } @Override public String toString() { return "SlidingProcessingTimeWindows(" + size + ", " + slide + ")"; } public static SlidingProcessingTimeWindows of(Time size, Time slide) { return new SlidingProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), 0); } public static SlidingProcessingTimeWindows of(Time size, Time slide, Time offset) { return new SlidingProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), offset.toMilliseconds() % slide.toMilliseconds()); } @Override public TypeSerializer
getWindowSerializer(ExecutionConfig executionConfig) { return new TimeWindow.Serializer(); } @Override public boolean isEventTime() { return false; }}复制代码
  • SlidingProcessingTimeWindows继承了Window,其中元素类型为Object,而窗口类型为TimeWindow;它有三个参数,一个是size,一个是slide,一个是offset,其中offset必须大于等于0,offset必须大于slide,size必须大于0
  • assignWindows方法以slide作为size通过TimeWindow.getWindowStartWithOffset(timestamp, offset, slide)计算lastStart(与SlidingEventTimeWindows不同的是SlidingProcessingTimeWindows的这个方法里头使用context.getCurrentProcessingTime()值重置了timestamp),然后以为start + size > timestamp为循环条件,每次对start减去slide,挨个计算TimeWindow(start, start + size);getDefaultTrigger方法返回的是ProcessingTimeTrigger;getWindowSerializer方法返回的是TimeWindow.Serializer();isEventTime返回的为false
  • SlidingEventTimeWindows提供了of静态工厂方法,可以指定size、slide及offset参数,它对于传入的offset参数转为毫秒然后与slide.toMilliseconds()取余作为最后的offset值

小结

  • flink的Sliding Window分为SlidingEventTimeWindows及SlidingProcessingTimeWindows,它们都继承了WindowAssigner,其中元素类型为Object,而窗口类型为TimeWindow;它有三个参数,一个是size,一个是slide,一个是offset,其中offset必须大于等于0,offset必须大于slide,size必须大于0
  • WindowAssigner定义了assignWindows、getDefaultTrigger、getWindowSerializer、isEventTime这几个抽象方法,同时定义了抽象静态类WindowAssignerContext;它有两个泛型,其中T为元素类型,而W为窗口类型;SlidingEventTimeWindows及SlidingProcessingTimeWindows的窗口类型为TimeWindow,它有start及end属性,其中start为inclusive,而end为exclusive,maxTimestamp返回的是end-1,它还提供了mergeWindows及getWindowStartWithOffset静态方法;前者用于合并重叠的时间窗口,后者用于获取指定timestamp、offset、windowSize的window start
  • SlidingEventTimeWindows及SlidingProcessingTimeWindows的不同在于assignWindows、getDefaultTrigger、isEventTime方法;前者assignWindows使用的是参数中的timestamp,而后者使用的是context.getCurrentProcessingTime();前者的getDefaultTrigger返回的是EventTimeTrigger,而后者返回的是ProcessingTimeTrigger;前者isEventTime方法返回的为true,而后者返回的为false

doc

转载地址:http://jxsvo.baihongyu.com/

你可能感兴趣的文章
CF Exam (数学)
查看>>
这家素食店的牙签能吃——天厨妙香
查看>>
在UnrealEngine中用Custom节点实现毛玻璃的效果
查看>>
冒泡排序分析
查看>>
ORACLE数据库创建用户名和表空间
查看>>
oracle的监听器启动后一会儿便停止 再启动报错
查看>>
v4l2API无法执行VIDIOC_DQBUF的问题
查看>>
jdbc连接sqlsever2017的第一步
查看>>
PHP实现文件下载
查看>>
Echarts X轴时间类型
查看>>
Leangoo到底好在哪里?
查看>>
Firefly自动售货机解决方案
查看>>
操作系统的内存对齐机制学习笔记
查看>>
5款最好的免费Linux缓存系统
查看>>
Web开发者必知的12款jQuery插件
查看>>
一起谈.NET技术,.NET简谈观察者模式
查看>>
艾伟也谈项目管理,我眼中的DevOps
查看>>
工作中的经验总结,也是教训
查看>>
Java并发编程:并发容器之CopyOnWriteArrayList
查看>>
20165232 测试1
查看>>