Skip to content
标签
分布式锁
字数
2130 字
阅读时间
11 分钟

分布式锁

java

import com.commnetsoft.commons.utils.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultScriptExecutor;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.stereotype.Component;

import java.util.Collections;
import java.util.concurrent.TimeUnit;

/**
 * 缓存相关
 *
 * @author Brack.zhu
 * @date 2020/7/3
 */
@Component
public class CacheHelper {

    @Value("#{coreConfig.applicationName}")
    private String applicationName;

    @Autowired
    private RedisTemplate redisTemplate;


    /**
     * 尝试获取分布式锁<br/>
     *  锁主键格式:{服务名}:lock:{key}
     * @param key 锁主键
     * @param lockFactor 锁因子,加锁解锁标识
     * @param expireTime 锁有效期
     * @param timeUnit 有效期单位
     * @return
     */
    public boolean tryLock(String key, String lockFactor, long expireTime, TimeUnit timeUnit) {
        if (StringUtils.isNotEmpty(key)) {
            String realKey = buildRealKey(key);
            // NX 只在键不存在时,才对键进行设置操作
            return redisTemplate.opsForValue().setIfAbsent(realKey, lockFactor, expireTime, timeUnit);
        }
        return false;
    }



    /**
     * 释放分布式锁<br/>
     *  锁主键格式:服务名:key
     * @param key 锁主键
     * @param lockFactor 因子,加锁解锁标识
     * @return
     */
    public boolean unLock(String key, String lockFactor) {
        if (StringUtils.isNotEmpty(key)) {
            String realKey = buildRealKey(key);
            // Redis 使用单个 Lua 解释器去运行所有脚本,并且, Redis 也保证脚本会以原子性(atomic)的方式执行
            String scriptStr = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
            RedisScript<Boolean> script = RedisScript.of(scriptStr, Boolean.class);
            DefaultScriptExecutor defaultScriptExecutor = new DefaultScriptExecutor(redisTemplate);
            Boolean rs = (Boolean) defaultScriptExecutor.execute(script, Collections.singletonList(realKey), lockFactor);
            return rs;
        }
        return false;
    }

    /**
     * 分布式锁重新续期<br/>
     * 锁主键格式:服务名:key
     *
     * @param key        锁主键
     * @param lockFactor 因子,加锁解锁标识
     * @param expirationTime 更新过期时间,单位:秒
     * @return
     */
    public boolean lockRenewExpiration(String key, String lockFactor, int expirationTime) {
        if (StringUtils.isNotEmpty(key)) {
            String realKey = buildRealKey(key);
            // Redis 使用单个 Lua 解释器去运行所有脚本,并且, Redis 也保证脚本会以原子性(atomic)的方式执行
            String scriptStr = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('expire', KEYS[1], ARGV[2]) else return 0 end";
            RedisScript<Boolean> script = RedisScript.of(scriptStr, Boolean.class);
            DefaultScriptExecutor defaultScriptExecutor = new DefaultScriptExecutor(redisTemplate);
            Boolean rs = (Boolean) defaultScriptExecutor.execute(script, Collections.singletonList(realKey), lockFactor, expirationTime);
            return rs;
        }
        return false;
    }

    /**
     * 构建锁主键
     * @param key
     * @return
     */
    private String buildRealKey(String key) {
        return applicationName + ":lock:" + key;
    }

}

分布式锁注解

java

/**
 * 分布式锁注解<br/>
 * <blockquote><pre>
 *  {@code
 *        Controller{
 *
 *             @Autowired
 *             private Service service
 *
 *              @GetMapping
 *              Result<Void> cacheTest(){
 *                  service.bMethod1();
 *                  service.bMethod2();
 *              }
 *        }
 *
 *        Service{
 *
 *           //Spring代理调用注解有效
 *           @DistLock
 *           bMethod1() {
 *             //业务逻辑
 *           }
 *
 *           bMethod2(){
 *              //注解无效,未经过Spring代理调用AOP拦截不到
 *              bInnerMethod();
 *              //解决方案:通过Spring代理调用注解才能有效
 *              SpringContextUtil.getBean(Service.Class).bInnerMethod();
 *              //业务逻辑
 *           }
 *
 *           @DistLock
 *           bInnerMethod(){
 *               //业务逻辑
 *           }
 *         }
 *  }
 *  </blockquote></pre>
 *
 * @author Brack.zhu
 * @date 2021/1/19
 */
@Documented
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface DistLock {
    /**
     * 分布式锁key,支持Spring的表达式,SpEL 如:#name<br/>
     *
     * @return
     */
    String value();

}

分布式注解切面

java

import com.commnetsoft.core.CoreConstant;
import com.commnetsoft.core.annotation.DistLock;
import com.commnetsoft.core.utils.AopUtil;
import com.commnetsoft.core.utils.SpElUtil;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.expression.EvaluationContext;
import org.springframework.stereotype.Component;

import java.lang.reflect.Method;

/**
 * 分布式锁AOP切面类
 *
 * @author Brack.zhu
 * @date 2021/1/19
 */
@Aspect
@Component
public class DistLockAspect {

    @Value("#{coreConfig.applicationName}")
    private String applicationName;

    private Logger log = LoggerFactory.getLogger(DistLockAspect.class);

    @Pointcut("@annotation(distLock)")
    public void pointcut(DistLock distLock) {
    }


    @Around(value = "pointcut(distLock)")
    public Object distLockAround(ProceedingJoinPoint joinPoint, DistLock distLock) throws Throwable {
        Method joinPointMethod = AopUtil.getMethod(joinPoint);
        EvaluationContext evaluationContext = SpElUtil.methodBindParam(joinPointMethod, joinPoint.getArgs());
        String val = SpElUtil.evaluate(distLock.value(), evaluationContext);
        String completionKey = completionLockKey(val);
        try (CacheLock lock = CacheLock.of(completionKey)) {
            if(log.isDebugEnabled()){
                log.debug("方法:{},key:{},开始获取分布式锁...",joinPointMethod,completionKey);
            }
            lock.lock();
            if(log.isDebugEnabled()){
                log.debug("方法:{},key:{},获取分布式锁完成",joinPointMethod,completionKey);
            }
            return joinPoint.proceed(joinPoint.getArgs());
        } catch (Exception e) {
            log.error("方法:{},key:{},分布式锁注解处理异常:",joinPointMethod,completionKey,e);
            throw e;
        }
    }

    /**
     * 构建完整的锁key
     *
     * @param lockVal 显现锁值
     * @return 完整的锁值
     */
    private String completionLockKey(String lockVal) {
        return applicationName + CoreConstant.Cache.REDIS_KEY_MARK + lockVal;
    }
}

分布式缓存对象

java

import com.commnetsoft.commons.utils.UUIDUtils;
import com.commnetsoft.core.CommonError;
import com.commnetsoft.core.cache.extend.watchdog.LockWatchDogManager;
import com.commnetsoft.core.utils.SpringContextUtil;
import com.commnetsoft.exception.MicroRuntimeException;
import org.apache.commons.lang.math.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Objects;
import java.util.concurrent.TimeUnit;

/**
 * 缓存分布式锁对象<br/>
 * 需要手动释放锁,或者使用自动资源释放代码如下<br/>
 * <blockquote><pre>
 * {@code
 *      //自动资源释放 JDK1.7以上支持
 *      try (CacheLock c=CacheLock.of("test")){
 *          c.lock();
 *          System.out.println("获取到锁");
 *          //业务逻辑
 *      }catch (Exception e){
 *          e.printStackTrace();
 *      }
 *      //try-catch运行完后自动释放锁
 * }
 * </blockquote></pre>
 *
 * @author Brack.zhu
 * @date 2020/7/3
 */
public class CacheLock implements AutoCloseable {

    private String key;

    private String lockFactor;

    private long expireTime;

    private TimeUnit timeUnit;

    /**
     * 同步获取锁最大等待时间
     * -1:无限长
     */
    private long syncMaxTime = -1;

    /**
     * 是否已经锁成功
     */
    private volatile boolean isLocked = false;

    private CacheHelper cacheHelper = SpringContextUtil.getBean(CacheHelper.class);

    private Logger log = LoggerFactory.getLogger(CacheLock.class);

    /**
     * 构建分布式锁对象
     *
     * @param key         锁主键
     * @param lockFactor  锁因子,加锁解锁标识
     * @param expireTime  锁有效期
     * @param timeUnit    有效期单位
     * @param syncMaxTime 同步获取锁最大等待时间,单位毫秒
     */
    public CacheLock(String key, String lockFactor, long expireTime, TimeUnit timeUnit, long syncMaxTime) {
        this.key = key;
        this.lockFactor = lockFactor;
        this.expireTime = expireTime;
        this.timeUnit = timeUnit;
        this.syncMaxTime = syncMaxTime;
    }

    /**
     * 尝试获取分布式锁
     *
     * @return 获取结果
     */
    public boolean tryLock() {
        return cacheHelper.tryLock(key, lockFactor, expireTime, timeUnit);
    }

    /**
     * 获取分布式锁<br/>
     * 同步获取,获取失败不释放方法
     *
     * @throws MicroRuntimeException 需要处理获取锁超时异常
     */
    public void lock() throws MicroRuntimeException {
        long start = System.currentTimeMillis();
        do {
            isLocked = tryLock();
            if (!isLocked) {
                if (syncMaxTime > 0 && (System.currentTimeMillis() - start) >= syncMaxTime) {
                    throw new MicroRuntimeException(CommonError.cache_lock_timeout_failed);
                }
                int sleep = RandomUtils.nextInt(200);
                try {
                    Thread.sleep(sleep);
                } catch (InterruptedException e) {
                    log.error("", e);
                }
            }
        } while (!isLocked);

        //获取锁成功
        if (isLocked) {
            LockWatchDogManager lockWatchDogManager = SpringContextUtil.getBean(LockWatchDogManager.class);
            if (null == lockWatchDogManager) {
                log.warn("锁的看门狗未设置成功,因为LockWatchDogManager获取失败.{}", this);
            } else {
                lockWatchDogManager.onWatchDog(Thread.currentThread(), this);
            }
        }

    }

    /**
     * 释放分布式锁
     */
    public void unLock() {
        if (isLocked) {
            boolean rs = cacheHelper.unLock(key, lockFactor);
            if (!rs) {
                log.warn("释放分布式锁失败,{},{},{},{}", key, lockFactor, expireTime, timeUnit);
            }
        }
    }

    /**
     * 缓存有效期续租
     *
     * @return 续租结果
     */
    public boolean renew() {
        if (isLocked) {
            Long let = timeUnit.toSeconds(expireTime);
            return cacheHelper.lockRenewExpiration(key, lockFactor, let.intValue());
        }
        return false;
    }

    /**
     * 自动关闭资源
     *
     * @throws Exception
     */
    @Override
    public void close() throws Exception {
        unLock();
    }

    @Override
    public String toString() {
        return super.toString() + ";" + isLocked + "," + key + "," + lockFactor + "," + expireTime + "," + timeUnit + "," + syncMaxTime;
    }

    /**
     * 创建一个默认分布式锁<br/>
     * <ui>
     * <li>锁单次有效期默认为30秒</li>
     * <li>同步锁获取等待时间:在出现锁争抢时,-1为一直等待,直到锁获取成功</li>
     * <ui/>
     *
     * @param key 分布式锁主键
     * @return 分布式锁对象
     */
    public static CacheLock of(String key) {
        Objects.requireNonNull(key, "分布式锁主键不能为空");
        return new CacheLock(key, UUIDUtils.generate(), LockWatchDogManager.DISTLOCK_DEF_EXPIRETIME, TimeUnit.MILLISECONDS, -1);
    }

    /**
     * 创建一个指定过期时间的分布式锁<br/>
     * <ui>
     * <li>锁单次有效期默认为30秒</li>
     * <li>同步锁获取等待时间支持传参</li>
     * <ui/>
     *
     * @param key         分布式锁主键
     * @param syncMaxTime 同步锁获取等待时间:在出现锁争抢时,-1为一直等待,直到锁获取成功。单位毫秒
     * @return 分布式锁对象
     */
    public static CacheLock of(String key, long syncMaxTime) {
        Objects.requireNonNull(key, "分布式锁主键不能为空");
        return new CacheLock(key, UUIDUtils.generate(), LockWatchDogManager.DISTLOCK_DEF_EXPIRETIME, TimeUnit.MILLISECONDS, syncMaxTime);
    }


}

看门狗

管理类

java

import com.commnetsoft.core.cache.CacheLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.*;

/**
 * 分布式锁看门狗管理类
 *
 * @author Brack.zhu
 * @date 2021/1/29
 */
@Component
public class LockWatchDogManager {

    private Set<DistLockDogTarget> distLockWatchSet = new HashSet<>();

    /**
     * 分布式锁默认过期时间,单位毫秒
     */
    public final static long DISTLOCK_DEF_EXPIRETIME = 30000;

    private LockWatchDog lockWatchDog;

    private Logger log = LoggerFactory.getLogger(LockWatchDogManager.class);

    @PostConstruct
    public void postConstruct() {
        lockWatchDog = new LockWatchDog(this, DISTLOCK_DEF_EXPIRETIME / 3);
        lockWatchDog.start();
    }

    @PreDestroy
    public void preDestroy() {
        lockWatchDog.callStop();
    }

    /**
     * 开启线程看门狗
     *
     * @param thread    调用线程对象
     * @param cacheLock 缓存锁对象
     */
    public void onWatchDog(Thread thread, CacheLock cacheLock) {
        if (null == thread || null == cacheLock) {
            return;
        }
        DistLockDogTarget distLockDogTarget = new DistLockDogTarget();
        distLockDogTarget.setLockCallThread(thread);
        distLockDogTarget.setCacheLock(cacheLock);
        distLockWatchSet.add(distLockDogTarget);
    }

    /**
     * 检查--看门狗巡查
     */
    public void watch() {
        Iterator<DistLockDogTarget> it = distLockWatchSet.iterator();
        while (it.hasNext()) {
            DistLockDogTarget target = it.next();
            Thread thread = target.getLockCallThread().get();
            if (null != thread && thread.isAlive()) {
                CacheLock cacheLock = target.getCacheLock().get();
                if (null != cacheLock) {
                    boolean rs = false;
                    try {
                        rs = cacheLock.renew();
                    } catch (Exception e) {
                        log.error("缓存锁续租失败:{},{}", thread, cacheLock, e);
                    }
                    if (rs) {
                        //续租成功
                        continue;
                    }
                }
            }
            it.remove();
        }
    }

}

看门狗线程

java

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 分布式锁看门狗线程
 *
 * @author Brack.zhu
 * @date 2021/1/6
 */
public class LockWatchDog extends Thread {


    private boolean stop = false;

    private LockWatchDogManager lockWatchDogManager;

    private long interval;

    private Logger log = LoggerFactory.getLogger(LockWatchDog.class);

    public LockWatchDog(LockWatchDogManager lockWatchDogManager, long interval) {
        super("Lock Watch Dog Thread");
        this.lockWatchDogManager = lockWatchDogManager;
        this.interval = interval;
    }

    /**
     * 触发线程停止
     */
    public void callStop() {
        this.stop = true;
        this.interrupt();
    }


    @Override
    public void run() {
        while (!stop) {
            lockWatchDogManager.watch();
            try {
                Thread.sleep(interval);
            } catch (InterruptedException e) {
                //ignore
            }
        }
        log.warn("分布式锁看门狗线程退出!");
    }


}

看门狗模型

java

import com.commnetsoft.core.cache.CacheLock;

import java.lang.ref.WeakReference;

/**
 * 看门狗巡检目标模型
 *
 * @author Brack.zhu
 * @date 2021/1/29
 */
public class DistLockDogTarget {

    /**
     * 缓存锁调用线程对象
     */
    private WeakReference<Thread> lockCallThread;

    /**
     * 缓存锁对象
     */
    private WeakReference<CacheLock> cacheLock;


    public WeakReference<Thread> getLockCallThread() {
        return lockCallThread;
    }

    public void setLockCallThread(WeakReference<Thread> lockCallThread) {
        this.lockCallThread = lockCallThread;
    }

    public void setLockCallThread(Thread lockCallThread) {
        this.lockCallThread = new WeakReference<>(lockCallThread);
    }

    public WeakReference<CacheLock> getCacheLock() {
        return cacheLock;
    }

    public void setCacheLock(WeakReference<CacheLock> cacheLock) {
        this.cacheLock = cacheLock;
    }

    public void setCacheLock(CacheLock cacheLock) {
        this.cacheLock = new WeakReference<>(cacheLock);
    }
}