前言
在Java多线程编程的时候,锁是一个很重要也很基础的概念,锁可以看做是多线程情况下访问共享资源的一种线程同步机制。这是对于单进程应用而言的,即所有线程都在同一个JVM进程里的时候,使用Java语言提供的锁机制可以起到对共享资源进行同步的作用。如果分布式环境下多个不同线程需要对共享资源进行同步,那么用Java的锁机制就无法实现了,这个时候就必须借助分布式锁来解决分布式环境下共享资源的同步问题。
基本要求
实现一个分布式锁, 我们至少要考虑它能满足以下的这些需求:
- 互斥, 就是要在任何的时刻, 同一个锁只能够有一个客户端用户占用
- 不会死锁, 就算持有锁的客户端在持有期间崩溃了, 但是也不会影响后续的客户端加锁
- 谁加锁谁解锁, 很好理解, 加锁和解锁的必须是同一个客户端
加锁
从 Redis 2.6.12 版本开始, SET 命令的行为可以通过一系列参数来修改:通过NX PX参数来实现加锁的效果(NX 参数可以保证在这个 key 不存在的情况下写入成功,再加上 PX 参数可以让该 key 在超时时间之后自动删除)。
public boolean lock(String key, String request, int expireTime) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
String result = jedis.set(key, request, "NX", "PX", expireTime * TIME);
if (LOCK_SUCCESS_MSG.equals(result)) {
logger.info("Thread id:"+Thread.currentThread().getId() + " lock success!Time:"+ LocalTime.now());
//开启后台线程续期
if (openRenewal) {
long sleepTime = (long)(expireTime * TIME * renewalPercentage);
ExpirationRenewalProcessor processor = new ExpirationRenewalProcessor(jedisPool.getResource(),renewalScript, key,
request, expireTime*TIME, sleepTime);
scheduleExpirationRenewal(processor);
}
return true;
} else {
logger.info("Thread id:"+Thread.currentThread().getId() + " lock fail,Time:"+ LocalTime.now());
return false;
}
} catch (Exception ex) {
logger.error("lock error");
} finally {
if (jedis != null) {
jedis.close();
}
}
return false;
}
- 第一个参数Key是锁的名字, 这个由具体业务逻辑控制, 保证唯一即可
- 第二个参数是客户端请求ID, 这样做的目的主要是为了保证加解锁的唯一性. 这样我们就可以知道该锁是哪个客户端加的. 保证唯一(可使用UUID或者雪花算法)
- 第三个参数是过期时间, 以秒为单位。
上述代码通过设置NX保证了只能有一个客户端获取到锁, 满足互斥性; 加入了过期时间, 保证在客户端崩溃后不会造成死锁; 请求ID的作用是用来标识客户端, 这样客户端在解锁的时候可以进行校验是否同一个客户端.如果构造分布式锁的时候开启了自动续期过期时间,那么会启动后台线程自动续期.
解锁
当锁拥有的客户端完成了对共享资源的操作后, 释放锁需要用到Lua脚本, 使用lua脚本可以保证使用redis的多个命令操作的原子性: 每次解锁时都需要判断锁是否是自己的,相等才执行 del 命令
//lua 解锁脚本
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
//释放锁代码
public boolean unlock(String key, String request) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
Object result = jedis.eval(unlockScript, Collections.singletonList(key), Collections.singletonList(request));
if (UNLOCK_SUCCESS_MSG.equals(result)) {
logger.info("Thread id:"+Thread.currentThread().getId() + " unlock success!Time:"+ LocalTime.now());
return true;
} else {
logger.info("Thread id:"+Thread.currentThread().getId() + " unlock fail,Time:"+ LocalTime.now());
return false;
}
} catch (Exception ex) {
logger.error("unlock error");
} finally {
if (jedis != null) {
jedis.close();
}
if (openRenewal && processor != null) {
//停止续期
processor.stop();
daemonThread.interrupt();
}
}
return false;
}
解锁的时候如果使用的锁开启了自动续期,那么需要中断后台线程停止续期。
定时续期
如果key 超时之后业务并没有执行完毕但却自动释放锁了,这时其它线程就会拿到锁,同一时间有多个线程执行同一段代码,这样就会导致并发问题。
而合理设置key的过期时间又并不容易,需要对业务熟悉通过经验去配置,设置过小,锁自动超时释放 发生并发问题的概率就会增加;设置过大,万一服务出现异常无法正常释放锁,那么这种持有这把锁的异常时间也就越长,其它线程迟迟拿不到锁。想到一个解决方法:我们可以加锁成功后,然后启动一个守护线程,让守护线程在一段时间后,重新去设置这个锁的过期时间。但在实际操作中,我们要注意以下3点:
- 和释放锁的情况一致,我们需要先判断锁的对象是否没有变。否则会造成无论谁持有锁,守护线程都会去重新设置锁的过期时间。
- 如果持有锁的线程已经处理完业务了,那么守护线程也应该被销毁。不能主线程都挂了,守护者还在那里继续浪费资源。
- 守护线程要在合理的时间再去重新设置锁的过期时间,否则会造成资源的浪费。不能动不动就去续,选择一个合适的时间间隔定时续。
lua脚本以及代码实现
//续期lua脚本
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('pexpire',KEYS[1],ARGV[2])
else
return 0
end
//守护线程 实现Runnable
public class ExpirationRenewalProcessor implements Runnable {
private static Logger logger = LoggerFactory.getLogger(ExpirationRenewalProcessor.class);
//expire命令执行成功返回结果
private static final Long EXPIRE_SUCCESS_MSG = 1L;
private Jedis jedis;
// lua 脚本
private String script;
//键
private String key;
//代表获取锁的客户端标识
private String request;
//重置key的过期时间
private long expireTime;
//线程每次睡眠的时间
private long sleepTime;
//线程结束的标志
private volatile boolean signal = true;
public ExpirationRenewalProcessor(Jedis jedis, String script, String key,
String request, long expireTime, long sleepTime) {
this.jedis = jedis;
this.script = script;
this.key = key;
this.request = request;
this.expireTime = expireTime;
this.sleepTime = sleepTime;
}
//标志位设置为结束标识
public void stop() {
this.signal = false;
}
@Override
public void run() {
while (signal) {
try {
Thread.sleep(sleepTime);
Object result = jedis.eval(script, Collections.singletonList(key), Arrays.asList(request, String.valueOf(expireTime)));
if (EXPIRE_SUCCESS_MSG.equals(result)) {
logger.info("Daemon thread renewal success");
} else {
logger.info("Daemon thread renewal fail");
this.stop();
}
} catch (InterruptedException e) {
logger.info("Daemon thread is interrupted forcibly");
} catch (Exception e) {
logger.info("Daemon thread run error");
this.stop();
}
}
if (jedis != null) {
jedis.close();
}
logger.info("Daemon thread stopped");
}
}
续期的时候还是通过执行lua脚本保证原子性,每次续期先sleep等待一段时间(等待时间=key过期时间 * 比例);这个比例(等待时间占过期时间的比例)由用户决定,0到1的范围内。最后通过pexpire命令重置过期时间为当初设定的时间。
测试
构造线程池开启20个线程模拟业务执行:执行结果如下:
Thread id:62 lock success!Time:16:06:02.029
Thread id:63 lock fail,Time:16:06:02.037
Thread id:67 lock fail,Time:16:06:02.037
Thread id:61 lock fail,Time:16:06:02.041
Thread id:56 lock fail,Time:16:06:02.041
Thread id:51 lock fail,Time:16:06:02.041
Thread id:60 lock fail,Time:16:06:02.041
Thread id:57 lock fail,Time:16:06:02.041
Thread id:64 lock fail,Time:16:06:02.042
Thread id:59 lock fail,Time:16:06:02.042
Thread id:65 lock fail,Time:16:06:02.042
Thread id:58 lock fail,Time:16:06:02.042
Thread id:50 lock fail,Time:16:06:02.050
Thread id:54 lock fail,Time:16:06:02.050
Thread id:55 lock fail,Time:16:06:02.054
Thread id:68 lock fail,Time:16:06:02.055
Thread id:53 lock fail,Time:16:06:02.056
Thread id:52 lock fail,Time:16:06:02.054
Thread id:69 lock fail,Time:16:06:02.054
Thread id:66 lock fail,Time:16:06:02.055
Daemon thread renewal success
Thread id:62 unlock success!Time:16:06:04.275
Daemon thread is interrupted forcibly
Daemon thread stopped
总结
- 用Redis做分布式锁相比其他分布式锁(zookeeper)实现更简单,速度更快
- 在加锁成功且开启了守护线程的话 代码会初始化守护线程的内部参数,然后通过start函数启动线程,最后在业务执行完释放锁之后,设置守护线程的关闭标记,通过interrupt()去中断sleep状态,保证守护线程及时销毁。
- 本文适用于单机版,如果Redis是多机部署的,那么可以尝试使用Redisson实现分布式锁
源码以及使用示例请移步到 Github地址