欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Storm - Trident

程序员文章站 2022-07-02 12:07:57
...
Trident


一、Storm 保证性

1.数据一定会发送
通过 ack / fail 方法确认,若失败,则提供重新发送的机制

2.数据一定只会统计一次
数据发送后有一个唯一性的标识,通过判断此标识,若存在,则不处理

3.数据一定会按照顺序进行处理
数据发送后有一个唯一性的标识,按照标识编号的顺序进行处理

二、Storm 保证性实现

1.逐个发送,逐个处理

如果这样处理,则原有的并行处理会变成穿行处理,不可取

2.批量发送,批量处理

如果这样处理,则如果当前这批数据处理完毕但未发送,则无法处理下一批数据,且这一批数据之间的处理顺序是并发的在进行的

3.分成两个步骤
一个处理数据,一个发送数据;
数据处理完毕,则继续处理下一批数据;数据是否发送到下一个缓解,由发送数据的步骤决定
采用此方式

三、Trident

1.Spout

package com.study.storm.trident.wordcount;

import java.util.Map;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

/**
 * @description
 * 数据来源
 * 模拟批量数据发送
 * <br/>
 * @remark
 * Storm 的保证及实现
 * 1.数据一定被发送
 * 通过 ack() 、 fail() 的确认机制,若发送失败,则重新发送
 * 2.数据只被处理一次
 * 数据发送时带有唯一的编号,判断此编号是否被处理过,若是,则忽略,不处理
 * 3.数据被按照一定的顺序处理
 * 数据发送时带有唯一的编号,按照编号的顺序进行处理,若数据不是按照顺序到达,则等待
 * 
 * <br/>
 * 
 * Trident 处理批量数据
 * 
 */
public class SentenceSpout extends BaseRichSpout {

	/**
	 * 
	 */
	private static final long serialVersionUID = 2122598284858356171L;

	private SpoutOutputCollector collector = null ;
	
	/**
	 * 模拟批量数据发送
	 * key : name 
	 * value : sentence 
	 */
	private Values [] valuesArray = new Values[] {
			new Values("a","111111111111"),
			new Values("b","222222222222"),
			new Values("c","333333333333"),
			new Values("d","444444444444"),
			new Values("e","555555555555"),
			new Values("f","666666666666"),
			new Values("g","777777777777"),
			new Values("h","888888888888")
	};
	
	@SuppressWarnings("rawtypes")
	@Override
	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
		this.collector = collector ;
	}

	// 发送的顺序,即数据组合的下标,标识数据发送到哪个位置
	private int index = 0 ;
	
	@Override
	public void nextTuple() {

		if(index >= valuesArray.length){
			return ;
		}
		index = index == valuesArray.length ? 0 : index++ ;
		this.collector.emit(valuesArray[index]);
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("name","sentence"));
	}

}





简化实现
package com.study.storm.trident.wordcount;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import storm.trident.Stream;
import storm.trident.TridentTopology;
import storm.trident.testing.FixedBatchSpout;

public class TridentTopologyDemo {

	public static void main(String[] args) {

		// 相当于原有的 Spout 实现
		@SuppressWarnings("unchecked")
		FixedBatchSpout tridentSpout = new FixedBatchSpout(new Fields("name","sentence"),
				1,
				new Values("a","111111111111"),
				new Values("b","222222222222"),
				new Values("c","333333333333"),
				new Values("d","444444444444"),
				new Values("e","555555555555"),
				new Values("f","666666666666"),
				new Values("g","777777777777"),
				new Values("h","888888888888"));
		// 是否循环发送,false 不
		tridentSpout.setCycle(false);
		
		TridentTopology topology = new TridentTopology();
		/**
		 *  1.本地过滤器设置
		 */
		// 设置数据源
		Stream initStream = topology.newStream("tridentSpout", tridentSpout);
		// 设置过滤器  -- 过滤name : d 的数据  
		initStream = initStream.each(new Fields("name"),new RemovePartDataFilter());
		// 添加函数,输出字母对应的位置
		initStream = initStream.each(new Fields("name"),new NameIndexFunction() ,new Fields("indexNum"));

		// 设置过滤器  -- 拦截数据并打印
		Stream filterPrintStream = initStream.each(new Fields("name","sentence"), new PrintFilter());
		
		
		
		
		
		
		
		
		//--提交Topology给集群运行
		Config conf = new Config();
		LocalCluster cluster = new LocalCluster();
		cluster.submitTopology("MyTopology", conf, topology.build());
		
		//--运行10秒钟后杀死Topology关闭集群
		Utils.sleep(1000 * 10);
		cluster.killTopology("MyTopology");
		cluster.shutdown();
	}

}




package com.study.storm.trident.wordcount;

import java.util.Iterator;

import backtype.storm.tuple.Fields;
import storm.trident.operation.BaseFilter;
import storm.trident.tuple.TridentTuple;

/**
 * @description 
 * 打印:key 与 value ,fields 与  fields 对应传输的内容
 */
public class PrintFilter extends BaseFilter {

	/**
	 * 
	 */
	private static final long serialVersionUID = 4393484291178519442L;

	@Override
	public boolean isKeep(TridentTuple tuple) {
		Fields fields = tuple.getFields();
		Iterator<String> iterator = fields.iterator();
		while(iterator.hasNext()){
			String key = iterator.next();
			Object valueByField = tuple.getValueByField(key);
			System.out.println("fields : "+ key + " values : "+valueByField);
		}
		
		return true;
	}

}


package com.study.storm.trident.wordcount;

import storm.trident.operation.BaseFilter;
import storm.trident.tuple.TridentTuple;

/**
 * 过滤name = d 的数据
 * return false 过滤
 * return true  继续传递
 */
public class RemovePartDataFilter extends BaseFilter {

	/**
	 * 
	 */
	private static final long serialVersionUID = 8639858690618579558L;

	@Override
	public boolean isKeep(TridentTuple tuple) {
		String stringByField = tuple.getStringByField("name");
		return !stringByField.equals("d");
	}

}


package com.study.storm.trident.wordcount;

import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Values;
import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class NameIndexFunction extends BaseFunction {

	/**
	 * 
	 */
	private static final long serialVersionUID = 9085021905838331812L;

	static Map<String,Integer> indexMap = new HashMap<String,Integer>();
	static {
		indexMap.put("a", 1);
		indexMap.put("b", 2);
		indexMap.put("c", 3);
		indexMap.put("d", 4);
		indexMap.put("e", 5);
		indexMap.put("f", 6);
		indexMap.put("g", 7);
		indexMap.put("h", 8);
		indexMap.put("i", 9);
	}
	
	@Override
	public void execute(TridentTuple tuple, TridentCollector collector) {
		String name = tuple.getStringByField("name");
		collector.emit(new Values(indexMap.get(name)));
	}

}


相关标签: storm trident