欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

storm trident实战 分区聚合

程序员文章站 2022-07-02 09:54:43
...

一、前言

      先有batch,因为trident内部是基于batch来实现的,然后有partition,分区后再分配并发度,然后才能进行并发处理。并发度的分配是利用parallelismHint来实现。

二、实战

   main方法

public static void main(String[] args) throws AlreadyAliveException,
			InvalidTopologyException, AuthorizationException, IOException {
		FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
				new Values("a"), new Values("b"), new Values("a"), new Values(
						"c"));
		//设置为true,数据源会源源不断发送
                spout.setCycle(true);
		TridentTopology topology = new TridentTopology();
		topology.newStream("spout", spout)
				.shuffle()
				.partitionAggregate(new Fields("sentence"), new SumWord(),
						new Fields("sum"))
						/**
						 * 设置3个并发度,可以理解为3个分区操作
						 */
						.parallelismHint(3)
				.each(new Fields("sum"), new PrintFilter_partition());
		Config config = new Config();
		config.setNumWorkers(2);
		config.setNumAckers(1);
		config.setDebug(false);
		StormSubmitter.submitTopology("trident__partition_aggregate", config,
				topology.build());
	}

   SumWord:

package com.storm.trident.partitionAggregate.分区聚合;

import java.util.HashMap;
import java.util.Map;

import org.apache.commons.collections.MapUtils;
import org.apache.storm.trident.operation.BaseAggregator;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.operation.TridentOperationContext;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Values;

public class SumWord extends BaseAggregator<Map<String,Integer>> {

	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;
	

	/**
	 * 属于哪个batch
	 */
	private Object batchId;
	
	/**
	 * 属于哪个分区
	 */
	private int partitionId;
	
	/**
	 * 分区数量
	 */
	private int numPartitions;
	
	/**
	 * 用来统计
	 */
	private Map<String,Integer> state;
	
	
	@SuppressWarnings("rawtypes")
	@Override
	public void prepare(Map conf, TridentOperationContext context) {
		state = new HashMap<String,Integer>();
		partitionId = context.getPartitionIndex();
		numPartitions = context.numPartitions();
	}
	@Override
	public Map<String, Integer> init(Object batchId, TridentCollector collector) {
		this.batchId = batchId;
		return state;
	}
	@Override
	public void aggregate(Map<String, Integer> val, TridentTuple tuple,
			TridentCollector collector) {
		System.out.println(tuple+";partitionId="+partitionId+";partitions="+numPartitions
				+",batchId:" + batchId);
		String word = tuple.getString(0);
		val.put(word, MapUtils.getInteger(val, word, 0)+1);
		System.out.println("sumWord:" + val);
	}
	@Override
	public void complete(Map<String, Integer> val, TridentCollector collector) {
		collector.emit(new Values(val));
	}
}

   打印方法

import org.apache.storm.trident.operation.BaseFilter;
import org.apache.storm.trident.tuple.TridentTuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PrintFilter_partition extends BaseFilter {
	
	 private static final Logger LOGGER = 

			 LoggerFactory.getLogger(PrintFilter_partition.class);

	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;

	@Override
	public boolean isKeep(TridentTuple tuple) {
		LOGGER.info("打印出来的tuple:" + tuple);
		return true;
	}
}

   测试效果:

 

2016-12-22 18:39:26.060 STDIO [INFO] [a];partitionId=4;partitions=5,batchId:257308:0
2016-12-22 18:39:26.062 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{}]
2016-12-22 18:39:26.066 STDIO [INFO] [b];partitionId=0;partitions=5,batchId:257308:0
2016-12-22 18:39:26.115 STDIO [INFO] sumWord:{a=1}
2016-12-22 18:39:26.116 STDIO [INFO] [a];partitionId=4;partitions=5,batchId:257308:0
2016-12-22 18:39:26.117 STDIO [INFO] sumWord:{a=2}
2016-12-22 18:39:26.120 STDIO [INFO] sumWord:{b=1}
2016-12-22 18:39:26.121 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{a=2}]
2016-12-22 18:39:26.121 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=1}]
2016-12-22 18:39:26.196 STDIO [INFO] [c];partitionId=4;partitions=5,batchId:257309:0
2016-12-22 18:39:26.197 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{}]
2016-12-22 18:39:26.198 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=1}]
2016-12-22 18:39:26.197 STDIO [INFO] sumWord:{c=1, a=2}
2016-12-22 18:39:26.205 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{c=1, a=2}]
2016-12-22 18:39:26.683 STDIO [INFO] [a];partitionId=4;partitions=5,batchId:257310:0
2016-12-22 18:39:26.684 STDIO [INFO] sumWord:{c=1, a=3}
2016-12-22 18:39:26.685 STDIO [INFO] [b];partitionId=0;partitions=5,batchId:257310:0
2016-12-22 18:39:26.687 STDIO [INFO] [a];partitionId=2;partitions=5,batchId:257310:0
2016-12-22 18:39:26.689 STDIO [INFO] sumWord:{a=1}
2016-12-22 18:39:26.691 STDIO [INFO] sumWord:{b=2}
2016-12-22 18:39:26.692 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=2}]
2016-12-22 18:39:26.693 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{c=1, a=3}]
2016-12-22 18:39:26.690 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{a=1}]
2016-12-22 18:39:27.188 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{a=1}]
2016-12-22 18:39:27.190 STDIO [INFO] [c];partitionId=0;partitions=5,batchId:257311:0
2016-12-22 18:39:27.192 STDIO [INFO] sumWord:{b=2, c=1}
2016-12-22 18:39:27.199 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{c=1, a=3}]
2016-12-22 18:39:27.203 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=2, c=1}]
2016-12-22 18:39:27.673 STDIO [INFO] [a];partitionId=4;partitions=5,batchId:257312:0
2016-12-22 18:39:27.675 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=2, c=1}]
2016-12-22 18:39:27.674 STDIO [INFO] sumWord:{c=1, a=4}
2016-12-22 18:39:27.677 STDIO [INFO] [b];partitionId=2;partitions=5,batchId:257312:0
2016-12-22 18:39:27.678 STDIO [INFO] sumWord:{b=1, a=1}
2016-12-22 18:39:27.680 STDIO [INFO] [a];partitionId=4;partitions=5,batchId:257312:0
2016-12-22 18:39:27.680 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=1, a=1}]
2016-12-22 18:39:27.681 STDIO [INFO] sumWord:{c=1, a=5}
2016-12-22 18:39:27.683 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{c=1, a=5}]
2016-12-22 18:39:28.227 STDIO [INFO] [c];partitionId=4;partitions=5,batchId:257313:0
2016-12-22 18:39:28.232 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=1, a=1}]
2016-12-22 18:39:28.236 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=2, c=1}]
2016-12-22 18:39:28.253 STDIO [INFO] sumWord:{c=2, a=5}
2016-12-22 18:39:28.256 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{c=2, a=5}]
2016-12-22 18:39:28.741 STDIO [INFO] [a];partitionId=4;partitions=5,batchId:257314:0
2016-12-22 18:39:28.744 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=1, a=1}]
2016-12-22 18:39:28.743 STDIO [INFO] sumWord:{c=2, a=6}
2016-12-22 18:39:28.748 STDIO [INFO] [a];partitionId=0;partitions=5,batchId:257314:0
2016-12-22 18:39:28.749 STDIO [INFO] sumWord:{b=2, c=1, a=1}
2016-12-22 18:39:28.756 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=2, c=1, a=1}]
2016-12-22 18:39:28.755 STDIO [INFO] [b];partitionId=4;partitions=5,batchId:257314:0
2016-12-22 18:39:28.763 STDIO [INFO] sumWord:{b=1, c=2, a=6}
2016-12-22 18:39:28.769 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=1, c=2, a=6}]
2016-12-22 18:39:29.218 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=1, c=2, a=6}]
2016-12-22 18:39:29.219 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=1, a=1}]
2016-12-22 18:39:29.221 STDIO [INFO] [c];partitionId=0;partitions=5,batchId:257315:0
2016-12-22 18:39:29.228 STDIO [INFO] sumWord:{b=2, c=2, a=1}
2016-12-22 18:39:29.229 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=2, c=2, a=1}]
2016-12-22 18:39:29.689 STDIO [INFO] [b];partitionId=2;partitions=5,batchId:257316:0
2016-12-22 18:39:29.693 STDIO [INFO] sumWord:{b=2, a=1}
2016-12-22 18:39:29.694 STDIO [INFO] [a];partitionId=0;partitions=5,batchId:257316:0
2016-12-22 18:39:29.697 STDIO [INFO] sumWord:{b=2, c=2, a=2}
2016-12-22 18:39:29.704 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=1, c=2, a=6}]
2016-12-22 18:39:29.723 STDIO [INFO] [a];partitionId=2;partitions=5,batchId:257316:0
2016-12-22 18:39:29.706 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=2, c=2, a=2}]
2016-12-22 18:39:29.740 STDIO [INFO] sumWord:{b=2, a=2}
2016-12-22 18:39:29.746 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=2, a=2}]
2016-12-22 18:39:30.197 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=1, c=2, a=6}]
2016-12-22 18:39:30.199 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=2, a=2}]