博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
flink之Timer定时器
阅读量:3958 次
发布时间:2019-05-24

本文共 4177 字,大约阅读时间需要 13 分钟。

一、Timer简介

Timer(定时器)是Flink Streaming API提供的用于感知并利用处理时间/事件时间变化的机制。

最常见的使用Timer的地方就是KeyedProcessFunction。我们在其processElement()方法中注册Timer,然后覆写其onTimer()方法作为Timer触发时的回调逻辑。根据时间特征的不同:

(1)处理时间——调用Context.timerService().registerProcessingTimeTimer()注册;onTimer()在系统时间戳达到Timer设定的时间戳时触发。

(2)事件时间——调用Context.timerService().registerEventTimeTimer()注册;onTimer()在Flink内部水印达到或超过Timer设定的时间戳时触发。

二、使用demo

demo功能为利用mapState进行wordCount,利用Timer定期对每2秒对state清除

(1)程序入口

package testOntimer;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/** * @程序功能:读取文件,进行wordCount,利用state测试onTimer定时器 * @author gaoj * @Created_in 2020-12-08 */public class Driver {    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env            .addSource(new Source())            .flatMap(new Map())            .keyBy(t -> t._1)            .process(new Process())            .print();        env.execute();    }}

(2)程序数据源

package testOntimer;import org.apache.commons.lang3.StringUtils;import org.apache.flink.streaming.api.functions.source.RichSourceFunction;import java.io.BufferedReader;import java.io.FileReader;import java.util.concurrent.TimeUnit;public class Source extends RichSourceFunction
{ private Boolean isRunning = true; @Override public void run(SourceContext
ctx) throws Exception { BufferedReader bufferedReader = new BufferedReader(new FileReader("D:\\test.txt")); while (isRunning){ String line = bufferedReader.readLine(); if(StringUtils.isNotBlank(line)){ ctx.collect(line); } TimeUnit.SECONDS.sleep(10); } } @Override public void cancel() { isRunning = false; }}

(3)数据源转换

package testOntimer;import org.apache.flink.api.common.functions.RichFlatMapFunction;import org.apache.flink.util.Collector;import scala.Tuple2;public class Map extends RichFlatMapFunction
> { @Override public void flatMap(String value, Collector
> out) throws Exception { String[] split = value.split(","); for (String s : split) { out.collect(new Tuple2<>(s,1)); } }}

(4)进行wordCount

package testOntimer;import org.apache.flink.api.common.state.MapState;import org.apache.flink.api.common.state.MapStateDescriptor;import org.apache.flink.api.common.state.ValueState;import org.apache.flink.api.common.state.ValueStateDescriptor;import org.apache.flink.api.common.typeinfo.TypeHint;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.KeyedProcessFunction;import org.apache.flink.util.Collector;import scala.Tuple2;import java.util.Calendar;public class Process extends KeyedProcessFunction
,Tuple2
> { private transient MapState
mapState; private transient ValueState
valueState; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); MapStateDescriptor
mapStateDescriptor = new MapStateDescriptor<>("valueStateDesc", TypeInformation.of(new TypeHint
() {}), TypeInformation.of(new TypeHint
() {})); mapState = getRuntimeContext().getMapState(mapStateDescriptor); ValueStateDescriptor
ValueStateDescriptor = new ValueStateDescriptor<>("phoneCount", TypeInformation.of(new TypeHint
() { })); valueState=getRuntimeContext().getState(ValueStateDescriptor ); } @Override public void processElement(Tuple2
value, Context ctx, Collector
> out) throws Exception { if (valueState.value()==null){ ctx.timerService().registerProcessingTimeTimer(System.currentTimeMillis()+2000); } int i = mapState.contains(value._1) ? mapState.get(value._1) : 0; int i1 = i + 1; mapState.put(value._1,i1); out.collect(new Tuple2<>(value._1,i1)); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector
> out) throws Exception { mapState.clear(); valueState.clear(); } @Override public void close() throws Exception { super.close(); }}

转载地址:http://timzi.baihongyu.com/

你可能感兴趣的文章
ctfhub 投稿彩蛋
查看>>
【Shiro_exploit】PYTHON报错解决:ModuleNotFoundError: No module named 'requests'
查看>>
一次很折腾的扩容,记录一下之后再整理
查看>>
VirtualBox虚拟机网络配置
查看>>
oracle vm virtualbox虚拟机下,CentOS7系统网络配置
查看>>
Windows 10下Docker使用经验谈
查看>>
centos下nmap安装和基础命令
查看>>
ubuntu出现有线已连接却无法上网
查看>>
一句话命令
查看>>
解决Linux CentOS中cp -f 复制强制覆盖的命令无效的方法
查看>>
wdcpv3升级到v3.2后,多PHP版本共存的安装方法
查看>>
centos tar压缩与解压缩
查看>>
Centos 7防火墙firewalld/iptables开放80端口
查看>>
centos 7 yum源文件配置详解及163 yum源更换
查看>>
PHP统计当前网站的访问人数,访问信息,被多少次访问。
查看>>
Windows10远程报错CredSSP加密oracle修正
查看>>
Windows server 2016 设置多用户登陆
查看>>
偶然发现的面包屑
查看>>
每天自动升级你的Centos
查看>>
WDCP v3版本的小工具集
查看>>