欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

storm trident实战 filter,function的使用

程序员文章站 2022-07-02 09:38:20
...

一、Storm trident filter

      filter通过返回true和false。来判断是否对信息过滤。

     1.1 代码

	public static void main(String[] args) throws InterruptedException,
			AlreadyAliveException, InvalidTopologyException,
			AuthorizationException {
		FixedBatchSpout spout = new FixedBatchSpout(new Fields("a", "b"),
				1, new Values(1, 2), new Values(4, 1),
				new Values(3, 0));
		spout.setCycle(false);
		TridentTopology topology = new TridentTopology();
		topology.newStream("spout", spout)
				.each(new Fields("a"), new MyFilter())
				.each(new Fields("a", "b"), new PrintFilterBolt(),new Fields(""));
		Config config = new Config();
		config.setNumWorkers(2);
		config.setNumAckers(1);
		config.setDebug(false);
		StormSubmitter.submitTopology("trident_filter", config,
				topology.build());
	}

    MyFilter:

import org.apache.storm.trident.operation.BaseFilter;
import org.apache.storm.trident.tuple.TridentTuple;

public class MyFilter extends BaseFilter {

    /**
	 * 
	 */
	private static final long serialVersionUID = 1L;

	@Override
	public boolean isKeep(TridentTuple tuple) {
		return tuple.getInteger(0) == 1;
	}
	
}

    PrintFilterBolt:

public class PrintFilterBolt extends BaseFunction {
	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;

	@Override
	public void execute(TridentTuple tuple, TridentCollector collector) {
		int firstIndex = tuple.getInteger(0);
		int secondIndex = tuple.getInteger(1);
		List<Integer> list = new ArrayList<Integer>();
		list.add(firstIndex);
		list.add(secondIndex);
		System.out.println("after storm filter opertition change is : "
				+ list.toString());
	}

}

   运行结果:

2016-12-22 13:16:09.079 o.a.s.s.o.a.c.f.s.ConnectionStateManager [INFO] State change: CONNECTED
2016-12-22 13:16:09.088 o.a.s.d.executor [INFO] Prepared bolt $spoutcoord-spout-spout:(2)
2016-12-22 13:16:09.736 STDIO [INFO] after storm filter opertition change is : [1, 2]

二、Storm trident function

       函数的作用是接收一个tuple(需指定接收tuple的哪个字段),输出0个或多个tuples。输出的新字段值会被追加到原始输入tuple的后面, 如果一个function不输出tuple,那就意味这这个tuple被过滤掉了。

    2.1 代码

public static void main(String[] args) throws InterruptedException,
			AlreadyAliveException, InvalidTopologyException,
			AuthorizationException {

		FixedBatchSpout spout = new FixedBatchSpout(new Fields("a", "b", "c"),
				1, new Values(1, 2, 3), new Values(4, 1, 6),
				new Values(3, 0, 8));
		spout.setCycle(false);
		TridentTopology topology = new TridentTopology();
		topology.newStream("spout", spout)
				.each(new Fields("b"), new MyFunction(), new Fields("d"))
				.each(new Fields("a", "b", "c", "d"), new PrintFunctionBolt(),
						new Fields(""));
		Config config = new Config();
		config.setNumWorkers(2);
		config.setNumAckers(1);
		config.setDebug(false);
		StormSubmitter.submitTopology("trident_function", config,
				topology.build());
	}
	

   MyFunction:

public class MyFunction extends BaseFunction {

    /**
	 * 
	 */
	private static final long serialVersionUID = 1L;
	
	public void execute(TridentTuple tuple, TridentCollector collector) {
        for(int i=0; i < tuple.getInteger(0); i++) {
            collector.emit(new Values(i));
        }
    }

}

   PrintFunctionBolt:

public class PrintFunctionBolt extends BaseFunction {
	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;

	@Override
	public void execute(TridentTuple tuple, TridentCollector collector) {
		int firstIndex = tuple.getInteger(0);
		int secondIndex = tuple.getInteger(1);
		int threeIndex = tuple.getInteger(2);
		int fourIndex = tuple.getInteger(3);
		List<Integer> list = new ArrayList<Integer>();
		list.add(firstIndex);
		list.add(secondIndex);
		list.add(threeIndex);
		list.add(fourIndex);
		System.out.println("after storm function opertition change is : " +list.toString());
	}

}

    运行效果:

2016-12-22 13:22:34.365 o.a.s.s.o.a.z.ClientCnxn [INFO] Session establishment complete on server 192.168.80.130/192.168.80.130:2181, sessionid = 0x159285f1109000c, negotiated timeout = 20000
2016-12-22 13:22:34.366 o.a.s.s.o.a.c.f.s.ConnectionStateManager [INFO] State change: CONNECTED
2016-12-22 13:22:34.374 o.a.s.d.executor [INFO] Prepared bolt $spoutcoord-spout-spout:(2)
2016-12-22 13:22:34.415 STDIO [INFO] after storm function opertition change is : [1, 2, 3, 0]
2016-12-22 13:22:34.415 STDIO [INFO] after storm function opertition change is : [1, 2, 3, 1]
2016-12-22 13:22:34.442 STDIO [INFO] after storm function opertition change is : [4, 1, 6, 0]