本文共 4177 字,大约阅读时间需要 13 分钟。
Timer(定时器)是Flink Streaming API提供的用于感知并利用处理时间/事件时间变化的机制。
最常见的使用Timer的地方就是KeyedProcessFunction。我们在其processElement()方法中注册Timer,然后覆写其onTimer()方法作为Timer触发时的回调逻辑。根据时间特征的不同:
(1)处理时间——调用Context.timerService().registerProcessingTimeTimer()注册;onTimer()在系统时间戳达到Timer设定的时间戳时触发。
(2)事件时间——调用Context.timerService().registerEventTimeTimer()注册;onTimer()在Flink内部水印达到或超过Timer设定的时间戳时触发。demo功能为利用mapState进行wordCount,利用Timer定期对每2秒对state清除
(1)程序入口package testOntimer;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/** * @程序功能:读取文件,进行wordCount,利用state测试onTimer定时器 * @author gaoj * @Created_in 2020-12-08 */public class Driver { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env .addSource(new Source()) .flatMap(new Map()) .keyBy(t -> t._1) .process(new Process()) .print(); env.execute(); }}
(2)程序数据源
package testOntimer;import org.apache.commons.lang3.StringUtils;import org.apache.flink.streaming.api.functions.source.RichSourceFunction;import java.io.BufferedReader;import java.io.FileReader;import java.util.concurrent.TimeUnit;public class Source extends RichSourceFunction{ private Boolean isRunning = true; @Override public void run(SourceContext ctx) throws Exception { BufferedReader bufferedReader = new BufferedReader(new FileReader("D:\\test.txt")); while (isRunning){ String line = bufferedReader.readLine(); if(StringUtils.isNotBlank(line)){ ctx.collect(line); } TimeUnit.SECONDS.sleep(10); } } @Override public void cancel() { isRunning = false; }}
(3)数据源转换
package testOntimer;import org.apache.flink.api.common.functions.RichFlatMapFunction;import org.apache.flink.util.Collector;import scala.Tuple2;public class Map extends RichFlatMapFunction> { @Override public void flatMap(String value, Collector > out) throws Exception { String[] split = value.split(","); for (String s : split) { out.collect(new Tuple2<>(s,1)); } }}
(4)进行wordCount
package testOntimer;import org.apache.flink.api.common.state.MapState;import org.apache.flink.api.common.state.MapStateDescriptor;import org.apache.flink.api.common.state.ValueState;import org.apache.flink.api.common.state.ValueStateDescriptor;import org.apache.flink.api.common.typeinfo.TypeHint;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.KeyedProcessFunction;import org.apache.flink.util.Collector;import scala.Tuple2;import java.util.Calendar;public class Process extends KeyedProcessFunction,Tuple2 > { private transient MapState mapState; private transient ValueState valueState; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); MapStateDescriptor mapStateDescriptor = new MapStateDescriptor<>("valueStateDesc", TypeInformation.of(new TypeHint () {}), TypeInformation.of(new TypeHint () {})); mapState = getRuntimeContext().getMapState(mapStateDescriptor); ValueStateDescriptor ValueStateDescriptor = new ValueStateDescriptor<>("phoneCount", TypeInformation.of(new TypeHint () { })); valueState=getRuntimeContext().getState(ValueStateDescriptor ); } @Override public void processElement(Tuple2 value, Context ctx, Collector > out) throws Exception { if (valueState.value()==null){ ctx.timerService().registerProcessingTimeTimer(System.currentTimeMillis()+2000); } int i = mapState.contains(value._1) ? mapState.get(value._1) : 0; int i1 = i + 1; mapState.put(value._1,i1); out.collect(new Tuple2<>(value._1,i1)); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector > out) throws Exception { mapState.clear(); valueState.clear(); } @Override public void close() throws Exception { super.close(); }}
转载地址:http://timzi.baihongyu.com/