Drools Fusion(CEP)定义及使用方法讲解
从 drools 统一行为建模平台的视野看,drools fusion 是负责启用事件处理行 为的一个模块。
定义
支持复杂事件处理,是比简单的理解事件是什么要更多得多,cep场景具有几个共同而明显的特点:
- 通常需要处理巨量的事件,但是只有少部分事件是真正关心的。
- 事件通常是不变的,因为它们是状态改变的一条记录。
- 通常有关事件的规则和查询必须是运行在被动模式(reactive modes),即,对事件模式(patterns)的检测作出反应。
- 通常在相关的事件之间有强烈的时间关系。
- 个别事件通常是不重要的。系统关心相关事件的模式(patterns)和它们的关系
- 通常,要求系统执行组合和聚合的事件。
用fusion,要把插入drools的数据声明为事件。
drools处理数据有两种方式,云模式和流模式,默认是云模式,用fusion,需要设置为流模式。流模式,插入的数据叫事件,有时间顺序,云模式没有,
流(stream)支持
大部分 cep 用例必须处理事件流(stream)。
流的特性:
- 在流中的事件通过时间戳被排序。
- 事件的数量(volumes)总是很高的。
- 原子事件自己是很少有用的。通常根据多个事件之间的相关性或流或其他来源提取含义。
- 流可以是相似的,即包含单一类型的事件;或者是异类的,即包含多种类型的事件。
声明流模式
在kmodule.xml 中添加配置 eventprocessingmode=“stream” 为流模式
<kbase name="fusionage" eventprocessingmode="stream" packages="com.us.fusion"> <ksession name="fusionageks" type="stateful"/> </kbase>
事件声明
用fusion,要把插入drools的数据声明为事件,声明事件使用@role标签
@role
把@role元数据标签指派给该事实类行
例如:
person 为java bean 也就是一个事实类型
declare person @role(event) end
person 的属性如下:
public class person { private string name; private integer age; private string like; private string sex; private string desc; private string address; private date createtime; // getter setter 省略
@timestamp
每一个事件都要有一个关联的时间戳指派给它。默认时,一个给定事件的时间戳是在事件被插入到工作内存时,从 session clock 读取,并且分配给该事件。有些时候,事件用时间戳作为它自己的一个属性。在这情况下,用户可以用@timestamp 标记用户属性为时间戳
例如:用person的 createtime 属性为时间戳
declare person @role(event) @timestamp( createtime ) end
@expires
重要:这个标签只有引擎运行在流(stream)模式之下才会被考虑.
该标签显示定义 一个事件在什么时候应该到期,事件到期,事件可能不再匹配和激活任何规则时。
使用如下
@expires( 1h35m )
在person 例子中假设过期时间为20s
declare person @role(event) @timestamp( createtime ) @expires(20s) end
滑动时间窗口
滑动时间窗口允许用户编写规则,其将仅匹配在最近的 x 时间单元内发生的事件
rule "boy" when $p : person(age < 25) over window:time(3s) then $p.setdesc("少年"); retract($p); end
例如:只匹配最近3秒内,年龄小于25的人
调用代码如下:
package com.us.fusion; import com.us.model.person; import org.kie.api.kieservices; import org.kie.api.runtime.kiecontainer; import org.kie.api.runtime.kiesession; import java.util.date; /** * created by yangyibo on 17/1/3. * @author yangyibo */ public class application { private static kiesession getsession() { kieservices ks = kieservices.factory.get(); kiecontainer kc = ks.getkieclasspathcontainer(); return kc.newkiesession("fusionageks"); } public static void run() { kiesession ks = getsession(); person p1 = new person("白展堂", 2,new date()); person p2 = new person("佟湘玉", 7,new date()); try { thread.sleep(4000); } catch (interruptedexception e) { system.out.println(e); } person p3 = new person("李大嘴", 16,new date()); ks.insert(p1); ks.insert(p2); ks.insert(p3); int count = ks.fireallrules(); system.out.println("总执行了" + count + "条规则------------------------------"); // ks.dispose(); } public static void main(string[] args) { run(); } }
规则代码如下:
package com.us.fusion7 import com.us.model.person function void printname(string streamname,string name,int age,string desc) { system.out.println("streamname:"+streamname+" name:"+name+" age:"+age+" desc:"+ desc); } declare person @role(event) @timestamp( createtime ) @expires(20s) end rule "boy" when $p : person(age > 0) over window:time(3s) then $p.setdesc("少年"); retract($p); printname("boy",$p.getname(),$p.getage(),$p.getdesc()); end
由于thread.sleep(4000);所以最近3秒内只有李大嘴一条记录所以
结果如下:
streamname:boy name:李大嘴 age:16 desc:少年
总执行了1条规则------------------------------
范例2 10s 内的平均年龄
滑动长度窗口
和滑动时间窗口很类似,其将仅匹配最近几次发生的事件,用法如图,只匹配最近1次发生的事件。
rule "old" when $p : person(age > 49) over window:length(2) then $p.setdesc("老年"); retract($p); end
例如年领大于49岁的最近两条记录
调用代码:
public class application { private static kiesession getsession() { kieservices ks = kieservices.factory.get(); kiecontainer kc = ks.getkieclasspathcontainer(); return kc.newkiesession("fusionageks"); } public static void run() { kiesession ks = getsession(); person p1 = new person("白展堂", 52,new date()); person p2 = new person("佟湘玉", 57,new date()); try { thread.sleep(4000); } catch (interruptedexception e) { system.out.println(e); } person p3 = new person("李大嘴", 56,new date()); ks.insert(p1); ks.insert(p2); ks.insert(p3); int count = ks.fireallrules(); system.out.println("总执行了" + count + "条规则------------------------------"); ks.dispose(); } public static void main(string[] args) { run(); } }
规则代码
package com.us.fusion7 import com.us.model.person function void printname(string streamname,string name,int age,string desc) { system.out.println("streamname:"+streamname+" name:"+name+" age:"+age+" desc:"+ desc); } declare person @role(event) @timestamp( createtime ) @expires(20s) end rule "old" when $p : person(age > 49) over window:length(2) then $p.setdesc("老年"); retract($p); printname("boy",$p.getname(),$p.getage(),$p.getdesc()); end
只匹配符合规则的最近的两条记录,所以舍弃“白展堂记录”
执行结果
streamname:boy name:李大嘴 age:56 desc:老年
streamname:boy name:佟湘玉 age:57 desc:老年
总执行了2条规则------------------------------
本文所有测试例子的pom 依赖
<dependency> <groupid>org.kie</groupid> <artifactid>kie-api</artifactid> <version>6.5.0.final</version> </dependency> <dependency> <groupid>org.drools</groupid> <artifactid>drools-core</artifactid> <version>6.5.0.final</version> </dependency> <dependency> <groupid>org.drools</groupid> <artifactid>drools-compiler</artifactid> <version>6.5.0.final</version> </dependency> <dependency> <groupid>org.drools</groupid> <artifactid>drools-decisiontables</artifactid> <version>6.5.0.final</version> </dependency> <dependency> <groupid>org.drools</groupid> <artifactid>drools-templates</artifactid> <version>6.5.0.final</version> </dependency>
本文所有测试例子的kmodule.xml
配置
<kbase name="fusionage" eventprocessingmode="stream" packages="com.us.fusion"> <ksession name="fusionageks" type="stateful"/> </kbase>
其他关键字: after, before, during, meet 等关键字 都是用于比较两个事件的发生时间顺序,用法待以后再叙
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对的支持。如果你想了解更多相关内容请查看下面相关链接