欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  网络运营

Apache FlinkCEP 实现超时状态监控的步骤详解

程序员文章站 2022-03-01 20:11:06
  cep - complex event processing复杂事件处理。 订单下单后超过一定时间还未进行支付确认。 打车订单生成后超过一定时间没有确认上车。 外...

Apache FlinkCEP 实现超时状态监控的步骤详解 

cep - complex event processing复杂事件处理。

订单下单后超过一定时间还未进行支付确认。

打车订单生成后超过一定时间没有确认上车。

外卖超过预定送达时间一定时限还没有确认送达。

apache flinkcep api

ceptimeouteventjob

flinkcep源码简析

datastream和patternstream

datastream 一般由相同类型事件或元素组成,一个datastream可以通过一系列的转换操作如filter、map等转换为另一个datastream。

patternstream 是对cep模式匹配的流的抽象,把datastream和pattern组合在一块,然后对外提供select和flatselect等方法。patternstream并不是datastream,它提供方法把匹配的模式序列和与其相关联的事件组成的映射(就是map<模式名称,list<事件>>)发出去,发到singleoutputstreamoperator里面,singleoutputstreamoperator是datastream。

cepoperatorutils工具类里的方法和变量使用了「patternstream」来命名,比如:

public
 
static
 <in, out> 
singleoutputstreamoperator
<out> createpatternstream(...){...}
public

static
 <in, out1, out2> 
singleoutputstreamoperator
<out1> createtimeoutpatternstream(...){...}

final
 
singleoutputstreamoperator
<out> patternstream;

singleoutputstreamoperator

@public

public
 
class
 
singleoutputstreamoperator
<t> 
extends
 
datastream
<t> {...}

patternstream的构造方法:

patternstream
(
final
 
datastream
<t> inputstream, 
final
 
pattern
<t, ?> pattern) {

  
this
.inputstream = inputstream;

  
this
.pattern = pattern;

  
this
.comparator = 
null
;

}



patternstream
(
final
 
datastream
<t> inputstream, 
final
 
pattern
<t, ?> pattern, 
final
 
eventcomparator
<t> comparator) {

  
this
.inputstream = inputstream;

  
this
.pattern = pattern;

  
this
.comparator = comparator;

}

pattern、quantifier和eventcomparator

pattern是模式定义的base class,builder模式,定义好的模式会被nfacompiler用来生成nfa。

如果想要自己实现类似next和followedby这种方法,比如timeend,对pattern进行扩展重写应该是可行的。

public
class
pattern
<t, f 
extends
 t> {
/** 模式名称 */
private
final
string
 name;
/** 前面一个模式 */
private
final
pattern
<t, ? 
extends
 t> previous;
/** 一个事件如果要被当前模式匹配到,必须满足的约束条件 */
private
iterativecondition
<f> condition;
/** 时间窗口长度,在时间长度内进行模式匹配 */
private
time
 windowtime;
/** 模式量词,意思是一个模式匹配几个事件等 默认是匹配到一个 */
private
quantifier
 quantifier = 
quantifier
.one(
consumingstrategy
.strict);
/** 停止将事件收集到循环状态时,事件必须满足的条件 */
private
iterativecondition
<f> untilcondition;
/**
   * 适用于{@code times}模式,用来维护模式里事件可以连续发生的次数
   */
private
times
 times;
// 匹配到事件之后的跳过策略
private
final
aftermatchskipstrategy
 aftermatchskipstrategy;
  ...
}

quantifier是用来描述具体模式行为的,主要有三大类:

single-单一匹配、looping-循环匹配、times-一定次数或者次数范围内都能匹配到。

每一个模式pattern可以是optional可选的(单一匹配或循环匹配),并可以设置consumingstrategy。

循环和次数也有一个额外的内部consumingstrategy,用在模式中接收的事件之间。

public
class
quantifier
 {
  ...
/**
   * 5个属性,可以组合,但并非所有的组合都是有效的
   */
public
enum
quantifierproperty
 {
    single,
    looping,
    times,
    optional,
    greedy
  }
/**
   * 描述在此模式中匹配哪些事件的策略
   */
public
enum
consumingstrategy
 {
    strict,
    skip_till_next,
    skip_till_any,
    not_follow,
    not_next
  }
/**
   * 描述当前模式里事件可以连续发生的次数;举个例子,模式条件无非就是boolean,满足true条件的事件连续出现times次,或者一个次数范围,比如2~4次,2次,3次,4次都会被当前模式匹配出来,因此同一个事件会被重复匹配到
   */
public
static
class
times
 {
private
final
int
 from;
private
final
int
 to;
private
times
(
int
 from, 
int
 to) {
preconditions
.checkargument(from > 
0
, 
"the from should be a positive number greater than 0."
);
preconditions
.checkargument(to >= from, 
"the to should be a number greater than or equal to from: "
 + from + 
"."
);
this
.from = from;
this
.to = to;
    }
public
int
 getfrom() {
return
 from;
    }
public
int
 getto() {
return
 to;
    }
// 次数范围
public
static
times
 of(
int
 from, 
int
 to) {
return
new
times
(from, to);
    }
// 指定具体次数
public
static
times
 of(
int
 times) {
return
new
times
(times, times);
    }
@override
public
boolean
 equals(
object
 o) {
if
 (
this
 == o) {
return
true
;
      }
if
 (o == 
null
 || getclass() != o.getclass()) {
return
false
;
      }
times
 times = (
times
) o;
return
 from == times.from &&
        to == times.to;
    }
@override
public
int
 hashcode() {
return
objects
.hash(from, to);
    }
  }
  ...
}

eventcomparator,自定义事件比较器,实现eventcomparator接口。

public
 
interface
 
eventcomparator
<t> 
extends
 
comparator
<t>, 
serializable
 {
long
 serialversionuid = 
1l
;
}

nfacompiler和nfa

nfacompiler提供将pattern编译成nfa或者nfafactory的方法,使用nfafactory可以创建多个nfa。

public
class
nfacompiler
 {
  ...
/**
   * nfafactory 创建nfa的接口
   *
   * @param <t> type of the input events which are processed by the nfa
   */
public
interface
nfafactory
<t> 
extends
serializable
 {
    nfa<t> createnfa();
  }
  
/**
   * nfafactory的具体实现nfafactoryimpl
   *
   * <p>the implementation takes the input type serializer, the window time and the set of
   * states and their transitions to be able to create an nfa from them.
   *
   * @param <t> type of the input events which are processed by the nfa
   */
private
static
class
nfafactoryimpl
<t> 
implements
nfafactory
<t> {
    
private
static
final
long
 serialversionuid = 
8939783698296714379l
;
    
private
final
long
 windowtime;
private
final
collection
<
state
<t>> states;
private
final
boolean
 timeouthandling;
    
private
nfafactoryimpl
(
long
 windowtime,
collection
<
state
<t>> states,
boolean
 timeouthandling) {
      
this
.windowtime = windowtime;
this
.states = states;
this
.timeouthandling = timeouthandling;
    }
    
@override
public
 nfa<t> createnfa() {
// 一个nfa由状态集合、时间窗口的长度和是否处理超时组成
return
new
 nfa<>(states, windowtime, timeouthandling);
    }
  }
}

nfa:non-deterministic finite automaton - 非确定的有限(状态)自动机。

更多内容参见

非确定有限状态自动机

public
class
 nfa<t> {
/**
   * nfacompiler返回的所有有效的nfa状态集合
   * these are directly derived from the user-specified pattern.
   */
private
final
map
<
string
, 
state
<t>> states;
  
/**
   * pattern.within(time)指定的时间窗口长度
   */
private
final
long
 windowtime;
  
/**
   * 一个超时匹配的标记
   */
private
final
boolean
 handletimeout;
  ...
}

 

patternselectfunction和patternflatselectfunction

当一个包含被匹配到的事件的映射能够通过模式名称访问到的时候,patternselectfunction的select()方法会被调用。模式名称是由pattern定义的时候指定的。select()方法恰好返回一个结果,如果需要返回多个结果,则可以实现patternflatselectfunction。

public
 
interface
 
patternselectfunction
<in, out> 
extends
 
function
, 
serializable
 {



  
/**

   * 从给到的事件映射中生成一个结果。这些事件使用他们关联的模式名称作为唯一标识

   */

  out select(
map
<
string
, 
list
<in>> pattern) 
throws
 
exception
;

}

 

patternflatselectfunction,不是返回一个out,而是使用collector 把匹配到的事件收集起来。

public
interface
patternflatselectfunction
<in, out> 
extends
function
, 
serializable
 {
  
/**
   * 生成一个或多个结果
   */
void
 flatselect(
map
<
string
, 
list
<in>> pattern, 
collector
<out> out) 
throws
exception
;
}

selecttimeoutcepoperator、patterntimeoutfunction

selecttimeoutcepoperator是在cepoperatorutils中调用createtimeoutpatternstream()方法时创建出来。

selecttimeoutcepoperator中会被算子迭代调用的方法是processmatchedsequences()和processtimedoutsequences()。

模板方法...对应到抽象类abstractkeyedceppatternoperator中processevent()方法和advancetime()方法。

还有flatselecttimeoutcepoperator和对应的patternflattimeoutfunction。

public
class
selecttimeoutcepoperator
<in, out1, out2, key>
extends
abstractkeyedceppatternoperator
<in, key, out1, 
selecttimeoutcepoperator
.
selectwrapper
<in, out1, out2>> {
private
outputtag
<out2> timedoutoutputtag;
public
selecttimeoutcepoperator
(
typeserializer
<in> inputserializer,
boolean
 isprocessingtime,
nfacompiler
.
nfafactory
<in> nfafactory,
final
eventcomparator
<in> comparator,
aftermatchskipstrategy
 skipstrategy,
// 参数命名混淆了flat...包括selectwrapper类中的成员命名...
patternselectfunction
<in, out1> flatselectfunction,
patterntimeoutfunction
<in, out2> flattimeoutfunction,
outputtag
<out2> outputtag,
outputtag
<in> latedataoutputtag) {
super
(
      inputserializer,
      isprocessingtime,
      nfafactory,
      comparator,
      skipstrategy,
new
selectwrapper
<>(flatselectfunction, flattimeoutfunction),
      latedataoutputtag);
this
.timedoutoutputtag = outputtag;
  }
  ...
}
public
interface
patterntimeoutfunction
<in, out> 
extends
function
, 
serializable
 {
  out timeout(
map
<
string
, 
list
<in>> pattern, 
long
 timeouttimestamp) 
throws
exception
;
}
public
interface
patternflattimeoutfunction
<in, out> 
extends
function
, 
serializable
 {
void
 timeout(
map
<
string
, 
list
<in>> pattern, 
long
 timeouttimestamp, 
collector
<out> out) 
throws
exception
;
}

 

cep和cepoperatorutils

cep是创建patternstream的工具类,patternstream只是datastream和pattern的组合。

public
class
 cep {
  
public
static
 <t> 
patternstream
<t> pattern(
datastream
<t> input, 
pattern
<t, ?> pattern) {
return
new
patternstream
<>(input, pattern);
  }
  
public
static
 <t> 
patternstream
<t> pattern(
datastream
<t> input, 
pattern
<t, ?> pattern, 
eventcomparator
<t> comparator) {
return
new
patternstream
<>(input, pattern, comparator);
  }
}

 

cepoperatorutils是在patternstream的select()方法和flatselect()方法被调用的时候,去创建singleoutputstreamoperator(datastream)。

public
class
cepoperatorutils
 {
  ...
private
static
 <in, out, k> 
singleoutputstreamoperator
<out> createpatternstream(
final
datastream
<in> inputstream,
final
pattern
<in, ?> pattern,
final
typeinformation
<out> outtypeinfo,
final
boolean
 timeouthandling,
final
eventcomparator
<in> comparator,
final
operatorbuilder
<in, out> operatorbuilder) {
final
typeserializer
<in> inputserializer = inputstream.gettype().createserializer(inputstream.getexecutionconfig());
    
// check whether we use processing time
final
boolean
 isprocessingtime = inputstream.getexecutionenvironment().getstreamtimecharacteristic() == 
timecharacteristic
.
processingtime
;
    
// compile our pattern into a nfafactory to instantiate nfas later on
final
nfacompiler
.
nfafactory
<in> nfafactory = 
nfacompiler
.compilefactory(pattern, timeouthandling);
    
final
singleoutputstreamoperator
<out> patternstream;
    
if
 (inputstream 
instanceof
keyedstream
) {
keyedstream
<in, k> keyedstream = (
keyedstream
<in, k>) inputstream;
      patternstream = keyedstream.transform(
        operatorbuilder.getkeyedoperatorname(),
        outtypeinfo,
        operatorbuilder.build(
          inputserializer,
          isprocessingtime,
          nfafactory,
          comparator,
          pattern.getaftermatchskipstrategy()));
    } 
else
 {
keyselector
<in, 
byte
> keyselector = 
new
nullbytekeyselector
<>();
      patternstream = inputstream.keyby(keyselector).transform(
        operatorbuilder.getoperatorname(),
        outtypeinfo,
        operatorbuilder.build(
          inputserializer,
          isprocessingtime,
          nfafactory,
          comparator,
          pattern.getaftermatchskipstrategy()
        )).forcenonparallel();
    }
    
return
 patternstream;
  }
  ...
}

flinkcep实现步骤

  1. in: datasource -> datastream -> transformations -> datastream
  2. pattern: pattern.begin.where.next.where...times...
  3. patternstream: cep.pattern(datastream, pattern)
  4. datastream: patternstream.select(patternselectfunction) patternstream.flatselect(patternselectfunction)
  5. out: datastream -> transformations -> datastream -> datasink

flinkcep匹配超时实现步骤

timeoutcep的流需要keyby,即keyedstream,如果inputstream不是keyedstream,会new一个0字节的key(上面cepoperatorutils源码里有提到)。

keyselector
<in, 
byte
> keyselector = 
new
 
nullbytekeyselector
<>();

pattern最后调用within设置窗口时间。 如果是对主键进行分组,一个时间窗口内最多只会匹配出一个超时事件,使用patternstream.select(...)就可以了。

  1. in: datasource -> datastream -> transformations -> datastream -> keyby -> keyedstream
  2. pattern: pattern.begin.where.next.where...within(time windowtime)
  3. patternstream: cep.pattern(keyedstream, pattern)
  4. outputtag: new outputtag(...)
  5. singleoutputstreamoperator: patternstream.flatselect(outputtag, patternflattimeoutfunction, patternflatselectfunction)
  6. datastream: singleoutputstreamoperator.getsideoutput(outputtag)
  7. out: datastream -> transformations -> datastream -> datasink

flinkcep超时不足

和flink窗口聚合类似,如果使用事件时间和依赖事件生成的水印向前推进,需要后续的事件到达,才会触发窗口进行计算和输出结果。

flinkcep超时完整demo

public
class
ceptimeouteventjob
 {
private
static
final
string
 local_kafka_broker = 
"localhost:9092"
;
private
static
final
string
 group_id = 
ceptimeouteventjob
.
class
.getsimplename();
private
static
final
string
 group_topic = group_id;
  
public
static
void
 main(
string
[] args) 
throws
exception
 {
// 参数
parametertool
 params = 
parametertool
.fromargs(args);
    
streamexecutionenvironment
 env = 
streamexecutionenvironment
.getexecutionenvironment();
// 使用事件时间
    env.setstreamtimecharacteristic(
timecharacteristic
.
eventtime
);
    env.enablecheckpointing(
5000
);
    env.getcheckpointconfig().enableexternalizedcheckpoints(
checkpointconfig
.
externalizedcheckpointcleanup
.retain_on_cancellation);
    env.getconfig().disablesysoutlogging();
    env.getconfig().setrestartstrategy(
restartstrategies
.fixeddelayrestart(
5
, 
10000
));
    
// 不使用pojo的时间
final
assignerwithperiodicwatermarks
 extractor = 
new
ingestiontimeextractor
<pojo>();
    
// 与kafka topic的partition保持一致
    env.setparallelism(
3
);
    
properties
 kafkaprops = 
new
properties
();
    kafkaprops.setproperty(
"bootstrap.servers"
, local_kafka_broker);
    kafkaprops.setproperty(
"group.id"
, group_id);
    
// 接入kafka的消息
flinkkafkaconsumer011
<pojo> consumer = 
new
flinkkafkaconsumer011
<>(group_topic, 
new
pojoschema
(), kafkaprops);
datastream
<pojo> pojodatastream = env.addsource(consumer)
        .assigntimestampsandwatermarks(extractor);
    pojodatastream.print();
    
// 根据主键aid分组 即对每一个pojo事件进行匹配检测【不同类型的pojo,可以采用不同的within时间】
// 1.
datastream
<pojo> keyedpojos = pojodatastream
        .keyby(
"aid"
);
    
// 从初始化到终态-一个完整的pojo事件序列
// 2.
pattern
<pojo, pojo> completedpojo =
pattern
.<pojo>begin(
"init"
)
            .where(
new
simplecondition
<pojo>() {
private
static
final
long
 serialversionuid = -
6847788055093903603l
;
              
@override
public
boolean
 filter(pojo pojo) 
throws
exception
 {
return
"02"
.equals(pojo.getastatus());
              }
            })
            .followedby(
"end"
)
//            .next("end")
            .where(
new
simplecondition
<pojo>() {
private
static
final
long
 serialversionuid = -
2655089736460847552l
;
              
@override
public
boolean
 filter(pojo pojo) 
throws
exception
 {
return
"00"
.equals(pojo.getastatus()) || 
"01"
.equals(pojo.getastatus());
              }
            });
    
// 找出1分钟内【便于测试】都没有到终态的事件aid
// 如果针对不同类型有不同within时间,比如有的是超时1分钟,有的可能是超时1个小时 则生成多个patternstream
// 3.
patternstream
<pojo> patternstream = cep.pattern(keyedpojos, completedpojo.within(
time
.minutes(
1
)));
    
// 定义侧面输出timedout
// 4.
outputtag
<pojo> timedout = 
new
outputtag
<pojo>(
"timedout"
) {
private
static
final
long
 serialversionuid = 
773503794597666247l
;
    };
    
// outputtag<l> timeoutoutputtag, patternflattimeoutfunction<t, l> patternflattimeoutfunction, patternflatselectfunction<t, r> patternflatselectfunction
// 5.
singleoutputstreamoperator
<pojo> timeoutpojos = patternstream.flatselect(
        timedout,
new
pojotimedout
(),
new
flatselectnothing
()
    );
    
// 打印输出超时的pojo
// 6.7.
    timeoutpojos.getsideoutput(timedout).print();
    timeoutpojos.print();
    env.execute(
ceptimeouteventjob
.
class
.getsimplename());
  }
  
/**
   * 把超时的事件收集起来
   */
public
static
class
pojotimedout
implements
patternflattimeoutfunction
<pojo, pojo> {
private
static
final
long
 serialversionuid = -
4214641891396057732l
;
    
@override
public
void
 timeout(
map
<
string
, 
list
<pojo>> map, 
long
 l, 
collector
<pojo> collector) 
throws
exception
 {
if
 (
null
 != map.get(
"init"
)) {
for
 (pojo pojoinit : map.get(
"init"
)) {
system
.out.println(
"timeout init:"
 + pojoinit.getaid());
          collector.collect(pojoinit);
        }
      }
// 因为end超时了,还没收到end,所以这里是拿不到end的
system
.out.println(
"timeout end: "
 + map.get(
"end"
));
    }
  }
  
/**
   * 通常什么都不做,但也可以把所有匹配到的事件发往下游;如果是宽松临近,被忽略或穿透的事件就没办法选中发往下游了
   * 一分钟时间内走完init和end的数据
   *
   * @param <t>
   */
public
static
class
flatselectnothing
<t> 
implements
patternflatselectfunction
<t, t> {
private
static
final
long
 serialversionuid = -
3029589950677623844l
;
    
@override
public
void
 flatselect(
map
<
string
, 
list
<t>> pattern, 
collector
<t> collector) {
system
.out.println(
"flatselect: "
 + pattern);
    }
  }
}

测试结果(followedby):

3
> pojo{aid=
'id000-0'
, astyle=
'style000-0'
, aname=
'name-0'
, logtime=
1563419728242
, energy=
529.00
, age=
0
, tt=
2019
-
07
-
18
, astatus=
'02'
, createtime=
null
, updatetime=
null
}
3
> pojo{aid=
'id000-1'
, astyle=
'style000-2'
, aname=
'name-1'
, logtime=
1563419728783
, energy=
348.00
, age=
26
, tt=
2019
-
07
-
18
, astatus=
'02'
, createtime=
null
, updatetime=
null
}
3
> pojo{aid=
'id000-0'
, astyle=
'style000-0'
, aname=
'name-0'
, logtime=
1563419749259
, energy=
492.00
, age=
0
, tt=
2019
-
07
-
18
, astatus=
'00'
, createtime=
null
, updatetime=
null
}
flatselect: {init=[pojo{aid=
'id000-0'
, astyle=
'style000-0'
, aname=
'name-0'
, logtime=
1563419728242
, energy=
529.00
, age=
0
, tt=
2019
-
07
-
18
, astatus=
'02'
, createtime=
null
, updatetime=
null
}], 
end
=[pojo{aid=
'id000-0'
, astyle=
'style000-0'
, aname=
'name-0'
, logtime=
1563419749259
, energy=
492.00
, age=
0
, tt=
2019
-
07
-
18
, astatus=
'00'
, createtime=
null
, updatetime=
null
}]}
timeout init:id000-
1
3
> pojo{aid=
'id000-1'
, astyle=
'style000-2'
, aname=
'name-1'
, logtime=
1563419728783
, energy=
348.00
, age=
26
, tt=
2019
-
07
-
18
, astatus=
'02'
, createtime=
null
, updatetime=
null
}
timeout 
end
: 
null
3
> pojo{aid=
'id000-2'
, astyle=
'style000-0'
, aname=
'name-0'
, logtime=
1563419829639
, energy=
467.00
, age=
0
, tt=
2019
-
07
-
18
, astatus=
'03'
, createtime=
null
, updatetime=
null
}
3
> pojo{aid=
'id000-2'
, astyle=
'style000-0'
, aname=
'name-0'
, logtime=
1563419841394
, energy=
107.00
, age=
0
, tt=
2019
-
07
-
18
, astatus=
'00'
, createtime=
null
, updatetime=
null
}
3
> pojo{aid=
'id000-3'
, astyle=
'style000-0'
, aname=
'name-0'
, logtime=
1563419967721
, energy=
431.00
, age=
0
, tt=
2019
-
07
-
18
, astatus=
'02'
, createtime=
null
, updatetime=
null
}
3
> pojo{aid=
'id000-3'
, astyle=
'style000-2'
, aname=
'name-0'
, logtime=
1563419979567
, energy=
32.00
, age=
26
, tt=
2019
-
07
-
18
, astatus=
'03'
, createtime=
null
, updatetime=
null
}
3
> pojo{aid=
'id000-3'
, astyle=
'style000-2'
, aname=
'name-0'
, logtime=
1563419993612
, energy=
542.00
, age=
26
, tt=
2019
-
07
-
18
, astatus=
'01'
, createtime=
null
, updatetime=
null
}
flatselect: {init=[pojo{aid=
'id000-3'
, astyle=
'style000-0'
, aname=
'name-0'
, logtime=
1563419967721
, energy=
431.00
, age=
0
, tt=
2019
-
07
-
18
, astatus=
'02'
, createtime=
null
, updatetime=
null
}], 
end
=[pojo{aid=
'id000-3'
, astyle=
'style000-2'
, aname=
'name-0'
, logtime=
1563419993612
, energy=
542.00
, age=
26
, tt=
2019
-
07
-
18
, astatus=
'01'
, createtime=
null
, updatetime=
null
}]}
3
> pojo{aid=
'id000-4'
, astyle=
'style000-0'
, aname=
'name-0'
, logtime=
1563420063760
, energy=
122.00
, age=
0
, tt=
2019
-
07
-
18
, astatus=
'02'
, createtime=
null
, updatetime=
null
}
3
> pojo{aid=
'id000-4'
, astyle=
'style000-0'
, aname=
'name-0'
, logtime=
1563420078008
, energy=
275.00
, age=
0
, tt=
2019
-
07
-
18
, astatus=
'03'
, createtime=
null
, updatetime=
null
}
timeout init:id000-
4
3
> pojo{aid=
'id000-4'
, astyle=
'style000-0'
, aname=
'name-0'
, logtime=
1563420063760
, energy=
122.00
, age=
0
, tt=
2019
-
07
-
18
, astatus=
'02'
, createtime=
null
, updatetime=
null
}
timeout 
end
: 
null

总结

以上所述是小编给大家介绍的apache flinkcep 实现超时状态监控的步骤,希望对大家有所帮助