ZooKeeper实现生产-消费者队列
【欢迎关注公众号:程序猿讲故事 (codestory),及时接收最新文章】
生产-消费者队列,用于多节点的分布式数据结构,生产和消费数据。生产者创建一个数据对象,并放到队列中;消费者从队列中取出一个数据对象并进行处理。在zookeeper中,队列可以使用一个容器节点下创建多个子节点来实现;创建子节点时,createmode使用 persistent_sequential,zookeeper会自动在节点名称后面添加唯一序列号。ephemeral_sequential也有同样的特点,区别在于会话结束后是否会自动删除。
敲小黑板:*_sequential是zookeeper的一个很重要的特性,分布式锁、选举制度都依靠这个特性实现的。
1 对前续代码的重构
之前的文章,我们已经用实现了watcher和barrier,创建zookeeper连接的代码已经复制了一遍。后续还需要类似的工作,因此先对原有代码做一下重构,让代码味道干净一点。
以下是 process(watchedevent)的代码
final public void process(watchedevent event) { if (event.eventtype.none.equals(event.gettype())) { // 连接状态发生变化 if (event.keeperstate.syncconnected.equals(event.getstate())) { // 连接建立成功 connectedsemaphore.countdown(); } } else if (event.eventtype.nodecreated.equals(event.gettype())) { processnodecreated(event); } else if (event.eventtype.nodedeleted.equals(event.gettype())) { processnodedeleted(event); } else if (event.eventtype.nodedatachanged.equals(event.gettype())) { processnodedatachanged(event); } else if (event.eventtype.nodechildrenchanged.equals(event.gettype())) { processnodechildrenchanged(event); } } |
以zookeeperbarrier为例,看看重构之后的构造函数和监听event的代码
zookeeperbarrier(string address, string tableserial, int tablecapacity, string customername) throws ioexception { super(address); this.tableserial = createrootnode(tableserial); this.tablecapacity = tablecapacity; this.customername = customername; } protected void processnodechildrenchanged(watchedevent event) { log.info("{} 接收到了通知 : {}", customername, event.gettype()); // 子节点有变化 synchronized (mutex) { mutex.notify(); } } |
2 队列的生产者
生产者的关键代码
string elementname = queuename + "/element"; arraylist<acl> ids = zoodefs.ids.open_acl_unsafe; createmode createmode = createmode.persistent_sequential; getzookeeper().create(elementname, value, ids, createmode); |
注意,重点是persistent_sequential,persistent是表示永久存储直到有命令删除,sequential表示自动在后面添加自增的唯一序列号。这样,尽管elementname都一样,但实际生成的znode名字在 “element”后面会添加格式为%010d的10个数字,如0000000001。如一个完整的znode名可能为/queue/element0000000021。
3 队列的消费者
消费者尝试从子节点列表获取znode名最小的一个子节点,如果队列为空则等待nodechildrenchanged事件。关键代码
/** 队列的同步信号 */ private static integer queuemutex = integer.valueof(1);
@override protected void processnodechildrenchanged(watchedevent event) { synchronized (queuemutex) { queuemutex.notify(); } }
/** * 从队列中删除第一个对象 * * @return * @throws keeperexception * @throws interruptedexception */ int consume() throws keeperexception, interruptedexception { while (true) { synchronized (queuemutex) { list<string> list = getzookeeper().getchildren(queuename, true); if (list.size() == 0) { queuemutex.wait(); } else { // 获取第一个子节点的名称 string firstnodename = getfirstelementname(list); // 删除节点,并返回节点的值 return deletenodeandreturnvalue(firstnodename); } } } } |
4 测试日志
把测试结果放源码前面,免得大家被无聊的代码晃晕。
测试代码创建了两个线程,一个线程是生产者,按随机间隔往队列中添加对象;一个线程是消费者,随机间隔尝试从队列中取出第一个,如果当时队列为空,会等到直到新的数据。
两个进程都加上随机间隔,是为了模拟生产可能比消费更快的情况。以下是测试日志,为了更突出,生产和消费的日志我增加了不同的文字样式。
49:47.866 [info] zookeeperqueuetest.testqueue(29) 开始zookeeper队列测试,本次将测试 10 个数据 49:48.076 [debug] zookeeperqueue.log(201) + profiler [tech.codestory.zookeeper.queue.zookeeperqueue 连接到zookeeper] |-- elapsed time [开始链接] 119.863 milliseconds. |-- elapsed time [等待连接成功的event] 40.039 milliseconds. |-- total [tech.codestory.zookeeper.queue.zookeeperqueue 连接到zookeeper] 159.911 milliseconds.
49:48.082 [debug] zookeeperqueue.log(201) + profiler [tech.codestory.zookeeper.queue.zookeeperqueue 连接到zookeeper] |-- elapsed time [开始链接] 103.795 milliseconds. |-- elapsed time [等待连接成功的event] 65.899 milliseconds. |-- total [tech.codestory.zookeeper.queue.zookeeperqueue 连接到zookeeper] 170.263 milliseconds.
49:48.102 [info] zookeeperqueuetest.run(51) 生产对象 : 1 , 然后等待 1700 毫秒 49:48.134 [info] zookeeperqueuetest.run(80) 消费对象: 1 , 然后等待 4000 毫秒 49:49.814 [info] zookeeperqueuetest.run(51) 生产对象 : 2 , 然后等待 900 毫秒 49:50.717 [info] zookeeperqueuetest.run(51) 生产对象 : 3 , 然后等待 1300 毫秒 49:52.020 [info] zookeeperqueuetest.run(51) 生产对象 : 4 , 然后等待 3700 毫秒 49:52.139 [info] zookeeperqueuetest.run(80) 消费对象: 2 , 然后等待 2800 毫秒 49:54.947 [info] zookeeperqueuetest.run(80) 消费对象: 3 , 然后等待 4500 毫秒 49:55.724 [info] zookeeperqueuetest.run(51) 生产对象 : 5 , 然后等待 3500 毫秒 49:59.228 [info] zookeeperqueuetest.run(51) 生产对象 : 6 , 然后等待 4200 毫秒 49:59.454 [info] zookeeperqueuetest.run(80) 消费对象: 4 , 然后等待 2400 毫秒 50:01.870 [info] zookeeperqueuetest.run(80) 消费对象: 5 , 然后等待 4900 毫秒 50:03.435 [info] zookeeperqueuetest.run(51) 生产对象 : 7 , 然后等待 4500 毫秒 50:06.776 [info] zookeeperqueuetest.run(80) 消费对象: 6 , 然后等待 3600 毫秒 50:07.938 [info] zookeeperqueuetest.run(51) 生产对象 : 8 , 然后等待 1900 毫秒 50:09.846 [info] zookeeperqueuetest.run(51) 生产对象 : 9 , 然后等待 3200 毫秒 50:10.388 [info] zookeeperqueuetest.run(80) 消费对象: 7 , 然后等待 2900 毫秒 50:13.051 [info] zookeeperqueuetest.run(51) 生产对象 : 10 , 然后等待 4900 毫秒 50:13.294 [info] zookeeperqueuetest.run(80) 消费对象: 8 , 然后等待 300 毫秒 50:13.600 [info] zookeeperqueuetest.run(80) 消费对象: 9 , 然后等待 4800 毫秒 50:18.407 [info] zookeeperqueuetest.run(80) 消费对象: 10 , 然后等待 2400 毫秒 |
5 完整源码
5.1 zookeeperbase.java
package tech.codestory.zookeeper;
import java.io.ioexception; import java.util.concurrent.countdownlatch; import org.apache.zookeeper.*; import org.apache.zookeeper.data.stat; import org.slf4j.logger; import org.slf4j.loggerfactory; import org.slf4j.profiler.profiler;
/** * 为 zookeeper测试代码创建一个基类,封装建立连接的过程 * * @author code story * @date 2019/8/16 */ public class zookeeperbase implements watcher { /** 日志,不使用 @slf4j ,是要使用子类的log */ logger log = null;
/** 等待连接建立成功的信号 */ private countdownlatch connectedsemaphore = new countdownlatch(1); /** zookeeper 客户端 */ private zookeeper zookeeper = null; /** 避免重复根节点 */ static integer rootnodeinitial = integer.valueof(1);
/** 构造函数 */ public zookeeperbase(string address) throws ioexception { log = loggerfactory.getlogger(getclass());
profiler profiler = new profiler(this.getclass().getname() + " 连接到zookeeper"); profiler.start("开始链接"); zookeeper = new zookeeper(address, 3000, this); try { profiler.start("等待连接成功的event"); connectedsemaphore.await(); } catch (interruptedexception e) { log.error("interruptedexception", e); } profiler.stop(); profiler.setlogger(log); profiler.log(); }
/** * 创建测试需要的根节点 * * @param rootnodename * @return */ public string createrootnode(string rootnodename) { synchronized (rootnodeinitial) { // 创建 tableserial 的znode try { stat existsstat = getzookeeper().exists(rootnodename, false); if (existsstat == null) { rootnodename = getzookeeper().create(rootnodename, new byte[0], zoodefs.ids.open_acl_unsafe, createmode.persistent); } } catch (keeperexception e) { log.error("keeperexception", e); } catch (interruptedexception e) { log.error("interruptedexception", e); } } return rootnodename; }
/** 读取zookeeper对象,供子类调用 */ protected zookeeper getzookeeper() { return zookeeper; }
@override final public void process(watchedevent event) { if (event.eventtype.none.equals(event.gettype())) { // 连接状态发生变化 if (event.keeperstate.syncconnected.equals(event.getstate())) { // 连接建立成功 connectedsemaphore.countdown(); } } else if (event.eventtype.nodecreated.equals(event.gettype())) { processnodecreated(event); } else if (event.eventtype.nodedeleted.equals(event.gettype())) { processnodedeleted(event); } else if (event.eventtype.nodedatachanged.equals(event.gettype())) { processnodedatachanged(event); } else if (event.eventtype.nodechildrenchanged.equals(event.gettype())) { processnodechildrenchanged(event); } }
/** * 处理事件: nodecreated * * @param event */ protected void processnodecreated(watchedevent event) {}
/** * 处理事件: nodedeleted * * @param event */ protected void processnodedeleted(watchedevent event) {}
/** * 处理事件: nodedatachanged * * @param event */ protected void processnodedatachanged(watchedevent event) {}
/** * 处理事件: nodechildrenchanged * * @param event */ protected void processnodechildrenchanged(watchedevent event) {} } |
5.2 zookeeperqueue.java
package tech.codestory.zookeeper.queue;
import java.io.ioexception; import java.nio.bytebuffer; import java.util.arraylist; import java.util.list; import org.apache.zookeeper.createmode; import org.apache.zookeeper.keeperexception; import org.apache.zookeeper.watchedevent; import org.apache.zookeeper.zoodefs; import org.apache.zookeeper.data.acl; import org.apache.zookeeper.data.stat; import lombok.extern.slf4j.slf4j; import tech.codestory.zookeeper.zookeeperbase;
/** * zookeeper实现queue * * @author code story * @date 2019/8/16 */ @slf4j public class zookeeperqueue extends zookeeperbase { /** 队列名称 */ private string queuename;
/** 队列的同步信号 */ private static integer queuemutex = integer.valueof(1);
/** * 构造函数 * * @param address * @param queuename * @throws ioexception */ public zookeeperqueue(string address, string queuename) throws ioexception { super(address);
this.queuename = createrootnode(queuename); }
@override protected void processnodechildrenchanged(watchedevent event) { synchronized (queuemutex) { queuemutex.notify(); } }
/** * 将对象添加到队列中 * * @param i * @return */ boolean produce(int i) throws keeperexception, interruptedexception { bytebuffer b = bytebuffer.allocate(4); byte[] value;
// add child with value i b.putint(i); value = b.array(); string elementname = queuename + "/element"; arraylist<acl> ids = zoodefs.ids.open_acl_unsafe; createmode createmode = createmode.persistent_sequential; getzookeeper().create(elementname, value, ids, createmode);
return true; }
/** * 从队列中删除第一个对象 * * @return * @throws keeperexception * @throws interruptedexception */ int consume() throws keeperexception, interruptedexception { while (true) { synchronized (queuemutex) { list<string> list = getzookeeper().getchildren(queuename, true); if (list.size() == 0) { queuemutex.wait(); } else { // 获取第一个子节点的名称 string firstnodename = getfirstelementname(list); // 删除节点,并返回节点的值 return deletenodeandreturnvalue(firstnodename); } } } }
/** * 获取第一个子节点的名称 * * @param list * @return */ private string getfirstelementname(list<string> list) { integer min = integer.max_value; string minnode = null; for (string s : list) { integer tempvalue = integer.valueof(s.substring(7)); if (tempvalue < min) { min = tempvalue; minnode = s; } } return minnode; }
/** * 删除节点,并返回节点的值 * * @param minnode * @return * @throws keeperexception * @throws interruptedexception */ private int deletenodeandreturnvalue(string minnode) throws keeperexception, interruptedexception { string fullnodename = queuename + "/" + minnode; stat stat = new stat(); byte[] b = getzookeeper().getdata(fullnodename, false, stat); getzookeeper().delete(fullnodename, stat.getversion()); bytebuffer buffer = bytebuffer.wrap(b); return buffer.getint(); } } |
5.3 zookeeperqueuetest.java
package tech.codestory.zookeeper.queue;
import java.io.ioexception; import java.security.securerandom; import java.util.random; import java.util.concurrent.countdownlatch; import org.apache.zookeeper.keeperexception; import org.testng.annotations.test; import lombok.extern.slf4j.slf4j;
/** * zookeeperqueue测试 * * @author code story * @date 2019/8/16 */ @slf4j public class zookeeperqueuetest { final string address = "192.168.5.128:2181"; final string queuename = "/queue"; final random random = new securerandom(); // 随机生成10-20之间的个数 final int count = 10 + random.nextint(10); /** 等待生产者和消费者线程都结束 */ private countdownlatch connectedsemaphore = new countdownlatch(2);
@test public void testqueue() { log.info("开始zookeeper队列测试,本次将测试 {} 个数据", count); new queueproducer().start(); new queueconsumer().start(); try { connectedsemaphore.await(); } catch (interruptedexception e) { log.error("interruptedexception", e); } }
/** * 队列的生产者 */ class queueproducer extends thread { @override public void run() { try { zookeeperqueue queue = new zookeeperqueue(address, queuename); for (int i = 0; i < count; i++) { int elementvalue = i + 1;
long waittime = random.nextint(50) * 100; log.info("生产对象 : {} , 然后等待 {} 毫秒", elementvalue, waittime); queue.produce(elementvalue); thread.sleep(waittime); } } catch (ioexception e) { log.error("ioexception", e); } catch (interruptedexception e) { log.error("interruptedexception", e); } catch (keeperexception e) { log.error("keeperexception", e); } connectedsemaphore.countdown(); } }
/** * 队列的消费者 */ class queueconsumer extends thread { @override public void run() { try { zookeeperqueue queue = new zookeeperqueue(address, queuename);
for (int i = 0; i < count; i++) { try { int elementvalue = queue.consume();
long waittime = random.nextint(50) * 100; log.info("消费对象: {} , 然后等待 {} 毫秒", elementvalue, waittime); thread.sleep(waittime); } catch (keeperexception e) { i--; log.error("keeperexception", e); } catch (interruptedexception e) { log.error("interruptedexception", e); } } connectedsemaphore.countdown(); } catch (ioexception e) { log.error("ioexception", e); } } } } |