欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

JavaEE:单应用锁与分布式应用锁

程序员文章站 2022-07-12 16:43:07
...

一、单应用锁:

说明:只能在非集群Web应用(JVM环境)中使用。

1.update行锁,在...Mapper.xml中:

<mapper ...>
   <update id="方法名">
      update 表名 set ...,   #行锁,update语句执行时,其他操作等待
      where 条件
   </update>
</mapper>

2.synchronized锁:

(1)synchronized方法锁:

@Autowired
PlatformTransactionManager platformTransactionManager;
@Autowired TransactionDefinition transactionDefinition;
public String synchronized createServiceOrder(){ //1.方法添加synchronized
   //2.获取事务
   TransactionStatus t = platformTransactionManager.getTransaction(transactionDefinition);
   ...   //查询医院与医生信息
   if(判断当前医院与医生是否存在){//3.1不存在时回滚
      platformTransactionManager.rollback(t);
      throw new Exception("选择的医院或医生不存在");
   }
   if(判断当前用户剩余服务次数){//3.2次数为0时回滚
      platformTransactionManager.rollback(t);
      throw new Exception("选择的医院或医生不存在");
   }
   ...   //更新数据:减少用户的服务次数
   //4.提交事务
   platformTransactionManager.commit(t);
   ...   //5.生成订单
   return 订单id;
}

(2)synchronized块锁:

@Autowired
PlatformTransactionManager platformTransactionManager;
@Autowired TransactionDefinition transactionDefinition;
public String createServiceOrder(){
   synchronized(){ //1.添加synchronized
      //2.获取事务
      TransactionStatus t = platformTransactionManager.getTransaction(transactionDefinition);
       ...   //查询医院与医生信息
      if(判断当前医院与医生是否存在){//3.1不存在时回滚
         platformTransactionManager.rollback(t);
         throw new Exception("选择的医院或医生不存在");
      }
      if(判断当前用户剩余服务次数){//3.2次数为0时回滚
         platformTransactionManager.rollback(t);
         throw new Exception("选择的医院或医生不存在");
      }
      ...   //更新数据:减少用户的服务次数
      //4.提交事务
      platformTransactionManager.commit(t);
   }
   ...   //5.生成订单
   return 订单id;
}

3.ReentrantLock锁(可重入锁):

Lock lock = new ReentrantLock();
@Autowired
PlatformTransactionManager platformTransactionManager;
@Autowired TransactionDefinition transactionDefinition;
public String createServiceOrder(String id){
   //1.获取锁
   lock.lock();
   try {
      //2.获取事务
      TransactionStatus t = platformTransactionManager.getTransaction(transactionDefinition);
      ...   //查询医院与医生信息
      if(判断当前医院与医生是否存在){//3.1不存在时回滚
         platformTransactionManager.rollback(t);
         throw new Exception("选择的医院或医生不存在");
      }
      if(判断当前用户剩余服务次数){//3.2次数为0时回滚
         platformTransactionManager.rollback(t);
         throw new Exception("选择的医院或医生不存在");
      }
      ...   //更新数据:减少用户的服务次数
      //4.提交事务
      platformTransactionManager.commit(t);
   } finally {
      //5.释放锁
      lock.unlock();
   }
   ...   //6.生成订单
   return 订单id;
}

二、分布式应用锁:

说明:能在集群Web应用(JVM环境)中使用。

1.数据库悲观锁(对数据库压力大):

格式(操作同一条数据,使用for update锁定,其他线/进程进入等待):

select * from 表名 where 条件 for update;

mapper映射文件:

UserMapper.xml内容(sql语句后加for update):
<mapper ...>
   <select id="queryUser" resultType="com.yyh.domain.User">
      select * from user 
      where user_id = #{userId,jdbcType=VARCHAR} 
      for update
   </select>
</mapper>

Dao类:

public class UserMapper {
   ...
   User queryUser(@Param("userId")String userId);
}

Controller类:

@Resource
UserMapper userMapper;

@RequestMapping("getUser")
@Transactional(rollbackFor = Exception.class)   //对方法加事务
public String getUser(String userId){
   User user = userMapper.queryUser(userId);
   if(user == null) throw new Exception("分布式锁没了");
   return ...;
}

2.Redis分布式锁(不支持阻塞):

说明:

随机值:用于自动释放锁时校验
NX:原子性,利用此值获取锁,键存在时不设值,键不存在时设值并获得锁
PX:过期时间,单位为毫秒,过期自动释放锁
SET 键名 随机值 NX PX 毫秒值

(1)获取/释放Redis锁:

public class RedisDisLock implements AutoCloseable {  //继承
RedisTemplate redisTemplate;
String key;  //键名
String randomValue;  //随机值

public RedisDisLock(RedisTemplate redisTemplate, String key){
   this.redisTemplate = redisTemplate;
   this.key = key; //键名
   this.randomValue = UUID.randomUUID().toString();  //随机值
}

//获得Redis锁
public boolean getRedisLock(){
   RedisCallback<Boolean> callback = connection -> {
      byte[] keyB = redisTemplate.getKeySerializer().serialize(key);  //将键名转为byte[]
      byte[] randomValueB = redisTemplate.getKeySerializer().serialize(randomValue);  //将随机值转为byte[]
      Expiration expiration = Expiration.seconds(60); //过期时间
      RedisStringCommands.SetOption nx = RedisStringCommands.SetOption.ifAbsent();  //NX
      return connection.set(keyB, randomValueB, expiration, nx);   
   }
   return redisTemplate.execute(callback);  获取Redis锁
}

//释放锁(Lua脚本):
public boolean unRedisLock(){
     RedisScript<Boolean> script = RedisScript.of(
          "if redis.call(\"get\", KEYS[1]) == ARGV[1] then\n" +   //校验锁
          "   return redis.call(\"del\", KEYS[1])\n" +            //释放锁
          "else\n" +
          "   return 0\n" +
          "end", 
     Boolean.class);
     return (Boolean) redisTemplate.execute(script, Arrays.asList(key), randomValue);  //释放Redis锁
   }
}

//自动释放锁
@Override
public void close() throws Exception {
   unRedisLock(); //释放锁
}

(2)使用Redis锁:

@Autowired
RedisTemplate redisTemplate;

public void test(){
   try (RedisDisLock lock = new RedisDisLock(redisTemplate, "键名")) {
      if (lock.getRedisLock()){  //获取锁,true表示有锁,false为没有锁
         //此处执行锁内具体逻辑
      }
   } catch (Exception e) {
      e.printStackTrace();
   }
}

3.Redisson分布式锁(基于Redis实现,支持阻塞):

官方文档:

https://github.com/redisson/redisson/wiki/2.-%E9%85%8D%E7%BD%AE%E6%96%B9%E6%B3%95

(1)使用Java代码配置方式,获取Redisson锁:

<1>导入Redisson依赖包,在pom.xml中:

<dependency>
   <groupId>org.redisson</groupId>
   <artifactId>redisson</artifactId>
   <version>3.11.2</version>
</dependency>

<2>单点Redis:

public void test(){
   Config config = new Config();
   config.setTransportMode(TransportMode.EPOLL);
   config.useSingleServer()   //单点Redis
         .addNodeAddress("redis://192.168.133.141:7181");  //可以用"rediss://"来启用SSL连接
   RedissonClient redisson = Redisson.create(config);
   RLock lock = redisson.getLock("name");  //name为业务名称
   try{
      lock.lock(30, TimeUnit.SECONDS);  //获得锁
      //此处执行锁内具体逻辑
   } catch (Exception e){
      e.printStackTrace();
   } finally {
      try{
         lock.unlock();  //释放锁
      } catch (Exception e){
         e.printStackTrace();
      }
   }
}

<3>集群Redis:

public void test(){
   Config config = new Config();
   config.useClusterServers()  //集群Redis
       .setScanInterval(2000) // 集群状态扫描间隔时间,单位是毫秒
       .addNodeAddress("redis://192.168.233.133:6379", "redis://192.168.233.134:6379") 
       .addNodeAddress("redis://192.168.233.135:6379")  //可以用"rediss://"来启用SSL连接
   RedissonClient redisson = Redisson.create(config);
   //后面代码同上单点Redis方式
}

(2)使用SpringBoot配置方式,获取Redisson锁:

<1>导入依赖:

<dependency>
   <groupId>org.redisson</groupId>
   <artifactId>redisson-spring-boot-starter</artifactId>
   <version>3.11.2</version>
</dependency>

<2>application.yml文件中配置redis:

spring:
  redis:
    database: 0
    host: 192.168.233.133  #redis服务器IP
    port: 6379  #redis服务器端口
    ...   #其他redis的配置

<3>代码:

@Autowired
RedissonClient redisson;  //自动装载
public void test(){
   RLock lock = redisson.getLock("name");  //name为业务名称
   try{
      lock.lock(30, TimeUnit.SECONDS);  //获得锁
      //此处执行锁内具体逻辑
   } catch (Exception e){
      e.printStackTrace();
   } finally {
      try{
         lock.unlock();  //释放锁
      } catch (Exception e){
         e.printStackTrace();
      }
   }
}

4.Zookeeper分布式锁(支持阻塞):

(1)进入Zookeeper命令行:

#查看节点列表
ls /
#创建分布式锁节点
create /lock dis-lock
#获取指定节点数据
get /lock

(2)获取/释放ZooKeeper锁:

public class ZooKeeperLock implements AutoCloseable, Watcher {
   ZooKeeper zooKeeper;
   String tempNode;
   public ZooKeeperLock(){
      //第1个参数为ZooKeeper服务器的IP+端口,第2个参数为超时时间
      this.zooKeeper = new ZooKeeper("192.168.233.141:2181", 10000, this);
   }
   public boolean getZKLock(String name){ //获取锁,name为业务名称(自定义)
      try{
         Stat s = zookeeper.exists("/" + name, false); //判断根节点是否存在
         if(s == null){ //根节点不存在时
            zooKeeper.create("/" + name, name.getBytes() ZooDefs.lds.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);  //1.创建根节点
         }
         tempNode = zooKeeper.create("/" + name + "/" + name + "_", name.getBytes() ZooDefs.lds.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); //2.创建瞬时有序节点
         List<String> childNodeList = zooKeeper.getChildren("/" + name, false);  //获取子节点列表
         Collections.sort(childNodeList);                                        //对子节点列表排序
         if(tempNode.endsWith(childNodeList.get(0))) return true;  //判断创建的节点是否是第1个,是获得锁,不是进入下面监听前一个节点
         /*
         3.监听上一个节点
         */
         String preNode = childNodeList.get(0);
         for (String itemNode: childNodeList) {
            if(tempNode.endsWith(itemNode)){
               zookeeper.exists("/" + name + "/" + preNode, true);
               break;
            }
            preNode = itemNode;
         }
         synchronized (this) {
            wait(); //进入等待
         }
         return true;
      } catch (Exception e){
         e.printStackTrace();
      }
   }

   @Override
   public void close() throws Exception {  //自动释放锁
      zookeeper.delete(tempNode, -1);  //删除节点
      zookeeper.close();  //关闭
   }
   @Override
   public void process(WatchedEvent e) {
      if(e.getType() == Event.EventType.NodeDeleted){
         synchronized(this){
            notify(); //取消等待
         }
      }
   }
}

(3)使用ZooKeeper锁:

public void test(){
   try (ZooKeeperLock lock = new ZooKeeperLock()) {
      if (lock.getZKLock("业务编号")){  //获取锁,true表示有锁,false为没有锁
         //此处执行锁内具体逻辑
      }
   } catch (Exception e) {
      e.printStackTrace();   
   }
}

5.Curator分布式锁(依赖ZooKeeper):

(1)导入Curator依赖包,在pom.xml中:

<dependency>
   <groupId>org.apache.curator</groupId>
   <artifactId>curator-recipes</artifactId>
   <version>4.2.0</version>
</dependency>

(2)使用Curator锁:

Application类中:

//启动类中实例化CuratorFramework
@StringBootApplication
public class MyApplication{
   ...   //其他代码

   @Bean(initMethod = "start", destroyMethod = "close")  //初始化时调用CuratorFramework的start()方法,结束时调用CuratorFramework的close()方法
   public CuratorFramework getCuratorFramework(){ //实例化CuratorFramework
      RetryPolicy rp = new ExponentialBackoffRetry(1000, 3)
      return CuratorFrameworkFactory.newClient("192.168.233.141:2181", rp);  //连接ZooKeeper服务器IP与端口,返回CuratorFramework客户端对象
   }
}

在需要的地方使用锁:

@Autowired
CuratorFramework client;

public void test(){
   InterProcessMutex lock = new InterProcessMutex(client, "/name");  //name为业务名称
   try{
      if(lock.acquire(30, TimeUnit.SECONDS)){  //获取锁
         //此处执行锁内具体逻辑
      }
   } catch (Exception e){
      e.printStackTrace();
   } finally {
      try{
         lock.release();  //释放锁
      } catch (Exception e){
         e.printStackTrace();
      }
   }
}