storm trident实战 filter,function的使用
程序员文章站
2022-03-05 14:15:24
...
一、Storm trident filter
filter通过返回true和false。来判断是否对信息过滤。
1.1 代码
public static void main(String[] args) throws InterruptedException, AlreadyAliveException, InvalidTopologyException, AuthorizationException { FixedBatchSpout spout = new FixedBatchSpout(new Fields("a", "b"), 1, new Values(1, 2), new Values(4, 1), new Values(3, 0)); spout.setCycle(false); TridentTopology topology = new TridentTopology(); topology.newStream("spout", spout) .each(new Fields("a"), new MyFilter()) .each(new Fields("a", "b"), new PrintFilterBolt(),new Fields("")); Config config = new Config(); config.setNumWorkers(2); config.setNumAckers(1); config.setDebug(false); StormSubmitter.submitTopology("trident_filter", config, topology.build()); }
MyFilter:
import org.apache.storm.trident.operation.BaseFilter; import org.apache.storm.trident.tuple.TridentTuple; public class MyFilter extends BaseFilter { /** * */ private static final long serialVersionUID = 1L; @Override public boolean isKeep(TridentTuple tuple) { return tuple.getInteger(0) == 1; } }
PrintFilterBolt:
public class PrintFilterBolt extends BaseFunction { /** * */ private static final long serialVersionUID = 1L; @Override public void execute(TridentTuple tuple, TridentCollector collector) { int firstIndex = tuple.getInteger(0); int secondIndex = tuple.getInteger(1); List<Integer> list = new ArrayList<Integer>(); list.add(firstIndex); list.add(secondIndex); System.out.println("after storm filter opertition change is : " + list.toString()); } }
运行结果:
2016-12-22 13:16:09.079 o.a.s.s.o.a.c.f.s.ConnectionStateManager [INFO] State change: CONNECTED 2016-12-22 13:16:09.088 o.a.s.d.executor [INFO] Prepared bolt $spoutcoord-spout-spout:(2) 2016-12-22 13:16:09.736 STDIO [INFO] after storm filter opertition change is : [1, 2]
二、Storm trident function
函数的作用是接收一个tuple(需指定接收tuple的哪个字段),输出0个或多个tuples。输出的新字段值会被追加到原始输入tuple的后面, 如果一个function不输出tuple,那就意味这这个tuple被过滤掉了。
2.1 代码
public static void main(String[] args) throws InterruptedException, AlreadyAliveException, InvalidTopologyException, AuthorizationException { FixedBatchSpout spout = new FixedBatchSpout(new Fields("a", "b", "c"), 1, new Values(1, 2, 3), new Values(4, 1, 6), new Values(3, 0, 8)); spout.setCycle(false); TridentTopology topology = new TridentTopology(); topology.newStream("spout", spout) .each(new Fields("b"), new MyFunction(), new Fields("d")) .each(new Fields("a", "b", "c", "d"), new PrintFunctionBolt(), new Fields("")); Config config = new Config(); config.setNumWorkers(2); config.setNumAckers(1); config.setDebug(false); StormSubmitter.submitTopology("trident_function", config, topology.build()); }
MyFunction:
public class MyFunction extends BaseFunction { /** * */ private static final long serialVersionUID = 1L; public void execute(TridentTuple tuple, TridentCollector collector) { for(int i=0; i < tuple.getInteger(0); i++) { collector.emit(new Values(i)); } } }
PrintFunctionBolt:
public class PrintFunctionBolt extends BaseFunction { /** * */ private static final long serialVersionUID = 1L; @Override public void execute(TridentTuple tuple, TridentCollector collector) { int firstIndex = tuple.getInteger(0); int secondIndex = tuple.getInteger(1); int threeIndex = tuple.getInteger(2); int fourIndex = tuple.getInteger(3); List<Integer> list = new ArrayList<Integer>(); list.add(firstIndex); list.add(secondIndex); list.add(threeIndex); list.add(fourIndex); System.out.println("after storm function opertition change is : " +list.toString()); } }
运行效果:
2016-12-22 13:22:34.365 o.a.s.s.o.a.z.ClientCnxn [INFO] Session establishment complete on server 192.168.80.130/192.168.80.130:2181, sessionid = 0x159285f1109000c, negotiated timeout = 20000 2016-12-22 13:22:34.366 o.a.s.s.o.a.c.f.s.ConnectionStateManager [INFO] State change: CONNECTED 2016-12-22 13:22:34.374 o.a.s.d.executor [INFO] Prepared bolt $spoutcoord-spout-spout:(2) 2016-12-22 13:22:34.415 STDIO [INFO] after storm function opertition change is : [1, 2, 3, 0] 2016-12-22 13:22:34.415 STDIO [INFO] after storm function opertition change is : [1, 2, 3, 1] 2016-12-22 13:22:34.442 STDIO [INFO] after storm function opertition change is : [4, 1, 6, 0]