原理
一般写法如下:
RLock lock = redissonClient.getLock("myLock");
lock.lock();
try{
//业务逻辑
}finally {
lock.unlock();
}
其实redis分布式锁就是基于redis的hash数据类型实现的,key为:锁名称,即myLock,field为:uuid+threadId,value为:上锁次数,从此可以看出redis锁是可重入的
一. 初始化锁RedissonLock
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
//若锁被其它线程占用,使用redis的发布订阅pub/sub功能来订阅释放锁消息
this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
}
public RedissonBaseLock(CommandAsyncExecutor commandExecutor, String name) {
//1.RedissonObject.name赋值
super(commandExecutor, name);
//2.执行lua脚本的执行器
this.commandExecutor = commandExecutor;
//3.创建ConnectionManager时产生id:UUID id = UUID.randomUUID();e12badb5-7396-4f05-8290-aaa352e04bc4
//被用来当做 和threadId组成 value值,用作判断加锁和释放锁是否是同一个线程的校验
this.id = commandExecutor.getConnectionManager().getId();
//4.Config的lockWatchdogTimeout 默认lockWatchdogTimeout = 30 * 1000
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
this.entryName = id + ":" + name;
}
二:上锁lock
//RedissonLock.lock()
@Override
public void lock(long leaseTime, TimeUnit unit) {
try {
lock(leaseTime, unit, false);
} catch (InterruptedException e) {
throw new IllegalStateException();
}
}
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
//1.获取当前线程id
long threadId = Thread.currentThread().getId();
//2.尝试上锁
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
//3.锁获取成功,直接返回 lock acquired
if (ttl == null) {
return;
}
//4.锁被其它线程占用,订阅释放锁通知,并同步阻塞future完成,即获取释放信号量通知
RFuture<RedissonLockEntry> future = subscribe(threadId);
if (interruptibly) {
commandExecutor.syncSubscriptionInterrupted(future);
} else {
//同步阻塞future完成,即创建监听runnable,如果是第一个获取锁失败的线程则真正创建释放锁监听
//int timeout = config.getTimeout() + config.getRetryInterval() * config.getRetryAttempts();
//最大阻塞时间为:3s+1.5s*3
commandExecutor.syncSubscription(future);
}
try {
//5.收到释放信号量的通知后,进入死循环尝试获取锁
while (true) {
//5.1再次尝试加锁,如果获取到锁,直接返回,停止死循环
ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
//5.2锁未获取成功,说明有其它线程持有锁,阻塞获取锁
if (ttl >= 0) {
try {
// 在锁剩余时间内,阻塞等待获取信号量Semaphore, Semaphore.release()会在订阅释放锁消息的监听器消息处理方法org.redisson.pubsub.LockPubSub#onMessage调用;当其他线程释放了占用的锁时,会广播释放锁消息,监听器接收释放锁消息后,释放信号量,最终会唤醒阻塞在这里的线程
//tryAcquire()让当前线程阻塞获取信号量,,避免了在while无限循环中频繁请求获取锁
// 若在最大等待时间内仍未获取到信号量,进入下一个循环尝试获取锁
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
// 锁过期时间小于零, 则一直等
if (interruptibly) {
future.getNow().getLatch().acquire();
} else {
future.getNow().getLatch().acquireUninterruptibly();
}
}
}
} finally {
//6.取消订阅
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
}
//尝试获取锁并等待上锁结果
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
//执行lua脚本尝试获取锁
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
//1.异步执行lua脚本,尝试上锁
if (leaseTime != -1) {
//使用自定义锁时间
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
//使用默认的30s
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
//2.等待上锁结果
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
//上锁不成功
return;
}
// lock acquired上锁成功
if (ttlRemaining == null) {
if (leaseTime != -1) {
//重置锁时间为自定义时间
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
//2.1使用默认时间上锁即30s,定时任务每隔10s给锁重置一下过期时间直到锁被释放
//使用自定义时间就不会不断给锁续延过期时间
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
//keys[1]为锁名,即redissonClient.getLock("myLock")中的myLock
//ARGV[1]:锁时间,即lock.lock(3, TimeUnit.MINUTES),3*60s,默认internalLockLeaseTime=30s
//ARGV[2]:锁的唯一标识,uuid + ":" + threadId
加索的整体逻辑使用hset报错上锁的线程,key为索命myLock,field为uuid+threadId,value为field上锁的次数
//1.若key不存在,则进行加锁,返回nil,即null
"if (redis.call('exists', KEYS[1]) == 0) then " +
//1.1对key加锁,并设置加索次数为1
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
//1.2设置key的过期时间
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
//2.若key存在,判断上锁的是否为此线程,若是次数+1,返回nil,即null
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
//2.1 上锁线程数+1
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
//2.2 重置过期时间
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
//3.走到这,说明锁获取不成功,即被其它线程已占用,则返回锁的剩余时间
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
上面值得注意的是:
- 只有lock.lcok()没有指定leaseTime时,即使用默认过期时间internalLockLeaseTime:30s时,才会每隔internalLockLeaseTime/3即10s重置一下锁的过期时间,即延续锁的过期时间,后面会讲到scheduleExpirationRenewal();如果lock.lock()上锁时指定leaseTime,不会续锁,那么到底使用那种方式合适那,每种方式都有各自优点和弊端:
a. 指定leaseTime:
优点:此种方式就算因为异常没有执行unlock(),也有自身的过期时间,等过期后,别的线程就能得到此锁了
弊端:如果代码在超时释放锁时还未完成业务,就可能出现并发带来的数据问题,若怕出现此问题,可以将leaseTime设大一点;
b. 不指定leaseTime:
优点:有效解决了上面业务未执行完锁就释放了的问题
弊端:如果因为代码异常导致取消续锁任务未执行到,该锁就永远不能自动释放,造成死锁,除非服务挂掉(续锁任务停止)或手动将key删除后,别的线程才能获取到该锁,所以,unLock()一般写在finally里,尽可能避免死锁
综上: 个人觉得还是指定合适的leaseTime好,能确保系统出现故障未释放锁后,在一定时间内能够主动去释放锁,避免造成死锁
- 在获取锁失败后,利用通过redis的channel 发布/订阅功能来实现订阅释放锁事件尝试再次获取锁,再次获取锁失败,利用Semaphore.tryAcquire()阻塞等待信号量,即等待锁释放,如果在等待的过程中一直未等到锁的释放事件通知,当超过最大等待时间则获取锁失败,返回 false;如果等到了锁的释放事件的通知,则开始进入下一个while循环重试获取锁(由此处可看出,我们这讲的是非公平锁,redisson实现了公平锁,读写锁等等),避免了当获取不到锁时一直while死循环无效尝试获取锁造成资源浪费
获取锁的整体流程:
模拟多线程同时请求锁的情况:
(注意redis中锁值得变化)
三:释放锁unLock
//RedissonLock.unlock()
@Override
public void unlock() {
try {
//释放当前线程持有锁
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException) e.getCause();
} else {
throw e;
}
}
}
@Override
public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise<>();
//1.利用lua脚本释放锁
RFuture<Boolean> future = unlockInnerAsync(threadId);
future.onComplete((opStatus, e) -> {
//2.取消续锁的定时任务
cancelExpirationRenewal(threadId);
if (e != null) {
result.tryFailure(e);
return;
}
//2.1 返回==null,说明此线程没有持有锁,抛异常
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
result.tryFailure(cause);
return;
}
result.trySuccess(null);
});
return result;
}
//使用lua脚本释放锁
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
//keys[1]为锁名,即redissonClient.getLock("myLock")中的myLock
// //keys[2]为订阅通道:redisson_lock__channel:{myLock}
//ARGV[1]:发布锁释放事件类型:LockPubSub.UNLOCK_MESSAGE
//ARGV[2]:锁过期时间
//ARGV[3]:锁的唯一标识,uuid + ":" + threadId
//1.若field不存在,即当前线程不持有锁,返回null
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
//2.若field存在,value-1,即当前线程持有锁。再判断锁的重入次数,即value值
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
//2.1若value-1后仍 >0,说明此线程lock了多次,重置锁剩余时间,返回0
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
//2.2若value-1后 不>0,说明此线程不再持有锁,删除key并发布LockPubSub.UNLOCK_MESSAGE事件,返回1
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
//3.返回null
"return nil;",
Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
释放锁的整理流程:
四:续锁 /取消续锁
-
锁续时:使用默认过期时间上锁成功后调用
// RedissonBaseLock.scheduleExpirationRenewal()
//过期时间更新,当有新线程占用锁时就开始定时任务,定时更新key的过期时间
protected void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
//1.说明锁已经有定时任务在续时了(因为锁可重入,重入时走此分支),增加锁重入次数,在是否要终止续锁时有用
oldEntry.addThreadId(threadId);
} else {
//需要续锁次数+1
entry.addThreadId(threadId);
//2.要定时更新锁过期时间
renewExpiration();
}
}private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}//使用默认过期时间上锁成功后,10后执行一次续锁任务,如果线程仍然占用着锁,则10s继续重置过期时间为internalLockLeaseTime //直到锁(即uuid + name)没有对应的ExpirationEntry ,即没有线程占用锁定时任务停止 //在取消续锁RedissonBaseLock.cancelExpirationRenewal()时,如果ExpirationEntry 没有对应的线程后,就删除对应的ExpirationEntry Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { //1.判断线程是否还持有锁 ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null) { return; } Long threadId = ent.getFirstThreadId(); if (threadId == null) { return; } //2.还有线程持有锁,利用lua脚本判断线程是否还持有锁,若是,重置过期时间 RFuture<Boolean> future = renewExpirationAsync(threadId); future.onComplete((res, e) -> { if (e != null) { log.error("Can't update lock " + getRawName() + " expiration", e); EXPIRATION_RENEWAL_MAP.remove(getEntryName()); return; } if (res) { // 3.说明线程还在持有锁,10s后继续重置过期时间,调用自己 renewExpiration(); } }); } // 10s后执行 }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); ee.setTimeout(task);
}
//判断threadId是否存在,存在重置key的过期时间
protected RFuture renewExpirationAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
//KEYS[1]:锁名,myLock
//ARGV[1]:过期时间
//ARGV[2]:uuid+threadId
“if (redis.call(‘hexists’, KEYS[1], ARGV[2]) == 1) then ” +
//1.若线程还持有锁,重置过期时间,返回1
“redis.call(‘pexpire’, KEYS[1], ARGV[1]); ” +
“return 1; ” +
“end; ” +
//2.否则返回0
“return 0;”,
Collections.singletonList(getRawName()),
internalLockLeaseTime, getLockName(threadId));
}
值得注意的: 只有使用默认过期时间,即lock()时没有指定leaseTime时,才会续锁
-
取消续锁:在释放锁时调用
// RedissonBaseLock.cancelExpirationRenewal()
//取消续锁的定时任务
protected void cancelExpirationRenewal(Long threadId) {
ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (task == null) {
return;
}if (threadId != null) { //1.线程上锁次数-1,如果-1后为0,则删除锁持有的此线程 //次数用来适应重入锁的,如第三次重入,第一次unlock时,还有其它两次上着锁那,所以仍需续锁 task.removeThreadId(threadId); } //2.若锁已没有上锁的线程了则取消在lock时设置的续锁的定时任务,且将锁对应的ExpirationEntry删除 //删除后在lock成功后的那个续锁的定时任务就不会执行了,因为取不到锁对应的ExpirationEntry了 if (threadId == null || task.hasNoThreads()) { Timeout timeout = task.getTimeout(); if (timeout != null) { timeout.cancel(); } EXPIRATION_RENEWAL_MAP.remove(getEntryName()); }
}
存在的问题: 若lock()后时,因为异常续锁没有被取消,那么问题就来了,锁就会一直被持有,其它线程将永远获取不到锁,所以unlock()一定要放到finally{}中
五:未获取锁时订阅获取锁逻辑
这简单说一下如何利用发布订阅获取锁的流程,有关redis发布订阅详细内容会在专门的文章中讲解
-
订阅释放锁消息
PublishSubscribe
//entryName:uuid:name //channelName:redisson_lock__channel:{name} public RFuture<E> subscribe(String entryName, String channelName) { //1.获取semaphore,在初始化时允许请求数为1,同一个锁获取的semaphore一样 AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName)); RPromise<E> newPromise = new RedissonPromise<>(); //2.添加到监听队列,并tryRun() semaphore.acquire(() -> { if (!newPromise.setUncancellable()) { semaphore.release(); return; } E entry = entries.get(entryName); if (entry != null) { //2.1走到这,说明是第2,3...个线程获取锁失败,将监听器保存到。。。。 entry.acquire(); //执行监听任务 semaphore.release(); entry.getPromise().onComplete(new TransferListener<E>(newPromise)); return; } //2.2走到这,说明是第一个获取锁失败的线程, //2.2.1创建RedissonLockEntry,默认允许信号量请求数为0,然后 +1 E value = createEntry(newPromise); value.acquire(); //2.2.2将创建的lockEntry添加到entries中,在续锁和取消续锁时用 E oldValue = entries.putIfAbsent(entryName, value); if (oldValue != null) { oldValue.acquire(); semaphore.release(); oldValue.getPromise().onComplete(new TransferListener<E>(newPromise)); return; } //***2.2.3真正的创建并注册监听,监听redisson_lock__channel:{name} RedisPubSubListener<Object> listener = createListener(channelName, value); service.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener); }); return newPromise; } //redisson_lock__channel:{name} private RedisPubSubListener<Object> createListener(String channelName, E value) { RedisPubSubListener<Object> listener = new BaseRedisPubSubListener() { //1.收到消息后,拉取第一个监听者任务,开始监听下一个释放锁消息,并释放一个请求 @Override public void onMessage(CharSequence channel, Object message) { if (!channelName.equals(channel.toString())) { return; } PublishSubscribe.this.onMessage(value, (Long) message); } @Override public boolean onStatus(PubSubType type, CharSequence channel) { if (!channelName.equals(channel.toString())) { return false; } if (type == PubSubType.SUBSCRIBE) { value.getPromise().trySuccess(value); return true; } return false; } }; return listener; }
public class LockPubSub extends PublishSubscribe {
public static final Long UNLOCK_MESSAGE = 0L; public static final Long READ_UNLOCK_MESSAGE = 1L; public LockPubSub(PublishSubscribeService service) { super(service); } @Override protected RedissonLockEntry createEntry(RPromise<RedissonLockEntry> newPromise) { return new RedissonLockEntry(newPromise); } @Override protected void onMessage(RedissonLockEntry value, Long message) { if (message.equals(UNLOCK_MESSAGE)) { //1.拉取监听队列中第一个任务并执行run()(又创建监听释放锁监听), //2.释放1个信号量唤醒等待的entry.getLatch().tryAcquire去尝试申请锁 Runnable runnableToExecute = value.getListeners().poll(); if (runnableToExecute != null) { runnableToExecute.run(); } value.getLatch().release(); } else if (message.equals(READ_UNLOCK_MESSAGE)) { //2.使用RedissonWriteLock时,释放锁时为事件 while (true) { Runnable runnableToExecute = value.getListeners().poll(); if (runnableToExecute == null) { break; } runnableToExecute.run(); } value.getLatch().release(value.getLatch().getQueueLength()); } }
}
//AsyncSemaphore
//默认为1,同时只能有1个监听,因为业务只能允许同一时间只能有一个线程获取锁, //如果counter=2,说明同时能有两个线程获取锁,未被了锁的设计 private final AtomicInteger counter; //决定了要监听几次锁释放,即release几次信号量,也即几个线程正在等待获取锁 private final Queue<Runnable> listeners = new ConcurrentLinkedQueue<>(); //在初始化PublishSubscribeService创建,permits=1 public AsyncSemaphore(int permits) { counter = new AtomicInteger(permits); } public void acquire(Runnable listener) { //1.将listener添加到监听队列,在收到释放锁消息时,就会从listeners中拉取首部runnable并尝试执行 listeners.add(listener); //2.尝试创建释放锁监听 tryRun(); }
//根据信号量counter决定是否创建释放锁监听
private void tryRun() {
//默认为1,同时只能有1个监听
//1.说明是第2,3。。。个线程获取锁失败或在收到释放锁时想要拉取下一个监听任务,为null,即不再执行run()创建监听
if (counter.get() == 0
|| listeners.peek() == null) {
return;
}if (counter.decrementAndGet() >= 0) { //2.说明是第一个线程获取锁失败,可以获取信号量,直接执行监听者任务,不会阻塞 Runnable listener = listeners.poll(); if (listener == null) { counter.incrementAndGet(); return; } listener.run(); } else { //3.按理来说应该走不到此处 if (counter.incrementAndGet() > 0) { //+1,调用自己执行监听 tryRun(); } } }
订阅释放锁整体流程:
-
取消订阅释放锁
//PublishSubscribe
//entryName:uuid:name
//channelName:redisson_lock__channel:{name}
public void unsubscribe(E entry, String entryName, String channelName) {
AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName));semaphore.acquire(() -> { if (entry.release() == 0) { //1.counter-1==0说明不再需要监听说明是在前面subscription()创建监听时创建的Entry //发布PubSubType.UNSUBSCRIBE事件,完成后并释放信号量 // just an assertion boolean removed = entries.remove(entryName) == entry; if (!removed) { throw new IllegalStateException(); } service.unsubscribe(PubSubType.UNSUBSCRIBE, new ChannelName(channelName)) .onComplete((r, e) -> { semaphore.release(); }); } else { semaphore.release(); } });
}
六:redisson分布式锁问题
如果是redis-cluster模式,在高并发时,线程1对master写入了myLock锁,在异步赋值给master对应的slave节点时发生当即(此时还未写入到slave成功),主备切换,slave变为了master,此时线程2请求锁,发现没有线程锁定也获取到了锁,此时就有两个线程1,线程2同时获取到了锁,会造成脏数据