欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

基于Zookeeper实现分布式锁

程序员文章站 2022-05-07 14:30:30
...

了解zk的分布式锁之前,就必须知道zk的原理,zk分永久顺序节点,临时顺序节点
架构介绍
在介绍使用Zookeeper实现分布式锁之前,首先看当前的系统架构图
基于Zookeeper实现分布式锁
解释: 左边的整个区域表示一个Zookeeper集群,locker是Zookeeper的一个持久节点,node_1、node_2、node_3是locker这个持久节点下面的临时顺序节点。client_1、client_2、client_n表示多个客户端,Service表示需要互斥访问的共享资源。

大致思想是
每个客户端对某个方法加锁时,在zookeeper上与该方法对应的指定根节点的目录下,生成一个唯一的临时顺序节点。判断是否获取锁的方式呢就是,只需要判断该节点是否是有序节点中最小一个。
如果不是最小一个顺序节点,则监听该节点的前一个节点。
释放锁:只需要将这个临时节点删除即可,这里客户端要是断开的话,临时节点会自动删除,可以避免服务宕机导致的锁无法释放,而产生的死锁问题。

  • 锁无法释放?使用Zookeeper可以有效的解决锁无法释放的问题,因为在创建锁的时候,客户端会在ZK中创建一个临时节点,一旦客户端获取到锁之后突然挂掉(Session连接断开),那么这个临时节点就会自动删除掉。其他客户端就可以再次获得锁。
  • 非阻塞锁?使用Zookeeper可以实现阻塞的锁,客户端可以通过在ZK中创建顺序节点,并且在节点上绑定监听器,一旦节点有变化,Zookeeper会通知客户端,客户端可以检查自己创建的节点是不是当前所有节点中序号最小的,如果是,那么自己就获取到锁,便可以执行业务逻辑了。
  • 不可重入?使用Zookeeper也可以有效的解决不可重入的问题,客户端在创建节点的时候,把当前客户端的主机信息和线程信息直接写入到节点中,下次想要获取锁的时候和当前最小的节点中的数据比对一下就可以了。如果和自己的信息一样,那么自己直接获取到锁,如果不一样就再创建一个临时的顺序节点,参与排队。
  • 单点问题?使用Zookeeper可以有效的解决单点问题,ZK是集群部署的,只要集群中有半数以上的机器存活,就可以对外提供服务。

下面放上基于curator实现分布式锁的demo

  • pom文件
<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>4.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.12</version>
        </dependency>

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
  • application.yml
spring:
  curator:
    sleep-time-ms: 3000
    max-retries: 3
    session-time-out-ms: 1800000
    zookeeper-address: 127.0.0.1:2181
    lock-node:
      - ROOT-LOCK
    interface-server:
      share-node: INTER-NODE
package com.evan.utils;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import java.util.List;
import java.util.concurrent.CountDownLatch;

@Component
@ConfigurationProperties(prefix = "spring.curator")
public class ZkClientHelper implements CommandLineRunner {
    private static final Logger logger = LoggerFactory.getLogger(ZkClientHelper.class);
    private static CountDownLatch countDownLatch = new CountDownLatch(1);

    // 失败重试间隔时间 单位:毫秒
    private int sleepTimeMs;
    // 失败重试次数
    private int maxRetries;
    // 会话存活时间 单位:毫秒
    private int sessionTimeOutMs;
    // zookeeper 服务地址
    private String zookeeperAddress;

    private List<String> lockNode;

    public void setSleepTimeMs(int sleepTimeMs) {
        this.sleepTimeMs = sleepTimeMs;
    }

    public void setMaxRetries(int maxRetries) {
        this.maxRetries = maxRetries;
    }

    public void setSessionTimeOutMs(int sessionTimeOutMs) {
        this.sessionTimeOutMs = sessionTimeOutMs;
    }

    public void setZookeeperAddress(String zookeeperAddress) {
        this.zookeeperAddress = zookeeperAddress;
    }

    public List<String> getLockNode() {
        return lockNode;
    }

    public void setLockNode(List<String> lockNode) {
        this.lockNode = lockNode;
    }

    @Bean
    @Order(-1)
    CuratorFramework build(){
        CuratorFramework client = CuratorFrameworkFactory
                .builder()
                .connectString(zookeeperAddress)
                .retryPolicy(new ExponentialBackoffRetry(sleepTimeMs,maxRetries))
                .namespace(ZK_CONSTANT.NAMESPACE.getValue())
                .sessionTimeoutMs(sessionTimeOutMs)
                .build();
        client.start();
        return client;
    }

    @Override
    public void run(String... args) throws Exception {
        CuratorFramework curatorFramework = build();
        curatorFramework.usingNamespace("zookeeper-lock");
        lockNode.forEach(node -> {
            String lockPath = ZK_CONSTANT.SPLIT.getValue() + node;
            // 检查锁节点是否存在
            Stat stat = null;
            try {
                stat = curatorFramework.checkExists()
                        .forPath(lockPath);
            if (StringUtils.isEmpty(stat)) {
                // 若不存在锁节点,则创建锁节点
                curatorFramework.create()
                        .creatingParentsIfNeeded()
                        .withMode(CreateMode.PERSISTENT)
                        .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                        .forPath(lockPath);
            }
            // 实现zookeeper 节点监听事件
            setWatcher(lockPath);
            logger.error("锁节点:{}初始化创建完成",lockPath);
            } catch (Exception e) {
                e.printStackTrace();
                logger.error("锁节点:{}初始化创建异常:{}",lockPath,e.getMessage());
            }
        });
    }

    private void setWatcher(String path) throws Exception {
        final PathChildrenCache cache = new PathChildrenCache(build(), path, false);
        cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
        cache.getListenable().addListener((client, event) -> {
            if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
                String oldPath = event.getData().getPath();
                logger.error("success to release lock for path:{}" , oldPath);
                if (oldPath.contains(path)) {
                    //释放计数器,让当前的请求获取锁
                    countDownLatch.countDown();
                }
            }
        });
    }

    /**
     *  获取锁
     *
     * @param rootPath 锁节点
     * @param path 节点
     */
    public void acquireDistributedLock(String rootPath,String path) {
        String lockPath = ZK_CONSTANT.SPLIT.getValue() + rootPath + ZK_CONSTANT.SPLIT.getValue() + path;
        while (true) {
            try {
                build()
                        .create()
                        .creatingParentsIfNeeded()
                        .withMode(CreateMode.EPHEMERAL)
                        .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                        .forPath(lockPath);
                break;
            } catch (Exception e) {
                logger.error("failed to acquire lock for path:{},exception:{}", lockPath,e.getMessage());
                try {
                    if (countDownLatch.getCount() <= 0) {
                        countDownLatch = new CountDownLatch(1);
                    }
                    countDownLatch.await();
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
            }
        }
    }

    /**
     *  释放锁
     *
     * @param rootPath 锁节点
     * @param path 节点
     * @return
     */
    public void releaseDistributedLock(String rootPath,String path){
        String lockPath = ZK_CONSTANT.SPLIT.getValue() + rootPath + ZK_CONSTANT.SPLIT.getValue() + path;
        try {
            if (null != build().checkExists().forPath(lockPath)) {
                build().delete().forPath(lockPath);
            }
        } catch (Exception e) {
            logger.error("failed to release lock:{},exception:{}",lockPath,e.getMessage());
            try {
                Thread.sleep(3000);
                this.releaseDistributedLock(rootPath,path);
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
        }
    }

    enum ZK_CONSTANT{
        SPLIT("/","分隔符"),NAMESPACE("SHARE_LOCK","锁域空间");
        String value ;
        String desc ;

        ZK_CONSTANT(String value, String desc) {
            this.value = value;
            this.desc = desc;
        }

        public String getValue() {
            return value;
        }

        public String getDesc() {
            return desc;
        }
    }
}

package com.evan.controller;

import com.evan.utils.ZkClientHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;


@RestController
public class LockController {

    private static final Logger logger = LoggerFactory.getLogger(LockController.class);

    @Autowired
    private ZkClientHelper zkClientHelper;

    @Value("${spring.curator.interface-server.share-node}")
    String shareNode;

    @RequestMapping("lock1")
    public void lock1() {
        zkClientHelper.acquireDistributedLock("ROOT-LOCK",shareNode);
        try {
            logger.info("I am lock1,i am updating resource……!!!");
            Thread.sleep(20000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            zkClientHelper.releaseDistributedLock("ROOT-LOCK",shareNode);
        }
    }

    @RequestMapping("lock2")
    public void lock2() {
        zkClientHelper.acquireDistributedLock("ROOT-LOCK",shareNode);
        try {
            logger.info("I am lock2,i am updating resource……!!!");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            zkClientHelper.releaseDistributedLock("ROOT-LOCK",shareNode);
        }
    }


    @RequestMapping("lock3")
    public void lock3() {
        zkClientHelper.acquireDistributedLock("ROOT-LOCK",shareNode);
        try {
            logger.info("I am lock3,i am updating resource……!!!");
            Thread.sleep(20000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            zkClientHelper.releaseDistributedLock("ROOT-LOCK",shareNode);
        }
    }

    @RequestMapping("lock4")
    public void lock4() {
        zkClientHelper.acquireDistributedLock("ROOT-LOCK",shareNode);
        try {
            logger.info("I am lock4,i am updating resource……!!!");
            Thread.sleep(15000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            zkClientHelper.releaseDistributedLock("ROOT-LOCK",shareNode);
        }
    }
}

github链接

相关标签: 分布式