欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

java使用zookeeper实现的分布式锁示例

程序员文章站 2024-02-29 07:58:40
使用zookeeper实现的分布式锁分布式锁,实现了lock接口 复制代码 代码如下:package com.concurrent; import java.io.io...

使用zookeeper实现的分布式锁

分布式锁,实现了lock接口

复制代码 代码如下:

package com.concurrent;

import java.io.ioexception;
import java.util.arraylist;
import java.util.collections;
import java.util.list;
import java.util.concurrent.countdownlatch;
import java.util.concurrent.timeunit;
import java.util.concurrent.locks.condition;
import java.util.concurrent.locks.lock;

import org.apache.zookeeper.createmode;
import org.apache.zookeeper.keeperexception;
import org.apache.zookeeper.watchedevent;
import org.apache.zookeeper.watcher;
import org.apache.zookeeper.zoodefs;
import org.apache.zookeeper.zookeeper;
import org.apache.zookeeper.data.stat;

/**
   distributedlock lock = null;
 try {
  lock = new distributedlock("127.0.0.1:2182","test");
  lock.lock();
  //do something...
 } catch (exception e) {
  e.printstacktrace();
 }
 finally {
  if(lock != null)
   lock.unlock();
 }
 * @author xueliang
 *
 */
public class distributedlock implements lock, watcher{
 private zookeeper zk;
 private string root = "/locks";//根
 private string lockname;//竞争资源的标志
 private string waitnode;//等待前一个锁
 private string myznode;//当前锁
 private countdownlatch latch;//计数器
 private int sessiontimeout = 30000;
 private list<exception> exception = new arraylist<exception>();

 /**
  * 创建分布式锁,使用前请确认config配置的zookeeper服务可用
  * @param config 127.0.0.1:2181
  * @param lockname 竞争资源标志,lockname中不能包含单词lock
  */
 public distributedlock(string config, string lockname){
  this.lockname = lockname;
  // 创建一个与服务器的连接
   try {
   zk = new zookeeper(config, sessiontimeout, this);
   stat stat = zk.exists(root, false);
   if(stat == null){
    // 创建根节点
    zk.create(root, new byte[0], zoodefs.ids.open_acl_unsafe,createmode.persistent);
   }
  } catch (ioexception e) {
   exception.add(e);
  } catch (keeperexception e) {
   exception.add(e);
  } catch (interruptedexception e) {
   exception.add(e);
  }
 }

 /**
  * zookeeper节点的监视器
  */
 public void process(watchedevent event) {
  if(this.latch != null) { 
            this.latch.countdown(); 
        }
 }

 public void lock() {
  if(exception.size() > 0){
   throw new lockexception(exception.get(0));
  }
  try {
   if(this.trylock()){
    system.out.println("thread " + thread.currentthread().getid() + " " +myznode + " get lock true");
    return;
   }
   else{
    waitforlock(waitnode, sessiontimeout);//等待锁
   }
  } catch (keeperexception e) {
   throw new lockexception(e);
  } catch (interruptedexception e) {
   throw new lockexception(e);
  }
 }

 public boolean trylock() {
  try {
   string splitstr = "_lock_";
   if(lockname.contains(splitstr))
    throw new lockexception("lockname can not contains \\u000b");
   //创建临时子节点
   myznode = zk.create(root + "/" + lockname + splitstr, new byte[0], zoodefs.ids.open_acl_unsafe,createmode.ephemeral_sequential);
   system.out.println(myznode + " is created ");
   //取出所有子节点
   list<string> subnodes = zk.getchildren(root, false);
   //取出所有lockname的锁
   list<string> lockobjnodes = new arraylist<string>();
   for (string node : subnodes) {
    string _node = node.split(splitstr)[0];
    if(_node.equals(lockname)){
     lockobjnodes.add(node);
    }
   }
   collections.sort(lockobjnodes);
   system.out.println(myznode + "==" + lockobjnodes.get(0));
   if(myznode.equals(root+"/"+lockobjnodes.get(0))){
    //如果是最小的节点,则表示取得锁
             return true;
         }
   //如果不是最小的节点,找到比自己小1的节点
   string submyznode = myznode.substring(myznode.lastindexof("/") + 1);
   waitnode = lockobjnodes.get(collections.binarysearch(lockobjnodes, submyznode) - 1);
  } catch (keeperexception e) {
   throw new lockexception(e);
  } catch (interruptedexception e) {
   throw new lockexception(e);
  }
  return false;
 }

 public boolean trylock(long time, timeunit unit) {
  try {
   if(this.trylock()){
    return true;
   }
         return waitforlock(waitnode,time);
  } catch (exception e) {
   e.printstacktrace();
  }
  return false;
 }

 private boolean waitforlock(string lower, long waittime) throws interruptedexception, keeperexception {
        stat stat = zk.exists(root + "/" + lower,true);
        //判断比自己小一个数的节点是否存在,如果不存在则无需等待锁,同时注册监听
        if(stat != null){
         system.out.println("thread " + thread.currentthread().getid() + " waiting for " + root + "/" + lower);
         this.latch = new countdownlatch(1);
         this.latch.await(waittime, timeunit.milliseconds);
         this.latch = null;
        }
        return true;
    }

 public void unlock() {
  try {
   system.out.println("unlock " + myznode);
   zk.delete(myznode,-1);
   myznode = null;
   zk.close();
  } catch (interruptedexception e) {
   e.printstacktrace();
  } catch (keeperexception e) {
   e.printstacktrace();
  }
 }

 public void lockinterruptibly() throws interruptedexception {
  this.lock();
 }

 public condition newcondition() {
  return null;
 }

 public class lockexception extends runtimeexception {
  private static final long serialversionuid = 1l;
  public lockexception(string e){
   super(e);
  }
  public lockexception(exception e){
   super(e);
  }
 }

}

并发测试工具

复制代码 代码如下:

package com.concurrent;

import java.util.arraylist;
import java.util.collections;
import java.util.list;
import java.util.concurrent.copyonwritearraylist;
import java.util.concurrent.countdownlatch;
import java.util.concurrent.atomic.atomicinteger;

/**
  concurrenttask[] task = new concurrenttask[5];
  for(int i=0;i<task.length;i++){
      task[i] = new concurrenttask(){
    public void run() {
     system.out.println("==============");

    }};
  }
  new concurrenttest(task);
 * @author xueliang
 *
 */
public class concurrenttest {
 private countdownlatch startsignal = new countdownlatch(1);//开始阀门
 private countdownlatch donesignal = null;//结束阀门
 private copyonwritearraylist<long> list = new copyonwritearraylist<long>();
 private atomicinteger err = new atomicinteger();//原子递增
 private concurrenttask[] task = null;

 public concurrenttest(concurrenttask... task){
  this.task = task;
  if(task == null){
   system.out.println("task can not null");
   system.exit(1);
  }
  donesignal = new countdownlatch(task.length);
  start();
 }
 /**
  * @param args
  * @throws classnotfoundexception
  */
 private void start(){
  //创建线程,并将所有线程等待在阀门处
  createthread();
  //打开阀门
  startsignal.countdown();//递减锁存器的计数,如果计数到达零,则释放所有等待的线程
  try {
   donesignal.await();//等待所有线程都执行完毕
  } catch (interruptedexception e) {
   e.printstacktrace();
  }
  //计算执行时间
  getexetime();
 }
 /**
  * 初始化所有线程,并在阀门处等待
  */
 private void createthread() {
  long len = donesignal.getcount();
  for (int i = 0; i < len; i++) {
   final int j = i;
   new thread(new runnable(){
    public void run() {
     try {
      startsignal.await();//使当前线程在锁存器倒计数至零之前一直等待
      long start = system.currenttimemillis();
      task[j].run();
      long end = (system.currenttimemillis() - start);
      list.add(end);
     } catch (exception e) {
      err.getandincrement();//相当于err++
     }
     donesignal.countdown();
    }
   }).start();
  }
 }
 /**
  * 计算平均响应时间
  */
 private void getexetime() {
  int size = list.size();
  list<long> _list = new arraylist<long>(size);
  _list.addall(list);
  collections.sort(_list);
  long min = _list.get(0);
  long max = _list.get(size-1);
  long sum = 0l;
  for (long t : _list) {
   sum += t;
  }
  long avg = sum/size;
  system.out.println("min: " + min);
  system.out.println("max: " + max);
  system.out.println("avg: " + avg);
  system.out.println("err: " + err.get());
 }

 public interface concurrenttask {
  void run();
 }

}

测试

复制代码 代码如下:

package com.concurrent;

import com.concurrent.concurrenttest.concurrenttask;

public class zktest {
 public static void main(string[] args) {
  runnable task1 = new runnable(){
   public void run() {
    distributedlock lock = null;
    try {
     lock = new distributedlock("127.0.0.1:2182","test1");
     //lock = new distributedlock("127.0.0.1:2182","test2");
     lock.lock();
     thread.sleep(3000);
     system.out.println("===thread " + thread.currentthread().getid() + " running");
    } catch (exception e) {
     e.printstacktrace();
    }
    finally {
     if(lock != null)
      lock.unlock();
    }

   }

  };
  new thread(task1).start();
  try {
   thread.sleep(1000);
  } catch (interruptedexception e1) {
   e1.printstacktrace();
  }
  concurrenttask[] tasks = new concurrenttask[60];
  for(int i=0;i<tasks.length;i++){
   concurrenttask task3 = new concurrenttask(){
    public void run() {
     distributedlock lock = null;
     try {
      lock = new distributedlock("127.0.0.1:2183","test2");
      lock.lock();
      system.out.println("thread " + thread.currentthread().getid() + " running");
     } catch (exception e) {
      e.printstacktrace();
     }
     finally {
      lock.unlock();
     }

    }
   };
   tasks[i] = task3;
  }
  new concurrenttest(tasks);
 }
}