Storm Spout nextTuple策略
程序员文章站
2022-06-26 18:12:59
...
Storm从0.8.1之后,在Spout调用nextTuple方法时,如果没有emit tuple,那么默认需要休眠1ms,这个具体的策略是可配置的,因此可以根据自己的具体场景,进行设置,以达到合理利用cpu资源。
ISpoutWaitStrategy是Spout没有emit时等待策略的接口,目的是合理利用Cpu,默认提供了2个实现,一个什么也没做,一个是sleep 1毫秒,我们可以自己来实现这个接口。
storm策略配置
topology.spout.wait.strategy "backtype.storm.spout.SleepSpoutWaitStrategy"
topology.sleep.spout.wait.strategy.time.ms 1
ISpoutWaitStrategy接口
/** * The strategy a spout needs to use when its waiting. Waiting is * triggered in one of two conditions: * * 1. nextTuple emits no tuples * 2. The spout has hit maxSpoutPending and can't emit any more tuples * * The default strategy sleeps for one millisecond. */ public interface ISpoutWaitStrategy { void prepare(Map conf); void emptyEmit(long streak); }
SleepSpoutWaitStrategy实现
public class SleepSpoutWaitStrategy implements ISpoutWaitStrategy { long sleepMillis; @Override public void prepare(Map conf) { sleepMillis = ((Number) conf.get(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS)).longValue(); } @Override public void emptyEmit(long streak) { try { Thread.sleep(sleepMillis); } catch (InterruptedException e) { throw new RuntimeException(e); } } }
NothingEmptyEmitStrategy实现
public class NothingEmptyEmitStrategy implements ISpoutWaitStrategy { @Override public void emptyEmit(long streak) { } @Override public void prepare(Map conf) { throw new UnsupportedOperationException("Not supported yet."); } }
扩展阅读 http://www.cnblogs.com/fxjwind/p/3238648.html