etcd分布式锁实现 博客分类: 分布式
程序员文章站
2024-03-22 20:39:22
...
引入maven依赖:
<dependency> <groupId>com.coreos</groupId> <artifactId>jetcd-core</artifactId> <version>0.0.2</version> </dependency>
分布式锁实现:
import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import com.coreos.jetcd.Client; import com.coreos.jetcd.Lease; import com.coreos.jetcd.Lock; import com.coreos.jetcd.data.ByteSequence; /** * Etcd Java客户端 Jetcd提供的Lock客户端实现分布式锁 */ public class EtcdDistributedLock { private static EtcdDistributedLock lock = null; private static Object mutex = new Object(); private Client client; // etcd客户端 private Lock lockClient; // etcd分布式锁客户端 private Lease leaseClient; // etcd租约客户端 private EtcdDistributedLock() { super(); // 创建Etcd客户端,本例中Etcd集群只有一个节点 this.client = Client.builder().endpoints("http://localhost:2379").build(); this.lockClient = client.getLockClient(); this.leaseClient = client.getLeaseClient(); } /** * 单例 */ public static EtcdDistributedLock getInstance() { synchronized (mutex) { // 互斥锁 if (null == lock) { lock = new EtcdDistributedLock(); } } return lock; } /** * 加锁操作,需要注意的是,本例中没有加入重试机制,加锁失败将直接返回。 * @param lockName: 针对某一共享资源(数据、文件等)制定的锁名 * @param TTL: Time To Live,租约有效期,一旦客户端崩溃,可在租约到期后自动释放锁 * @return LockResult */ public LockResult lock(String lockName, long TTL) { LockResult lockResult = new LockResult(); /* 1.准备阶段 */ // 创建一个定时任务作为“心跳”,保证等待锁释放期间,租约不失效; // 同时,一旦客户端发生故障,心跳便会停止,锁也会因租约过期而被动释放,避免死锁 ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(); // 初始化返回值lockResult lockResult.setIsLockSuccess(false); lockResult.setService(service); // 记录租约ID,初始值设为 0L Long leaseId = 0L; /* 2.创建租约 */ try { // 创建一个租约,租约有效期为TTL,实际应用中根据具体业务确定 leaseId = leaseClient.grant(TTL).get().getID(); lockResult.setLeaseId(leaseId); // 启动定时任务续约,心跳周期和初次启动延时计算公式如下,可根据实际业务制定 long period = TTL - TTL / 5; service.scheduleAtFixedRate(new KeepAliveTask(leaseClient, leaseId), period, period, TimeUnit.SECONDS); } catch (InterruptedException | ExecutionException e) { System.err.println("[error]: Create lease failed:" + e); return lockResult; } System.out.println(System.currentTimeMillis() + "|[ lock]: "+Thread.currentThread().getName()+" start to lock."); /* 3.加锁操作 */ // 执行加锁操作,并为锁对应的key绑定租约 try { lockClient.lock(ByteSequence.fromString(lockName), leaseId).get(); } catch (InterruptedException | ExecutionException e1) { System.err.println("[error]: lock failed:" + e1); return lockResult; } System.out.println(System.currentTimeMillis() + "|[ lock]: "+Thread.currentThread().getName()+" lock successfully."); lockResult.setIsLockSuccess(true); return lockResult; } /** * 解锁操作,释放锁、关闭定时任务、解除租约 * * @param lockName:锁名 * @param lockResult:加锁操作返回的结果 */ public void unLock(String lockName, LockResult lockResult) { System.err.println(System.currentTimeMillis() + "|[unlock]: "+Thread.currentThread().getName()+" start to unlock."); try { // 释放锁 lockClient.unlock(ByteSequence.fromString(lockName)).get(); // 关闭定时任务 lockResult.getService().shutdown(); // 删除租约 if (lockResult.getLeaseId() != 0L) { leaseClient.revoke(lockResult.getLeaseId()); } } catch (InterruptedException | ExecutionException e) { System.err.println("[error]: unlock failed: " + e); } System.err.println(System.currentTimeMillis() + "|[unlock]: "+Thread.currentThread().getName()+" unlock successfully."); } /** * 在等待其它客户端释放锁期间,通过心跳续约,保证自己的锁对应租约不会失效 */ static class KeepAliveTask implements Runnable { private Lease leaseClient; private long leaseId; KeepAliveTask(Lease leaseClient, long leaseId) { this.leaseClient = leaseClient; this.leaseId = leaseId; } @Override public void run() { // 续约一次 leaseClient.keepAliveOnce(leaseId); } } /** * 该class用于描述加锁的结果,同时携带解锁操作所需参数 */ static class LockResult { private boolean isLockSuccess; private long leaseId; private ScheduledExecutorService service; LockResult() { super(); } public void setIsLockSuccess(boolean isLockSuccess) { this.isLockSuccess = isLockSuccess; } public void setLeaseId(long leaseId) { this.leaseId = leaseId; } public void setService(ScheduledExecutorService service) { this.service = service; } public boolean getIsLockSuccess() { return this.isLockSuccess; } public long getLeaseId() { return this.leaseId; } public ScheduledExecutorService getService() { return this.service; } } /** * 测试分布式锁 * @param args */ public static void main(String[] args) { // 模拟分布式场景下,多个进程 “抢锁” for (int i = 0; i < 10; i++) { new MyThread().start(); } } static class MyThread extends Thread { @Override public void run() { String lockName = "/lock/mylock"; // 分布式锁名称 // 1. 加锁 LockResult lockResult = getInstance().lock(lockName, 30); if (lockResult.getIsLockSuccess()) { // 获得了锁 try { Thread.sleep(10000); // sleep 10秒,模拟执行相关业务 } catch (InterruptedException e) { System.out.println("[error]:" + e); } } // 2. 解锁 getInstance().unLock(lockName, lockResult); } } }
推荐阅读
-
etcd分布式锁实现 博客分类: 分布式
-
etcd分布式锁实现 博客分类: 分布式
-
【并发控制】并发控制与分布式锁(redis/zookeeper)实现【图文教程】_ 第3章
-
spark1.1.0部署standalone分布式集群 博客分类: spark spark1.10
-
hadoop 1.2.1 完全分布式安装注意要点 博客分类: hadoop学习hadoop环境搭建 hadoop 完全分布式安装
-
JAVA线程池管理及分布式HADOOP调度框架搭建 博客分类: java架构hadoopjeetask 分布式线程池java架构hadoop
-
分布式架构下的会话追踪实践【基于Cookie和Redis实现】 博客分类: NoSQL/Redis/MongoDB session共享rediscookie分布式架构session
-
Apache Kafka 集群环境搭建 博客分类: Apache Kafka kafkaapache分布式消息系统
-
Apache Kafka 集群环境搭建 博客分类: Apache Kafka kafkaapache分布式消息系统
-
分布式SQL查询引擎Presto原理介绍 博客分类: python&nodejs presto