目录
- redis分布式锁如何续期
- redis分布式锁的正确姿势
- 如何回答
- 源码分析
- 真相大白
- redis分布式锁的5个坑
- 一、锁未被释放
- 二、b的锁被a给释放了
- 三、数据库事务超时
- 四、锁过期了,业务还没执行完
- 五、redis主从复制的坑
redis分布式锁如何续期
redis分布式锁的正确姿势
据肥朝了解,很多同学在用分布式锁时,都是直接百度搜索找一个redis分布式锁工具类就直接用了.关键是该工具类中还充斥着很多system.out.println();等语句.其实redis分布式锁比较正确的姿势是采用redisson这个客户端工具.具体介绍可以搜索最大的同性交友网站github.
如何回答
首先如果你之前用redis的分布式锁的姿势正确,并且看过相应的官方文档的话,这个问题so easy.我们来看
坦白说,如果你英文棒棒哒那么看英文文档可能更好理解
by default lock watchdog timeout is 30 seconds and can be changed through config.lockwatchdogtimeout setting.
但是你如果看的是中文文档
看门狗检查锁的超时时间默认是30秒
这句话肥朝从语文角度分析就是一个歧义句,他有两个意思
1.看门狗默认30秒去检查一次锁的超时时间
2.看们狗会去检查锁的超时时间,锁的时间时间默认是30秒
看到这里,我希望大家不要黑我的小学体育老师,虽然他和语文老师是同个人.语文不行,我们可以源码来凑!
源码分析
我们根据官方文档给出的例子,写了一个最简单的demo,例子根据上面截图中ctr+c和ctr+v一波操作,如下
public class demomain {
public static void main(string[] args) throws exception {
config config = new config();
config.usesingleserver().setaddress("redis://127.0.0.1:6379");
redissonclient redisson = redisson.create(config);
rlock lock = redisson.getlock("anylock");
lock.lock();
//lock.unlock();
}
}
create
从这里我们知道,internallockleasetime 和 lockwatchdogtimeout这两个参数是相等的.
lockwatchdogtimeout默认值如下
public class config {
private long lockwatchdogtimeout = 30 * 1000;
public long getlockwatchdogtimeout() {
return lockwatchdogtimeout;
}
//省略无关代码
}
从internallockleasetime这个单词也可以看出,这个加的分布式锁的超时时间默认是30秒.但是还有一个问题,那就是这个看门狗,多久来延长一次有效期呢?我们往下看
lock
从我图中框起来的地方我们就知道了,获取锁成功就会开启一个定时任务,也就是watchdog,定时任务会定期检查去续期renewexpirationasync(threadid).
这里定时用的是netty-common包中的hashedwheeltimer,肥朝公众号已经和各大搜索引擎建立了密切的合作关系,你只需要把这个类在任何搜索引擎一搜,都能知道相关api参数的意义.
从图中我们明白,该定时调度每次调用的时间差是internallockleasetime / 3.也就10秒.
真相大白
通过源码分析我们知道,默认情况下,加锁的时间是30秒.如果加锁的业务没有执行完,那么到 30-10 = 20秒的时候,就会进行一次续期,把锁重置成30秒.那这个时候可能又有同学问了,那业务的机器万一宕机了呢?宕机了定时任务跑不了,就续不了期,那自然30秒之后锁就解开了呗.
redis分布式锁的5个坑
一、锁未被释放
这种情况是一种低级错误,就是我上边犯的错,由于当前线程 获取到redis 锁,处理完业务后未及时释放锁,导致其它线程会一直尝试获取锁阻塞,例如:用jedis客户端会报如下的错误信息
redis.clients.jedis.exceptions.jedisconnectionexception: could not get a resource from the pool
redis线程池已经没有空闲线程来处理客户端命令。
解决的方法也很简单,只要我们细心一点,拿到锁的线程处理完业务及时释放锁,如果是重入锁未拿到锁后,线程可以释放当前连接并且sleep一段时间。
public void lock() {
while (true) {
boolean flag = this.getlock(key);
if (flag) {
todo .........
} else {
// 释放当前redis连接
redis.close();
// 休眠1000毫秒
sleep(1000);
}
}
}
二、b的锁被a给释放了
我们知道redis实现锁的原理在于 setnx命令。当 key不存在时将 key的值设为 value ,返回值为 1;若给定的 key已经存在,则 setnx不做任何动作,返回值为 0 。
setnx key value
我们来设想一下这个场景:a、b两个线程来尝试给key mylock加锁,a线程先拿到锁(假如锁3秒后过期),b线程就在等待尝试获取锁,到这一点毛病没有。
那如果此时业务逻辑比较耗时,执行时间已经超过redis锁过期时间,这时a线程的锁自动释放(删除key),b线程检测到mylock这个key不存在,执行 setnx命令也拿到了锁。
但是,此时a线程执行完业务逻辑之后,还是会去释放锁(删除key),这就导致b线程的锁被a线程给释放了。
为避免上边的情况,一般我们在每个线程加锁时要带上自己独有的value值来标识,只释放指定value的key,否则就会出现释放锁混乱的场景。
三、数据库事务超时
emm~ 聊redis锁咋还扯到数据库事务上来了?别着急往下看,看下边这段代码:
@transaction
public void lock() {
while (true) {
boolean flag = this.getlock(key);
if (flag) {
insert();
}
}
}
给这个方法添加一个@transaction注解开启事务,如代码中抛出异常进行回滚,要知道数据库事务可是有超时时间限制的,并不会无条件的一直等一个耗时的数据库操作。
比如:我们解析一个大文件,再将数据存入到数据库,如果执行时间太长,就会导致事务超时自动回滚。
一旦你的key长时间获取不到锁,获取锁等待的时间远超过数据库事务超时时间,程序就会报异常。
一般为解决这种问题,我们就需要将数据库事务改为手动提交、回滚事务。
@autowired
datasourcetransactionmanager datasourcetransactionmanager;
@transaction
public void lock() {
//手动开启事务
transactionstatus transactionstatus = datasourcetransactionmanager.gettransaction(transactiondefinition);
try {
while (true) {
boolean flag = this.getlock(key);
if (flag) {
insert();
//手动提交事务
datasourcetransactionmanager.commit(transactionstatus);
}
}
} catch (exception e) {
//手动回滚事务
datasourcetransactionmanager.rollback(transactionstatus);
}
}
四、锁过期了,业务还没执行完
这种情况和我们上边提到的第二种比较类似,但解决思路上略有不同。
同样是redis分布式锁过期,而业务逻辑没执行完的场景,不过,这里换一种思路想问题,把redis锁的过期时间再弄长点不就解决了吗?
那还是有问题,我们可以在加锁的时候,手动调长redis锁的过期时间,可这个时间多长合适?业务逻辑的执行时间是不可控的,调的过长又会影响操作性能。
要是redis锁的过期时间能够自动续期就好了。
为了解决这个问题我们使用redis客户端redisson,redisson很好的解决了redis在分布式环境下的一些棘手问题,它的宗旨就是让使用者减少对redis的关注,将更多精力用在处理业务逻辑上。
redisson对分布式锁做了很好封装,只需调用api即可。
rlock lock = redissonclient.getlock("stocklock");
redisson在加锁成功后,会注册一个定时任务监听这个锁,每隔10秒就去查看这个锁,如果还持有锁,就对过期时间进行续期。默认过期时间30秒。这个机制也被叫做:“看门狗”,这名字。。。
举例子:假如加锁的时间是30秒,过10秒检查一次,一旦加锁的业务没有执行完,就会进行一次续期,把锁的过期时间再次重置成30秒。
通过分析下边redisson的源码实现可以发现,不管是加锁、解锁、续约都是客户端把一些复杂的业务逻辑,通过封装在lua脚本中发送给redis,保证这段复杂业务逻辑执行的原子性。
@slf4j
@service
public class redisdistributionlockplus {
/**
* 加锁超时时间,单位毫秒, 即:加锁时间内执行完操作,如果未完成会有并发现象
*/
private static final long default_lock_timeout = 30;
private static final long time_seconds_five = 5 ;
/**
* 每个key的过期时间 {@link lockcontent}
*/
private map<string, lockcontent> lockcontentmap = new concurrenthashmap<>(512);
/**
* redis执行成功的返回
*/
private static final long exec_success = 1l;
/**
* 获取锁lua脚本, k1:获锁key, k2:续约耗时key, arg1:requestid,arg2:超时时间
*/
private static final string lock_script = "if redis.call('exists', keys[2]) == 1 then argv[2] = math.floor(redis.call('get', keys[2]) + 10) end " +
"if redis.call('exists', keys[1]) == 0 then " +
"local t = redis.call('set', keys[1], argv[1], 'ex', argv[2]) " +
"for k, v in pairs(t) do " +
"if v == 'ok' then return tonumber(argv[2]) end " +
"end " +
"return 0 end";
/**
* 释放锁lua脚本, k1:获锁key, k2:续约耗时key, arg1:requestid,arg2:业务耗时 arg3: 业务开始设置的timeout
*/
private static final string unlock_script = "if redis.call('get', keys[1]) == argv[1] then " +
"local ctime = tonumber(argv[2]) " +
"local biz_timeout = tonumber(argv[3]) " +
"if ctime > 0 then " +
"if redis.call('exists', keys[2]) == 1 then " +
"local avg_time = redis.call('get', keys[2]) " +
"avg_time = (tonumber(avg_time) * 8 + ctime * 2)/10 " +
"if avg_time >= biz_timeout - 5 then redis.call('set', keys[2], avg_time, 'ex', 24*60*60) " +
"else redis.call('del', keys[2]) end " +
"elseif ctime > biz_timeout -5 then redis.call('set', keys[2], argv[2], 'ex', 24*60*60) end " +
"end " +
"return redis.call('del', keys[1]) " +
"else return 0 end";
/**
* 续约lua脚本
*/
private static final string renew_script = "if redis.call('get', keys[1]) == argv[1] then return redis.call('expire', keys[1], argv[2]) else return 0 end";
private final stringredistemplate redistemplate;
public redisdistributionlockplus(stringredistemplate redistemplate) {
this.redistemplate = redistemplate;
scheduletask task = new scheduletask(this, lockcontentmap);
// 启动定时任务
scheduleexecutor.schedule(task, 1, 1, timeunit.seconds);
}
/**
* 加锁
* 取到锁加锁,取不到锁一直等待知道获得锁
*
* @param lockkey
* @param requestid 全局唯一
* @param expire 锁过期时间, 单位秒
* @return
*/
public boolean lock(string lockkey, string requestid, long expire) {
log.info("开始执行加锁, lockkey ={}, requestid={}", lockkey, requestid);
for (; ; ) {
// 判断是否已经有线程持有锁,减少redis的压力
lockcontent lockcontentold = lockcontentmap.get(lockkey);
boolean unlocked = null == lockcontentold;
// 如果没有被锁,就获取锁
if (unlocked) {
long starttime = system.currenttimemillis();
// 计算超时时间
long bizexpire = expire == 0l ? default_lock_timeout : expire;
string lockkeyrenew = lockkey + "_renew";
redisscript<long> script = redisscript.of(lock_script, long.class);
list<string> keys = new arraylist<>();
keys.add(lockkey);
keys.add(lockkeyrenew);
long lockexpire = redistemplate.execute(script, keys, requestid, long.tostring(bizexpire));
if (null != lockexpire && lockexpire > 0) {
// 将锁放入map
lockcontent lockcontent = new lockcontent();
lockcontent.setstarttime(starttime);
lockcontent.setlockexpire(lockexpire);
lockcontent.setexpiretime(starttime + lockexpire * 1000);
lockcontent.setrequestid(requestid);
lockcontent.setthread(thread.currentthread());
lockcontent.setbizexpire(bizexpire);
lockcontent.setlockcount(1);
lockcontentmap.put(lockkey, lockcontent);
log.info("加锁成功, lockkey ={}, requestid={}", lockkey, requestid);
return true;
}
}
// 重复获取锁,在线程池中由于线程复用,线程相等并不能确定是该线程的锁
if (thread.currentthread() == lockcontentold.getthread()
&& requestid.equals(lockcontentold.getrequestid())){
// 计数 +1
lockcontentold.setlockcount(lockcontentold.getlockcount()+1);
return true;
}
// 如果被锁或获取锁失败,则等待100毫秒
try {
timeunit.milliseconds.sleep(100);
} catch (interruptedexception e) {
// 这里用lombok 有问题
log.error("获取redis 锁失败, lockkey ={}, requestid={}", lockkey, requestid, e);
return false;
}
}
}
/**
* 解锁
*
* @param lockkey
* @param lockvalue
*/
public boolean unlock(string lockkey, string lockvalue) {
string lockkeyrenew = lockkey + "_renew";
lockcontent lockcontent = lockcontentmap.get(lockkey);
long consumetime;
if (null == lockcontent) {
consumetime = 0l;
} else if (lockvalue.equals(lockcontent.getrequestid())) {
int lockcount = lockcontent.getlockcount();
// 每次释放锁, 计数 -1,减到0时删除redis上的key
if (--lockcount > 0) {
lockcontent.setlockcount(lockcount);
return false;
}
consumetime = (system.currenttimemillis() - lockcontent.getstarttime()) / 1000;
} else {
log.info("释放锁失败,不是自己的锁。");
return false;
}
// 删除已完成key,先删除本地缓存,减少redis压力, 分布式锁,只有一个,所以这里不加锁
lockcontentmap.remove(lockkey);
redisscript<long> script = redisscript.of(unlock_script, long.class);
list<string> keys = new arraylist<>();
keys.add(lockkey);
keys.add(lockkeyrenew);
long result = redistemplate.execute(script, keys, lockvalue, long.tostring(consumetime),
long.tostring(lockcontent.getbizexpire()));
return exec_success.equals(result);
}
/**
* 续约
*
* @param lockkey
* @param lockcontent
* @return true:续约成功,false:续约失败(1、续约期间执行完成,锁被释放 2、不是自己的锁,3、续约期间锁过期了(未解决))
*/
public boolean renew(string lockkey, lockcontent lockcontent) {
// 检测执行业务线程的状态
thread.state state = lockcontent.getthread().getstate();
if (thread.state.terminated == state) {
log.info("执行业务的线程已终止,不再续约 lockkey ={}, lockcontent={}", lockkey, lockcontent);
return false;
}
string requestid = lockcontent.getrequestid();
long timeout = (lockcontent.getexpiretime() - lockcontent.getstarttime()) / 1000;
redisscript<long> script = redisscript.of(renew_script, long.class);
list<string> keys = new arraylist<>();
keys.add(lockkey);
long result = redistemplate.execute(script, keys, requestid, long.tostring(timeout));
log.info("续约结果,true成功,false失败 lockkey ={}, result={}", lockkey, exec_success.equals(result));
return exec_success.equals(result);
}
static class scheduleexecutor {
public static void schedule(scheduletask task, long initialdelay, long period, timeunit unit) {
long delay = unit.tomillis(initialdelay);
long period_ = unit.tomillis(period);
// 定时执行
new timer("lock-renew-task").schedule(task, delay, period_);
}
}
static class scheduletask extends timertask {
private final redisdistributionlockplus redisdistributionlock;
private final map<string, lockcontent> lockcontentmap;
public scheduletask(redisdistributionlockplus redisdistributionlock, map<string, lockcontent> lockcontentmap) {
this.redisdistributionlock = redisdistributionlock;
this.lockcontentmap = lockcontentmap;
}
@override
public void run() {
if (lockcontentmap.isempty()) {
return;
}
set<map.entry<string, lockcontent>> entries = lockcontentmap.entryset();
for (map.entry<string, lockcontent> entry : entries) {
string lockkey = entry.getkey();
lockcontent lockcontent = entry.getvalue();
long expiretime = lockcontent.getexpiretime();
// 减少线程池中任务数量
if ((expiretime - system.currenttimemillis())/ 1000 < time_seconds_five) {
//线程池异步续约
threadpool.submit(() -> {
boolean renew = redisdistributionlock.renew(lockkey, lockcontent);
if (renew) {
long expiretimenew = lockcontent.getstarttime() + (expiretime - lockcontent.getstarttime()) * 2 - time_seconds_five * 1000;
lockcontent.setexpiretime(expiretimenew);
} else {
// 续约失败,说明已经执行完 or redis 出现问题
lockcontentmap.remove(lockkey);
}
});
}
}
}
}
}
五、redis主从复制的坑
redis高可用最常见的方案就是主从复制(master-slave),这种模式也给redis分布式锁挖了一坑。
redis cluster集群环境下,假如现在a客户端想要加锁,它会根据路由规则选择一台master节点写入key mylock,在加锁成功后,master节点会把key异步复制给对应的slave节点。
如果此时redis master节点宕机,为保证集群可用性,会进行主备切换,slave变为了redis master。b客户端在新的master节点上加锁成功,而a客户端也以为自己还是成功加了锁的。
此时就会导致同一时间内多个客户端对一个分布式锁完成了加锁,导致各种脏数据的产生。
至于解决办法嘛,目前看还没有什么根治的方法,只能尽量保证机器的稳定性,减少发生此事件的概率。
小结一下:上面就是我在使用redis 分布式锁时遇到的一些坑,有点小感慨,经常用一个方法填上这个坑,没多久就发现另一个坑又出来了,其实根本没有什么十全十美的解决方案,哪有什么银弹,只不过是在权衡利弊后,选一个在接受范围内的折中方案而已。
以上为个人经验,希望能给大家一个参考,也希望大家多多支持www.887551.com。