欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

基于zookeeper实现分布式锁

程序员文章站 2022-05-07 14:30:30
...

什么是分布式锁?

可以这么理解:分布式锁和线程锁类似,区别在于分布式锁是对于不同机器上的,而线程锁是对于同一机器不同线程的。分布式锁用于在分布式系统中,不同服务器对同一条数据进行修改时运用到。

基于zookeeper实现分布式锁

获取到分布式锁的服务器将可进行修改,例如服务器1得到锁之后将可进行修改,该分布式锁将锁定,则服务器2获取不到该锁,进行等待服务器1释放锁后才可进行获取锁。

为什么要用zookeeper来做?

比如用数据库来做好像也可以,但是存在问题的是:如果服务器1获取到锁之后宕机了,那么该锁将一直没释放,那么其他服务器进行获取锁将永远获取不到,然而zookeeper则可以做到这一点,也就是服务器一旦宕机,锁节点将自行释放掉;还有另一个问题是:提供分布式锁的服务器宕机了那么就永远获取不到该锁,那么这时就需要用到集群,而如果采用数据库集群那么成本也太高了,而如果zookeeper进行集群则好多了


下面粗略采用zookeeper来实现一个分布式锁

安装zookeeper我就不说了,在这一篇文章中前面有具体介绍到

下面直接贴具体的实现:

package com.cwh.zk.util;

import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

public class ZkdistributeLock {
	static volatile Object obj = new Object();
	private static ZkdistributeLock zkLock = null;
    private static final int SESSION_TIMEOUT = 5000;  // 超时时间  
    private String hosts = "192.168.27.129:2181,192.168.27.130:2181";  // zookeeper server列表  
    private ZooKeeper zk;  
    private CountDownLatch latch = new CountDownLatch(1);  
  
	public static ZkdistributeLock getIntance() throws Exception{
		if(zkLock==null){
			synchronized (obj) {
				if(zkLock==null){
					zkLock = new ZkdistributeLock();
				}
			}
		}
		return zkLock;
	}
	
	ZkdistributeLock() throws Exception{
		zk = new ZooKeeper(hosts, SESSION_TIMEOUT, new ZkLockWatcher());
		latch.await();
	}

	/**
	 * 获取分布式锁
	 * @return
	 * @throws InterruptedException 
	 */
	public String getLock() throws InterruptedException{
		String node = null;
		int count = 1;
		System.out.println("获取锁...("+count+")");
		try {
			node = zk.create("/zkDistributeLock", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
		} catch (Exception e) {
			while(true){
				try {
					node = zk.create("/zkDistributeLock", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
				} catch (Exception e1) {
					Thread.sleep(1000);
					if(count==100)break;//超过100次失败不再请求连接
					count++;
					System.out.println("获取锁...("+count+")");
					continue;
				}
				System.out.println("获取锁成功...");
				break;
			}
		} 
		System.out.println("===============");
		return node;
		
	}

	/**
	 * 释放分布式锁
	 * @param id
	 * @throws InterruptedException
	 * @throws KeeperException
	 */
	public void rmLock(String id) throws Exception {
		zk.delete(id, -1);
		System.out.println("释放锁...");
	}
	
	public class ZkLockWatcher implements Watcher {
		@Override
		public void process(WatchedEvent event) {
			try {  
	            // 连接建立时, 打开latch, 唤醒wait在该latch上的线程  
	            if (event.getState() == KeeperState.SyncConnected) {  
	                latch.countDown();  
	            }  
	        } catch (Exception e) {  
	            e.printStackTrace();  
	        }  
		}

	}

}

测试类:

package com.cwh.zk.util;


public class ZkdistributeLockTest {
	public static void main(String[] args) throws Exception {

	ZkdistributeLock zklock = ZkdistributeLock.getIntance();
	//获取分布式锁
	String id = zklock.getLock();
	//dosomething
	Thread.sleep(10000);
	//释放锁
	zklock.rmLock(id);
	}
}
运行测试:

直接run 3次吧,也就是启动了三个;

基于zookeeper实现分布式锁基于zookeeper实现分布式锁基于zookeeper实现分布式锁

ok!这样就算是粗略实现了一个分布式锁啦!如有不当地方望指正