目录
- 问题背景
- 解决方案
- 主要实现原理:
- 可靠性:
- springboot 集成使用 redis 分布式锁
- 使用示例
- 参考文档
问题背景
现在的应用程序架构中,很多服务都是多副本运行,从而保证服务的稳定性。一个服务实例挂了,其他服务依旧可以接收请求。但是服务的多副本运行随之也会引来一些分布式问题,比如某个接口的处理逻辑是这样的:接收到请求后,先查询 db 看是否有相关的数据,如果没有则插入数据,如果有则更新数据。在这种场景下如果相同的 n 个请求并发发到后端服务实例,就会出现重复插入数据的情况:
解决方案
针对上面问题,一般的解决方案是使用分布式锁来解决。同一个进程内的话用本进程内的锁即可解决,但是服务多实例部署的话是分布式的,各自进程独立,这种情况下可以设置一个全局获取锁的地方,各个进程都可以通过某种方式获取这个全局锁,获得到锁后就可以执行相关业务逻辑代码,没有拿到锁则跳过不执行,这个全局锁就是我们所说的分布式锁。分布式锁一般有三种实现方式:1. 数据库乐观锁;2. 基于redis的分布式锁;3. 基于zookeeper的分布式锁。
我们这里介绍如何基于 redis 的分布式锁来解决分布式并发问题:redis 充当获取全局锁的地方,每个实例在接收到请求的时候首先从 redis 获取锁,获取到锁后执行业务逻辑代码,没争抢到锁则放弃执行。
主要实现原理:
redis 锁主要利用 redis 的 setnx 命令:
加锁命令:setnx key value,当键不存在时,对键进行设置操作并返回成功,否则返回失败。key 是锁的唯一标识,一般按业务来决定命名。value 一般用 uuid 标识,确保锁不被误解。
解锁命令:del key,通过删除键值对释放锁,以便其他线程可以通过 setnx 命令来获取锁。
锁超时:expire key timeout, 设置 key 的超时时间,以保证即使锁没有被显式释放,锁也可以在一定时间后自动释放,避免资源被永远锁住。
可靠性:
为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件:
- 互斥性。在任意时刻,保证只有一台机器的一个线程可以持有锁;
- 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁;
- 具备非阻塞性。一旦获取不到锁就立刻返回加锁失败;
- 加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了;
springboot 集成使用 redis 分布式锁
写了一个 redislock 工具类,用于业务逻辑执行前加锁和业务逻辑执行完解锁操作。这里的加锁操作可能实现的不是很完善,有加锁和锁过期两个操作原子性问题,如果 springboot 版本是2.x的话是可以用注释中的代码在加锁的时候同时设置锁过期时间,如果 springboot 版本是2.x以下的话建议使用 lua 脚本来确保操作的原子性,这里为了简单就先这样写:
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.data.redis.core.stringredistemplate;
import org.springframework.data.redis.core.script.defaultredisscript;
import org.springframework.stereotype.component;
import java.util.arraylist;
import java.util.list;
import java.util.concurrent.timeunit;
/**
* @description: redis分布式锁实现工具类
* @author: qianghaohao
* @time: 2021/7/19
*/
@component
public class redislock {
@autowired
stringredistemplate redistemplate;
/**
* 获取锁
*
* @param lockkey 锁
* @param identity 身份标识(保证锁不会被其他人释放)
* @param expiretime 锁的过期时间(单位:秒)
* @return
*/
public boolean lock(string lockkey, string identity, long expiretime) {
// 由于我们目前 springboot 版本比较低,1.5.9,因此还不支持下面这种写法
// return redistemplate.opsforvalue().setifabsent(lockkey, identity, expiretime, timeunit.seconds);
if (redistemplate.opsforvalue().setifabsent(lockkey, identity)) {
redistemplate.expire(lockkey, expiretime, timeunit.seconds);
return true;
}
return false;
}
/**
* 释放锁
*
* @param lockkey 锁
* @param identity 身份标识(保证锁不会被其他人释放)
* @return
*/
public boolean releaselock(string lockkey, string identity) {
string luascript = "if " +
" redis.call('get', keys[1]) == argv[1] " +
"then " +
" return redis.call('del', keys[1]) " +
"else " +
" return 0 " +
"end";
defaultredisscript<boolean> redisscript = new defaultredisscript<>();
redisscript.setresulttype(boolean.class);
redisscript.setscripttext(luascript);
list<string> keys = new arraylist<>();
keys.add(lockkey);
object result = redistemplate.execute(redisscript, keys, identity);
return (boolean) result;
}
}
使用示例
这里只贴出关键的使用代码,注意:锁的 key 根据自己的业务逻辑命名,能唯一标示同一个请求即可。value 这里设置为 uuid,为了确保释放锁的时候能正确释放(只释放自己加的锁)。
@autowired private redislock redislock; // redis 分布式锁
string redislockkey = string.format("%s:docker-image:%s", redis_lock_prefix, imagevo.getimagerepository());
string redislockvalue = uuid.randomuuid().tostring();
try {
if (!redislock.lock(redislockkey, redislockvalue, redis_lock_timeout)) {
logger.info("redislockkey [" + redislockkey + "] 已存在,不执行镜像插入和更新");
result.setmessage("新建镜像频繁,稍后重试,锁占用");
return result;
}
... // 执行业务逻辑
catch (execpion e) {
... // 异常处理
} finally { // 释放锁
if (!redislock.releaselock(redislockkey, redislockvalue)) {
logger.error("释放redis锁 [" + redislockkey + "] 失败);
} else {
logger.error("释放redis锁 [" + redislockkey + "] 成功");
}
}
参考文档
https://www.jianshu.com/p/6c2f85e2c586
https://xiaomi-info.github.io/2019/12/17/redis-distributed-lock/
到此这篇关于springboot 使用 redis 分布式锁解决并发问题的文章就介绍到这了,更多相关springboot redis 分布式锁内容请搜索www.887551.com以前的文章或继续浏览下面的相关文章希望大家以后多多支持www.887551.com!