欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

storm笔记:Trident状态

程序员文章站 2022-05-30 23:38:43
...

storm笔记:Trident应用中说了下Trident的使用,这里说下Trident几种状态的变化及其对应API的使用。

本文内容来自Trident State,部分内容根据实际情况做出修改。

Trident中有对状态数据进行读取和写入操作的一流抽象工具。状态既可以保存在拓扑内部,比如保存在内容中并由HDFS存储,也可以通过外部存储(比如Memcached或Cassandra)存储在数据库中。而对于Trident的API而言,这两种机制没有任何区别。

Trident以容错的方式管理状态,以便在重试或失败时的状态更新是幂等的。在大数据处理中,数据处理的幂等性是非常重要的一个指标,这样能够保证每个消息即使处理了多次,结果也像是只处理了一次一样。

在进行状态更新时可能需要各种级别的容错能力,在这之前,我们来看一个例子说明实现“恰好一次”语义所需的技巧。比如,正在对流中的数据进行计数聚合操作,每次处理新的元组时,都会将运行的计数结果存储在数据库中。

如果发生故障时,元组将重新执行计数操作。这就会在执行状态更新时出现问题,因为这个时候不知道是不是已经更新过该元组状态。也许还没有处理该元组数据,这个时候就需要增加计数。也许已经处理该元组,并成功增加计数,但是在下一步的时候出现问题,这种情况下,就不应该增加计数。也有可能是处理元组正常,更新计数是异常,这个时候就需要更新计数。

所以说,如果只是在数据库中存储计数信息,就不知道元组是否已经处理过。因此,就需要更多的信息作为辅助。Trident提供了下面三个性质,来实现“恰好一次”的处理:

  1. 元组都是以小批次处理
  2. 每批元组都会给出一个唯一ID,称为事务ID(transaction id,txid)。如果批次重复处理,txid也会相同。
  3. 状态的更新操作是按照元组批次的顺序执行的。也就是说,在批次2状态更新成功之前,不会进行批次3的状态更新。

根据这些特性,就可以通过检查到该元组的批次是否已被处理,并根据检测结果采取适当的操作更新状态了。采取的具体操作取决于Spout的类型。Spout有三种类型:“非事务型(non-transactional)”,“事务型(transactional)”和“不透明事务型(opaque transactional)”。对应的容错能力也是三种:“非事务”,“事务”和“不透明事务”。下面来看看Spout的各个类型及对应的容错能力。

事务型Spout

Trident是按照批次发送元组进行处理的,每个批次的元组被赋予唯一的事务ID。Spout的特性根据他们所提供容错性保证机制来决定的,而且这种机制也会对每个批次发生作用。事务型Spout有如下特性:

  1. 每个批次的txid不变,对于一个特定的txid,重复执行时,它所包含的元组数据与第一次完全相同。
  2. 元组只会在一个批次出现,不会重复(某个元组只会出现在一个批次中,不会出现在多个批次中)。
  3. 每个元组都会出现一次(不会遗漏任何的元组数据)

这是最简单最容易理解的一种Spout类型,数据流被分割成固定的批次。storm中有与Kafka集成的事务型Spout的扩展,代码在这里

既然事务型Spout这么简单易懂,为什么不在Trident中完全使用事务型Spout呢?其实就在于它的容错能力。比如,TransactionalTridentKafkaSpout的工作方式是,同一个txid的批次中将包含kafka所有分区的元组。一旦某个批次发出后,出现异常,需要重新发出,就需要完全相同的元组集合才能满足事务型Spout要求的语义。但是这个时候,kafka某个节点异常(节点关闭或分区不可用),就无法获取完全相同的的一批元组,那整个拓扑就会应为第3条语义(批次按顺序执行)停止。

这就是要有“不透明事务型”Spout的原因了,它能够容忍数据源节点丢失,而且又能保证数据恰好被操作一次。

注:对kafka比较熟悉的应该会想到,如果某一个topic支持复制,那即使一个节点不可用,还会有其他复制节点顶上,那TransactionalTridentKafkaSpout也能够避免上面的问题。

下面继续看看如何设计一个支持恰好一次特性的“事务型”Spout语义(简单的说就是同一个txid对应的批次元组数据完全一致)的状态实现,这种状态称为“事务型状态”。

比如,现在有一个单词计数的拓扑,需要将单词计数存储在key/value数据库中。key是单词,value中包含单词数量。另外,为了确定同一批次元组是否已经被执行,需要将txid也一同存储在value中。这样,当需要更新单词数量的时候,先比较txid是否相同,如果相同,就跳过更新。如果不同,就更新计数。

考虑这个为什么它工作的例子。 假设您正在处理由以下批次元组组成的txid 3:

比如,要处理一个txid是3的一批元组:

["man"]
["man"]
["dog"]

目前数据库中存储的数据为:

man => [count=3, txid=1]
dog => [count=4, txid=3]
apple => [count=10, txid=2]

在这个时候,发现“man”对应的txid是1,当前的txid是3,就可以更新了。然后“dog”对应的txid是3,说明同一批次的元组数据已经发送过了,就不需要更新。从这点可以看出,txid是3的批次元组是重复发送的,在更新“dog”数量后,在更新“man”数量前,出现了错误。最后的结果就是:

man => [count=5, txid=3]
dog => [count=4, txid=3]
apple => [count=10, txid=2]

不透明事务型Spout

前面已经提过,不透明事务型Spout不能保证相同txid对应的批次中的元组数据完全一致。其特点如下:

  1. 每个元组都会在有且仅有一个批次中处理成功。

[OpaqueTridentKafkaSpout](http://github.com/apache/storm/tree/v1.1.0/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/OpaqueTridentKafkaSpout.java)具有这种特性,同时对kafka节点异常有很好的容错性。OpaqueTridentKafkaSpout在发送一个批次元组的时候,会从上次成功之后的位置开始发送,这样就能够保证元组不会漏发或重发。

基于上面的特点,不透明事务型Spout就不同通过txid来直接判断是否可以跳过状态更新,因为具有相同txid的批次中元组可能发生了变化。

这就需要存储更多的状态信息了,而不仅仅是一个结果和一个txid了,还需要存储前一个结果值。

比如,当前批次的计数是2,需要进行一次状态更新,数据库中的数据如下:

{
  "value": 4,
  "prevValue": 1,
  "txid": 2
}

如果当前的txid是3,与数据库中的不同。在这种情况下,需要将prevValue的值该为value的值,value的值增加2,更新txid为3,最后的结果就是:

{
  "value": 6,
  "prevValue": 4,
  "txid": 3
}

如果当前的txid是2,等于数据库中的txid。因为txid相同,说明上一次txid为2的批次处理失败,但是本次的元组可能与上一次不同了。这个时候,就需要使用本次数据覆盖上次处理结果。也就是说,prevValue值不变,value的值改为prevValue加2,txid不变,最后的结果如下:

{
  "value": 3,
  "prevValue": 1,
  "txid": 2
}

这种方式的可行性依赖于Trident的强顺序性。也就是说,一旦开始处理一个新的批次,就不会重复执行上一个批次。不透明事务型Spout保证了不同批次之间没有重复的情况,也就是每个元组只会在一个批次中处理成功,所以就可以放心的使用前一个值与当前值覆盖已存数据了。

非事务型Spout

非事务型Spout不能为批次提供任何保证。所以可能出现”至多一次”的处理,即在某个批次处理过程中失败了,但是不会在重新处理;也可能提供“至少一次”的处理,即可能会有多个批次分别处理某个元组。也就是没有办法实现“恰好一次”的语义。

不同类型spout和状态总结

下面是不同的spout/状态组合是否支持“恰好一次”处理语义:

storm笔记:Trident状态

不透明事务状态有最强的容错性,但是因为存储txid和两个结果带来更大的开销。事务型状态只需要存储一个状态结果,但是只对事务型Spout有效。非事务型状态要求存储的数据更少,但是不能实现“恰好一次”的处理语义。

所以在选择容错与存储空间中,需要根据具体的需要选择合适的组合。

状态API

根据前面来看,“恰好一次”语义的原理有些复杂,但是作为用户,并不需要了解这些txid对比、多值存储,因为Trident已经在State中封装了所有容错处理逻辑,只需要想下面着用携带码就行:

TridentTopology topology = new TridentTopology();
TridentState wordCounts =
        topology.newStream("spout1", spout)
                .each(new Fields("sentence"), new Split(), new Fields("word"))
                .groupBy(new Fields("word"))
                .persistentAggregate(MemcachedState.opaque(serverLocations), new Count(), new Fields("count"))
                .parallelismHint(6);

所有的不透明事务状态逻辑已经封装在MemcachedState.opaque中,另外,状态更新会自动调整为批次操作,这样可以减少与数据库之间反复交互带来的资源浪费。

基本的State接口只有两个方法:

public interface State {
    void beginCommit(Long txid); // 对于像DRPC流发生的partitionPersist这样的事情,可以是null
    void commit(Long txid);
}

前面已经说过,状态更新开始和结束时都会获取txid。Trident并不关心状态如何操作,使用哪种方式更新,使用哪种方式读取。

假如有一个包含用户地址信息的定制数据库,需要使用Trident与数据库交互,State扩展类中包含对于用户信息的getter和setter方法:

public class LocationDB implements State {
    public void beginCommit(Long txid) {
    }

    public void commit(Long txid) {
    }

    public void setLocation(long userId, String location) {
        // 向数据库设置地址信息
    }

    public String getLocation(long userId) {
        // 从数据库中获取地址信息
    }
}

然后就需要一个StateFactory来创建Trident所需的State对象,LocationDB所需的StateFactory大体结构如下:

public class LocationDBFactory implements StateFactory {
    public State makeState(Map conf, int partitionIndex, int numPartitions) {
        return new LocationDB();
    }
}

Trident提供了用于查询状态源的QueryFunction接口,以及更新状态源的StateUpdater接口。比如,查询LocationDB中用户信息的QueryLocation

TridentTopology topology = new TridentTopology();
TridentState locations = topology.newStaticState(new LocationDBFactory());
topology.newStream("myspout", spout)
        .stateQuery(locations, new Fields("userid"), new QueryLocation(), new Fields("location"));

QueryLocation的代码如下:

public class QueryLocation extends BaseQueryFunction<LocationDB, String> {
    public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) {
        List<String> ret = new ArrayList();
        for (TridentTuple input : inputs) {
            ret.add(state.getLocation(input.getLong(0)));
        }
        return ret;
    }

    public void execute(TridentTuple tuple, String location, TridentCollector collector) {
        collector.emit(new Values(location));
    }
}

QueryFunction操作分为两步:首先,Trident会将收集到的数据放在一个批次中,发送给batchRetrieve方法。在这个例子中,batchRetrieve方法收到的是一些用户id。batchRetrieve会返回一组与输入元组长度相同的结果。输入元组与输出结果中各个元素是彼此对应的。

从这点来看,上面的LocationDB类并没有发挥Trident批处理优势,所以需要尽心改造:

public class LocationDB implements State {
    public void beginCommit(Long txid) {
    }

    public void commit(Long txid) {
    }

    public void setLocationsBulk(List<Long> userIds, List<String> locations) {
        // set locations in bulk
    }

    public List<String> bulkGetLocations(List<Long> userIds) {
        // get locations in bulk
    }
}

对应的QueryLocation类如下:

public class QueryLocation extends BaseQueryFunction<LocationDB, String> {
    public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) {
        List<Long> userIds = new ArrayList<Long>();
        for (TridentTuple input : inputs) {
            userIds.add(input.getLong(0));
        }
        return state.bulkGetLocations(userIds);
    }

    public void execute(TridentTuple tuple, String location, TridentCollector collector) {
        collector.emit(new Values(location));
    }
}

这段代码大幅减少了数据库操作。

对于更新状态,可以使用StateUpdater接口。比如下面的更新操作:

public class LocationUpdater extends BaseStateUpdater<LocationDB> {
    public void updateState(LocationDB state, List<TridentTuple> tuples, TridentCollector collector) {
        List<Long> ids = new ArrayList<Long>();
        List<String> locations = new ArrayList<String>();
        for (TridentTuple t : tuples) {
            ids.add(t.getLong(0));
            locations.add(t.getString(1));
        }
        state.setLocationsBulk(ids, locations);
    }
}

对应的更新操作拓扑中就可以是这样:

TridentTopology topology = new TridentTopology();
TridentState locations =
        topology.newStream("locations", locationsSpout)
                .partitionPersist(new LocationDBFactory(), new Fields("userid", "location"), new LocationUpdater());

partitionPersist方法会更新状态,StateUpdater接口接收一批元组和状态信息,然后更新状态。上面的LocationUpdater类中仅仅是从元组中抓取用户id和地址信息,然后对状态执行批量处理。然后,partitionPersist会返回一个表示更新状态后的TridentState对象。随后就可以在拓扑的其他地方使用stateQuery方法查询状态。

StateUpdaterupdateState方法中有一个TridentCollector参数,这个对象是可以将发送进来的元组发送到一个新的数据流中。在这个例子中没有用到。如果需要进行比如向数据库更新计数值的后续操作,可以通过TridentState#newValuesStream方法获取新的数据流数据。

persistentAggregate

Trident使用一个名为persistentAggregate的方法更新状态。前面已经出现过,这里再写一遍:

TridentTopology topology = new TridentTopology();
TridentState wordCounts =
        topology.newStream("spout1", spout)
                .each(new Fields("sentence"), new Split(), new Fields("word"))
                .groupBy(new Fields("word"))
                .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

partitionPersist是一个接收Trident聚合器作为参数并对状态数据进行更新的方法,persistentAggregate就是构建于partitionPersist上层的一个编程抽象。在这个例子中,通过groupBy返回一个分组数据,Trident需要一个实现MapState接口的对象。分组字段是状态的key,聚合结果是状态的value。MapState接口如下:

public interface MapState<T> extends State {
    List<T> multiGet(List<List<Object>> keys);
    List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters);
    void multiPut(List<List<Object>> keys, List<T> vals);
}

如果你需要在未分组的数据流上执行聚合操作时,Trident需要一个实现Snapshottable接口的对象:

public interface Snapshottable<T> extends State {
    T get();
    T update(ValueUpdater updater);
    void set(T o);
}

MemoryMapStateMemcachedState 都实现了这些接口.

实现MapState接口

实现MapState接口非常简单,Trident几乎把所有事都做完了。OpaqueMapTransactionalMapNonTransactionalMap都分别实现了各自的容错语义。只需要为这些类提供一个用于对不同key/value进行批量获取、批量修改的IBackingMap实现就行。IBackingMap接口如下:

public interface IBackingMap<T> {
    List<T> multiGet(List<List<Object>> keys);
    void multiPut(List<List<Object>> keys, List<T> vals);
}

OpaqueMap会使用OpaqueValue作为vals参数调用multiPut方法;TransactionalMap会使用TransactionalValue作为参数;NonTransactionalMaps会直接把拓扑对象传入。

Trident还提供了CachedMap类来实现key/value的自动LRU缓存操作。

最后,Trident还提供了SnapshottableMap类,该类通过将全局聚合的结果存入一个固定key中的方法将MapState对象转化为Snapshottable对象。

可以参考MemcachedState的实现来了解如何将这些工具结合在一起来提供一个高性能的MapState实现。MemcachedState支持不透明事务、事务和非事务语义。


个人主页: http://www.howardliu.cn

个人博文: storm笔记:Trident状态

CSDN主页: http://blog.csdn.net/liuxinghao

CSDN博文: storm笔记:Trident状态