技术交流28群

服务热线

135-6963-3175

微信服务号

redis分布式锁的实现及注意点 更新时间 2022-3-5 浏览1463次

redis分布式锁的实现

背景

应用分布式部署或并发调用所产生的问题,使得多个应用对同一个操作进行了重复操作,就会出现数据不一致问题。

性质:在任意一个时刻, 只有一个客户端持有锁, 即独享。

实现过程

基于setnx key value

key设置为业务键或者固定字符串(看场景,比如批次处理任务可能就是固定字符串)

value可以设置为线程id或者随机字符串uuid(用于标记当前获取锁的对象)

注意问题1(死锁问题)

若一台应用实例逻辑执行完程序出现异常,则锁会一直存在,没有得到释放,会无法获得所,此时就会造成死锁问题;

改进方式:给锁加一个过期时间。但是由于业务逻辑执行的时间不同,过期的时间设置也是一个问题,故通常分布式锁不能应用于业务逻辑执行较长的程序;(注意:由于redis每条指令都是原子性操作所以setnx和expire不要分开执行,redis2.8提供了set key value [EX seconds] [PX milliseconds] [NX|XX])。

参数列表解释

EX seconds: 设定过期时间,单位为秒

PX milliseconds: 设定过期时间,单位为毫秒

NX: key不存在时设置值

XX: key存在时设置值

public boolean lock(Jedis jedis,String key,String val,int expireTime){
  String lock = jedis.set(key, val, "NX", "PX",
      expireTime);
  return "OK".equals(lock);
}

注意问题2(锁超时问题):

当业务任务执行时间过长但是我们设置的有效期又过短的情况下导致还没执行完,锁失效了,这时候第二台应用获得锁。这个时候第二个线程就提前重新持有了这把锁,导致临界区代码不能得到严格的串行执行。

解决思路:在执行计算期间检查若发现锁快超时了,客户端可以给redis服务实例发送一个Lua脚本让redis服务端延长锁的时间,只要这个锁的key还存在而且值还等于客户端设置的那个值。(启动另外一个线程去检查的问题,若能获取到且快超时则去延时客户端应当只能在失效时间内无法延长锁时再去重新获取锁,保证锁只能自己手动去释放)

延时伪代码:

if redis.call("get",KEYS[1]) == ARGV[1] then
    redis.call("set",KEYS[1],ex=3000)  
else
    getDLock();//重新获取锁

注意问题3(同时锁问题):

可能发生在主从模式,主节点挂掉后,从节点会取而代之,客户端上却并没有明显感知。原先第一个客户端在主节点中申请成功了一把锁,但是这把锁还没有来得及同步到从节点,主节点突然挂掉了。然后从节点变成了主节点,这个新的节点内部没有这个锁,所以当另一个客户端过来请求加锁时,立即就批准了。这样就会导致系统中同样一把锁被两个客户端同时持有,不安全性由此产生。

方案:可以先给锁设置一个LockTime,然后启动一个守护线程,让守护线程在一段时间后,重新去设置这个锁的LockTime。类似锁延时,我们推荐使用Redission框架来实现锁超时间延长。


延时守护线程例子代码(该类线程也可用于问题2锁延时):

public class SurvivalClamProcessor implements Runnable {
    private static final int REDIS_EXPIRE_SUCCESS = 1;
    SurvivalClamProcessor(String field, String key, String value, int lockTime) {
        this.field = field;
        this.key = key;
        this.value = value;
        this.lockTime = lockTime;
        this.signal = Boolean.TRUE;
    }
 
    private String field;
 
    private String key;
 
    private String value;
 
    private int lockTime;
 
    //线程关闭的标记,记得是volatile,线程可见强同步
    private volatile Boolean signal;
 
    void stop() {//关闭
        this.signal = Boolean.FALSE;
    } 
    @Override
    public void run() {
        //守护线程需要等待waitTime后才可以去重新设置锁的超时时间,避免了资源的浪费。
        int waitTime = lockTime * 1000 * 2 / 3;
        while (signal) {
            try {
                Thread.sleep(waitTime);
                if (cacheUtils.expandLockTime(field, key, value, lockTime) == REDIS_EXPIRE_SUCCESS) {
                    if (logger.isInfoEnabled()) {
                        logger.info("expandLockTime 成功,本次等待{}ms,将重置锁超时时间重置为{}s,其中field为{},key为{}", waitTime, lockTime, field, key);
                    }
                } else {
                    if (logger.isInfoEnabled()) {
                        logger.info("expandLockTime 失败,将导致SurvivalClamConsumer中断");
                    }
                    this.stop();
                }
            } catch (InterruptedException e) {
                if (logger.isInfoEnabled()) {
                    logger.info("SurvivalClamProcessor 处理线程被强制中断");
                }
            } catch (Exception e) {
                logger.error("SurvivalClamProcessor run error", e);
            }
        }
        if (logger.isInfoEnabled()) {
            logger.info("SurvivalClamProcessor 处理线程已停止");
        }
    }
}

expandLockTime是通过Lua脚本实现的

//在expandLockTime时候也去判断了当前持有锁的对象是否一致,避免了胡乱重置锁超时时间的情况
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('expire', KEYS[1],ARGV[2]) else return '0' end";

说明:为何一般用lua脚本?Redis保证以一种原子性的方式来执行脚本:当lua脚本在执行的时候,不会有其他脚本和命令同时执行,这种语义类似于 MULTI/EXEC。从别的客户端的视角来看,一个lua脚本要么不可见,要么已经执行完。然而这也意味着,执行一个较慢的lua脚本是不建议的,由于脚本的开销非常低,构造一个快速执行的脚本并非难事。但是你要注意到,当你正在执行一个比较慢的脚本时,所以其他的客户端都无法执行命令。

获取锁之后启动守护线程代码:

SurvivalClamProcessor survivalClamProcessor 
= new SurvivalClamProcessor(lockField, lockKey, randomValue, lockTime);
Thread survivalThread = new Thread(survivalClamProcessor);
survivalThread.setDaemon(Boolean.TRUE);//后台守护
survivalThread.start();
//执行业务
Object returnObject = joinPoint.proceed(args);

//finally执行
survivalClamProcessor.stop();
survivalThread.interrupt();//通过interrupt()去中断sleep状态,保证线程及时销毁。
return returnObject;

redission框架例子代码(并实现自动锁延时):

public void test() throws Exception{
        RLock lock = redissonClient.getLock("guodong");    // 拿锁失败时会不停的重试
        //具有Watch Dog 自动延期机制 默认续30s 每隔30/3=10 秒续到30s
        lock.lock();//在该行代码里有锁延时相关操作
        //尝试拿锁10s后停止重试,返回false 具有Watch Dog 自动延期机制 默认续30s
        boolean res1 = lock.tryLock(10, TimeUnit.SECONDS); 
        //没有Watch Dog ,10s后自动释放
        lock.lock(10, TimeUnit.SECONDS);
        //尝试拿锁100s后停止重试,返回false 没有Watch Dog ,10s后自动释放
        boolean res2 = lock.tryLock(100, 10, TimeUnit.SECONDS);
        Thread.sleep(40000L);
        lock.unlock();
}
//scheduleExpirationRenewal(threadId);` -> `renewExpiration();`

参考:

blog.csdn.net/jaryle/article/details/102502884

cnblogs.com/jelly12345/p/14699492.html

blog.csdn.net/weixin_42683134/article/details/103166739