欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

ZooKeeper实现生产-消费者队列

程序员文章站 2022-10-04 11:23:40
使用ZooKeeper实现一个生产-消费者队列,可用于多节点的分布式数据结构。生产者创建一个数据对象,并放到队列中;消费者从队列中取出一个数据对象并进行处理。 ......

【欢迎关注公众号:程序猿讲故事 (codestory),及时接收最新文章】

生产-消费者队列,用于多节点的分布式数据结构,生产和消费数据。生产者创建一个数据对象,并放到队列中;消费者从队列中取出一个数据对象并进行处理。在zookeeper中,队列可以使用一个容器节点下创建多个子节点来实现;创建子节点时,createmode使用 persistent_sequential,zookeeper会自动在节点名称后面添加唯一序列号。ephemeral_sequential也有同样的特点,区别在于会话结束后是否会自动删除。

敲小黑板:*_sequential是zookeeper的一个很重要的特性,分布式锁、选举制度都依靠这个特性实现的。

1      对前续代码的重构

之前的文章,我们已经用实现了watcher和barrier,创建zookeeper连接的代码已经复制了一遍。后续还需要类似的工作,因此先对原有代码做一下重构,让代码味道干净一点。

 ZooKeeper实现生产-消费者队列

以下是 process(watchedevent)的代码

final public void process(watchedevent event) {

  if (event.eventtype.none.equals(event.gettype())) {

    // 连接状态发生变化

    if (event.keeperstate.syncconnected.equals(event.getstate())) {

      // 连接建立成功

      connectedsemaphore.countdown();

    }

  } else if (event.eventtype.nodecreated.equals(event.gettype())) {

    processnodecreated(event);

  } else if (event.eventtype.nodedeleted.equals(event.gettype())) {

    processnodedeleted(event);

  } else if (event.eventtype.nodedatachanged.equals(event.gettype())) {

    processnodedatachanged(event);

  } else if (event.eventtype.nodechildrenchanged.equals(event.gettype())) {

    processnodechildrenchanged(event);

  }

}

 

以zookeeperbarrier为例,看看重构之后的构造函数和监听event的代码

zookeeperbarrier(string address, string tableserial, int tablecapacity, string customername)

    throws ioexception {

  super(address);

  this.tableserial = createrootnode(tableserial);

  this.tablecapacity = tablecapacity;

  this.customername = customername;

}

protected void processnodechildrenchanged(watchedevent event) {

  log.info("{} 接收到了通知 : {}", customername, event.gettype());

  // 子节点有变化

  synchronized (mutex) {

    mutex.notify();

  }

}

2      队列的生产者

生产者的关键代码

string elementname = queuename + "/element";

arraylist<acl> ids = zoodefs.ids.open_acl_unsafe;

createmode createmode = createmode.persistent_sequential;

getzookeeper().create(elementname, value, ids, createmode);

注意,重点是persistent_sequential,persistent是表示永久存储直到有命令删除,sequential表示自动在后面添加自增的唯一序列号。这样,尽管elementname都一样,但实际生成的znode名字在 “element”后面会添加格式为%010d的10个数字,如0000000001。如一个完整的znode名可能为/queue/element0000000021。

3      队列的消费者

消费者尝试从子节点列表获取znode名最小的一个子节点,如果队列为空则等待nodechildrenchanged事件。关键代码

/** 队列的同步信号 */

private static integer queuemutex = integer.valueof(1);

 

@override

protected void processnodechildrenchanged(watchedevent event) {

  synchronized (queuemutex) {

    queuemutex.notify();

  }

}

 

/**

 * 从队列中删除第一个对象

 *

 * @return

 * @throws keeperexception

 * @throws interruptedexception

 */

int consume() throws keeperexception, interruptedexception {

  while (true) {

    synchronized (queuemutex) {

      list<string> list = getzookeeper().getchildren(queuename, true);

      if (list.size() == 0) {

        queuemutex.wait();

      } else {

        // 获取第一个子节点的名称

        string firstnodename = getfirstelementname(list);

        // 删除节点,并返回节点的值

        return deletenodeandreturnvalue(firstnodename);

      }

    }

  }

}

4      测试日志

把测试结果放源码前面,免得大家被无聊的代码晃晕。

测试代码创建了两个线程,一个线程是生产者,按随机间隔往队列中添加对象;一个线程是消费者,随机间隔尝试从队列中取出第一个,如果当时队列为空,会等到直到新的数据。

两个进程都加上随机间隔,是为了模拟生产可能比消费更快的情况。以下是测试日志,为了更突出,生产和消费的日志我增加了不同的文字样式。

49:47.866 [info] zookeeperqueuetest.testqueue(29) 开始zookeeper队列测试,本次将测试 10 个数据

49:48.076 [debug] zookeeperqueue.log(201)

+ profiler [tech.codestory.zookeeper.queue.zookeeperqueue 连接到zookeeper]

|-- elapsed time                   [开始链接]   119.863 milliseconds.

|-- elapsed time           [等待连接成功的event]    40.039 milliseconds.

|-- total        [tech.codestory.zookeeper.queue.zookeeperqueue 连接到zookeeper]   159.911 milliseconds.

 

49:48.082 [debug] zookeeperqueue.log(201)

+ profiler [tech.codestory.zookeeper.queue.zookeeperqueue 连接到zookeeper]

|-- elapsed time                   [开始链接]   103.795 milliseconds.

|-- elapsed time           [等待连接成功的event]    65.899 milliseconds.

|-- total        [tech.codestory.zookeeper.queue.zookeeperqueue 连接到zookeeper]   170.263 milliseconds.

 

49:48.102 [info] zookeeperqueuetest.run(51) 生产对象 : 1 , 然后等待 1700 毫秒

49:48.134 [info] zookeeperqueuetest.run(80) 消费对象: 1 , 然后等待 4000 毫秒

49:49.814 [info] zookeeperqueuetest.run(51) 生产对象 : 2 , 然后等待 900 毫秒

49:50.717 [info] zookeeperqueuetest.run(51) 生产对象 : 3 , 然后等待 1300 毫秒

49:52.020 [info] zookeeperqueuetest.run(51) 生产对象 : 4 , 然后等待 3700 毫秒

49:52.139 [info] zookeeperqueuetest.run(80) 消费对象: 2 , 然后等待 2800 毫秒

49:54.947 [info] zookeeperqueuetest.run(80) 消费对象: 3 , 然后等待 4500 毫秒

49:55.724 [info] zookeeperqueuetest.run(51) 生产对象 : 5 , 然后等待 3500 毫秒

49:59.228 [info] zookeeperqueuetest.run(51) 生产对象 : 6 , 然后等待 4200 毫秒

49:59.454 [info] zookeeperqueuetest.run(80) 消费对象: 4 , 然后等待 2400 毫秒

50:01.870 [info] zookeeperqueuetest.run(80) 消费对象: 5 , 然后等待 4900 毫秒

50:03.435 [info] zookeeperqueuetest.run(51) 生产对象 : 7 , 然后等待 4500 毫秒

50:06.776 [info] zookeeperqueuetest.run(80) 消费对象: 6 , 然后等待 3600 毫秒

50:07.938 [info] zookeeperqueuetest.run(51) 生产对象 : 8 , 然后等待 1900 毫秒

50:09.846 [info] zookeeperqueuetest.run(51) 生产对象 : 9 , 然后等待 3200 毫秒

50:10.388 [info] zookeeperqueuetest.run(80) 消费对象: 7 , 然后等待 2900 毫秒

50:13.051 [info] zookeeperqueuetest.run(51) 生产对象 : 10 , 然后等待 4900 毫秒

50:13.294 [info] zookeeperqueuetest.run(80) 消费对象: 8 , 然后等待 300 毫秒

50:13.600 [info] zookeeperqueuetest.run(80) 消费对象: 9 , 然后等待 4800 毫秒

50:18.407 [info] zookeeperqueuetest.run(80) 消费对象: 10 , 然后等待 2400 毫秒

5      完整源码

5.1   zookeeperbase.java

package tech.codestory.zookeeper;

 

import java.io.ioexception;

import java.util.concurrent.countdownlatch;

import org.apache.zookeeper.*;

import org.apache.zookeeper.data.stat;

import org.slf4j.logger;

import org.slf4j.loggerfactory;

import org.slf4j.profiler.profiler;

 

/**

 * 为 zookeeper测试代码创建一个基类,封装建立连接的过程

 *

 * @author code story

 * @date 2019/8/16

 */

public class zookeeperbase implements watcher {

  /** 日志,不使用 @slf4j ,是要使用子类的log */

  logger log = null;

 

  /** 等待连接建立成功的信号 */

  private countdownlatch connectedsemaphore = new countdownlatch(1);

  /** zookeeper 客户端 */

  private zookeeper zookeeper = null;

  /** 避免重复根节点 */

  static integer rootnodeinitial = integer.valueof(1);

 

  /** 构造函数 */

  public zookeeperbase(string address) throws ioexception {

    log = loggerfactory.getlogger(getclass());

 

    profiler profiler = new profiler(this.getclass().getname() + " 连接到zookeeper");

    profiler.start("开始链接");

    zookeeper = new zookeeper(address, 3000, this);

    try {

      profiler.start("等待连接成功的event");

      connectedsemaphore.await();

    } catch (interruptedexception e) {

      log.error("interruptedexception", e);

    }

    profiler.stop();

    profiler.setlogger(log);

    profiler.log();

  }

 

  /**

   * 创建测试需要的根节点

   *

   * @param rootnodename

   * @return

   */

  public string createrootnode(string rootnodename) {

    synchronized (rootnodeinitial) {

      // 创建 tableserial 的znode

      try {

        stat existsstat = getzookeeper().exists(rootnodename, false);

        if (existsstat == null) {

          rootnodename = getzookeeper().create(rootnodename, new byte[0],

              zoodefs.ids.open_acl_unsafe, createmode.persistent);

        }

      } catch (keeperexception e) {

        log.error("keeperexception", e);

      } catch (interruptedexception e) {

        log.error("interruptedexception", e);

      }

    }

    return rootnodename;

  }

 

  /** 读取zookeeper对象,供子类调用 */

  protected zookeeper getzookeeper() {

    return zookeeper;

  }

 

  @override

  final public void process(watchedevent event) {

    if (event.eventtype.none.equals(event.gettype())) {

      // 连接状态发生变化

      if (event.keeperstate.syncconnected.equals(event.getstate())) {

        // 连接建立成功

        connectedsemaphore.countdown();

      }

    } else if (event.eventtype.nodecreated.equals(event.gettype())) {

      processnodecreated(event);

    } else if (event.eventtype.nodedeleted.equals(event.gettype())) {

      processnodedeleted(event);

    } else if (event.eventtype.nodedatachanged.equals(event.gettype())) {

      processnodedatachanged(event);

    } else if (event.eventtype.nodechildrenchanged.equals(event.gettype())) {

      processnodechildrenchanged(event);

    }

  }

 

  /**

   * 处理事件: nodecreated

   *

   * @param event

   */

  protected void processnodecreated(watchedevent event) {}

 

  /**

   * 处理事件: nodedeleted

   *

   * @param event

   */

  protected void processnodedeleted(watchedevent event) {}

 

  /**

   * 处理事件: nodedatachanged

   *

   * @param event

   */

  protected void processnodedatachanged(watchedevent event) {}

 

  /**

   * 处理事件: nodechildrenchanged

   *

   * @param event

   */

  protected void processnodechildrenchanged(watchedevent event) {}

}

5.2   zookeeperqueue.java

package tech.codestory.zookeeper.queue;

 

import java.io.ioexception;

import java.nio.bytebuffer;

import java.util.arraylist;

import java.util.list;

import org.apache.zookeeper.createmode;

import org.apache.zookeeper.keeperexception;

import org.apache.zookeeper.watchedevent;

import org.apache.zookeeper.zoodefs;

import org.apache.zookeeper.data.acl;

import org.apache.zookeeper.data.stat;

import lombok.extern.slf4j.slf4j;

import tech.codestory.zookeeper.zookeeperbase;

 

/**

 * zookeeper实现queue

 *

 * @author code story

 * @date 2019/8/16

 */

@slf4j

public class zookeeperqueue extends zookeeperbase {

  /** 队列名称 */

  private string queuename;

 

  /** 队列的同步信号 */

  private static integer queuemutex = integer.valueof(1);

 

  /**

   * 构造函数

   *

   * @param address

   * @param queuename

   * @throws ioexception

   */

  public zookeeperqueue(string address, string queuename) throws ioexception {

    super(address);

 

    this.queuename = createrootnode(queuename);

  }

 

  @override

  protected void processnodechildrenchanged(watchedevent event) {

    synchronized (queuemutex) {

      queuemutex.notify();

    }

  }

 

  /**

   * 将对象添加到队列中

   *

   * @param i

   * @return

   */

  boolean produce(int i) throws keeperexception, interruptedexception {

    bytebuffer b = bytebuffer.allocate(4);

    byte[] value;

 

    // add child with value i

    b.putint(i);

    value = b.array();

    string elementname = queuename + "/element";

    arraylist<acl> ids = zoodefs.ids.open_acl_unsafe;

    createmode createmode = createmode.persistent_sequential;

    getzookeeper().create(elementname, value, ids, createmode);

 

    return true;

  }

 

  /**

   * 从队列中删除第一个对象

   *

   * @return

   * @throws keeperexception

   * @throws interruptedexception

   */

  int consume() throws keeperexception, interruptedexception {

    while (true) {

      synchronized (queuemutex) {

        list<string> list = getzookeeper().getchildren(queuename, true);

        if (list.size() == 0) {

          queuemutex.wait();

        } else {

          // 获取第一个子节点的名称

          string firstnodename = getfirstelementname(list);

          // 删除节点,并返回节点的值

          return deletenodeandreturnvalue(firstnodename);

        }

      }

    }

  }

 

  /**

   * 获取第一个子节点的名称

   *

   * @param list

   * @return

   */

  private string getfirstelementname(list<string> list) {

    integer min = integer.max_value;

    string minnode = null;

    for (string s : list) {

      integer tempvalue = integer.valueof(s.substring(7));

      if (tempvalue < min) {

        min = tempvalue;

        minnode = s;

      }

    }

    return minnode;

  }

 

  /**

   * 删除节点,并返回节点的值

   *

   * @param minnode

   * @return

   * @throws keeperexception

   * @throws interruptedexception

   */

  private int deletenodeandreturnvalue(string minnode)

      throws keeperexception, interruptedexception {

    string fullnodename = queuename + "/" + minnode;

    stat stat = new stat();

    byte[] b = getzookeeper().getdata(fullnodename, false, stat);

    getzookeeper().delete(fullnodename, stat.getversion());

    bytebuffer buffer = bytebuffer.wrap(b);

    return buffer.getint();

  }

}

5.3   zookeeperqueuetest.java

package tech.codestory.zookeeper.queue;

 

import java.io.ioexception;

import java.security.securerandom;

import java.util.random;

import java.util.concurrent.countdownlatch;

import org.apache.zookeeper.keeperexception;

import org.testng.annotations.test;

import lombok.extern.slf4j.slf4j;

 

/**

 * zookeeperqueue测试

 *

 * @author code story

 * @date 2019/8/16

 */

@slf4j

public class zookeeperqueuetest {

  final string address = "192.168.5.128:2181";

  final string queuename = "/queue";

  final random random = new securerandom();

  // 随机生成10-20之间的个数

  final int count = 10 + random.nextint(10);

  /** 等待生产者和消费者线程都结束 */

  private countdownlatch connectedsemaphore = new countdownlatch(2);

 

  @test

  public void testqueue() {

    log.info("开始zookeeper队列测试,本次将测试 {} 个数据", count);

    new queueproducer().start();

    new queueconsumer().start();

    try {

      connectedsemaphore.await();

    } catch (interruptedexception e) {

      log.error("interruptedexception", e);

    }

  }

 

  /**

   * 队列的生产者

   */

  class queueproducer extends thread {

    @override

    public void run() {

      try {

        zookeeperqueue queue = new zookeeperqueue(address, queuename);

        for (int i = 0; i < count; i++) {

          int elementvalue = i + 1;

 

          long waittime = random.nextint(50) * 100;

          log.info("生产对象 : {} , 然后等待 {} 毫秒", elementvalue, waittime);

          queue.produce(elementvalue);

          thread.sleep(waittime);

        }

      } catch (ioexception e) {

        log.error("ioexception", e);

      } catch (interruptedexception e) {

        log.error("interruptedexception", e);

      } catch (keeperexception e) {

        log.error("keeperexception", e);

      }

      connectedsemaphore.countdown();

    }

  }

 

  /**

   * 队列的消费者

   */

  class queueconsumer extends thread {

    @override

    public void run() {

      try {

        zookeeperqueue queue = new zookeeperqueue(address, queuename);

 

        for (int i = 0; i < count; i++) {

          try {

            int elementvalue = queue.consume();

 

            long waittime = random.nextint(50) * 100;

            log.info("消费对象: {} , 然后等待 {} 毫秒", elementvalue, waittime);

            thread.sleep(waittime);

          } catch (keeperexception e) {

            i--;

            log.error("keeperexception", e);

          } catch (interruptedexception e) {

            log.error("interruptedexception", e);

          }

        }

        connectedsemaphore.countdown();

      } catch (ioexception e) {

        log.error("ioexception", e);

      }

    }

  }

}