Apache FlinkCEP 实现超时状态监控的步骤详解
cep - complex event processing复杂事件处理。
apache flinkcep api
datastream 一般由相同类型事件或元素组成,一个datastream可以通过一系列的转换操作如filter、map等转换为另一个datastream。
patternstream 是对cep模式匹配的流的抽象,把datastream和pattern组合在一块,然后对外提供select和flatselect等方法。patternstream并不是datastream,它提供方法把匹配的模式序列和与其相关联的事件组成的映射(就是map<模式名称,list<事件>>)发出去,发到singleoutputstreamoperator里面,singleoutputstreamoperator是datastream。
public static <in, out> singleoutputstreamoperator <out> createpatternstream(...){...} public static <in, out1, out2> singleoutputstreamoperator <out1> createtimeoutpatternstream(...){...} final singleoutputstreamoperator <out> patternstream;
@public public class singleoutputstreamoperator <t> extends datastream <t> {...}
patternstream ( final datastream <t> inputstream, final pattern <t, ?> pattern) { this .inputstream = inputstream; this .pattern = pattern; this .comparator = null ; } patternstream ( final datastream <t> inputstream, final pattern <t, ?> pattern, final eventcomparator <t> comparator) { this .inputstream = inputstream; this .pattern = pattern; this .comparator = comparator; }
pattern是模式定义的base class,builder模式,定义好的模式会被nfacompiler用来生成nfa。
public class pattern <t, f extends t> { /** 模式名称 */ private final string name; /** 前面一个模式 */ private final pattern <t, ? extends t> previous; /** 一个事件如果要被当前模式匹配到,必须满足的约束条件 */ private iterativecondition <f> condition; /** 时间窗口长度,在时间长度内进行模式匹配 */ private time windowtime; /** 模式量词,意思是一个模式匹配几个事件等 默认是匹配到一个 */ private quantifier quantifier = quantifier .one( consumingstrategy .strict); /** 停止将事件收集到循环状态时,事件必须满足的条件 */ private iterativecondition <f> untilcondition; /** * 适用于{@code times}模式,用来维护模式里事件可以连续发生的次数 */ private times times; // 匹配到事件之后的跳过策略 private final aftermatchskipstrategy aftermatchskipstrategy; ... }
public class quantifier { ... /** * 5个属性,可以组合,但并非所有的组合都是有效的 */ public enum quantifierproperty { single, looping, times, optional, greedy } /** * 描述在此模式中匹配哪些事件的策略 */ public enum consumingstrategy { strict, skip_till_next, skip_till_any, not_follow, not_next } /** * 描述当前模式里事件可以连续发生的次数;举个例子,模式条件无非就是boolean,满足true条件的事件连续出现times次,或者一个次数范围,比如2~4次,2次,3次,4次都会被当前模式匹配出来,因此同一个事件会被重复匹配到 */ public static class times { private final int from; private final int to; private times ( int from, int to) { preconditions .checkargument(from > 0 , "the from should be a positive number greater than 0." ); preconditions .checkargument(to >= from, "the to should be a number greater than or equal to from: " + from + "." ); this .from = from; this .to = to; } public int getfrom() { return from; } public int getto() { return to; } // 次数范围 public static times of( int from, int to) { return new times (from, to); } // 指定具体次数 public static times of( int times) { return new times (times, times); } @override public boolean equals( object o) { if ( this == o) { return true ; } if (o == null || getclass() != o.getclass()) { return false ; } times times = ( times ) o; return from == times.from && to == times.to; } @override public int hashcode() { return objects .hash(from, to); } } ... }
public interface eventcomparator <t> extends comparator <t>, serializable { long serialversionuid = 1l ; }
public class nfacompiler { ... /** * nfafactory 创建nfa的接口 * * @param <t> type of the input events which are processed by the nfa */ public interface nfafactory <t> extends serializable { nfa<t> createnfa(); } /** * nfafactory的具体实现nfafactoryimpl * * <p>the implementation takes the input type serializer, the window time and the set of * states and their transitions to be able to create an nfa from them. * * @param <t> type of the input events which are processed by the nfa */ private static class nfafactoryimpl <t> implements nfafactory <t> { private static final long serialversionuid = 8939783698296714379l ; private final long windowtime; private final collection < state <t>> states; private final boolean timeouthandling; private nfafactoryimpl ( long windowtime, collection < state <t>> states, boolean timeouthandling) { this .windowtime = windowtime; this .states = states; this .timeouthandling = timeouthandling; } @override public nfa<t> createnfa() { // 一个nfa由状态集合、时间窗口的长度和是否处理超时组成 return new nfa<>(states, windowtime, timeouthandling); } } }
nfa:non-deterministic finite automaton - 非确定的有限(状态)自动机。
public class nfa<t> { /** * nfacompiler返回的所有有效的nfa状态集合 * these are directly derived from the user-specified pattern. */ private final map < string , state <t>> states; /** * pattern.within(time)指定的时间窗口长度 */ private final long windowtime; /** * 一个超时匹配的标记 */ private final boolean handletimeout; ... }
public interface patternselectfunction <in, out> extends function , serializable { /** * 从给到的事件映射中生成一个结果。这些事件使用他们关联的模式名称作为唯一标识 */ out select( map < string , list <in>> pattern) throws exception ; }
patternflatselectfunction,不是返回一个out,而是使用collector 把匹配到的事件收集起来。
public interface patternflatselectfunction <in, out> extends function , serializable { /** * 生成一个或多个结果 */ void flatselect( map < string , list <in>> pattern, collector <out> out) throws exception ; }
public class selecttimeoutcepoperator <in, out1, out2, key> extends abstractkeyedceppatternoperator <in, key, out1, selecttimeoutcepoperator . selectwrapper <in, out1, out2>> { private outputtag <out2> timedoutoutputtag; public selecttimeoutcepoperator ( typeserializer <in> inputserializer, boolean isprocessingtime, nfacompiler . nfafactory <in> nfafactory, final eventcomparator <in> comparator, aftermatchskipstrategy skipstrategy, // 参数命名混淆了flat...包括selectwrapper类中的成员命名... patternselectfunction <in, out1> flatselectfunction, patterntimeoutfunction <in, out2> flattimeoutfunction, outputtag <out2> outputtag, outputtag <in> latedataoutputtag) { super ( inputserializer, isprocessingtime, nfafactory, comparator, skipstrategy, new selectwrapper <>(flatselectfunction, flattimeoutfunction), latedataoutputtag); this .timedoutoutputtag = outputtag; } ... } public interface patterntimeoutfunction <in, out> extends function , serializable { out timeout( map < string , list <in>> pattern, long timeouttimestamp) throws exception ; } public interface patternflattimeoutfunction <in, out> extends function , serializable { void timeout( map < string , list <in>> pattern, long timeouttimestamp, collector <out> out) throws exception ; }
public class cep { public static <t> patternstream <t> pattern( datastream <t> input, pattern <t, ?> pattern) { return new patternstream <>(input, pattern); } public static <t> patternstream <t> pattern( datastream <t> input, pattern <t, ?> pattern, eventcomparator <t> comparator) { return new patternstream <>(input, pattern, comparator); } }
public class cepoperatorutils { ... private static <in, out, k> singleoutputstreamoperator <out> createpatternstream( final datastream <in> inputstream, final pattern <in, ?> pattern, final typeinformation <out> outtypeinfo, final boolean timeouthandling, final eventcomparator <in> comparator, final operatorbuilder <in, out> operatorbuilder) { final typeserializer <in> inputserializer = inputstream.gettype().createserializer(inputstream.getexecutionconfig()); // check whether we use processing time final boolean isprocessingtime = inputstream.getexecutionenvironment().getstreamtimecharacteristic() == timecharacteristic . processingtime ; // compile our pattern into a nfafactory to instantiate nfas later on final nfacompiler . nfafactory <in> nfafactory = nfacompiler .compilefactory(pattern, timeouthandling); final singleoutputstreamoperator <out> patternstream; if (inputstream instanceof keyedstream ) { keyedstream <in, k> keyedstream = ( keyedstream <in, k>) inputstream; patternstream = keyedstream.transform( operatorbuilder.getkeyedoperatorname(), outtypeinfo, operatorbuilder.build( inputserializer, isprocessingtime, nfafactory, comparator, pattern.getaftermatchskipstrategy())); } else { keyselector <in, byte > keyselector = new nullbytekeyselector <>(); patternstream = inputstream.keyby(keyselector).transform( operatorbuilder.getoperatorname(), outtypeinfo, operatorbuilder.build( inputserializer, isprocessingtime, nfafactory, comparator, pattern.getaftermatchskipstrategy() )).forcenonparallel(); } return patternstream; } ... }
- in: datasource -> datastream -> transformations -> datastream
- pattern: pattern.begin.where.next.where...times...
- patternstream: cep.pattern(datastream, pattern)
- datastream: patternstream.select(patternselectfunction) patternstream.flatselect(patternselectfunction)
- out: datastream -> transformations -> datastream -> datasink
keyselector <in, byte > keyselector = new nullbytekeyselector <>();
pattern最后调用within设置窗口时间。 如果是对主键进行分组,一个时间窗口内最多只会匹配出一个超时事件,使用patternstream.select(...)就可以了。
- in: datasource -> datastream -> transformations -> datastream -> keyby -> keyedstream
- pattern: pattern.begin.where.next.where...within(time windowtime)
- patternstream: cep.pattern(keyedstream, pattern)
- outputtag: new outputtag(...)
- singleoutputstreamoperator: patternstream.flatselect(outputtag, patternflattimeoutfunction, patternflatselectfunction)
- datastream: singleoutputstreamoperator.getsideoutput(outputtag)
- out: datastream -> transformations -> datastream -> datasink
public class ceptimeouteventjob { private static final string local_kafka_broker = "localhost:9092" ; private static final string group_id = ceptimeouteventjob . class .getsimplename(); private static final string group_topic = group_id; public static void main( string [] args) throws exception { // 参数 parametertool params = parametertool .fromargs(args); streamexecutionenvironment env = streamexecutionenvironment .getexecutionenvironment(); // 使用事件时间 env.setstreamtimecharacteristic( timecharacteristic . eventtime ); env.enablecheckpointing( 5000 ); env.getcheckpointconfig().enableexternalizedcheckpoints( checkpointconfig . externalizedcheckpointcleanup .retain_on_cancellation); env.getconfig().disablesysoutlogging(); env.getconfig().setrestartstrategy( restartstrategies .fixeddelayrestart( 5 , 10000 )); // 不使用pojo的时间 final assignerwithperiodicwatermarks extractor = new ingestiontimeextractor <pojo>(); // 与kafka topic的partition保持一致 env.setparallelism( 3 ); properties kafkaprops = new properties (); kafkaprops.setproperty( "bootstrap.servers" , local_kafka_broker); kafkaprops.setproperty( "group.id" , group_id); // 接入kafka的消息 flinkkafkaconsumer011 <pojo> consumer = new flinkkafkaconsumer011 <>(group_topic, new pojoschema (), kafkaprops); datastream <pojo> pojodatastream = env.addsource(consumer) .assigntimestampsandwatermarks(extractor); pojodatastream.print(); // 根据主键aid分组 即对每一个pojo事件进行匹配检测【不同类型的pojo,可以采用不同的within时间】 // 1. datastream <pojo> keyedpojos = pojodatastream .keyby( "aid" ); // 从初始化到终态-一个完整的pojo事件序列 // 2. pattern <pojo, pojo> completedpojo = pattern .<pojo>begin( "init" ) .where( new simplecondition <pojo>() { private static final long serialversionuid = - 6847788055093903603l ; @override public boolean filter(pojo pojo) throws exception { return "02" .equals(pojo.getastatus()); } }) .followedby( "end" ) // .next("end") .where( new simplecondition <pojo>() { private static final long serialversionuid = - 2655089736460847552l ; @override public boolean filter(pojo pojo) throws exception { return "00" .equals(pojo.getastatus()) || "01" .equals(pojo.getastatus()); } }); // 找出1分钟内【便于测试】都没有到终态的事件aid // 如果针对不同类型有不同within时间,比如有的是超时1分钟,有的可能是超时1个小时 则生成多个patternstream // 3. patternstream <pojo> patternstream = cep.pattern(keyedpojos, completedpojo.within( time .minutes( 1 ))); // 定义侧面输出timedout // 4. outputtag <pojo> timedout = new outputtag <pojo>( "timedout" ) { private static final long serialversionuid = 773503794597666247l ; }; // outputtag<l> timeoutoutputtag, patternflattimeoutfunction<t, l> patternflattimeoutfunction, patternflatselectfunction<t, r> patternflatselectfunction // 5. singleoutputstreamoperator <pojo> timeoutpojos = patternstream.flatselect( timedout, new pojotimedout (), new flatselectnothing () ); // 打印输出超时的pojo // 6.7. timeoutpojos.getsideoutput(timedout).print(); timeoutpojos.print(); env.execute( ceptimeouteventjob . class .getsimplename()); } /** * 把超时的事件收集起来 */ public static class pojotimedout implements patternflattimeoutfunction <pojo, pojo> { private static final long serialversionuid = - 4214641891396057732l ; @override public void timeout( map < string , list <pojo>> map, long l, collector <pojo> collector) throws exception { if ( null != map.get( "init" )) { for (pojo pojoinit : map.get( "init" )) { system .out.println( "timeout init:" + pojoinit.getaid()); collector.collect(pojoinit); } } // 因为end超时了,还没收到end,所以这里是拿不到end的 system .out.println( "timeout end: " + map.get( "end" )); } } /** * 通常什么都不做,但也可以把所有匹配到的事件发往下游;如果是宽松临近,被忽略或穿透的事件就没办法选中发往下游了 * 一分钟时间内走完init和end的数据 * * @param <t> */ public static class flatselectnothing <t> implements patternflatselectfunction <t, t> { private static final long serialversionuid = - 3029589950677623844l ; @override public void flatselect( map < string , list <t>> pattern, collector <t> collector) { system .out.println( "flatselect: " + pattern); } } }
3 > pojo{aid= 'id000-0' , astyle= 'style000-0' , aname= 'name-0' , logtime= 1563419728242 , energy= 529.00 , age= 0 , tt= 2019 - 07 - 18 , astatus= '02' , createtime= null , updatetime= null } 3 > pojo{aid= 'id000-1' , astyle= 'style000-2' , aname= 'name-1' , logtime= 1563419728783 , energy= 348.00 , age= 26 , tt= 2019 - 07 - 18 , astatus= '02' , createtime= null , updatetime= null } 3 > pojo{aid= 'id000-0' , astyle= 'style000-0' , aname= 'name-0' , logtime= 1563419749259 , energy= 492.00 , age= 0 , tt= 2019 - 07 - 18 , astatus= '00' , createtime= null , updatetime= null } flatselect: {init=[pojo{aid= 'id000-0' , astyle= 'style000-0' , aname= 'name-0' , logtime= 1563419728242 , energy= 529.00 , age= 0 , tt= 2019 - 07 - 18 , astatus= '02' , createtime= null , updatetime= null }], end =[pojo{aid= 'id000-0' , astyle= 'style000-0' , aname= 'name-0' , logtime= 1563419749259 , energy= 492.00 , age= 0 , tt= 2019 - 07 - 18 , astatus= '00' , createtime= null , updatetime= null }]} timeout init:id000- 1 3 > pojo{aid= 'id000-1' , astyle= 'style000-2' , aname= 'name-1' , logtime= 1563419728783 , energy= 348.00 , age= 26 , tt= 2019 - 07 - 18 , astatus= '02' , createtime= null , updatetime= null } timeout end : null 3 > pojo{aid= 'id000-2' , astyle= 'style000-0' , aname= 'name-0' , logtime= 1563419829639 , energy= 467.00 , age= 0 , tt= 2019 - 07 - 18 , astatus= '03' , createtime= null , updatetime= null } 3 > pojo{aid= 'id000-2' , astyle= 'style000-0' , aname= 'name-0' , logtime= 1563419841394 , energy= 107.00 , age= 0 , tt= 2019 - 07 - 18 , astatus= '00' , createtime= null , updatetime= null } 3 > pojo{aid= 'id000-3' , astyle= 'style000-0' , aname= 'name-0' , logtime= 1563419967721 , energy= 431.00 , age= 0 , tt= 2019 - 07 - 18 , astatus= '02' , createtime= null , updatetime= null } 3 > pojo{aid= 'id000-3' , astyle= 'style000-2' , aname= 'name-0' , logtime= 1563419979567 , energy= 32.00 , age= 26 , tt= 2019 - 07 - 18 , astatus= '03' , createtime= null , updatetime= null } 3 > pojo{aid= 'id000-3' , astyle= 'style000-2' , aname= 'name-0' , logtime= 1563419993612 , energy= 542.00 , age= 26 , tt= 2019 - 07 - 18 , astatus= '01' , createtime= null , updatetime= null } flatselect: {init=[pojo{aid= 'id000-3' , astyle= 'style000-0' , aname= 'name-0' , logtime= 1563419967721 , energy= 431.00 , age= 0 , tt= 2019 - 07 - 18 , astatus= '02' , createtime= null , updatetime= null }], end =[pojo{aid= 'id000-3' , astyle= 'style000-2' , aname= 'name-0' , logtime= 1563419993612 , energy= 542.00 , age= 26 , tt= 2019 - 07 - 18 , astatus= '01' , createtime= null , updatetime= null }]} 3 > pojo{aid= 'id000-4' , astyle= 'style000-0' , aname= 'name-0' , logtime= 1563420063760 , energy= 122.00 , age= 0 , tt= 2019 - 07 - 18 , astatus= '02' , createtime= null , updatetime= null } 3 > pojo{aid= 'id000-4' , astyle= 'style000-0' , aname= 'name-0' , logtime= 1563420078008 , energy= 275.00 , age= 0 , tt= 2019 - 07 - 18 , astatus= '03' , createtime= null , updatetime= null } timeout init:id000- 4 3 > pojo{aid= 'id000-4' , astyle= 'style000-0' , aname= 'name-0' , logtime= 1563420063760 , energy= 122.00 , age= 0 , tt= 2019 - 07 - 18 , astatus= '02' , createtime= null , updatetime= null } timeout end : null
