欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

storm trident实战 trident state

程序员文章站 2022-07-02 08:07:55
...

一、认识storm trident
    trident可以理解为storm批处理的高级抽象,提供了分组、分区、聚合、函数等操作,提供一致性和恰好一次处理的语义。
    1)元祖被作为batch处理
    2)每个batch的元祖都被指定唯一的一个事物id,如果因为处理失败导致batch重发,也和保证和重发前一样的事物id
    3)数据更新操作严格有序,比如batch1必须在batch2之前被成功处理,且如果batch1失败了,后面的处理也会失败。
    假如: batch1处理1--20
          batch2处理21--40
          batch1处理失败,那么batch2也会失败
    虽然数据更新操作严格有序,但是数据处理阶段也可以并行的,只是最后的持久化操作必须有序。
  1.1 trident state
     trident的状态具有仅仅处理一次,持续聚合的语义,使用trident来实现恰好一次的语义不需要开发人员去处理事务相关的工作,因为trident state已经帮我们封装好了,只需要编写类似于如下的代码:

  topology.newStream("sentencestream", spout)
                .each(new Fields("sentence"), new Split(), new Fields("word"))
                .groupBy(new Fields("word"))
                .persistentAggregate(new MyHbaseState.HbaseFactory(options), new Count(), new Fields("count"))
                .parallelismHint(3);

   所有处理事务逻辑都在MyHbaseState.HbaseFactory中处理了(这个是我自己定义的,trident支持在内存里面处理,类似于MemachedState.opaque)。
   trident提供了一个StateFactory用来创建State对象的实例,行如:

   public final class XFactory implements StateFactory{
       public State makeState(Map conf,int partitonIndex,int numPartitions){
            return new State();
       }
  }

  1.2 persistentAggregate
     persistentAggregate是trident中用来更新来源的状态,如果前面是一个分好组的流,trident希望你提供的状态实现MapState接口,其中key是分组的字段,
  而聚合结果是状态的值。
  1.3 实现MapStates
     trident中实现MapState非常简单,只需要为这个类提供一个IBackingMap的接口实现接口。
二、实战

    首先搭建好zk,storm,hadoop,hbase的分布式环境

    master:
storm trident实战 trident state
            
    
    博客分类: storm stormtridentstatehbase 
  slave1:
storm trident实战 trident state
            
    
    博客分类: storm stormtridentstatehbase 

  slave2:
storm trident实战 trident state
            
    
    博客分类: storm stormtridentstatehbase 
    main方法:

     public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
        TridentTopology topology = new TridentTopology();
        FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
                new Values("tanjie is a good man"), new Values(
                        "what is your name"), new Values("how old are you"),
                new Values("my name is tanjie"), new Values("i am 18"));
        spout.setCycle(false);
        tridentStreamToHbase(topology,spout);
        Config config = new Config();
        config.setDebug(false);
        StormSubmitter.submitTopologyWithProgressBar("word_count_trident_state_HbaseState", config, topology.build());
    }

   tridentStreamToHbase方法:

  private static TridentState tridentStreamToHbase(TridentTopology topology,
            FixedBatchSpout spout) {
        MyHbaseState.Options options = new MyHbaseState.Options();
        options.setTableName("storm_trident_state");
        options.setColumFamily("colum1");
        options.setQualifier("q1");
        /**
         * 根据数据源拆分单词后,然后分区操作,在每个分区上又进行分组(hash算法),然后在每个分组上进行聚合
         * 所以这里可能有多个分区,每个分区有多个分组,然后在多个分组上进行聚合
         * 用来进行group的字段会以key的形式存在于State当中,聚合后的结果会以value的形式存储在State当中
         */
        return topology.newStream("sentencestream", spout)
                .each(new Fields("sentence"), new Split(), new Fields("word"))
                .groupBy(new Fields("word"))
                .persistentAggregate(new MyHbaseState.HbaseFactory(options), new Count(), new Fields("count"))
                .parallelismHint(3);
    }
    

    MyHbaseState实现:

package com.storm.trident.state.hbase;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.storm.task.IMetricsContext;
import org.apache.storm.trident.state.JSONNonTransactionalSerializer;
import org.apache.storm.trident.state.JSONOpaqueSerializer;
import org.apache.storm.trident.state.JSONTransactionalSerializer;
import org.apache.storm.trident.state.Serializer;
import org.apache.storm.trident.state.State;
import org.apache.storm.trident.state.StateFactory;
import org.apache.storm.trident.state.StateType;
import org.apache.storm.trident.state.map.IBackingMap;
import org.apache.storm.trident.state.map.MapState;
import org.apache.storm.trident.state.map.OpaqueMap;
import org.apache.storm.trident.state.map.SnapshottableMap;
import org.apache.storm.tuple.Values;

import com.google.common.collect.Maps;

@SuppressWarnings({ "unchecked", "rawtypes" })
public class MyHbaseState<T> implements IBackingMap<T> {

    private static final Map<StateType, Serializer> DEFAULT_SERIALZERS = Maps
            .newHashMap();

    private int partitionNum;

    private Options<T> options;

    private Serializer<T> serializer;

    private Connection connection;

    private Table table;

    static {
        DEFAULT_SERIALZERS.put(StateType.NON_TRANSACTIONAL,
                new JSONNonTransactionalSerializer());
        DEFAULT_SERIALZERS.put(StateType.TRANSACTIONAL,
                new JSONTransactionalSerializer());
        DEFAULT_SERIALZERS.put(StateType.OPAQUE, new JSONOpaqueSerializer());
    }

    public MyHbaseState(final Options<T> options, Map conf, int partitionNum) {
        this.options = options;
        this.serializer = options.serializer;
        this.partitionNum = partitionNum;
        try {
            connection = ConnectionFactory.createConnection(HBaseConfiguration
                    .create());
            table = connection.getTable(TableName.valueOf(options.tableName));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static class Options<T> implements Serializable {

        /**
         *
         */
        private static final long serialVersionUID = 1L;

        public Serializer<T> serializer = null;

        public String globalkey = "$HBASE_STATE_GLOBAL$";

        /**
         * 表名
         */
        public String tableName;

        /**
         * 列族
         */
        public String columFamily;

        /**
         *
         */
        public String qualifier;

        public String getTableName() {
            return tableName;
        }

        public void setTableName(String tableName) {
            this.tableName = tableName;
        }

        public String getColumFamily() {
            return columFamily;
        }

        public void setColumFamily(String columFamily) {
            this.columFamily = columFamily;
        }

        public String getQualifier() {
            return qualifier;
        }

        public void setQualifier(String qualifier) {
            this.qualifier = qualifier;
        }

    }

    protected static class HbaseFactory<T> implements StateFactory {

        private static final long serialVersionUID = 1L;
        private Options<T> options;

        public HbaseFactory(Options<T> options) {
            this.options = options;
            if (this.options.serializer == null) {
                this.options.serializer = DEFAULT_SERIALZERS
                        .get(StateType.OPAQUE);
            }
        }

        @Override
        public State makeState(Map conf, IMetricsContext metrics,
                int partitionIndex, int numPartitions) {
            System.out.println("partitionIndex:" + partitionIndex
                    + ",numPartitions:" + numPartitions);
            IBackingMap state = new MyHbaseState(options, conf, partitionIndex);
            MapState mapState = OpaqueMap.build(state);
            return new SnapshottableMap(mapState, new Values(options.globalkey));
        }

    }

    @Override
    public void multiPut(List<List<Object>> keys, List<T> values) {
        List<Put> puts = new ArrayList<Put>(keys.size());
        for (int i = 0; i < keys.size(); i++) {
            Put put = new Put(toRowKey(keys.get(i)));
            T val = values.get(i);
            System.out.println("partitionIndex: " + this.partitionNum
                    + ",key.get(i):" + keys.get(i) + "value值:" + val);
            put.addColumn(this.options.columFamily.getBytes(),
                    this.options.qualifier.getBytes(),
                    this.options.serializer.serialize(val));
            puts.add(put);
        }
        try {
            this.table.put(puts);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public List<T> multiGet(List<List<Object>> keys) {
        List<Get> gets = new ArrayList<Get>();
        for (final List<Object> key : keys) {
            // LOG.info("Partition: {}, GET: {}", this.partitionNum, key);
            Get get = new Get(toRowKey(key));
            get.addColumn(this.options.columFamily.getBytes(),
                    this.options.qualifier.getBytes());
            gets.add(get);
        }
        List<T> retval = new ArrayList<T>();
        try {
            // 批量获取所有rowKey的数据
            Result[] results = this.table.get(gets);
            for (final Result result : results) {
                byte[] value = result.getValue(
                        this.options.columFamily.getBytes(),
                        this.options.qualifier.getBytes());
                if (value != null) {
                    retval.add(this.serializer.deserialize(value));
                } else {
                    retval.add(null);
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return retval;
    }

    private byte[] toRowKey(List<Object> keys) {
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        try {
            for (Object key : keys) {
                bos.write(String.valueOf(key).getBytes());
            }
            bos.close();
        } catch (IOException e) {
            throw new RuntimeException("IOException creating HBase row key.", e);
        }
        return bos.toByteArray();
    }

}

   运行结果:

   查看supervisor日志:

2016-12-23 11:34:25.576 STDIO [INFO] partitionIndex: 0,key.get(i):[good]value值:org.apache.storm.trident.state.OpaqueValue@6498fd6a[currTxid=1,prev=<null>,curr=1]
2016-12-23 11:34:25.582 STDIO [INFO] partitionIndex: 1,key.get(i):[name]value值:org.apache.storm.trident.state.OpaqueValue@81e227f[currTxid=1,prev=<null>,curr=1]
2016-12-23 11:34:25.582 STDIO [INFO] partitionIndex: 1,key.get(i):[are]value值:org.apache.storm.trident.state.OpaqueValue@726ac402[currTxid=1,prev=<null>,curr=1]
2016-12-23 11:34:25.585 STDIO [INFO] partitionIndex: 2,key.get(i):[what]value值:org.apache.storm.trident.state.OpaqueValue@2667735e[currTxid=1,prev=<null>,curr=1]
2016-12-23 11:34:25.585 STDIO [INFO] partitionIndex: 2,key.get(i):[your]value值:org.apache.storm.trident.state.OpaqueValue@51c73404[currTxid=1,prev=<null>,curr=1]
2016-12-23 11:34:25.585 STDIO [INFO] partitionIndex: 2,key.get(i):[tanjie]value值:org.apache.storm.trident.state.OpaqueValue@6d281c8d[currTxid=1,prev=<null>,curr=1]
2016-12-23 11:34:25.585 STDIO [INFO] partitionIndex: 2,key.get(i):[old]value值:org.apache.storm.trident.state.OpaqueValue@646aa4f7[currTxid=1,prev=<null>,curr=1]
2016-12-23 11:34:25.586 STDIO [INFO] partitionIndex: 2,key.get(i):[is]value值:org.apache.storm.trident.state.OpaqueValue@157487a2[currTxid=1,prev=<null>,curr=2]
2016-12-23 11:34:25.586 STDIO [INFO] partitionIndex: 2,key.get(i):[a]value值:org.apache.storm.trident.state.OpaqueValue@1574a7af[currTxid=1,prev=<null>,curr=1]
2016-12-23 11:34:25.586 STDIO [INFO] partitionIndex: 2,key.get(i):[how]value值:org.apache.storm.trident.state.OpaqueValue@1dacdd2a[currTxid=1,prev=<null>,curr=1]
2016-12-23 11:34:25.587 STDIO [INFO] partitionIndex: 2,key.get(i):[you]value值:org.apache.storm.trident.state.OpaqueValue@3febff9e[currTxid=1,prev=<null>,curr=1]
2016-12-23 11:34:25.587 STDIO [INFO] partitionIndex: 2,key.get(i):[man]value值:org.apache.storm.trident.state.OpaqueValue@1edafedb[currTxid=1,prev=<null>,curr=1]
2016-12-23 11:34:25.812 STDIO [INFO] partitionIndex: 2,key.get(i):[tanjie]value值:org.apache.storm.trident.state.OpaqueValue@38a106df[currTxid=2,prev=1,curr=2]
2016-12-23 11:34:25.812 STDIO [INFO] partitionIndex: 2,key.get(i):[is]value值:org.apache.storm.trident.state.OpaqueValue@53ca3784[currTxid=2,prev=2,curr=3]
2016-12-23 11:34:25.815 STDIO [INFO] partitionIndex: 0,key.get(i):[am]value值:org.apache.storm.trident.state.OpaqueValue@5261a4c8[currTxid=2,prev=<null>,curr=1]
2016-12-23 11:34:25.815 STDIO [INFO] partitionIndex: 0,key.get(i):[my]value值:org.apache.storm.trident.state.OpaqueValue@88970b9[currTxid=2,prev=<null>,curr=1]
2016-12-23 11:34:25.826 STDIO [INFO] partitionIndex: 1,key.get(i):[i]value值:org.apache.storm.trident.state.OpaqueValue@78b27ff6[currTxid=2,prev=<null>,curr=1]
2016-12-23 11:34:25.827 STDIO [INFO] partitionIndex: 1,key.get(i):[name]value值:org.apache.storm.trident.state.OpaqueValue@eef2d62[currTxid=2,prev=1,curr=2]
2016-12-23 11:34:25.828 STDIO [INFO] partitionIndex: 1,key.get(i):[18]value值:org.apache.storm.trident.state.OpaqueValue@788c8496[currTxid=2,prev=<null>,curr=1]

   查看hbase表
storm trident实战 trident state
            
    
    博客分类: storm stormtridentstatehbase 

  • storm trident实战 trident state
            
    
    博客分类: storm stormtridentstatehbase 
  • 大小: 60.4 KB
  • storm trident实战 trident state
            
    
    博客分类: storm stormtridentstatehbase 
  • 大小: 14.4 KB
  • storm trident实战 trident state
            
    
    博客分类: storm stormtridentstatehbase 
  • 大小: 14.6 KB
  • storm trident实战 trident state
            
    
    博客分类: storm stormtridentstatehbase 
  • 大小: 12.2 KB