欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

storm消息可靠机制(ack)的原理和使用

程序员文章站 2022-07-14 17:56:45
...

关于storm的基础,参照我这篇文章:流式计算storm
关于并发和并行,参照我这篇文章:并发和并行
关于storm的并行度解释,参照我这篇文章:storm的并行度解释
关于storm的流分组策略,参照我这篇文章:storm的流分组策略
关于storm的消息可靠机制,参照我这篇文章:storm的消息可靠机制

storm的消息可靠机制可以确保spout发出的每条tuple消息都会被完整的处理;
主要是由spout和bolt共同完成的.
本文主要讨论storm的消息可靠机制的原理和使用

storm的可靠机制,是storm的一大亮点,那么他是如何实现的呢?
先看效果:
    1.spout每发一条消息,就新建一个唯一的msgId(比如UUID),
    然后将这条消息和这个唯一id存在map中;
    2.每个bolt在处理tuple后,emit的时候带上tulpe,
    成功,就调用ack方法,代表成功,
    失败就调用fail方法,代表失败;
    这样编写代码后,你会发现,失败的消息spout会重新发送,效果就出来了
实现原理:
    原理很简单,使用了异或的知识点.
    我们知道,任意两个相同的数字,异或的结果都是0.例如:1^1=0
    现在请跟着我的思路想:
    1.首先想象有个服务,叫ack,他的主要作用就是判断每条tuple信息是否都成功处理
    2.每个spout发送和接收成功,都要给ack发送一个数字,
    最后由ack计算,判断整条链路是否成功处理
    3.spout作为发送方,假设他要给3个bolt发送消息,分别是bolt1,bolt2,bolt3;
    4.假设这3个bolt最后都发给bolt4;
    5.假设本次要处理的消息叫做root_id;
    6.开始发送了;
    7.spout给bolt1发送消息<root_id,1>
    8.spout给bolt2发送消息<root_id,2>
    9.spout给bolt3发送消息<root_id,3>
    10.发送完spout再给ack发送1^2^3
    11.bolt1收到<root_id,1>,处理成功再给bolt4发送<root_id,4>;
    12.发送完bolt1再给ack发送1^4,处理不成功就不发送了;
    13.bolt2收到<root_id,2>,处理成功再给bolt4发送<root_id,5>;
    14.发送完bolt2再给ack发送2^5,处理不成功就不发送了;
    15.bolt3收到<root_id,3>,处理成功再给bolt4发送<root_id,6>;
    16.发送完bolt3再给ack发送3^6,处理不成功就不发送了;
    17.bolt4收到前3个bolt的消息,<root_id,4>,<root_id,5>,<root_id,6>,
    处理成功后分别给ack发送4,5,6,处理不成功就不发送了;
    18.我们站在ack的角度来看,对于root_id这条消息来说,如果所有spout和bolt都成功,
    那么应该会收到:1^2^3,1^4,2^5,3^6,4,5,6;
    19.将所有收到的数字异或操作,即:1^2^3^1^4^2^5^3^6^4^5^6,
    由于相同数字异或结果为0,即上面的式子的结果就是0,
    任意少收到哪个值,最终的结果都不会为0;
    20.如果ack最终计算的结果是0,那么就代表这个消息root_id处理成功了
    21.如果ack最终计算结果不为0,那么就代表这个消息root_id处理失败了

如何使用

举个项目中的例子:
spout中:

    这个类 extends BaseRichSpout
    private OutputCollector collector;
    private ConcurrentHashMap<UUID, Values> pending;

    @Override
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector collector) {
        this.collector = collector;
        this.pending = new ConcurrentHashMap<>();
    }

    @Override
    public void nextTuple() {
        //具体业务...
    Values value = new Values("要传的业务数据");
        UUID msgId = UUID.randomUUID();
        this.pending.put(msgId, value);
        this.collector.emit(value, msgId);
    }

    @Override
    public void ack(Object msgId) {
    //收到成功消息,就删除这条msgId
        this.pending.remove(msgId);
    }

    @Override
    public void fail(Object msgId) {
        //收到失败消息就重新发送一遍
        //一般成熟的做法是会再记录个失败次数,不会一直失败重发的
        this.collector.emit(this.pending.get(msgId), msgId);
    }

bolt中:

    这个类 extends BaseRichBolt
    private OutputCollector collector;

    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
        this.collector=collector;
    }
    @Override
    public void execute(Tuple tuple) {
    try {
        //具体业务...

        //注意,这里发送的时候,一定要带上tuple
        this.collector.emit(tuple,new Values("业务数据"));
            collector.ack(tuple);
    } catch (Exception e) {
            collector.fail(tuple);
            e.printStackTrace();
        }
    }