Discuptor入门(二)-实例
前言:最近在项目中看到有人使用的discuptor框架,因为没有接触过所以网上找了些资料.但最终发现开荒者太少,好像没什么人用那.最后感觉还是官方入门文档靠谱点.所以自己翻译了下(翻译器~),希望能帮助到别人.后续如果有什么新理解,我会继续补充的.
discuptor简介:高并发无锁框架
原文地址:https://github.com/lmax-exchange/disruptor/wiki/getting-started
此为译文
愿所有码农能看的英文~
ps:阅读前你需要花几分钟时间去了解一下discuptor的基本概念
为了开始使用disruptor,我们将考虑一个非常简单和人为的例子,一个将生产者传递给消费者的单个长值,消费者只需打印出该值。 首先,我们将定义将携带数据的事件。
public class longevent { private long value; public void set(long value) { this.value = value; } }
为了让disruptor为我们预先分配这些事件,我们需要一个将执行构造的eventfactory
public class longeventfactory implements eventfactory<longevent> { public longevent newinstance() { return new longevent(); } }
一旦我们定义了事件,我们需要创建一个处理这些事件的消费者。 在我们的例子中,我们要做的就是从控制台中打印出值
public class longeventhandler implements eventhandler<longevent> { public void onevent(longevent event, long sequence, boolean endofbatch) { system.out.println("event: " + event); } }
我们将需要这些事件的来源,为了举例,我将假设数据来自某种i / o设备,例如, 网络或文件以bytebuffer的形式
使用disruptor的3.0版本,添加了更丰富的lambda风格的api,以帮助开发人员将此复杂性封装在ring buffer中,因此3.0之后发布消息的首选方法是通过api的event publisher / event translator部分。
例如:
public class longeventproducerwithtranslator { private final ringbuffer<longevent> ringbuffer; public longeventproducerwithtranslator(ringbuffer<longevent> ringbuffer) { this.ringbuffer = ringbuffer; } private static final eventtranslatoronearg<longevent, bytebuffer> translator = new eventtranslatoronearg<longevent, bytebuffer>() { public void translateto(longevent event, long sequence, bytebuffer bb) { event.set(bb.getlong(0)); } }; public void ondata(bytebuffer bb) { ringbuffer.publishevent(translator, bb); } }
这种方法的另一个优点是翻译器代码可以被拉入一个单独的类中,并可以轻松地单独进行单元测试。 disruptor提供了许多不同的接口(eventtranslator,eventtranslatoronearg,eventtranslatortwoarg等),可以实现这些接口以提供翻译。 原因是允许转换器表示为静态类或非捕获lambda(当java 8滚动时)作为转换方法的参数通过ring buffer上的调用传递给转换器。
使用旧版api发布
我们可以使用更“原始”的方法。
public class longeventproducer { private final ringbuffer<longevent> ringbuffer; public longeventproducer(ringbuffer<longevent> ringbuffer) { this.ringbuffer = ringbuffer; } public void ondata(bytebuffer bb) { long sequence = ringbuffer.next(); // grab the next sequence try { longevent event = ringbuffer.get(sequence); // get the entry in the disruptor // for the sequence event.set(bb.getlong(0)); // fill with data } finally { ringbuffer.publish(sequence); } } }
显而易见的是,事件发布变得比使用简单队列更复杂。 这是由于对事件预分配的需求。 它需要(在最低级别)消息发布的两阶段方法,即声明环形缓冲区中的时隙然后发布可用数据。 还必须将发布包装在try / finally块中。 如果我们在ring buffer中声明一个插槽(调用ringbuffer.next()),那么我们必须发布这个序列。 如果不这样做可能会导致干扰者状态的腐败。 具体而言,在多生产者的情况下,这将导致消费者停滞并且在没有重启的情况下无法恢复。 因此,建议使用eventtranslator api。
最后一步是将整个事物连接在一起。 可以手动连接所有组件,但是它可能有点复杂,因此提供dsl以简化构造。 一些更复杂的选项不能通过dsl获得,但它适用于大多数情况
public class longeventmain { public static void main(string[] args) throws exception { // the factory for the event longeventfactory factory = new longeventfactory(); // specify the size of the ring buffer, must be power of 2. int buffersize = 1024; // construct the disruptor disruptor<longevent> disruptor = new disruptor<>(factory, buffersize, daemonthreadfactory.instance); // connect the handler disruptor.handleeventswith(new longeventhandler()); // start the disruptor, starts all threads running disruptor.start(); // get the ring buffer from the disruptor to be used for publishing. ringbuffer<longevent> ringbuffer = disruptor.getringbuffer(); longeventproducer producer = new longeventproducer(ringbuffer); bytebuffer bb = bytebuffer.allocate(8); for (long l = 0; true; l++) { bb.putlong(0, l); producer.ondata(bb); thread.sleep(1000); } } }
using java 8
disruptor api的设计影响之一是java 8将依赖于功能接口的概念来充当java lambdas的类型声明。 disruptor api中的大多数接口定义符合功能接口的要求,因此可以使用lambda而不是自定义类,这可以减少所需的锅炉位置。
public class longeventmain { public static void main(string[] args) throws exception { // specify the size of the ring buffer, must be power of 2. int buffersize = 1024; // construct the disruptor disruptor<longevent> disruptor = new disruptor<>(longevent::new, buffersize, daemonthreadfactory.instance); // connect the handler disruptor.handleeventswith((event, sequence, endofbatch) -> system.out.println("event: " + event)); // start the disruptor, starts all threads running disruptor.start(); // get the ring buffer from the disruptor to be used for publishing. ringbuffer<longevent> ringbuffer = disruptor.getringbuffer(); bytebuffer bb = bytebuffer.allocate(8); for (long l = 0; true; l++) { bb.putlong(0, l); ringbuffer.publishevent((event, sequence, buffer) -> event.set(buffer.getlong(0)), bb); thread.sleep(1000); } } }
注意如何不再需要许多类(例如处理程序,翻译器)。 还要注意用于publishevent()的lambda如何仅引用传入的参数。如果我们将该代码编写为:
bytebuffer bb = bytebuffer.allocate(8); for (long l = 0; true; l++) { bb.putlong(0, l); ringbuffer.publishevent((event, sequence) -> event.set(bb.getlong(0))); thread.sleep(1000); }
这将创建一个捕获lambda,这意味着它需要实例化一个对象来保存bytebuffer bb变量,因为它将lambda传递给publishevent()调用。 这将产生额外的(不必要的)垃圾,因此如果要求低gc压力,则应首选将参数传递给lambda的调用。
给那个方法引用可以用来代替匿名lamdbas,可以用这种方式重写这个例子。
public class longeventmain { public static void handleevent(longevent event, long sequence, boolean endofbatch) { system.out.println(event); } public static void translate(longevent event, long sequence, bytebuffer buffer) { event.set(buffer.getlong(0)); } public static void main(string[] args) throws exception { // specify the size of the ring buffer, must be power of 2. int buffersize = 1024; // construct the disruptor disruptor<longevent> disruptor = new disruptor<>(longevent::new, buffersize, daemonthreadfactory.instance); // connect the handler disruptor.handleeventswith(longeventmain::handleevent); // start the disruptor, starts all threads running disruptor.start(); // get the ring buffer from the disruptor to be used for publishing. ringbuffer<longevent> ringbuffer = disruptor.getringbuffer(); bytebuffer bb = bytebuffer.allocate(8); for (long l = 0; true; l++) { bb.putlong(0, l); ringbuffer.publishevent(longeventmain::translate, bb); thread.sleep(1000); } } }
基本调整选项
使用上述方法将在最广泛的部署方案中起到功能。 但是,如果您能够对disruptor运行的硬件和软件环境做出某些假设,那么您可以利用许多调优选项来提高性能。 调整有两个主要选项,单个与多个生产者和替代等待策略。
单一与多生产者
提高并发系统性能的最佳方法之一是遵循single writer原则,这适用于disruptor。 如果您处于只有一个线程产生事件进入disruptor的情况下,那么您可以利用它来获得额外的性能。
public class longeventmain { public static void main(string[] args) throws exception { //..... // construct the disruptor with a singleproducersequencer disruptor<longevent> disruptor = new disruptor( factory, buffersize, producertype.single, new blockingwaitstrategy(), daemonthreadfactory.instance); //..... } }
为了说明通过这种技术可以实现多少性能优势,我们可以在onetoone性能测试中更改生产者类型。 测试在i7 sandy bridge macbook air上运行。
multiple producer
run 0, disruptor=26,553,372 ops/sec
run 1, disruptor=28,727,377 ops/sec
run 2, disruptor=29,806,259 ops/sec
run 3, disruptor=29,717,682 ops/sec
run 4, disruptor=28,818,443 ops/sec
run 5, disruptor=29,103,608 ops/sec
run 6, disruptor=29,239,766 ops/sec
single producer
run 0, disruptor=89,365,504 ops/sec
run 1, disruptor=77,579,519 ops/sec
run 2, disruptor=78,678,206 ops/sec
run 3, disruptor=80,840,743 ops/sec
run 4, disruptor=81,037,277 ops/sec
run 5, disruptor=81,168,831 ops/sec
run 6, disruptor=81,699,346 ops/sec
替代等待策略
disruptor使用的默认等待策略是blockingwaitstrategy。在内部,blockingwaitstrategy使用典型的锁和条件变量来处理线程唤醒。 blockingwaitstrategy是可用等待策略中最慢的,但对于cpu使用率而言是最保守的,并且将在最广泛的部署选项中提供最一致的行为。但是,再次了解已部署的系统可以提供额外的性能。
sleepingwaitstrategy
与blockingwaitstrategy一样,sleepwaitstrategy通过使用简单的忙等待循环尝试保守cpu使用率,但在循环中间使用对locksupport.parknanos(1)的调用。在典型的linux系统上,这将使线程暂停约60μs。然而,它具有以下好处:生产线程不需要采取任何其他增加适当计数器的动作,并且不需要发信号通知条件变量的成本。但是,在生产者和消费者线程之间移动事件的平均延迟会更高。它在不需要低延迟的情况下效果最好,但是对生产线程的影响很小。常见用例是异步日志记录。
yieldingwaitstrategy
yieldingwaitstrategy是可以在低延迟系统中使用的2种等待策略之一,其中可以选择燃烧cpu周期以改善延迟。 yieldingwaitstrategy将忙于等待序列增加到适当的值。在循环体内,将调用thread.yield(),允许其他排队的线程运行。当需要非常高的性能并且事件处理程序线程的数量小于逻辑核心的总数(例如,逻辑核心数)时,这是推荐的等待策略。你启用了超线程。
busyspinwaitstrategy
busyspinwaitstrategy是性能最高的等待策略,但对部署环境施加了最高限制。仅当事件处理程序线程的数量小于框中的物理核心数时,才应使用此等待策略。例如。应禁用超线程。
从环形缓冲区清除对象
通过disruptor传递数据时,对象的寿命可能超过预期。 为避免这种情况发生,可能需要在处理后清除事件。 如果您有一个事件处理程序清除同一个处理程序中的值就足够了。 如果您有一系列事件处理程序,那么您可能需要在链的末尾放置一个特定的处理程序来处理清除对象。
class objectevent<t> { t val; void clear() { val = null; } } public class clearingeventhandler<t> implements eventhandler<objectevent<t>> { public void onevent(objectevent<t> event, long sequence, boolean endofbatch) { // failing to call clear here will result in the // object associated with the event to live until // it is overwritten once the ring buffer has wrapped // around to the beginning. event.clear(); } } public static void main(string[] args) { disruptor<objectevent<string>> disruptor = new disruptor<>( () -> objectevent<string>(), buffersize, daemonthreadfactory.instance); disruptor .handleeventswith(new processingeventhandler()) .then(new clearingobjecthandler()); }