基于ZooKeeper实现队列源码
实现原理
先进先出队列是最常用的队列,使用zookeeper实现先进先出队列就是在特定的目录下创建persistent_equential节点,创建成功时watcher通知等待的队列,队列删除序列号最小的节点用以消费。此场景下zookeeper的znode用于消息存储,znode存储的数据就是消息队列中的消息内容,sequential序列号就是消息的编号,按序取出即可。由于创建的节点是持久化的,所以不必担心队列消息的丢失问题。
队列(queue)
分布式队列是通用的数据结构,为了在 zookeeper 中实现分布式队列,首先需要指定一个 znode 节点作为队列节点(queue node), 各个分布式客户端通过调用 create() 函数向队列中放入数据,调用create()时节点路径名带"qn-"结尾,并设置顺序(sequence)节点标志。 由于设置了节点的顺序标志,新的路径名具有以下字符串模式:"_path-to-queue-node_/qn-x",x 是唯一自增号。需要从队列中获取数据/移除数据的客户端首先调用 getchildren() 函数,有数据则获取(获取数据后可以删除也可以不删),没有则在队列节点(queue node)上将 watch 设置为 true,等待触发并处理最小序号的节点(即从序号最小的节点中取数据)。
应用场景
zookeeper队列不太适合要求高性能的场合,但可以在数据量不大的情况下考虑使用。比如已在项目中使用zookeeper又需要小规模的队列应用,这时可以使用zookeeper实现的队列;毕竟引进一个消息中间件会增加系统的复杂性和运维的压力。
详细代码
zookeeperclient工具类
package org.massive.common; import org.apache.zookeeper.watchedevent; import org.apache.zookeeper.watcher; import org.apache.zookeeper.zookeeper; import java.io.ioexception; import java.util.concurrent.countdownlatch; import java.util.concurrent.timeunit; /** * created by massive on 2016/12/18. */ public class zookeeperclient { private static string connectionstring = "localhost:2181"; private static int sessiontimeout = 10000; public static zookeeper getinstance() throws ioexception, interruptedexception { //-------------------------------------------------------------- // 为避免连接还未完成就执行zookeeper的get/create/exists操作引起的(keepererrorcode = connectionloss) // 这里等zookeeper的连接完成才返回实例 //-------------------------------------------------------------- final countdownlatch connectedsignal = new countdownlatch(1); zookeeper zk = new zookeeper(connectionstring, sessiontimeout, new watcher() { @override public void process(watchedevent event) { if (event.getstate() == event.keeperstate.syncconnected) { connectedsignal.countdown(); } else if (event.getstate() == event.keeperstate.expired) { } } }); connectedsignal.await(sessiontimeout, timeunit.milliseconds); return zk; } public static int getsessiontimeout() { return sessiontimeout; } public static void setsessiontimeout(int sessiontimeout) { zookeeperclient.sessiontimeout = sessiontimeout; } }
zookeeperqueue
package org.massive.queue; import org.apache.commons.lang3.randomutils; import org.apache.zookeeper.*; import org.apache.zookeeper.data.stat; import org.massive.common.zookeeperclient; import java.io.ioexception; import java.io.unsupportedencodingexception; import java.util.list; import java.util.sortedset; import java.util.treeset; /** * created by allen on 2016/12/22. */ public class zookeeperqueue { private zookeeper zk; private int sessiontimeout; private static byte[] root_queue_data = {0x12,0x34}; private static string queue_root = "/queue"; private string queuename; private string queuepath; private object mutex = new object(); public zookeeperqueue(string queuename) throws ioexception, keeperexception, interruptedexception { this.queuename = queuename; this.queuepath = queue_root + "/" + queuename; this.zk = zookeeperclient.getinstance(); this.sessiontimeout = zk.getsessiontimeout(); //---------------------------------------------------- // 确保队列根目录/queue和当前队列的目录的存在 //---------------------------------------------------- ensureexists(queue_root); ensureexists(queuepath); } public byte[] consume() throws interruptedexception, keeperexception, unsupportedencodingexception { list<string> nodes = null; byte[] returnval = null; stat stat = null; do { synchronized (mutex) { nodes = zk.getchildren(queuepath, new producewatcher()); //---------------------------------------------------- // 如果没有消息节点,等待生产者的通知 //---------------------------------------------------- if (nodes == null || nodes.size() == 0) { mutex.wait(); } else { sortedset<string> sortednode = new treeset<string>(); for (string node : nodes) { sortednode.add(queuepath + "/" + node); } //---------------------------------------------------- // 消费队列里序列号最小的消息 //---------------------------------------------------- string first = sortednode.first(); returnval = zk.getdata(first, false, stat); zk.delete(first, -1); system.out.print(thread.currentthread().getname() + " "); system.out.print("consume a message from queue:" + first); system.out.println(", message data is: " + new string(returnval,"utf-8")); return returnval; } } } while (true); } class producewatcher implements watcher { @override public void process(watchedevent event) { //---------------------------------------------------- // 生产一条消息成功后通知一个等待线程 //---------------------------------------------------- synchronized (mutex) { mutex.notify(); } } } public void produce(byte[] data) throws keeperexception, interruptedexception, unsupportedencodingexception { //---------------------------------------------------- // 确保当前队列目录存在 // example: /queue/queuename //---------------------------------------------------- ensureexists(queuepath); string node = zk.create(queuepath + "/", data, zoodefs.ids.open_acl_unsafe, createmode.persistent_sequential); system.out.print(thread.currentthread().getname() + " "); system.out.print("produce a message to queue:" + node); system.out.println(" , message data is: " + new string(data,"utf-8")); } public void ensureexists(string path) { try { stat stat = zk.exists(path, false); if (stat == null) { zk.create(path, root_queue_data, zoodefs.ids.open_acl_unsafe, createmode.persistent); } } catch (keeperexception e) { e.printstacktrace(); } catch (interruptedexception e) { e.printstacktrace(); } } public static void main(string[] args) throws ioexception, interruptedexception, keeperexception { string queuename = "test"; final zookeeperqueue queue = new zookeeperqueue(queuename); for (int i = 0; i < 10; i++) { new thread(new runnable() { @override public void run() { try { queue.consume(); system.out.println("--------------------------------------------------------"); system.out.println(); } catch (interruptedexception e) { e.printstacktrace(); } catch (keeperexception e) { e.printstacktrace(); } catch (unsupportedencodingexception e) { e.printstacktrace(); } } }).start(); } new thread(new runnable() { @override public void run() { for (int i = 0; i < 10; i++) { try { thread.sleep(randomutils.nextint(100 * i, 200 * i)); queue.produce(("massive" + i).getbytes()); } catch (interruptedexception e) { e.printstacktrace(); } catch (keeperexception e) { e.printstacktrace(); } catch (unsupportedencodingexception e) { e.printstacktrace(); } } } },"produce-thread").start(); } }
测试
运行main方法,本机器的某次输出结果
produce-thread produce a message to queue:/queue/test/0000000000 , message data is: massive0 thread-8 consume a message from queue:/queue/test/0000000000, message data is: massive0 -------------------------------------------------------- produce-thread produce a message to queue:/queue/test/0000000001 , message data is: massive1 thread-6 consume a message from queue:/queue/test/0000000001, message data is: massive1 -------------------------------------------------------- produce-thread produce a message to queue:/queue/test/0000000002 , message data is: massive2 thread-3 consume a message from queue:/queue/test/0000000002, message data is: massive2 -------------------------------------------------------- produce-thread produce a message to queue:/queue/test/0000000003 , message data is: massive3 thread-0 consume a message from queue:/queue/test/0000000003, message data is: massive3 -------------------------------------------------------- produce-thread produce a message to queue:/queue/test/0000000004 , message data is: massive4 thread-5 consume a message from queue:/queue/test/0000000004, message data is: massive4 -------------------------------------------------------- produce-thread produce a message to queue:/queue/test/0000000005 , message data is: massive5 thread-2 consume a message from queue:/queue/test/0000000005, message data is: massive5 -------------------------------------------------------- produce-thread produce a message to queue:/queue/test/0000000006 , message data is: massive6 thread-4 consume a message from queue:/queue/test/0000000006, message data is: massive6 -------------------------------------------------------- produce-thread produce a message to queue:/queue/test/0000000007 , message data is: massive7 thread-9 consume a message from queue:/queue/test/0000000007, message data is: massive7 -------------------------------------------------------- produce-thread produce a message to queue:/queue/test/0000000008 , message data is: massive8 thread-7 consume a message from queue:/queue/test/0000000008, message data is: massive8 -------------------------------------------------------- produce-thread produce a message to queue:/queue/test/0000000009 , message data is: massive9 thread-1 consume a message from queue:/queue/test/0000000009, message data is: massive9
总结
以上就是本文有关于队列和基于zookeeper实现队列源码介绍的全部内容,希望对大家有所帮助。
感谢朋友们对本站的支持!
上一篇: Android ViewPager制作新手导航页(动态加载)
下一篇: Java实现队列