分布式锁——redis实现分布式锁(手写redis分布式锁)

不适用分布式锁会怎样?

以抢购商品的高并发场景为例,通常单体的应用可以通过同步代码来实现顺序对数据库的操作,保证程序是按照预想来执行扣减操作的,不发生超卖情况。

但是在分布式系统中,同一个服务多实例部署,同步代码就不能解决该问题,简单来讲就是同步代码在多实例的情况下只能管好自己,管不了别人,而且因为synchronized的粗粒度,单线程执行造成请求挤压情况。

使用redis实现分布式锁

使用redis实现分布式锁主要是使用其SETNX操作SETNX [key] [value],当key不存在时,将value存入redis,成功返回1,否则返回0,由于redis单线程的特点,所以可以通过使用SETNX来实现分布式锁。

实现思路:当第一个请求进入代码后,在执行下单扣减库存之前,向redis中SETNX一个商品id为key的value(value可以为时间戳),当下单扣减库存执行结束之后,再删除这个key value。由于key已经存在,其他请求SETNX不进去,所以就保证了多实例间的线程安全。

注意

  • 要考虑执行业务代码抛异常之后无法解锁的问题
  • 要考虑某个线程执行完加锁操作后,突然程序挂了,其他实例无法解锁的情况

手写redis实现分布式锁

这里使用的是springboot2.x,所以spring-data-redis是lettuce实现的

为了避免意外死锁的情况,就要设置redis锁的超时时间,但是设置超时时间会存在如下问题:

  • 超时时间内一旦没有执行完成业务代码之后删除锁,此时就会被其他客户端请求在某些分布式实例节点上获取到锁
  • 当执行缓慢的线程执行完成后,删除的可能是别的线程的锁,高并发情况下,可能造成锁连环失效(删除锁的时候要判断是否是自己的锁)

以下demo中解决了以上问题

controller:

package com.leolee.msf.controller;

import com.google.gson.Gson;
import com.leolee.msf.service.serviceInterface.DistributedTransactionService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Map;

/**
 * @ClassName DistributedTransactionController
 * @Description: 分布式事务测试
 * 产品下单为例
 * @Author LeoLee
 * @Date 2020/11/20
 * @Version V1.0
 **/
@RestController
@RequestMapping("/product")
public class DistributedTransactionController {

    //限购一件
    private final int num = 1;

    @Autowired
    Gson gson;

    @Autowired
    DistributedTransactionService distributedTransactionService;




    @GetMapping("/{id}")
    public String productQuantity(@PathVariable(name = "id")String productId) {

        return gson.toJson(distributedTransactionService.getProductQuantity(productId));
    }


    @GetMapping("/order/{id}")
    public String order(@PathVariable(name = "id")String productId) {

        boolean b = distributedTransactionService.orderByProductId(productId);
        Map<String, Object> resultMap = distributedTransactionService.getProductQuantity(productId);
        if (b) {
            resultMap.put("msg", "抢购成功");
            resultMap.put("code", true);
        } else {
            resultMap.put("msg", "抢购成功");
            resultMap.put("code", false);
        }
        return gson.toJson(resultMap);
    }

}

service:

package com.leolee.msf.service;

import com.leolee.msf.service.serviceInterface.DistributedTransactionService;
import com.leolee.msf.utils.RedisLockUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

/**
 * @ClassName DistributedTransactionServiceImpl
 * @Description: TODO
 * @Author LeoLee
 * @Date 2020/11/20
 * @Version V1.0
 **/
@Service("distributedTransactionService")
public class DistributedTransactionServiceImpl implements DistributedTransactionService {

    @Autowired
    RedisTemplate redisTemplate;

    @Autowired
    RedisLockUtil redisLockUtil;



    //库存 key-productId value-数量
    private HashMap<String, Long> productStockQuantity;

    //订单 key-uuid value-productId
    private HashMap<String, String> order;

    //总量 key-productId value-数量
    private HashMap<String, Long> total;

    //抢购商品id写死
    private final String productId = "123";


    public DistributedTransactionServiceImpl() {
        this.total = new HashMap<String, Long>();
        this.productStockQuantity = new HashMap<String, Long>();
        this.total.put(productId, 10000l);
        this.productStockQuantity.put(productId, 10000l);
        this.order = new HashMap<String, String>();
    }


    @Override
    public Map<String, Object> getProductQuantity(String productId) {
        Map<String, Object> info = new HashMap<>();
        info.put("productId", productId);
        info.put("soldOut", order.size());//已售
        info.put("total", total.get(productId));
        info.put("stock", productStockQuantity.get(productId));
        return info;
    }

    /*该方案存在问题
      1.当前锁过期之后,高并发情况下多个客户端同时执行getAndSet方法,那么虽然最终只有一个客户端可以加锁,虽然其他没有获得锁的请求没有成功执行业务操作,但是覆盖了锁的value时间戳
      2.虽然这样为了处理死锁问题,由于存在一个客户端请求在锁失效前还是没有执行完毕,甚至计算库存是否>0都没有完成,下一个客户端请求的时候,判断前一个锁已经失效,覆盖了前一个锁,所以两个线程间还是会出现超卖的问题。
    */
    @Override
    public boolean orderByProductId(String productId) {

        //加分布式锁
        //value设置为10秒后
        String cuurentTimeMills = String.valueOf(System.currentTimeMillis() + 10000);
        if (!redisLockUtil.redisLock(productId, cuurentTimeMills)) {
            return false;
        }

        boolean result = false;
        try {
            //=======================执行业务逻辑=========================

            //判断是否存在该商品
            if (checkExist(productId)) {
                try {
                    //模拟数据库操作
                    Thread.sleep(1000);
                    //产生订单,扣减库存
                    order.put(UUID.randomUUID().toString(), productId);
                    productStockQuantity.put(productId, productStockQuantity.get(productId) - 1);
                    result = true;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //=======================业务逻辑结束=========================
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //解锁
            redisLockUtil.deleteLock(productId, cuurentTimeMills);
        }

        return result;
    }

    /*
     * 功能描述: <br>
     * 〈检查商品是否存在,是否有库存〉
     * @Param: [productId]
     * @Return: boolean
     * @Author: LeoLee
     * @Date: 2020/11/20 10:52
     */
    private boolean checkExist(String productId) {

        return total.containsKey(productId) && productStockQuantity.containsKey(productId) && productStockQuantity.get(productId) > 0 ? true : false;
    }
}

RedisLock:

package com.leolee.msf.utils;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;

/**
 * @ClassName RedisLockUtil
 * @Description: 分布式锁(还是存在超卖的情况,该实例仅供理解学习redis分布式锁)
 * @Author LeoLee
 * @Date 2020/11/20
 * @Version V1.0
 **/
public class RedisLockUtil {

    @Autowired
    private RedisTemplate<String,Object> redisTemplate;

    /*
     * 功能描述: <br>
     * 〈分布式锁——加锁〉
     * @Param: [key, timeMillis 设置要大于当前时间戳]
     * @Return: boolean
     * @Author: LeoLee
     * @Date: 2020/11/20 12:24
     */
    public boolean redisLock(String key, String timeMillis) {

        //加锁成功直接返回true,证明目前还没有该key
        if (redisTemplate.opsForValue().setIfAbsent(key, timeMillis)) {
            return true;
        }

        //解决死锁
        String current = (String) redisTemplate.opsForValue().get(key);
        if (current != null && Long.valueOf(current) < System.currentTimeMillis()) {//之前的锁过期了,应该允许新的请求获取锁
            //Set value of key and return its old value.设置新值返回旧值,
            // 考虑多线程并发的情况,只有一个线程的设置值和当前值相同,它才有权利加锁
            String old = (String) redisTemplate.opsForValue().getAndSet(key, timeMillis);//getAndSet线程安全
            if (old != null && old.equals(current)) {
                return true;
            }
        }
        return false;
    }

    public boolean deleteLock(String key, String timeMillis) {

        if (String.valueOf(redisTemplate.opsForValue().get(key)).equals(timeMillis)) {
            return redisTemplate.delete(key);
        }
        return false;
    }
}

上面就是一个简单的redis锁实现,但是!!!!!这个实现方式还是存在问题:

  • 各个服务端实例所在服务器的时间要一致
  • 当前锁过期之后,高并发情况下多个客户端同时执行getAndSet方法,那么虽然最终只有一个客户端可以加锁,虽然其他没有获得锁的请求没有成功执行业务操作,但是覆盖了锁的value时间戳
  • 虽然这样为了处理死锁问题,由于存在一个客户端请求在锁失效前还是没有执行完毕,甚至计算库存是否>0都没有完成,下一个客户端请求的时候,判断前一个锁已经失效,覆盖了前一个锁,所以两个线程间还是会出现超卖的问题。

废了这么大劲只是为了解释清楚redis分布式锁的实现思路,以及可能存在哪些问题。

那什么方案才是最完美的呢?

问题的根本是可能存在当前获取到锁的客户端,业务执行过慢,超过了锁的有效期,又不能把锁的有效期设置的过大

这里就需要一个机制来在业务执行超过锁有效期后能延长锁的有效时间,这就引出了一种新的解决思路:

  • 当某一个客户端获取到一把锁之后,另起一个持有该客户端锁特征的线程
  • 这个线程通过该客户端锁的特征可以去redis中找到对应的锁,每个一定时间就延长该锁的有效期
  • 当该客户端对应线程执行完毕后,停止其对应的延长锁有效期的线程,并删除redis锁

这种延长分布式锁存活时间的思想是和Redisson框架实现分布式锁的思想一致的。之后会详细介绍一下Redisson

依照上面的思路,做出了如下改动:

  1. 当第一个客户端请求进来之后,使用SETNX创建了一个key为productId,value为UUID的分布式锁,超时时间是5000毫秒
  2. 设置分布式锁成功之后,开始执行下单扣减库存的逻辑,同时通过另起一个线程来延长当前线程的redis锁超时时间,延长超时时间的逻辑为
    • 根据分布式锁的超时时间/3,作为检查分布式锁超时的轮询间隔
    • 在每一次轮询中获取当前分布式锁剩余的存活时间,如果剩余存活时间小于轮询时间,则续期2秒
  3. 当业务执行完成后会去删除当前的分布式锁,并通知该续期线程interrupt,由于轮询执行在Runnable中,并使用了Thread.sleep(),无法向上层代码抛出InterruptedException(只能try/catch)并且isInterrupted状态会被变为false,线程并没有停下来,所以在catch到InterruptedException: sleep interruped之后,手动执行Thread.currentThread().interrupt()重置线程中断状态为true,这样就可以在上层代码判断该线程的状态来进行可控的操作,由于我这里还是需要让该线程停下,所以我最终还是调用了extensionExpirationTime.stop();。但是这种中断线程的逻辑是没有问题的。具体可以参考:https://blog.csdn.net/trackle400/article/details/81775189

controller不变

service:

package com.leolee.msf.service;

import com.leolee.msf.service.serviceInterface.DistributedTransactionService;
import com.leolee.msf.utils.redisLock.RedisLockUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

/**
 * @ClassName DistributedTransactionServiceImpl
 * @Description: TODO
 * @Author LeoLee
 * @Date 2020/11/20
 * @Version V1.0
 **/
@Service("distributedTransactionService")
public class DistributedTransactionServiceImpl implements DistributedTransactionService {

    @Autowired
    RedisTemplate redisTemplate;

    @Autowired
    RedisLockUtil redisLockUtil;



    //库存 key-productId value-数量
    private HashMap<String, Long> productStockQuantity;

    //订单 key-uuid value-productId
    private HashMap<String, String> order;

    //总量 key-productId value-数量
    private HashMap<String, Long> total;

    //抢购商品id写死
    private final String productId = "123";


    public DistributedTransactionServiceImpl() {
        this.total = new HashMap<String, Long>();
        this.productStockQuantity = new HashMap<String, Long>();
        this.total.put(productId, 10000l);
        this.productStockQuantity.put(productId, 10000l);
        this.order = new HashMap<String, String>();
    }


    @Override
    public Map<String, Object> getProductQuantity(String productId) {
        Map<String, Object> info = new HashMap<>();
        info.put("productId", productId);
        info.put("soldOut", order.size());//已售
        info.put("total", total.get(productId));
        info.put("stock", productStockQuantity.get(productId));
        return info;
    }

    //====================================================================================================
    public boolean orderByProductId2(String productId) {

        //加分布式锁
        String uuid = UUID.randomUUID().toString();
        redisLockUtil.redisLock(productId, uuid, 5000);

        boolean result = false;
        try {
            //=======================执行业务逻辑=========================
            //判断是否存在该商品
            if (checkExist(productId)) {
                try {
                    //模拟数据库操作
                    Thread.sleep(4000);
                    //产生订单,扣减库存
                    order.put(UUID.randomUUID().toString(), productId);
                    productStockQuantity.put(productId, productStockQuantity.get(productId) - 1);
                    result = true;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //=======================业务逻辑结束=========================
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //解锁
            redisLockUtil.newDeleteLock(productId, uuid);
        }


        return result;
    }





    /*
     * 功能描述: <br>
     * 〈检查商品是否存在,是否有库存〉
     * @Param: [productId]
     * @Return: boolean
     * @Author: LeoLee
     * @Date: 2020/11/20 10:52
     */
    private boolean checkExist(String productId) {

        return total.containsKey(productId) && productStockQuantity.containsKey(productId) && productStockQuantity.get(productId) > 0 ? true : false;
    }
}

RedisLock:

package com.leolee.msf.utils.redisLock;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @ClassName RedisLockUtil
 * @Description: 分布式锁(还是存在超卖的情况,该实例仅供理解学习redis分布式锁)
 * @Author LeoLee
 * @Date 2020/11/20
 * @Version V1.0
 **/
public class RedisLockUtil {

    @Autowired
    private RedisTemplate<String,Object> redisTemplate;


    //========================方案2============================
    ExtensionExpirationTime extensionExpirationTime = null;
    /*
     * 功能描述: <br>
     * 〈分布式加锁,延长过期时间版〉
     * @Param: [key, value, time]
     * @Return: boolean
     * @Author: LeoLee
     * @Date: 2020/11/20 17:02
     */
    public boolean redisLock(String key, String value, long time) {


        if (redisTemplate.opsForValue().setIfAbsent(key, value, time, TimeUnit.MILLISECONDS)) {

            extensionExpirationTime = new ExtensionExpirationTime(key, value, time, redisTemplate);
            extensionExpirationTime.start();
            return true;
        }
        return false;
    }

    public boolean newDeleteLock(String key, String value) {

        if (String.valueOf(redisTemplate.opsForValue().get(key)).equals(value)) {
            extensionExpirationTime.interrupt();//终止续期线程
            if (extensionExpirationTime.isInterrupted()) {
                try {
                    extensionExpirationTime.stop();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            return redisTemplate.delete(key);
        }
        return false;
    }

}

延长时间的线程类:

package com.leolee.msf.utils.redisLock;

import org.springframework.data.redis.core.RedisTemplate;

import java.util.concurrent.TimeUnit;

/**
 * @ClassName ExtensionExpirationTime
 * @Description: TODO
 * @Author LeoLee
 * @Date 2020/11/20
 * @Version V1.0
 **/
public class ExtensionExpirationTime extends Thread {


    private String productId;

    private String value;

    private long checkTime;

    private RedisTemplate<String,Object> redisTemplate;

    private int i;

    public ExtensionExpirationTime(String productId, String value, long time, RedisTemplate<String,Object> redisTemplate) {
        this.productId = productId;
        this.value = value;
        this.checkTime = time/3 > 0 ? time/3 : 5000;
        this.redisTemplate = redisTemplate;
    }


    @Override
    public void run() {

        while (!Thread.currentThread().isInterrupted()) {
            try {
                Thread.sleep(checkTime);
                //延长过期时间
                System.out.println("prudctId:" + productId + ",第" + ++i + "次续期");
                checkExpiretion();
            } catch (InterruptedException e) {
                e.printStackTrace();
                //中断状态在抛出异常前,被清除掉,因此在此处重置中断状态
                Thread.currentThread().interrupt();
            }
        }
    }

    private void checkExpiretion() {
        long currentExpire = redisTemplate.opsForValue().getOperations().getExpire(productId);
        if (currentExpire < checkTime/1000) {
            redisTemplate.expire(productId, checkTime + currentExpire * 2000, TimeUnit.MILLISECONDS);
        }
    }
}

执行结果:

那么这样一个简单的分布式锁就完成了。目前测试是没有出现问题的,如果谁发现了什么问题,请评论联系我。

对于使用该代码出现的任何问题,本人不负责任QaQ~~~

 

 

 

本文地址:https://blog.csdn.net/qq_25805331/article/details/109843493

(0)
上一篇 2022年3月21日
下一篇 2022年3月21日

相关推荐