欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Storm Spout nextTuple策略

程序员文章站 2022-06-26 18:12:59
...

 

      Storm从0.8.1之后,在Spout调用nextTuple方法时,如果没有emit tuple,那么默认需要休眠1ms,这个具体的策略是可配置的,因此可以根据自己的具体场景,进行设置,以达到合理利用cpu资源。

      

       ISpoutWaitStrategy是Spout没有emit时等待策略的接口,目的是合理利用Cpu,默认提供了2个实现,一个什么也没做,一个是sleep 1毫秒,我们可以自己来实现这个接口。

 

storm策略配置

topology.spout.wait.strategy "backtype.storm.spout.SleepSpoutWaitStrategy"

topology.sleep.spout.wait.strategy.time.ms       1   

 

 

ISpoutWaitStrategy接口

 

/**
 * The strategy a spout needs to use when its waiting. Waiting is
 * triggered in one of two conditions:
 * 
 * 1. nextTuple emits no tuples
 * 2. The spout has hit maxSpoutPending and can't emit any more tuples
 * 
 * The default strategy sleeps for one millisecond.
 */
public interface ISpoutWaitStrategy {
    void prepare(Map conf);
    void emptyEmit(long streak);
}
 

 

SleepSpoutWaitStrategy实现

 

public class SleepSpoutWaitStrategy implements ISpoutWaitStrategy {

    long sleepMillis;
    
    @Override
    public void prepare(Map conf) {
        sleepMillis = ((Number) conf.get(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS)).longValue();
    }

    @Override
    public void emptyEmit(long streak) {
        try {
            Thread.sleep(sleepMillis);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}
 

 

 

 NothingEmptyEmitStrategy实现

public class NothingEmptyEmitStrategy implements ISpoutWaitStrategy {
    @Override
    public void emptyEmit(long streak) {        
    }

    @Override
    public void prepare(Map conf) {
        throw new UnsupportedOperationException("Not supported yet.");
    }
}

 

 扩展阅读 http://www.cnblogs.com/fxjwind/p/3238648.html