死磕 java同步系列之mysql分布式锁
问题
(1)什么是分布式锁?
(2)为什么需要分布式锁?
(3)mysql如何实现分布式锁?
(4)mysql分布式锁的优点和缺点?
简介
随着并发量的不断增加,单机的服务迟早要向多节点或者微服务进化,这时候原来单机模式下使用的synchronized或者reentrantlock将不再适用,我们迫切地需要一种分布式环境下保证线程安全的解决方案,今天我们一起来学习一下mysql分布式锁如何实现分布式线程安全。
基础知识
mysql中提供了两个函数——get_lock('key', timeout)
和release_lock('key')
——来实现分布式锁,可以根据key
来加锁,这是一个字符串,可以设置超时时间(单位:秒),当调用release_lock('key')
或者客户端断线
的时候释放锁。
它们的使用方法如下:
mysql> select get_lock('user_1', 10); -> 1 mysql> select release_lock('user_1'); -> 1
get_lock('user_1', 10)
如果10秒之内获取到锁则返回1,否则返回0;
release_lock('user_1')
如果该锁是当前客户端持有的则返回1,如果该锁被其它客户端持有着则返回0,如果该锁没有被任何客户端持有则返回null;
多客户端案例
为了便于举例【本篇文章由“彤哥读源码”原创,请支持原创,谢谢!】,这里的超时时间全部设置为0,也就是立即返回。
时刻 | 客户端a | 客户端b |
---|---|---|
1 | get_lock('user_1', 0) -> 1 | - |
2 | - | get_lock('user_1', 0) -> 0 |
3 | - | release_lock('user_1', 0) -> 0 |
4 | release_lock('user_1', 0) -> 1 | - |
5 | release_lock('user_2', 0) -> null | - |
6 | - | get_lock('user_1', 0) -> 1 |
7 | - | release_lock('user_1', 0) -> 1 |
java实现
为了方便快速实现,这里使用 springboot2.1 + mybatis 实现,并且省略spring的配置,只列举主要的几个类。
定义locker接口
接口中只有一个方法,入参1为加锁的key,入参2为执行的命令。
public interface locker { void lock(string key, runnable command); }
mysql分布式锁实现
mysql的实现中要注意以下两点:
(1)加锁、释放锁必须在同一个session(同一个客户端)中,所以这里不能使用mapper接口的方式调用,因为mapper接口有可能会导致不在同一个session。
(2)可重入性是通过threadlocal保证的;
@slf4j @component public class mysqllocker implements locker { private static final threadlocal<sqlsessionwrapper> localsession = new threadlocal<>(); @autowired private sqlsessionfactory sqlsessionfactory; @override public void lock(string key, runnable command) { // 加锁、释放锁必须使用同一个session sqlsessionwrapper sqlsessionwrapper = localsession.get(); if (sqlsessionwrapper == null) { // 第一次获取锁 localsession.set(new sqlsessionwrapper(sqlsessionfactory.opensession())); } try { // 【本篇文章由“彤哥读源码”原创,请支持原创,谢谢!】 // -1表示没获取到锁一直等待 if (getlock(key, -1)) { command.run(); } } catch (exception e) { log.error("lock error", e); } finally { releaselock(key); } } private boolean getlock(string key, long timeout) { map<string, object> param = new hashmap<>(); param.put("key", key); param.put("timeout", timeout); sqlsessionwrapper sqlsessionwrapper = localsession.get(); integer result = sqlsessionwrapper.sqlsession.selectone("lockermapper.getlock", param); if (result != null && result.intvalue() == 1) { // 获取到了锁,state加1 sqlsessionwrapper.state++; return true; } return false; } private boolean releaselock(string key) { sqlsessionwrapper sqlsessionwrapper = localsession.get(); integer result = sqlsessionwrapper.sqlsession.selectone("lockermapper.releaselock", key); if (result != null && result.intvalue() == 1) { // 释放锁成功,state减1 sqlsessionwrapper.state--; // 当state减为0的时候说明当前线程获取的锁全部释放了,则关闭session并从threadlocal中移除 if (sqlsessionwrapper.state == 0) { sqlsessionwrapper.sqlsession.close(); localsession.remove(); } return true; } return false; } private static class sqlsessionwrapper { int state; sqlsession sqlsession; public sqlsessionwrapper(sqlsession sqlsession) { this.state = 0; this.sqlsession = sqlsession; } } }
lockermapper.xml
定义get_lock()、release_lock()的语句。
<?xml version="1.0" encoding="utf-8"?> <!doctype mapper public "-//mybatis.org//dtd mapper 3.0//en" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <mapper namespace="lockermapper"> <select id="getlock" resulttype="integer"> select get_lock(#{key}, #{timeout}); </select> <select id="releaselock" resulttype="integer"> select release_lock(#{key}) </select> </mapper>
测试类
这里启动1000个线程,每个线程打印一句话并睡眠2秒钟。
@runwith(springrunner.class) @springboottest(classes = application.class) public class mysqllockertest { @autowired private locker locker; @test public void testmysqllocker() throws ioexception { for (int i = 0; i < 1000; i++) { // 多节点测试 try { thread.sleep(2000); } catch (interruptedexception e) { e.printstacktrace(); } new thread(()->{ locker.lock("lock", ()-> { // 可重入性测试 locker.lock("lock", ()-> { system.out.println(string.format("time: %d, threadname: %s", system.currenttimemillis(), thread.currentthread().getname())); try { thread.sleep(2000); } catch (interruptedexception e) { e.printstacktrace(); } }); }); }, "thread-"+i).start(); } system.in.read(); } }
运行结果
查看运行结果发现每隔2秒打印一个线程的信息,说明这个锁是有效的,至于分布式环境下面的验证也很简单,起多个mysqllockertest实例即可。
time: 1568715905952, threadname: thread-3 time: 1568715907955, threadname: thread-4 time: 1568715909966, threadname: thread-8 time: 1568715911967, threadname: thread-0 time: 1568715913969, threadname: thread-1 time: 1568715915972, threadname: thread-9 time: 1568715917975, threadname: thread-6 time: 1568715919997, threadname: thread-5 time: 1568715921999, threadname: thread-7 time: 1568715924001, threadname: thread-2
总结
(1)分布式环境下需要使用分布式锁,单机的锁将无法保证线程安全;
(2)mysql分布式锁是基于get_lock('key', timeout)
和release_lock('key')
两个函数实现的;
(3)mysql分布式锁是可重入锁;
彩蛋
使用mysql分布式锁需要注意些什么呢?
答:必须保证多个服务节点使用的是同一个mysql库【本篇文章由“彤哥读源码”原创,请支持原创,谢谢!】。
mysql分布式锁具有哪些优点?
答:1)方便快捷,因为基本每个服务都会连接数据库,但是不是每个服务都会使用redis或者zookeeper;
2)如果客户端断线了会自动释放锁,不会造成锁一直被占用;
3)mysql分布式锁是可重入锁,对于旧代码的改造成本低;
mysql分布式锁具有哪些缺点?
答:1)加锁直接打到数据库,增加了数据库的压力;
2)加锁的线程会占用一个session,也就是一个连接数,如果并发量大可能会导致正常执行的sql语句获取不到连接;
3)服务拆分后如果每个服务使用自己的数据库,则不合适;
4)相对于redis或者zookeeper分布式锁,效率相对要低一些;
欢迎关注我的公众号“彤哥读源码”,查看更多源码系列文章, 与彤哥一起畅游源码的海洋。
上一篇: SpringBootSecurity学习(12)前后端分离版之简单登录
下一篇: 交换排序