storm trident实战 trident state
一、认识storm trident
trident可以理解为storm批处理的高级抽象,提供了分组、分区、聚合、函数等操作,提供一致性和恰好一次处理的语义。
1)元祖被作为batch处理
2)每个batch的元祖都被指定唯一的一个事物id,如果因为处理失败导致batch重发,也和保证和重发前一样的事物id
3)数据更新操作严格有序,比如batch1必须在batch2之前被成功处理,且如果batch1失败了,后面的处理也会失败。
假如: batch1处理1--20
batch2处理21--40
batch1处理失败,那么batch2也会失败
虽然数据更新操作严格有序,但是数据处理阶段也可以并行的,只是最后的持久化操作必须有序。
1.1 trident state
trident的状态具有仅仅处理一次,持续聚合的语义,使用trident来实现恰好一次的语义不需要开发人员去处理事务相关的工作,因为trident state已经帮我们封装好了,只需要编写类似于如下的代码:
topology.newStream("sentencestream", spout) .each(new Fields("sentence"), new Split(), new Fields("word")) .groupBy(new Fields("word")) .persistentAggregate(new MyHbaseState.HbaseFactory(options), new Count(), new Fields("count")) .parallelismHint(3);
所有处理事务逻辑都在MyHbaseState.HbaseFactory中处理了(这个是我自己定义的,trident支持在内存里面处理,类似于MemachedState.opaque)。
trident提供了一个StateFactory用来创建State对象的实例,行如:
public final class XFactory implements StateFactory{ public State makeState(Map conf,int partitonIndex,int numPartitions){ return new State(); } }
1.2 persistentAggregate
persistentAggregate是trident中用来更新来源的状态,如果前面是一个分好组的流,trident希望你提供的状态实现MapState接口,其中key是分组的字段,
而聚合结果是状态的值。
1.3 实现MapStates
trident中实现MapState非常简单,只需要为这个类提供一个IBackingMap的接口实现接口。
二、实战
首先搭建好zk,storm,hadoop,hbase的分布式环境
master:
slave1:
slave2:
main方法:
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException { TridentTopology topology = new TridentTopology(); FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("tanjie is a good man"), new Values( "what is your name"), new Values("how old are you"), new Values("my name is tanjie"), new Values("i am 18")); spout.setCycle(false); tridentStreamToHbase(topology,spout); Config config = new Config(); config.setDebug(false); StormSubmitter.submitTopologyWithProgressBar("word_count_trident_state_HbaseState", config, topology.build()); }
tridentStreamToHbase方法:
private static TridentState tridentStreamToHbase(TridentTopology topology, FixedBatchSpout spout) { MyHbaseState.Options options = new MyHbaseState.Options(); options.setTableName("storm_trident_state"); options.setColumFamily("colum1"); options.setQualifier("q1"); /** * 根据数据源拆分单词后,然后分区操作,在每个分区上又进行分组(hash算法),然后在每个分组上进行聚合 * 所以这里可能有多个分区,每个分区有多个分组,然后在多个分组上进行聚合 * 用来进行group的字段会以key的形式存在于State当中,聚合后的结果会以value的形式存储在State当中 */ return topology.newStream("sentencestream", spout) .each(new Fields("sentence"), new Split(), new Fields("word")) .groupBy(new Fields("word")) .persistentAggregate(new MyHbaseState.HbaseFactory(options), new Count(), new Fields("count")) .parallelismHint(3); }
MyHbaseState实现:
package com.storm.trident.state.hbase; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.Map; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Table; import org.apache.storm.task.IMetricsContext; import org.apache.storm.trident.state.JSONNonTransactionalSerializer; import org.apache.storm.trident.state.JSONOpaqueSerializer; import org.apache.storm.trident.state.JSONTransactionalSerializer; import org.apache.storm.trident.state.Serializer; import org.apache.storm.trident.state.State; import org.apache.storm.trident.state.StateFactory; import org.apache.storm.trident.state.StateType; import org.apache.storm.trident.state.map.IBackingMap; import org.apache.storm.trident.state.map.MapState; import org.apache.storm.trident.state.map.OpaqueMap; import org.apache.storm.trident.state.map.SnapshottableMap; import org.apache.storm.tuple.Values; import com.google.common.collect.Maps; @SuppressWarnings({ "unchecked", "rawtypes" }) public class MyHbaseState<T> implements IBackingMap<T> { private static final Map<StateType, Serializer> DEFAULT_SERIALZERS = Maps .newHashMap(); private int partitionNum; private Options<T> options; private Serializer<T> serializer; private Connection connection; private Table table; static { DEFAULT_SERIALZERS.put(StateType.NON_TRANSACTIONAL, new JSONNonTransactionalSerializer()); DEFAULT_SERIALZERS.put(StateType.TRANSACTIONAL, new JSONTransactionalSerializer()); DEFAULT_SERIALZERS.put(StateType.OPAQUE, new JSONOpaqueSerializer()); } public MyHbaseState(final Options<T> options, Map conf, int partitionNum) { this.options = options; this.serializer = options.serializer; this.partitionNum = partitionNum; try { connection = ConnectionFactory.createConnection(HBaseConfiguration .create()); table = connection.getTable(TableName.valueOf(options.tableName)); } catch (IOException e) { e.printStackTrace(); } } public static class Options<T> implements Serializable { /** * */ private static final long serialVersionUID = 1L; public Serializer<T> serializer = null; public String globalkey = "$HBASE_STATE_GLOBAL$"; /** * 表名 */ public String tableName; /** * 列族 */ public String columFamily; /** * */ public String qualifier; public String getTableName() { return tableName; } public void setTableName(String tableName) { this.tableName = tableName; } public String getColumFamily() { return columFamily; } public void setColumFamily(String columFamily) { this.columFamily = columFamily; } public String getQualifier() { return qualifier; } public void setQualifier(String qualifier) { this.qualifier = qualifier; } } protected static class HbaseFactory<T> implements StateFactory { private static final long serialVersionUID = 1L; private Options<T> options; public HbaseFactory(Options<T> options) { this.options = options; if (this.options.serializer == null) { this.options.serializer = DEFAULT_SERIALZERS .get(StateType.OPAQUE); } } @Override public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { System.out.println("partitionIndex:" + partitionIndex + ",numPartitions:" + numPartitions); IBackingMap state = new MyHbaseState(options, conf, partitionIndex); MapState mapState = OpaqueMap.build(state); return new SnapshottableMap(mapState, new Values(options.globalkey)); } } @Override public void multiPut(List<List<Object>> keys, List<T> values) { List<Put> puts = new ArrayList<Put>(keys.size()); for (int i = 0; i < keys.size(); i++) { Put put = new Put(toRowKey(keys.get(i))); T val = values.get(i); System.out.println("partitionIndex: " + this.partitionNum + ",key.get(i):" + keys.get(i) + "value值:" + val); put.addColumn(this.options.columFamily.getBytes(), this.options.qualifier.getBytes(), this.options.serializer.serialize(val)); puts.add(put); } try { this.table.put(puts); } catch (IOException e) { e.printStackTrace(); } } @Override public List<T> multiGet(List<List<Object>> keys) { List<Get> gets = new ArrayList<Get>(); for (final List<Object> key : keys) { // LOG.info("Partition: {}, GET: {}", this.partitionNum, key); Get get = new Get(toRowKey(key)); get.addColumn(this.options.columFamily.getBytes(), this.options.qualifier.getBytes()); gets.add(get); } List<T> retval = new ArrayList<T>(); try { // 批量获取所有rowKey的数据 Result[] results = this.table.get(gets); for (final Result result : results) { byte[] value = result.getValue( this.options.columFamily.getBytes(), this.options.qualifier.getBytes()); if (value != null) { retval.add(this.serializer.deserialize(value)); } else { retval.add(null); } } } catch (IOException e) { e.printStackTrace(); } return retval; } private byte[] toRowKey(List<Object> keys) { ByteArrayOutputStream bos = new ByteArrayOutputStream(); try { for (Object key : keys) { bos.write(String.valueOf(key).getBytes()); } bos.close(); } catch (IOException e) { throw new RuntimeException("IOException creating HBase row key.", e); } return bos.toByteArray(); } }
运行结果:
查看supervisor日志:
2016-12-23 11:34:25.576 STDIO [INFO] partitionIndex: 0,key.get(i):[good]value值:org.apache.storm.trident.state.OpaqueValue@6498fd6a[currTxid=1,prev=<null>,curr=1] 2016-12-23 11:34:25.582 STDIO [INFO] partitionIndex: 1,key.get(i):[name]value值:org.apache.storm.trident.state.OpaqueValue@81e227f[currTxid=1,prev=<null>,curr=1] 2016-12-23 11:34:25.582 STDIO [INFO] partitionIndex: 1,key.get(i):[are]value值:org.apache.storm.trident.state.OpaqueValue@726ac402[currTxid=1,prev=<null>,curr=1] 2016-12-23 11:34:25.585 STDIO [INFO] partitionIndex: 2,key.get(i):[what]value值:org.apache.storm.trident.state.OpaqueValue@2667735e[currTxid=1,prev=<null>,curr=1] 2016-12-23 11:34:25.585 STDIO [INFO] partitionIndex: 2,key.get(i):[your]value值:org.apache.storm.trident.state.OpaqueValue@51c73404[currTxid=1,prev=<null>,curr=1] 2016-12-23 11:34:25.585 STDIO [INFO] partitionIndex: 2,key.get(i):[tanjie]value值:org.apache.storm.trident.state.OpaqueValue@6d281c8d[currTxid=1,prev=<null>,curr=1] 2016-12-23 11:34:25.585 STDIO [INFO] partitionIndex: 2,key.get(i):[old]value值:org.apache.storm.trident.state.OpaqueValue@646aa4f7[currTxid=1,prev=<null>,curr=1] 2016-12-23 11:34:25.586 STDIO [INFO] partitionIndex: 2,key.get(i):[is]value值:org.apache.storm.trident.state.OpaqueValue@157487a2[currTxid=1,prev=<null>,curr=2] 2016-12-23 11:34:25.586 STDIO [INFO] partitionIndex: 2,key.get(i):[a]value值:org.apache.storm.trident.state.OpaqueValue@1574a7af[currTxid=1,prev=<null>,curr=1] 2016-12-23 11:34:25.586 STDIO [INFO] partitionIndex: 2,key.get(i):[how]value值:org.apache.storm.trident.state.OpaqueValue@1dacdd2a[currTxid=1,prev=<null>,curr=1] 2016-12-23 11:34:25.587 STDIO [INFO] partitionIndex: 2,key.get(i):[you]value值:org.apache.storm.trident.state.OpaqueValue@3febff9e[currTxid=1,prev=<null>,curr=1] 2016-12-23 11:34:25.587 STDIO [INFO] partitionIndex: 2,key.get(i):[man]value值:org.apache.storm.trident.state.OpaqueValue@1edafedb[currTxid=1,prev=<null>,curr=1] 2016-12-23 11:34:25.812 STDIO [INFO] partitionIndex: 2,key.get(i):[tanjie]value值:org.apache.storm.trident.state.OpaqueValue@38a106df[currTxid=2,prev=1,curr=2] 2016-12-23 11:34:25.812 STDIO [INFO] partitionIndex: 2,key.get(i):[is]value值:org.apache.storm.trident.state.OpaqueValue@53ca3784[currTxid=2,prev=2,curr=3] 2016-12-23 11:34:25.815 STDIO [INFO] partitionIndex: 0,key.get(i):[am]value值:org.apache.storm.trident.state.OpaqueValue@5261a4c8[currTxid=2,prev=<null>,curr=1] 2016-12-23 11:34:25.815 STDIO [INFO] partitionIndex: 0,key.get(i):[my]value值:org.apache.storm.trident.state.OpaqueValue@88970b9[currTxid=2,prev=<null>,curr=1] 2016-12-23 11:34:25.826 STDIO [INFO] partitionIndex: 1,key.get(i):[i]value值:org.apache.storm.trident.state.OpaqueValue@78b27ff6[currTxid=2,prev=<null>,curr=1] 2016-12-23 11:34:25.827 STDIO [INFO] partitionIndex: 1,key.get(i):[name]value值:org.apache.storm.trident.state.OpaqueValue@eef2d62[currTxid=2,prev=1,curr=2] 2016-12-23 11:34:25.828 STDIO [INFO] partitionIndex: 1,key.get(i):[18]value值:org.apache.storm.trident.state.OpaqueValue@788c8496[currTxid=2,prev=<null>,curr=1]
查看hbase表