欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Java分布式锁的三种实现方案

程序员文章站 2024-03-07 20:40:39
方案一:数据库乐观锁 乐观锁通常实现基于数据版本(version)的记录机制实现的,比如有一张红包表(t_bonus),有一个字段(left_count)记录礼物的剩余个...

方案一:数据库乐观锁

乐观锁通常实现基于数据版本(version)的记录机制实现的,比如有一张红包表(t_bonus),有一个字段(left_count)记录礼物的剩余个数,用户每领取一个奖品,对应的left_count减1,在并发的情况下如何要保证left_count不为负数,乐观锁的实现方式为在红包表上添加一个版本号字段(version),默认为0。

异常实现流程

-- 可能会发生的异常情况
-- 线程1查询,当前left_count为1,则有记录
select * from t_bonus where id = 10001 and left_count > 0

-- 线程2查询,当前left_count为1,也有记录
select * from t_bonus where id = 10001 and left_count > 0

-- 线程1完成领取记录,修改left_count为0,
update t_bonus set left_count = left_count - 1 where id = 10001

-- 线程2完成领取记录,修改left_count为-1,产生脏数据
update t_bonus set left_count = left_count - 1 where id = 10001

通过乐观锁实现

-- 添加版本号控制字段
alter table table add column version int default '0' not null after t_bonus;

-- 线程1查询,当前left_count为1,则有记录,当前版本号为1234
select left_count, version from t_bonus where id = 10001 and left_count > 0

-- 线程2查询,当前left_count为1,有记录,当前版本号为1234
select left_count, version from t_bonus where id = 10001 and left_count > 0

-- 线程1,更新完成后当前的version为1235,update状态为1,更新成功
update t_bonus set version = 1235, left_count = left_count-1 where id = 10001 and version = 1234

-- 线程2,更新由于当前的version为1235,udpate状态为0,更新失败,再针对相关业务做异常处理
update t_bonus set version = 1235, left_count = left_count-1 where id = 10001 and version = 1234

方案二:基于redis的分布式锁

setnx命令(set if not exists)\
语法:setnx key value\
功能:原子性操作,当且仅当 key 不存在,将 key 的值设为 value ,并返回1;若给定的 key 已经存在,则 setnx 不做任何动作,并返回0。\
expire命令\
语法:expire(key, expiretime)\
功能:key设置过期时间\
getset命令\
语法:getset key value\
功能:将给定 key 的值设为 value ,并返回 key 的旧值 (old value),当 key 存在但不是字符串类型时,返回一个错误,当key不存在时,返回nil。\
get命令\
语法:get key\
功能:返回 key 所关联的字符串值,如果 key 不存在那么返回特殊值 nil 。\
del命令\
语法:del key [key …]\
功能:删除给定的一个或多个 key ,不存在的 key 会被忽略。

第一种:使用redis的setnx()、expire()方法,用于分布式锁

  • setnx(lockkey, 1) 如果返回0,则说明占位失败;如果返回1,则说明占位成功
  • expire()命令对lockkey设置超时时间,为的是避免死锁问题。
  • 执行完业务代码后,可以通过delete命令删除key。

这个方案其实是可以解决日常工作中的需求的,但从技术方案的探讨上来说,可能还有一些可以完善的地方。比如,如果在第一步setnx执行成功后,在expire()命令执行成功前,发生了宕机的现象,那么就依然会出现死锁的问题

第二种:使用redis的setnx()、get()、getset()方法,用于分布式锁,解决死锁问题

  • setnx(lockkey, 当前时间+过期超时时间) ,如果返回1,则获取锁成功;如果返回0则没有获取到锁,转向2。
  • get(lockkey)获取值oldexpiretime ,并将这个value值与当前的系统时间进行比较,如果小于当前系统时间,则认为这个锁已经超时,可以允许别的请求重新获取,转向3。
  • 计算newexpiretime=当前时间+过期超时时间,然后getset(lockkey, newexpiretime) 会返回当前lockkey的值currentexpiretime。
  • 判断currentexpiretime与oldexpiretime 是否相等,如果相等,说明当前getset设置成功,获取到了锁。如果不相等,说明这个锁又被别的请求获取走了,那么当前请求可以直接返回失败,或者继续重试。
  • 在获取到锁之后,当前线程可以开始自己的业务处理,当处理完毕后,比较自己的处理时间和对于锁设置的超时时间,如果小于锁设置的超时时间,则直接执行delete释放锁;如果大于锁设置的超时时间,则不需要再锁进行处理。
import cn.com.tpig.cache.redis.redisservice;
import cn.com.tpig.utils.springutils;
/**
 * created by idea
 * user: shma1664
 * date: 2016-08-16 14:01
 * desc: redis分布式锁
 */
public final class redislockutil {
 private static final int defaultexpire = 60;
 private redislockutil() {
 //
 }
 /**
 * 加锁
 * @param key redis key
 * @param expire 过期时间,单位秒
 * @return true:加锁成功,false,加锁失败
 */
 public static boolean lock(string key, int expire) {
 redisservice redisservice = springutils.getbean(redisservice.class);
 long status = redisservice.setnx(key, "1");

 if(status == 1) {
 redisservice.expire(key, expire);
 return true;
 }
 return false;
 }
 public static boolean lock(string key) {
 return lock2(key, defaultexpire);
 }
 /**
 * 加锁
 * @param key redis key
 * @param expire 过期时间,单位秒
 * @return true:加锁成功,false,加锁失败
 */
 public static boolean lock2(string key, int expire) {
 redisservice redisservice = springutils.getbean(redisservice.class);
 long value = system.currenttimemillis() + expire;
 long status = redisservice.setnx(key, string.valueof(value));
 if(status == 1) {
 return true;
 }
 long oldexpiretime = long.parselong(redisservice.get(key, "0"));
 if(oldexpiretime < system.currenttimemillis()) {
 //超时
 long newexpiretime = system.currenttimemillis() + expire;
 long currentexpiretime = long.parselong(redisservice.getset(key, string.valueof(newexpiretime)));
 if(currentexpiretime == oldexpiretime) {
 return true;
 }
 }
 return false;
 }
 public static void unlock1(string key) {
 redisservice redisservice = springutils.getbean(redisservice.class);
 redisservice.del(key);
 }
 public static void unlock2(string key) { 
 redisservice redisservice = springutils.getbean(redisservice.class); 
 long oldexpiretime = long.parselong(redisservice.get(key, "0")); 
 if(oldexpiretime > system.currenttimemillis()) { 
 redisservice.del(key); 
 }
 }
}

public void drawredpacket(long userid) {
 string key = "draw.redpacket.userid:" + userid;
 boolean lock = redislockutil.lock2(key, 60);
 if(lock) {
 try {
 //领取操作
 } finally {
 //释放锁
 redislockutil.unlock(key);
 }
 } else {
 new runtimeexception("重复领取奖励");
 }
}

spring aop基于注解方式和spel实现开箱即用的redis分布式锁策略

import java.lang.annotation.elementtype;
import java.lang.annotation.retention;
import java.lang.annotation.retentionpolicy;
import java.lang.annotation.target;
/**
 * runtime
 * 定义注解
 * 编译器将把注释记录在类文件中,在运行时 vm 将保留注释,因此可以反射性地读取。
 * @author shma1664
 *
 */
@retention(retentionpolicy.runtime)
@target(elementtype.method)
public @interface redislockable {
 string[] key() default "";
 long expiration() default 60;
}
import javax.annotation.resource;
import java.lang.reflect.method;
import com.autohome.api.dealer.util.cache.redisclient;
import com.google.common.base.joiner;
import org.aspectj.lang.proceedingjoinpoint;
import org.aspectj.lang.signature;
import org.aspectj.lang.annotation.around;
import org.aspectj.lang.annotation.aspect;
import org.aspectj.lang.annotation.pointcut;
import org.aspectj.lang.reflect.methodsignature;
import org.springframework.expression.evaluationcontext;
import org.springframework.expression.expression;
import org.springframework.expression.expressionparser;
import org.springframework.expression.spel.standard.spelexpressionparser;
import org.springframework.expression.spel.support.standardevaluationcontext;
import org.springframework.stereotype.component;
/**
 * created by idea
 * user: mashaohua
 * date: 2016-09-28 18:08
 * desc:
 */
@aspect
@component
public class redislockaop {
 @resource
 private redisclient redisclient;
 @pointcut("execution(* com.autohome.api.dealer.tuan.service.*.*(..))")
 public void pointcut(){}
 @around("pointcut()")
 public object doaround(proceedingjoinpoint point) throws throwable{
 signature signature = point.getsignature();
 methodsignature methodsignature = (methodsignature) signature;
 method method = methodsignature.getmethod();
 string targetname = point.gettarget().getclass().getname();
 string methodname = point.getsignature().getname();
 object[] arguments = point.getargs();
 if (method != null && method.isannotationpresent(redislockable.class)) {
 redislockable redislock = method.getannotation(redislockable.class);
 long expire = redislock.expiration();
 string rediskey = getlockkey(targetname, methodname, redislock.key(), arguments);
 boolean islock = redislockutil.lock2(rediskey, expire);
 if(!islock) {
 try {
 return point.proceed();
 } finally {
 unlock2(rediskey);
 }
 } else {
 throw new runtimeexception("您的操作太频繁,请稍后再试");
 }
 }
 return point.proceed();
 }
 private string getlockkey(string targetname, string methodname, string[] keys, object[] arguments) {
 stringbuilder sb = new stringbuilder();
 sb.append("lock.").append(targetname).append(".").append(methodname);
 if(keys != null) {
 string keystr = joiner.on(".").skipnulls().join(keys);
 string[] parameters = reflectparamnames.getnames(targetname, methodname);
 expressionparser parser = new spelexpressionparser();
 expression expression = parser.parseexpression(keystr);
 evaluationcontext context = new standardevaluationcontext();
 int length = parameters.length;
 if (length > 0) {
 for (int i = 0; i < length; i++) {
 context.setvariable(parameters[i], arguments[i]);
 }
 }
 string keysvalue = expression.getvalue(context, string.class);
 sb.append("#").append(keysvalue);
 }
 return sb.tostring();
 }
<!-- https://mvnrepository.com/artifact/javassist/javassist -->
<dependency>
 <groupid>org.javassist</groupid>
 <artifactid>javassist</artifactid>
 <version>3.18.1-ga</version>
</dependency>
import javassist.*;
import javassist.bytecode.codeattribute;
import javassist.bytecode.localvariableattribute;
import javassist.bytecode.methodinfo;
import org.apache.log4j.logger;
/**
 * created by idea
 * user: mashaohua
 * date: 2016-09-28 18:39
 * desc:
 */
public class reflectparamnames {
 private static logger log = logger.getlogger(reflectparamnames.class);
 private static classpool pool = classpool.getdefault();
 static{
 classclasspath classpath = new classclasspath(reflectparamnames.class);
 pool.insertclasspath(classpath);
 }
 public static string[] getnames(string classname,string methodname) {
 ctclass cc = null;
 try {
 cc = pool.get(classname);
 ctmethod cm = cc.getdeclaredmethod(methodname);
 // 使用javaassist的反射方法获取方法的参数名
 methodinfo methodinfo = cm.getmethodinfo();
 codeattribute codeattribute = methodinfo.getcodeattribute();
 localvariableattribute attr = (localvariableattribute) codeattribute.getattribute(localvariableattribute.tag);
 if (attr == null) return new string[0];
 int begin = 0;
 string[] paramnames = new string[cm.getparametertypes().length];
 int count = 0;
 int pos = modifier.isstatic(cm.getmodifiers()) ? 0 : 1;
 for (int i = 0; i < attr.tablelength(); i++){
 // 为什么 加这个判断,发现在windows 跟linux执行时,参数顺序不一致,通过观察,实际的参数是从this后面开始的
 if (attr.variablename(i).equals("this")){
 begin = i;
 break;
 }
 }
 for (int i = begin+1; i <= begin+paramnames.length; i++){
 paramnames[count] = attr.variablename(i);
 count++;
 }
 return paramnames;
 } catch (exception e) {
 e.printstacktrace();
 }finally{
 try {
 if(cc != null) cc.detach();
 } catch (exception e2) {
 log.error(e2.getmessage());
 }
 }
 return new string[0];
 }
}

在需要使用分布式锁的地方添加注解

/**
 * 抽奖接口
 * 添加redis分布式锁保证一个订单只有一个请求处理,防止用户刷礼物,支持spel表达式
 * redislockkey:lock.com.autohome.api.dealer.tuan.service.impl.drawbonus#orderid
 * @param orderid 订单id
 * @return 抽中的奖品信息
 */
@redislockable(key = {"#orderid"}, expiration = 120)
@override
public bonusconvertbean drawbonus(integer orderid) throws bonusexception{
 // 业务逻辑
}

第三种方案:基于zookeeper的分布式锁

利用节点名称的唯一性来实现独占锁

zookeeper机制规定同一个目录下只能有一个唯一的文件名,zookeeper上的一个znode看作是一把锁,通过createznode的方式来实现。所有客户端都去创建/lock/${lock_name}_lock节点,最终成功创建的那个客户端也即拥有了这把锁,创建失败的可以选择监听继续等待,还是放弃抛出异常实现独占锁。
package com.shma.example.zookeeper.lock;

import java.io.ioexception;
import java.util.arraylist;
import java.util.collections;
import java.util.list;
import java.util.concurrent.countdownlatch;
import java.util.concurrent.timeunit;
import java.util.concurrent.locks.condition;
import java.util.concurrent.locks.lock;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.stat;
/**
 * created by idea
 * user: mashaohua
 * date: 2016-09-30 16:09
 * desc:
 */
public class zookeeperlock implements lock, watcher {
 private zookeeper zk;
 private string root = "/locks";//根
 private string lockname;//竞争资源的标志
 private string myznode;//当前锁
 private int sessiontimeout = 30000;
 private list<exception> exception = new arraylist<exception>();
 /**
 * 创建分布式锁,使用前请确认config配置的zookeeper服务可用
 * @param config 127.0.0.1:2181
 * @param lockname 竞争资源标志,lockname中不能包含单词lock
 */
 public zookeeperlock(string config, string lockname){
 this.lockname = lockname;
 // 创建一个与服务器的连接
 try {
 zk = new zookeeper(config, sessiontimeout, this);
 stat stat = zk.exists(root, false);
 if(stat == null){
 // 创建根节点
 zk.create(root, new byte[0], zoodefs.ids.open_acl_unsafe, createmode.persistent);
 }
 } catch (ioexception e) {
 exception.add(e);
 } catch (keeperexception e) {
 exception.add(e);
 } catch (interruptedexception e) {
 exception.add(e);
 }
 }
 @override
 public void lock() {
 if(exception.size() > 0){
 throw new lockexception(exception.get(0));
 }
 if(!trylock()) {
 throw new lockexception("您的操作太频繁,请稍后再试");
 }
 }
 @override
 public void lockinterruptibly() throws interruptedexception {
 this.lock();
 }
 @override
 public boolean trylock() {
 try {
 myznode = zk.create(root + "/" + lockname, new byte[0], zoodefs.ids.open_acl_unsafe, createmode.persistent);
 return true;
 } catch (keeperexception e) {
 e.printstacktrace();
 } catch (interruptedexception e) {
 e.printstacktrace();
 }
 return false;
 }
 @override
 public boolean trylock(long time, timeunit unit) throws interruptedexception {
 return trylock();
 }
 @override
 public void unlock() {
 try {
 zk.delete(myznode, -1);
 myznode = null;
 zk.close();
 } catch (interruptedexception e) {
 e.printstacktrace();
 } catch (keeperexception e) {
 e.printstacktrace();
 }
 }
 @override
 public condition newcondition() {
 return null;
 }
 @override
 public void process(watchedevent watchedevent) {
 //
 }
}
zookeeperlock lock = null;
try {
 lock = new zookeeperlock("127.0.0.1:2182","test1");
 lock.lock();
 //业务逻辑处理
} catch (lockexception e) {
 throw e;
} finally {
 if(lock != null)
 lock.unlock();
}

利用临时顺序节点控制时序实现

/lock已经预先存在,所有客户端在它下面创建临时顺序编号目录节点,和选master一样,编号最小的获得锁,用完删除,依次方便。\

算法思路:对于加锁操作,可以让所有客户端都去/lock目录下创建临时顺序节点,如果创建的客户端发现自身创建节点序列号是/lock/目录下最小的节点,则获得锁。否则,监视比自己创建节点的序列号小的节点(比自己创建的节点小的最大节点),进入等待。

对于解锁操作,只需要将自身创建的节点删除即可。

package com.shma.example.zookeeper.lock;
import java.io.ioexception;
import java.util.arraylist;
import java.util.collections;
import java.util.list;
import java.util.concurrent.countdownlatch;
import java.util.concurrent.timeunit;
import java.util.concurrent.locks.condition;
import java.util.concurrent.locks.lock;
import org.apache.zookeeper.createmode;
import org.apache.zookeeper.keeperexception;
import org.apache.zookeeper.watchedevent;
import org.apache.zookeeper.watcher;
import org.apache.zookeeper.zoodefs;
import org.apache.zookeeper.zookeeper;
import org.apache.zookeeper.data.stat;
/**
 * created by idea
 * user: mashaohua
 * date: 2016-09-30 16:09
 * desc:
 */
public class distributedlock implements lock, watcher{
 private zookeeper zk;
 private string root = "/locks";//根
 private string lockname;//竞争资源的标志
 private string waitnode;//等待前一个锁
 private string myznode;//当前锁
 private countdownlatch latch;//计数器
 private int sessiontimeout = 30000;
 private list<exception> exception = new arraylist<exception>();
 /**
 * 创建分布式锁,使用前请确认config配置的zookeeper服务可用
 * @param config 127.0.0.1:2181
 * @param lockname 竞争资源标志,lockname中不能包含单词lock
 */
 public distributedlock(string config, string lockname){
 this.lockname = lockname;
 // 创建一个与服务器的连接
 try {
 zk = new zookeeper(config, sessiontimeout, this);
 stat stat = zk.exists(root, false);
 if(stat == null){
 // 创建根节点
 zk.create(root, new byte[0], zoodefs.ids.open_acl_unsafe,createmode.persistent);
 }
 } catch (ioexception e) {
 exception.add(e);
 } catch (keeperexception e) {
 exception.add(e);
 } catch (interruptedexception e) {
 exception.add(e);
 }
 }
 /**
 * zookeeper节点的监视器
 */
 public void process(watchedevent event) {
 if(this.latch != null) {
 this.latch.countdown();
 }
 }
 public void lock() {
 if(exception.size() > 0){
 throw new lockexception(exception.get(0));
 }
 try {
 if(this.trylock()){
 system.out.println("thread " + thread.currentthread().getid() + " " +myznode + " get lock true");
 return;
 }
 else{
 waitforlock(waitnode, sessiontimeout);//等待锁
 }
 } catch (keeperexception e) {
 throw new lockexception(e);
 } catch (interruptedexception e) {
 throw new lockexception(e);
 }
 }
 public boolean trylock() {
 try {
 string splitstr = "_lock_";
 if(lockname.contains(splitstr))
 throw new lockexception("lockname can not contains \\u000b");
 //创建临时子节点
 myznode = zk.create(root + "/" + lockname + splitstr, new byte[0], zoodefs.ids.open_acl_unsafe,createmode.ephemeral_sequential);
 system.out.println(myznode + " is created ");
 //取出所有子节点
 list<string> subnodes = zk.getchildren(root, false);
 //取出所有lockname的锁
 list<string> lockobjnodes = new arraylist<string>();
 for (string node : subnodes) {
 string _node = node.split(splitstr)[0];
 if(_node.equals(lockname)){
 lockobjnodes.add(node);
 }
 }
 collections.sort(lockobjnodes);
 system.out.println(myznode + "==" + lockobjnodes.get(0));
 if(myznode.equals(root+"/"+lockobjnodes.get(0))){
 //如果是最小的节点,则表示取得锁
 return true;
 }
 //如果不是最小的节点,找到比自己小1的节点
 string submyznode = myznode.substring(myznode.lastindexof("/") + 1);
 waitnode = lockobjnodes.get(collections.binarysearch(lockobjnodes, submyznode) - 1);
 } catch (keeperexception e) {
 throw new lockexception(e);
 } catch (interruptedexception e) {
 throw new lockexception(e);
 }
 return false;
 }
 public boolean trylock(long time, timeunit unit) {
 try {
 if(this.trylock()){
 return true;
 }
 return waitforlock(waitnode,time);
 } catch (exception e) {
 e.printstacktrace();
 }
 return false;
 }
 private boolean waitforlock(string lower, long waittime) throws interruptedexception, keeperexception {
 stat stat = zk.exists(root + "/" + lower,true);
 //判断比自己小一个数的节点是否存在,如果不存在则无需等待锁,同时注册监听
 if(stat != null){
 system.out.println("thread " + thread.currentthread().getid() + " waiting for " + root + "/" + lower);
 this.latch = new countdownlatch(1);
 this.latch.await(waittime, timeunit.milliseconds);
 this.latch = null;
 }
 return true;
 }
 public void unlock() {
 try {
 system.out.println("unlock " + myznode);
 zk.delete(myznode,-1);
 myznode = null;
 zk.close();
 } catch (interruptedexception e) {
 e.printstacktrace();
 } catch (keeperexception e) {
 e.printstacktrace();
 }
 }
 public void lockinterruptibly() throws interruptedexception {
 this.lock();
 }
 public condition newcondition() {
 return null;
 }
 public class lockexception extends runtimeexception {
 private static final long serialversionuid = 1l;
 public lockexception(string e){
 super(e);
 }
 public lockexception(exception e){
 super(e);
 }
 }
}

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,同时也希望多多支持!