Redis分布式锁
1、相关题目
Redis除了拿来做缓存,你还见过基于Redis的什么用法?
数据共享,分布式Session
分布式锁
全局ID
计算器、点赞
位统计
购物车
轻量级消息队列 list、stream
抽奖
点赞、签到、打卡
差集交集并集。用户关注、可能认识的人、推荐模型
热点新闻、热搜排行
Redis做分布式锁的时候有需要注意的问题?
你们公司自己实现的分布式锁是否用的setnx命令实现?
这个是最合适的吗?你如何考虑分布式锁的可重入问题?
如果是Redis是单点部署的,会带来什么问题?怎么解决?
Redis集群模式下,比如主从模式,CAP方面有没有什么问题呢?
CAP,redis集群是AP,redis单机是C,一致性
那你简单的介绍一下Redlock 吧?你简历上写redisson,你谈谈
Redis分布式锁如何续期?看门狗知道吗?
2、锁的种类
单机版同一个JVM虚拟机内,synchronized或者Lock接口
分布式多个不同VM虚拟机,单机的线程锁机制不再起作用,资源类在不同的服务器之间共享了。
一个靠谱分布式锁需要具备的条件和刚需
独占性
任何时候有且仅有一个线程持有
高可用
如果在redis集群的情况下,不能因为某一个节点挂了而出现获得锁和释放锁失败的现象,要在高并发请求下依旧性能好用。
防死锁
杜绝死锁,必须有超时控制机制或者撤销操作,有一个兜底的终止方案。
不乱抢
不能私下unlock别人的锁,只能自己加锁自己释放,自己加的锁必须解自己的。
重入性
同一个节点的同一个线程如果获得锁之后,它也可以再次获取这个锁。
3、案例
场景:多个服务间保证同一时刻同一时间段内同一用户只能有一个请求(防止关键业务出现并发攻击)
3.1、基础环境配置(样板)
目录结构
在redis中新加key inventory001 模拟库存1号有100件物品
配置类
RedisConfig
@Configuration
public class RedisConfig
{
@Bean
public RedisTemplate<String, Object> redisTemplate(LettuceConnectionFactory lettuceConnectionFactory)
{
RedisTemplate<String,Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(lettuceConnectionFactory);
//设置key序列化方式string
redisTemplate.setKeySerializer(new StringRedisSerializer());
//设置value的序列化方式json
redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
redisTemplate.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
@Bean
public Redisson redisson()
{
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.111.27:6379").setDatabase(0).setPassword("123456");
return (Redisson) Redisson.create(config);
}
}
Swagger2Config
@Configuration
@EnableSwagger2
public class Swagger2Config
{
@Value("${swagger2.enabled}")
private Boolean enabled;
@Bean
public Docket createRestApi() {
return new Docket(DocumentationType.SWAGGER_2)
.apiInfo(apiInfo())
.enable(enabled)
.select()
.apis(RequestHandlerSelectors.basePackage("com.zm.redislock")) //你自己的package
.paths(PathSelectors.any())
.build();
}
private ApiInfo apiInfo() {
return new ApiInfoBuilder()
.title("springboot利用swagger2构建api接口文档 "+"\t"+ DateTimeFormatter.ofPattern("yyyy-MM-dd").format(LocalDateTime.now()))
.description("springboot+redis整合")
.version("1.0")
.termsOfServiceUrl("https://www.baidu.com/")
.build();
}
}
InventoryController
@RestController
@Api(tags = "redis分布式锁测试")
public class InventoryController
{
@Autowired
private InventoryService inventoryService;
@ApiOperation("扣减库存,一次卖一个")
@GetMapping(value = "/inventory/sale")
public String sale()
{
return inventoryService.sale();
}
@ApiOperation("扣减库存saleByRedisson,一次卖一个")
@GetMapping(value = "/inventory/saleByRedisson")
public String saleByRedisson()
{
return inventoryService.saleByRedisson();
}
}
application.properties
server.port=7777
spring.application.name=redis_distributed_lock2
# ========================swagger2=====================
# http://localhost:7777/swagger-ui.html
swagger2.enabled=true
spring.mvc.pathmatch.matching-strategy=ant_path_matcher
# ========================redis=====================
spring.redis.database=0
spring.redis.host=192.168.111.27
spring.redis.port=6379
spring.redis.password=123456
spring.redis.lettuce.pool.max-active=8
spring.redis.lettuce.pool.max-wait=-1ms
spring.redis.lettuce.pool.max-idle=8
spring.redis.lettuce.pool.min-idle=0
InventoryService第一版
@Service
public class InventoryService {
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Value("${server.port}")
private String port;
private Lock lock = new ReentrantLock();
public String sale() {
String retMessage = "";
lock.lock();
try {
// 查询库存信息
String result = stringRedisTemplate.opsForValue().get("inventory001");
// 判断库存够不够,如果为空则设置为0,有则转化为integger
Integer inventoryNumber = result == null ? 0 : Integer.parseInt(result);
if (inventoryNumber > 0){
// 减扣库存,每次减少一个
stringRedisTemplate.opsForValue().set("inventory001",String.valueOf(--inventoryNumber));
retMessage = "成功卖出一个商品,库存剩余:"+inventoryNumber;
System.out.println(retMessage+"\t"+"服务端口号:"+port);
}else {
retMessage = "商品卖完了,去别处看看吧(0-0)ll";
}
} finally {
lock.unlock();
}
return retMessage+"\t"+"服务端口号:"+port;
}
public String saleByRedisson() {
return null;
}
}
测试,启动微服务
去redis看一下,没有问题
3.2、手写分布式锁
3.2.1、初始化版本简单添加
将项目redis_distributed_lock2复制一份改名成redis_distributed_lock3
加了synchronized或者Lock
3.2.2、使用nginx微服务架构进行反向代理和负载均衡
一些问题
案例项目V2.0版本代码分布式部署后,单机锁还是出现超卖现象,需要分布式锁
Nginx配置负载均衡
Nginx 安装配置 | 菜鸟教程 (runoob.com)如果没有安装参照这个安装即可
/usr/local/nginx/conf目录下修改配置文件nginx.conf新增反向代理和负载均衡配置
upstream mynginx { server 10.7.41.11:7777 weight=1; ###windows的IP server 10.7.41.11:8888 weight=1; } server { listen 80; server_name localhost; #charset koi8-r; #access_log logs/host.access.log main; location / { #root html; proxy_pass http://mynginx; index index.html index.htm; }
启动 并查看nginx进程
./nginx -c /usr/local/nginx/conf/nginx.conf ps -ef |grep nginx
案例V2.0版本代码修改+启动两个微服务
启动7777微服务
启动8888微服务
通过nginx访问,你的linux服务器地址IP,反向代理加负载均衡
可以点击看到效果,一边一个,默认轮询
192.168.111.27//inventory/sale
刷新一下
再刷新
查看后台:
没有问题两个微服务交替减少商品
上面纯手点验证OK,下面高并发模拟
打开apacheJMeter接口测试工具进行测试
添加HTTP请求
再添加聚合报告
启动测试
查看redis还剩多少
按照正常的应该是0,但现实还剩了8个说明有同一个商品卖了两次的情况,也就是超卖了
查看微服务后台,这些数字的商品都超卖了
V2.0版本,单机版加锁配合Nginx和meter压测后,不满足高并发分布式锁的性能要求,出现超卖
为什么会出现bug也就是超卖现象?
在单机环境下,可以使用synchronized或Lock来实现上锁。但是在分布式系统中,因为线程的竞争是不在同一个节点的(同一个jvm中)所以需要一个让所有进程都能访问到的锁来实现(比如redis或者zookeeper来构建),不同进程jvm层面的锁就不用管了,那么就可以使用第三方的一个组件,来获取锁,未获取到锁的进程就会被阻塞。
分布式锁的出现
能干什么?
跨进程+跨服务
解决超卖
防止缓存击穿
3.2.3、redis分布式锁的使用
上redis分布式锁setnx
InventoryService类的sale方法修改
public String sale(){
String retMessage = "";
String key = "redisLock";
String uuidValue = IdUtil.simpleUUID()+":"+Thread.currentThread().getId();
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue);
//flag=false,抢不到锁就要递归重试,但是不能马上重试,暂停20毫秒
if (!flag){
//暂停20毫秒重式
try { TimeUnit.MICROSECONDS.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); }
sale();
}else {
//这是抢到锁了
try {
// 查询库存信息
String result = stringRedisTemplate.opsForValue().get("inventory001");
// 判断库存够不够,如果为空则设置为0,有则转化为integger
Integer inventoryNumber = result == null ? 0 : Integer.parseInt(result);
if (inventoryNumber > 0){
// 减扣库存,每次减少一个
stringRedisTemplate.opsForValue().set("inventory001",String.valueOf(--inventoryNumber));
retMessage = "成功卖出一个商品,库存剩余:"+inventoryNumber;
System.out.println(retMessage+"\t"+"服务端口号:"+port);
}else {
retMessage = "商品卖完了,去别处看看吧(0-0)ll";
}
} finally {
//释放锁,删除key
stringRedisTemplate.delete(key);
}
}
return retMessage+"\t"+"服务端口号:"+port;
}
进行测试,这次设置inventory001为3000个,使用接口测试工具测试
结果并不是设想的那样库存会是0,中间发生报错,堆栈溢出,在递归的情况下,如果一个进程等待时间过长,后面的进程都在排队就会堆栈溢出
java.lang.StackOverflowError,它通常是由于递归调用过深导致的。高并发情况下是严禁使用递归的
所以,要解决堆栈溢出问题,还有在final那一块,没有检查当前是否有锁的持有者。这可能会导致一个线程释放了其他线程持有的锁,所以应该在释放锁之前检查当前线程是否是锁的持有者。
多线程判断想想JUC里面说过的虚假唤醒,用while替代if
用自旋替代递归重试
V3.1,递归重试,容易导致stackoverfLowerror,所以不推荐,另外,高并发唤醒后推荐用whiLe判断而不是if
自旋锁
自旋锁(Spinlock)是一种同步机制,用于保护对共享资源的访问。当一个线程尝试获取一个已经被其他线程持有的自旋锁时,这个线程不会被阻塞(即进入等待状态),而是会在一个循环中不断地检查锁的状态,这个过程被称为自旋。
自旋锁的主要优点是避免了线程切换的开销,因为线程在等待锁的过程中始终保持运行状态。这在锁被持有的时间很短的情况下是非常有效的。
然而,自旋锁的主要缺点是它会消耗CPU资源。如果锁被持有的时间较长,或者有很多线程在等待同一个锁,自旋锁可能会导致CPU使用率非常高,甚至可能导致系统的性能下降。
修改之后
public String sale(){
String retMessage = "";
String key = "redisLock";
String uuidValue = IdUtil.simpleUUID()+":"+Thread.currentThread().getId();
//不用递归了,高并发容易堆栈溢出,我们使用自旋代替递归的方式调用,也不用if,使用while
while (!stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue)){
//暂停20毫秒重式
try { TimeUnit.MICROSECONDS.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); }
}
//这是抢到锁了
try {
// 查询库存信息
String result = stringRedisTemplate.opsForValue().get("inventory001");
// 判断库存够不够,如果为空则设置为0,有则转化为integger
Integer inventoryNumber = result == null ? 0 : Integer.parseInt(result);
if (inventoryNumber > 0){
// 减扣库存,每次减少一个
stringRedisTemplate.opsForValue().set("inventory001",String.valueOf(--inventoryNumber));
retMessage = "成功卖出一个商品,库存剩余:"+inventoryNumber;
System.out.println(retMessage+"\t"+"服务端口号:"+port);
}else {
retMessage = "商品卖完了,去别处看看吧(0-0)ll";
}
} finally {
//释放锁,删除key
if(uuidValue.equals(stringRedisTemplate.opsForValue().get(key))){
stringRedisTemplate.delete(key);
}
}
return retMessage+"\t"+"服务端口号:"+port;
}
测试,还是3000个
最终是正确的,没有发生错误
高并发程序必须要有对服务宕机和防止死锁做处理,当前程序就没有预防宕机的功能,假如,一个线程拿到了锁,然后正在执行业务逻辑代码,还没到final呢,服务突然挂掉了,而且当时设置key的时候压根就没有设置过期时间,那么这锁就一直还在,它不可能自己删除,然而后面的线程还在等着你进去的释放,你挂了就不会释放,然后就死锁了,就G了。
3.2.4、宕机与过期+防止死锁
直接在中间添加过期时间,是不具备原子性的
可以直接使用setIfAbsent方法中的重载方法,自带过期时间
while部分的修改
while (!stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue,30L,TimeUnit.SECONDS)){
//暂停20毫秒重式
try { TimeUnit.MICROSECONDS.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); }
}
剩下的压测当然也没有错误,这里就不再写了。
3.2.5、防止误删key的问题
上面的程序还存在的问题就是,假如A线程由于网络卡顿他在第30秒的时候还在干业务逻辑呢,30秒到了,key失效了,此时一直等待在外面的B线程看到机会了,把key拿到了,然后到了32秒A的活干完了,理所当然的执行了final删除key,这个时候的key已经不是A的了,它是B的,那B干完活出来一脸懵逼,我锁被别人删了。
其实在上面我已经改了
finally {
//释放锁,删除key,只能删除自己的
if(uuidValue.equals(stringRedisTemplate.opsForValue().get(key))){
stringRedisTemplate.delete(key);
}
}
但是这里仍然会有问题,判断和删除他们还不是原子性的,所以需要使用lua脚本保证原子性。
3.2.6、使用lua脚本保证原子性
参考官网的https://redis.io/docs/reference/patterns/distributed-locks/
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
Redis调用Lua脚本通过eval命令保证代码执行的原子性,直接用return返回脚本执行后的结果值
eval luascript numkeys [key [key ...]] [arg [arg ...]]
- luascript 代表要执行的lua脚本
- numkeys 代表参数的个数,这里就是key的个数,写0就是没有参数
- [key [key ...]] numkeys有几个这里就写几个,比如2个 就写k1,k2
- [arg [arg ...]] 就是key的值与前面的key一一对应
lua脚本里面的参数应该这样写
KEYS[1] 下标是从1开始的!
ARGV[1]
来个hello world入门
127.0.0.1:6379> EVAL "return 'hello lua'" 0
"hello lua"
127.0.0.1:6379>
复杂一点点
使用lua脚本将三个命令合在一起,在lua脚本中想使用命令需要有标志,redis.call(命令)
set k1 v1
expire k1 30
get k1
合在一起后
EVAL "redis.call('set','k1','v1') redis.call('expire','k1','30') return redis.call('get','k1')" 0
使用带参数的,根据你传进来的参数设置key和value
127.0.0.1:6379> eval "return redis.call('mset',KEYS[1],ARGV[1],KEYS[2],ARGV[2])" 2 lua1 lua2 l1 l2
OK
127.0.0.1:6379> mget lua1 lua2
1) "l1"
2) "l2"
下面使用带判断的lua脚本,官网上的判断一个锁是否存在,存在就删除它,不存在就返回0
if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end
先设置一个key为lock,直接使用官网的脚本试试
参数错误,这里的参数当然不会错误,官网的参数加的是双引号,而实际上我们要使用参数必须是单引号,改一下就行了。
这样就成功了
看一下条件判断的语法
if(布尔条件) then
业务代码
elseif(布尔条件) then
业务代码
elseif(布尔条件) then
业务代码
else
业务代码
end
写一个不使用redis命令的纯判断lua脚本
if KEYS[1] > KEYS[2] then return ARGV[1] elseif KEYS[1] < KEYS[2] then return ARGV[2] else return ARGV[3] end
在redis中执行
现在回归项目代码,实现将判断锁和删除锁合二为一,实现原子性操作
改进:修改为lua脚本的redis分布式锁调用,必须保证原子性,参考官网脚本案例
//此处进行改进,删除操作还是没有原子性,使用lua脚本进行原子操作
String luaScript = "if redis.call('get',KEYS[1]) == ARGV[1] then " +
"return redis.call('del',KEYS[1]) " +
"else return 0 end";
stringRedisTemplate.execute(new DefaultRedisScript(luaScript,boolean.class), Arrays.asList(key),uuidValue);
启动测试,高并发测试还是3000次请求
OK!测试没有问题
4、可重入锁+设计模式
在上面的案例中还有一个问题
如何坚固锁的可重入性问题?
再来回顾一下一个锁要具备的条件
独占性
高可用
防死锁
不乱抢
重入性
可重入锁(又名递归锁)
是指在同一个线程在外层方法获取锁之后,再进入该线程的内层方法自动获取锁(锁的对象得是同一个对象),不会业务因为之前已经获取过还没释放而阻塞。
如果是1个有synchronized 修饰的递归调用方法,程序第2次进入被自己阻塞了岂不是天大的笑话,出现了作茧自缚。所以Java中ReentrantLock和synchronized都是可重入锁,可重入锁的一个优点是可一定程度避免死锁。
进入同步域(即同步代码块/方法或显式锁锁定的代码)
一句话总结:
一个线程中的多个流程可以获取同一把锁,持有这把同步锁可以再次进入。自己可以获取自己的内部锁。
小案例
public class ReEntryLockDemo {
final Object obj = new Object();
public void entry01(){
new Thread(() ->{
synchronized (obj){
System.out.println(Thread.currentThread().getName()+"\t"+"外层调用");
synchronized (obj){
System.out.println(Thread.currentThread().getName()+"\t"+"中层调用");
synchronized (obj){
System.out.println(Thread.currentThread().getName()+"\t"+"内层调用");
}
}
}
},"t1").start();
}
public static void main(String[] args) {
ReEntryLockDemo reEntryLockDemo = new ReEntryLockDemo();
reEntryLockDemo.entry01();
}
}
1. obj,它被用作同步锁。
2. entry01方法创建了一个新的线程("t1"),并在这个线程中进行了三层嵌套的同步块。每一层同步块都使用obj作为锁。如果synchronized不是可重入的,那么就会发生中层代码等待外层释放锁才能执行,而外层的执行内容就是启动中层,那就死锁了。
3. main方法创建了一个ReEntryLockDemo对象,并调用了entry01方法。
你会看到线程"t1"在外层、中层和内层都成功获取了锁,并打印出了相应的消息。这是因为Java中的内置锁synchronized是可重入的,一个线程可以多次获取同一个锁,而不会导致死锁。
再来一个,将synchronized作为方法声明
public void entry02(){
m1();
}
private synchronized void m1() {
System.out.println(Thread.currentThread().getName()+"\t"+"外层调用synchronized");
m2();
}
private synchronized void m2() {
System.out.println(Thread.currentThread().getName()+"\t"+"中层调用synchronized");
m3();
}
private synchronized void m3() {
System.out.println(Thread.currentThread().getName()+"\t"+"内层调用synchronized");
}
同第一个测试类似的,使用synchronized修饰的方法或者代码块的内部调用本类的其他synchronized修饰的方法或者代码块时,是永远可以得到锁的
可重入锁,指的是可重复可递归调用的锁,在外层使用锁之后,在内层仍然可以使用,并且不发生死锁,这样的锁就叫做可重入锁。
再来一个测试,使用显式锁lock手动lock和unlock
Lock lock = new ReentrantLock();
public void entry03(){
new Thread(() ->{
lock.lock();
try {
System.out.println(Thread.currentThread().getName()+"\t"+"外层调用lock");
lock.lock();
try {
System.out.println(Thread.currentThread().getName()+"\t"+"内层调用lock");
}finally {
lock.unlock();
}
}finally {
lock.unlock();
}
},"t1").start();
try {
TimeUnit.MICROSECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() ->{
lock.lock();
try {
System.out.println(Thread.currentThread().getName()+"\t"+"外层调用lock");
}finally {
lock.unlock();
}
},"t2").start();
}
这样没有问题,原本是上一次锁之后就需要解对应的锁,现在故意将内层的锁不释放,仍然进行这个程序试一下
可以看见它会一直卡在这里不会执行t2的业务,在这里t1是将它的俩业务执行完了,它上了两次锁,但是只释放了一次,它能正常执行业务,但是t1和t2毕竟使用的是同一把锁lock,所以此时的lock认为它还没有被释放呢,所以t2就会迟迟等待一直卡在这里。
隐式锁Synchronized的重入的实现机理
每个锁对象拥有一个锁计数器和一个指向持有该锁的线程的指针。
当执行monitorenter时,如果目标锁对象的计数器为零,那么说明它没有被其他线程所持有,Java虚拟机会将该锁对象的持有线程设置为当前线程,并且将其计数器加1。
在目标锁对象的计数器不为零的情况下,如果锁对象的持有线程是当前线程,那么Java虚拟机可以将其计数器加1,否则需要等待,直至持有线程释放该锁。
当执行monitorexit时,Java虚拟机则需将锁对象的计数器减1。计数器为零代表锁已被释放。
参照于AQS源码分析,对于可重入锁计数问题,redis中的Map<String, Map<0bject,object>>可以帮助我们使用
K K V
举个例子:
127.0.0.1:6379> exists redisLock 先判断有没有这个锁
(integer) 0
127.0.0.1:6379> hset redisLock asdij24eihjqher3:1 1
(integer) 1
没有就创建,流水号:线程id 默认重入次数是1
127.0.0.1:6379> HINCRBY redisLock asdij24eihjqher3:1 1
(integer) 2
使用HINCRBY增加一次代表重入了一次,现在是2次
127.0.0.1:6379> HINCRBY redisLock asdij24eihjqher3:1 1
(integer) 3
127.0.0.1:6379> HINCRBY redisLock asdij24eihjqher3:1 1
(integer) 4
127.0.0.1:6379> hget redisLock asdij24eihjqher3:1
"4"
127.0.0.1:6379> HINCRBY redisLock asdij24eihjqher3:1 -1
(integer) 3
127.0.0.1:6379> HINCRBY redisLock asdij24eihjqher3:1 -1
(integer) 2
127.0.0.1:6379> HINCRBY redisLock asdij24eihjqher3:1 -1
(integer) 1
127.0.0.1:6379> HINCRBY redisLock asdij24eihjqher3:1 -1
(integer) 0
重入有几次就要解锁几次直到为0可以删除
127.0.0.1:6379> del redisLock
(integer) 1
127.0.0.1:6379>
小结
setnx,只能解决有无的问题,够用但是不完美。
hset,不但解决有无,还解决可重入问题。
4.1、设计
目前的目的是保证同一个时候只能有一个线程持有锁进去redis做扣减库存动作
有俩分支
保证加锁和解锁。lock/unlock,这需要使用lua脚本进行判断等复杂操作保证原子性。
扣减库存redis命令的原子性。这个已经在原来的代码上实现了。
4.2、使用lua脚本优化原代码,锁实现可重入
此时setnx就不好用了,要使用hset代替
上锁脚本lock
先判断redis分布式锁这个key是否存在
EXISTS key判断,如果返回是0就是不存在,需要创建属于自己的锁;
如果是1就存在说明已经有这个锁了,但还需要进一步判断是不是当前线程自己的
使用HEXISTS key uuid:ThreadID判断是不是自己的,返回0就不是自己的,返回1就是自己的,然后再自增一次表示重入。
需改为lua脚本
三个版本
v1
if redis.call('exists','key') == 0 then redis.call('hset','key','uuid:threadid',1) redis.call('expire','key',30) return 1 elseif redis.call('hexists'.'key','uuid:threadid') == 1 then redis.call('hincrby','key','uuid:threadid',1) redis.call('expire','key',30) return 1 else return 0 end
这个脚本还可以优化,相同的部分可以替换,HINCRBY可以代替掉hset
HINCRBY在没有key或field是会自动创建
v2合并相同的代码,用HINCRBY代替hset,精简代码
if redis.call('exists','key') == 0 or redis.call('hexists'.'key','uuid:threadid') == 1 then redis.call('hincrby','key','uuid:threadid',1) redis.call('expire','key',30) return 1 else return 0 end
v3脚本OK了,换上我们的参数来替代ARGV[2]是30秒过期时间
if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then redis.call('hincrby',KEYS[1],ARGV[1],1) redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end
测试搞成一行
if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then redis.call('hincrby',KEYS[1],ARGV[1],1) redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end
解锁脚本unlock
要先有锁并且还得是自己的,还需要判断可重入性
使用HEXISTS key uuid:threadid先判断有没有这个锁,没有返回的0,程序块返回nil
不是0,说明自己有锁还是自己的锁,直接调用HINCRBY -1 表示每次减少一个,也就是解锁一次,直到它变为0了就是可以删除key了然后再del key
设计脚本
v1
if redis.call('hexists',key,uuid:threadid) == 0 then return nil elseif redis.call('hincrby',key,uuid:threadid,-1) == 0 then return redis.call('del',key) else return 0 end
v2 替换参数
if redis.call('hexists',KEYS[1],ARGV[1]) == 0 then return nil elseif redis.call('hincrby',KEYS[1],ARGV[1],-1) == 0 then return redis.call('del',KEYS[1]) else return 0 end
换成一行
if redis.call('hexists',KEYS[1],ARGV[1]) == 0 then return nil elseif redis.call('hincrby',KEYS[1],ARGV[1],-1) == 0 then return redis.call('del',KEYS[1]) else return 0 end
测试全套 luck加4次,unlock减4次
4.3、将上面的Lua脚本整合到微服务java程序里
我们将程序恢复到最为原始的状态,基于那个最原始的开始创建新版本
新建RedisDistributedLock类并实现JUC里面的Lock接口
实际上干活的是带时间参数的trylock
通过实现JUC里面的Lock接口,实现Redis分布式锁RedisDistributedLock
//自研的redis分布式带重入的锁
public class RedisDistributedLock implements Lock {
private StringRedisTemplate stringRedisTemplate;
private String lockName;//KEYS[1]
private String uuidValue;//ARGV[1]
private long expireTime;//ARGV[2]
public RedisDistributedLock(StringRedisTemplate stringRedisTemplate, String lockName) {
this.stringRedisTemplate = stringRedisTemplate;
this.lockName = lockName;
this.uuidValue = IdUtil.simpleUUID()+":"+Thread.currentThread().getId();
this.expireTime = 50L;
}
@Override
public void lock() {
tryLock();
}
@Override
public boolean tryLock() {
try {
tryLock(-1L,TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
if (time == -1L){
String script = "if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then redis.call('hincrby',KEYS[1],ARGV[1],1) redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end";
while (!stringRedisTemplate.execute(new DefaultRedisScript<>(script,Boolean.class), Arrays.asList(lockName),uuidValue,String.valueOf(expireTime))){
try { TimeUnit.MICROSECONDS.sleep(60); } catch (InterruptedException e) { e.printStackTrace(); }
}
return true;
}
return false;
}
@Override
public void unlock() {
String script = "if redis.call('hexists',KEYS[1],ARGV[1]) == 0 then return nil elseif redis.call('hincrby',KEYS[1],ARGV[1],-1) == 0 then return redis.call('del',KEYS[1]) else return 0 end";
Long flag = stringRedisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(lockName), uuidValue, String.valueOf(expireTime));
if (flag == null){
throw new RuntimeException("不存在这个锁哦!!!");
}
}
//这俩用不到
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public Condition newCondition() {
return null;
}
}
如果现在就直接到InventoryService中使用会有问题
redis获得锁被写死了,假如以后还有zookeeper、mysql做分布式锁,需要再加,所以引入工厂模式
工厂设计模式的引入
@Component
public class DistributedLock {
@Autowired
private StringRedisTemplate stringRedisTemplate;
private String lockName;
public Lock getDistributedLock(String lockType){
if (lockType == null) return null;
if (lockType.equals("REDIS")){
this.lockName = "redisLock";
return new RedisDistributedLock(stringRedisTemplate,lockName);
}else if (lockType.equals("ZOOKEEPER")){
//ZOOKEEPER版本的分布式锁
this.lockName = "zookeeperLock";
return null;
}else if (lockType.equals("MYSQL")){
//MYSQL分布式锁
this.lockName = "mysqlLock";
return null;
}
return null;
}
}
最终启动微服务测试,直接使用jmeter进行压测5000次
查看后台,没有错误成功卖完。
这样虽然是正常的流程,但是我们的可重入性还是没有体现出来,再来改造一下
在执行业务逻辑片段我们添加一个方法进行再次上锁,也就是重入一次
private void testReEntry() {
Lock myRedisLock = distributedLock.getDistributedLock("REDIS");
myRedisLock.lock();
try {
System.out.println("=========测试可重入锁=======");
} finally {
myRedisLock.unlock();
}
}
在这里进行调用
if (inventoryNumber > 0){
// 减扣库存,每次减少一个
stringRedisTemplate.opsForValue().set("inventory001",String.valueOf(--inventoryNumber));
retMessage = "成功卖出一个商品,库存剩余:"+inventoryNumber;
System.out.println(retMessage+"\t"+"服务端口号:"+port);
testReEntry();
}else {
retMessage = "商品卖完了,去别处看看吧(0-0)ll";
}
然后在浏览器上来一次单机的访问一次
报错了,查看后台
发现问题,线程的id是同一个线程62号确实没错,加锁两次又解锁两次也没错,但是uuid发生了变化,所以导致无法解锁。
问题出在这里
那么就进行一下修改,改工厂模式的片段
我们在使用工厂模式时只使用一个distributedLock依赖对象,那么这个是全局唯一的,所以在它创建被应用时将uuid从这里生成,添加到它的无参构造器中,然后再通过RedisDistributedLock的有参构造方法下放进来使uuidValue组合成uuid+线程id的形式就和原来的value一样的格式了。
DistributedLock修改
@Component
public class DistributedLock {
@Autowired
private StringRedisTemplate stringRedisTemplate;
private String lockName;
private String uuid;
public DistributedLock() {
this.uuid = IdUtil.simpleUUID();
}
public Lock getDistributedLock(String lockType){
if (lockType == null) return null;
if (lockType.equals("REDIS")){
this.lockName = "redisLock";
return new RedisDistributedLock(stringRedisTemplate,lockName,uuid);
}else if (lockType.equals("ZOOKEEPER")){
//ZOOKEEPER版本的分布式锁
this.lockName = "zookeeperLock";
return null;
}else if (lockType.equals("MYSQL")){
//MYSQL分布式锁
this.lockName = "mysqlLock";
return null;
}
return null;
}
}
RedisDistributedLock修改它的构造方法即可
public RedisDistributedLock(StringRedisTemplate stringRedisTemplate, String lockName,String uuid) {
this.stringRedisTemplate = stringRedisTemplate;
this.lockName = lockName;
this.uuidValue = uuid+":"+Thread.currentThread().getId();
this.expireTime = 25L;
}
下面进行高并发测试
这里不明白为什么打印的顺序很乱,上锁前的打印信息没有出来,代码顺序也没错
看一下redis的数据
这里的数据最后库存为0,也没有锁了,一切正常,下面这个可以看到它重入了两次。
5、业务续期
确保redisLock过期时间大于业务执行时间的问题,防止别人删错锁
写个自动续期加钟的Lua脚本
if redis.call('EXISTS',KEYS[1],ARGV[1]) == 1 then
return redis.call('expire',KEYS[1],ARGV[2])
else
return 0
end
测试加钟脚本
eval "if redis.call('EXISTS',KEYS[1],ARGV[1]) == 1 then return redis.call('expire',KEYS[1],ARGV[2]) else return 0 end" 1 testLock uuid:11 50
测试的没有错误,那就去改写一下java程序,
这一版本是实现自动续期功能的完善,后台自定义扫描程序,如果规定时间内没有完成业务逻辑,会调用加钟自动续期的脚本。
RedisDistributedLock修改部分
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
if (time == -1L){
String script = "if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then redis.call('hincrby',KEYS[1],ARGV[1],1) redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end";
System.out.println("lockName:"+lockName+"\t"+"uuidValue:"+uuidValue);
while (!stringRedisTemplate.execute(new DefaultRedisScript<>(script,Boolean.class), Arrays.asList(lockName),uuidValue,String.valueOf(expireTime))){
try { TimeUnit.MICROSECONDS.sleep(60); } catch (InterruptedException e) { e.printStackTrace(); }
}
//新建一个后台扫描程序,来检查key目前的ttl,是否到了我们规定的二分之一或者三分之一来实现续期
renewExpire();
return true;
}
return false;
}
private void renewExpire() {
String script = "if redis.call('EXISTS',KEYS[1],ARGV[1]) == 1 then return redis.call('expire',KEYS[1],ARGV[2]) else return 0 end";
new Timer().schedule(new TimerTask() {
@Override
public void run() {
if(stringRedisTemplate.execute(new DefaultRedisScript<>(script,Boolean.class), Arrays.asList(lockName),uuidValue,String.valueOf(expireTime))){
renewExpire();
}
}
},(this.expireTime * 1000)/3);
}
InventoryService每10秒钟进行一个调度
//实现自动续期功能的完善,后台自定义扫描程序,如果规定时间内没有完成业务逻辑,会调用加钟自动续期的脚本
public String sale() {
String retMessage = "";
Lock myRedisLock = distributedLock.getDistributedLock("REDIS");
myRedisLock.lock();
try {
// 查询库存信息
String result = stringRedisTemplate.opsForValue().get("inventory001");
// 判断库存够不够,如果为空则设置为0,有则转化为integger
Integer inventoryNumber = result == null ? 0 : Integer.parseInt(result);
if (inventoryNumber > 0){
// 减扣库存,每次减少一个
stringRedisTemplate.opsForValue().set("inventory001",String.valueOf(--inventoryNumber));
retMessage = "成功卖出一个商品,库存剩余:"+inventoryNumber;
System.out.println(retMessage+"\t"+"服务端口号:"+port);
//在这里故意将暂停120秒线程,演示自动续期功能
try {
TimeUnit.SECONDS.sleep(120);
} catch (InterruptedException e) {
e.printStackTrace();
}
}else {
retMessage = "商品卖完了,去别处看看吧(0-0)ll";
}
} finally {
myRedisLock.unlock();
}
return retMessage+"\t"+"服务端口号:"+port;
}
启动测试,单例请求
测试成功!
6、Redis分布式锁总结
synchronized
单机版oK,上分布式就不行了
nginx分布式微服务单机锁不行了o(╥﹏╥)o
取消单机锁,上redis分布式锁setnx
只加了锁,没有释放锁, 出异常的话,可能无法释放锁,必须要在代码层面finally释放锁
宕机了,部署了微服务代码层面根本没有走到finally这块,没办法保证解锁,这个key没有被删除, 需要有lockKey的过期时间设定
为redis的分布式锁key,增加过期时间 此外,还必须要setnx+过期时间必须同一行
必须规定只能自己删除自己的锁,你不能把别人的锁删除了, 防止张冠李戴,1删2,2删3
unlock变为Lua脚本
锁重入,hset替代setnx+lock变为Lua脚本保证
自动续期
OK这一章节到此结束,下一章节------->Redlock算法和底层源码分析