欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

storm trident实战 分组聚合

程序员文章站 2022-07-02 09:55:19
...

一、前言

      groupBy分组操作,根据指定属性进行分组,如果后面是aggregate()的话,先根据partitionBy分区,在每个partition上分组,分完组后,在每个分组上进行聚合。

二、实战

   main:

public static void main(String[] args) throws Exception {

		/**
		 * 多设置几个并行度,分组后如果分组不够,那么将有并行度空闲跑者
		 */
		@SuppressWarnings("unchecked")
		FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
				new Values("a"), new Values("b"), new Values("a"),new Values("c"),
		new Values("c"),new Values("c"),new Values("d"));
		spout.setCycle(false);
		TridentTopology tridentTopology = new TridentTopology();
		tridentTopology
				.newStream("spout", spout)
				.parallelismHint(3)
				.shuffle()
				.groupBy(new Fields("sentence"))
				.aggregate(new Fields("sentence"), new MyAgg(),
						new Fields("Map"))
			    .parallelismHint(5)
				.each(new Fields("sentence","Map"), new MyBolt());
		Config config = new Config();
		config.setDebug(false);
		StormSubmitter.submitTopology("trident_groupby_aggregate_many", config,
				tridentTopology.build());
	}

   MyAgg:

package com.storm.trident.groupby.先分组后聚合;

import java.util.HashMap;
import java.util.Map;

import org.apache.storm.trident.operation.BaseAggregator;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.operation.TridentOperationContext;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Values;

public class MyAgg extends BaseAggregator<Map<String, Integer>> {
	

	private static final long serialVersionUID = 1L;
	
	
	/**
	 * 属于哪个分区
	 */
	private int partitionId;

	/**
	 * 分区数量
	 */
	private int numPartitions;
	
	@SuppressWarnings("rawtypes")
	@Override
	public void prepare(Map conf, TridentOperationContext context) {
		partitionId = context.getPartitionIndex();
		numPartitions = context.numPartitions();
	}

	@Override
	public void aggregate(Map<String, Integer> val, TridentTuple tuple,
			TridentCollector collector) {
		String word = tuple.getString(0);
		Integer value = val.get(word);
		if (value == null) {
			value = 0;
		}
		value++;
		// 把数据保存到一个map对象中
		val.put(word, value);
		val.put(word + "属于哪个分区", partitionId);
		System.out.println("I am partition [" + partitionId
				+ "] and I have kept a tweet by: " + numPartitions);
	}

	@Override
	public void complete(Map<String, Integer> val, TridentCollector collector) {
		collector.emit(new Values(val));
	}

	@Override
	public Map<String, Integer> init(Object arg0, TridentCollector arg1) {
		return new HashMap<String, Integer>();
	}

}

  MyBolt:

package com.storm.trident.groupby.先分组后聚合;

import java.util.Map;
import java.util.Map.Entry;

import org.apache.storm.trident.operation.BaseFilter;
import org.apache.storm.trident.tuple.TridentTuple;

public class MyBolt extends BaseFilter {
	
	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;

	@SuppressWarnings("unchecked")
	@Override
	public boolean isKeep(TridentTuple tuple) {
		System.out.println("打印出来的tuple:" + tuple);
		Map<String, Integer> value = ((Map<String,Integer>) tuple.getValue(1));
		for (Entry<String, Integer> entry : value.entrySet()) {
			System.out.println("key:"+ entry.getKey()+",value:" + entry.getValue());
		}
		return false;
	}

}

 三、测试

   打包在storm集群里跑
storm trident实战 分组聚合
            
    
    博客分类: storm stormtrident分组聚合 
  查看log日志,主要日志如下

2016-12-22 18:36:11.293 STDIO [INFO] I am partition [3] and I have kept a tweet by: 5
2016-12-22 18:36:11.302 STDIO [INFO] I am partition [4] and I have kept a tweet by: 5
2016-12-22 18:36:11.304 STDIO [INFO] 打印出来的tuple:[b, {b属于哪个分区=4, b=1}]
2016-12-22 18:36:11.306 STDIO [INFO] key:b属于哪个分区,value:4
2016-12-22 18:36:11.317 STDIO [INFO] I am partition [3] and I have kept a tweet by: 5
2016-12-22 18:36:11.321 STDIO [INFO] key:b,value:1
2016-12-22 18:36:11.335 STDIO [INFO] 打印出来的tuple:[a, {a属于哪个分区=3, a=2}]
2016-12-22 18:36:11.341 STDIO [INFO] key:a属于哪个分区,value:3
2016-12-22 18:36:11.344 STDIO [INFO] key:a,value:2
2016-12-22 18:36:11.423 STDIO [INFO] I am partition [0] and I have kept a tweet by: 5
2016-12-22 18:36:11.424 STDIO [INFO] I am partition [0] and I have kept a tweet by: 5
2016-12-22 18:36:11.425 STDIO [INFO] I am partition [0] and I have kept a tweet by: 5
2016-12-22 18:36:11.433 STDIO [INFO] 打印出来的tuple:[c, {c=3, c属于哪个分区=0}]
2016-12-22 18:36:11.433 STDIO [INFO] key:c,value:3
2016-12-22 18:36:11.439 STDIO [INFO] key:c属于哪个分区,value:0
2016-12-22 18:36:11.923 STDIO [INFO] I am partition [1] and I have kept a tweet by: 5
2016-12-22 18:36:11.933 STDIO [INFO] 打印出来的tuple:[d, {d=1, d属于哪个分区=1}]
2016-12-22 18:36:11.939 STDIO [INFO] key:d,value:1
2016-12-22 18:36:11.942 STDIO [INFO] key:d属于哪个分区,value:1         
  • storm trident实战 分组聚合
            
    
    博客分类: storm stormtrident分组聚合 
  • 大小: 8.9 KB