亚洲农村老熟妇肥BBBB_无码人妻精品一区二区蜜桃色_精品亚洲AⅤ无码午夜在线观看_中文字幕熟妇人妻在线视频_囯产色无码精品视频免费

當(dāng)前位置: 首頁(yè) > 科技新聞 >

阿里面試:如何用Redis實(shí)現(xiàn)分布式鎖?

時(shí)間:2020-06-04 17:39來(lái)源:網(wǎng)絡(luò)整理 瀏覽:
前言上一章節(jié)我提到了基于zk分布式鎖的實(shí)現(xiàn),這章節(jié)就來(lái)說(shuō)一下基于Redis的分布式鎖實(shí)現(xiàn)吧。zk實(shí)現(xiàn)分布式鎖的傳送門:zk分布式鎖在開始提到
前言

上一章節(jié)我提到了基于zk分布式鎖的實(shí)現(xiàn),這章節(jié)就來(lái)說(shuō)一下基于Redis的分布式鎖實(shí)現(xiàn)吧。

zk實(shí)現(xiàn)分布式鎖的傳送門:zk分布式鎖

在開始提到Redis分布式鎖之前,我想跟大家聊點(diǎn)Redis的基礎(chǔ)知識(shí)。

說(shuō)一下Redis的兩個(gè)命令:

SETNX key value

setnx 是SET if Not eXists(如果不存在,則 SET)的簡(jiǎn)寫。

阿里面試:如何用Redis實(shí)現(xiàn)分布式鎖?

用法如圖,如果不存在set成功返回int的1,這個(gè)key存在了返回0。

SETEX key seconds value

將值 value 關(guān)聯(lián)到 key ,并將 key 的生存時(shí)間設(shè)為 seconds (以秒為單位)。

如果 key 已經(jīng)存在,setex命令將覆寫舊值。

有小伙伴肯定會(huì)疑惑萬(wàn)一set value 成功 set time失敗,那不就傻了么,這啊Redis官網(wǎng)想到了。

setex是一個(gè)原子性(atomic)操作,關(guān)聯(lián)值和設(shè)置生存時(shí)間兩個(gè)動(dòng)作會(huì)在同一時(shí)間內(nèi)完成。

阿里面試:如何用Redis實(shí)現(xiàn)分布式鎖?

我設(shè)置了10秒的失效時(shí)間,ttl命令可以查看倒計(jì)時(shí),負(fù)的說(shuō)明已經(jīng)到期了。

跟大家講這兩個(gè)命名也是有原因的,因?yàn)樗麄兪荝edis實(shí)現(xiàn)分布式鎖的關(guān)鍵。

正文

開始前還是看看場(chǎng)景:

阿里面試:如何用Redis實(shí)現(xiàn)分布式鎖?

我依然是創(chuàng)建了很多個(gè)線程去扣減庫(kù)存inventory,不出意外的庫(kù)存扣減順序變了,最終的結(jié)果也是不對(duì)的。

單機(jī)加synchronized或者Lock這些常規(guī)操作我就不說(shuō)了好吧,結(jié)果肯定是對(duì)的。

阿里面試:如何用Redis實(shí)現(xiàn)分布式鎖?

我先實(shí)現(xiàn)一個(gè)簡(jiǎn)單的Redis鎖,然后我們?cè)賹?shí)現(xiàn)分布式鎖,可能更方便大家的理解。

還記得上面我說(shuō)過(guò)的命令么,實(shí)現(xiàn)一個(gè)單機(jī)的其實(shí)比較簡(jiǎn)單,你們先思考一下,別往下看。

setnx阿里面試:如何用Redis實(shí)現(xiàn)分布式鎖?

可以看到,第一個(gè)成功了,沒釋放鎖,后面的都失敗了,至少順序問(wèn)題問(wèn)題是解決了,只要加鎖,縮放后面的拿到,釋放如此循環(huán),就能保證按照順序執(zhí)行。

但是你們也發(fā)現(xiàn)問(wèn)題了,還是一樣的,第一個(gè)仔set成功了,但是突然掛了,那鎖就一直在那無(wú)法得到釋放,后面的線程也永遠(yuǎn)得不到鎖,又死鎖了。

所以....

setex

知道我之前說(shuō)這個(gè)命令的原因了吧,設(shè)置一個(gè)過(guò)期時(shí)間,就算線程1掛了,也會(huì)在失效時(shí)間到了,自動(dòng)釋放。

我這里就用到了nx和px的結(jié)合參數(shù),就是set值并且加了過(guò)期時(shí)間,這里我還設(shè)置了一個(gè)過(guò)期時(shí)間,就是這時(shí)間內(nèi)如果第二個(gè)沒拿到第一個(gè)的鎖,就退出阻塞了,因?yàn)榭赡苁强蛻舳藬噙B了。

阿里面試:如何用Redis實(shí)現(xiàn)分布式鎖?

加鎖

整體加鎖的邏輯比較簡(jiǎn)單,大家基本上都能看懂,不過(guò)我拿到當(dāng)前時(shí)間去減開始時(shí)間的操作感覺有點(diǎn)笨, System.currentTimeMillis()消耗很大的。

/**
*加鎖
*
*@paramid
*@return
*/
publicbooleanlock(Stringid){
Longstart=System.currentTimeMillis();
try{
for(;;){
//SET命令返回OK,則證明獲取鎖成功
Stringlock=jedis.set(LOCK_KEY,id,params);
if("OK".equals(lock)){
returntrue;
}
//否則循環(huán)等待,在timeout時(shí)間內(nèi)仍未獲取到鎖,則獲取失敗
longl=System.currentTimeMillis()-start;
if(l>=timeout){
returnfalse;
}
try{
Thread.sleep(100);
}catch(InterruptedExceptione){
e.printStackTrace();
}
}
}finally{
jedis.close();
}
}

System.currentTimeMillis消耗大,每個(gè)線程進(jìn)來(lái)都這樣,我之前寫代碼,就會(huì)在服務(wù)器啟動(dòng)的時(shí)候,開一個(gè)線程不斷去拿,調(diào)用方直接獲取值就好了,不過(guò)也不是最優(yōu)解,日期類還是有很多好方法的。

@Service
publicclassTimeServcie{
privatestaticlongtime;
static{
newThread(newRunnable(){
@Override
publicvoidrun(){
while(true){
try{
Thread.sleep(5);
}catch(InterruptedExceptione){
e.printStackTrace();
}
longcur=System.currentTimeMillis();
setTime(cur);
}
}
}).start();
}

publicstaticlonggetTime(){
returntime;
}

publicstaticvoidsetTime(longtime){
TimeServcie.time=time;
}
}
解鎖

解鎖的邏輯更加簡(jiǎn)單,就是一段Lua的拼裝,把Key做了刪除。

你們發(fā)現(xiàn)沒,我上面加鎖解鎖都用了UUID,這就是為了保證,誰(shuí)加鎖了誰(shuí)解鎖,要是你刪掉了我的鎖,那不亂套了嘛。

LUA是原子性的,也比較簡(jiǎn)單,就是判斷一下Key和我們參數(shù)是否相等,是的話就刪除,返回成功1,0就是失敗。

/**
*解鎖
*
*@paramid
*@return
*/
publicbooleanunlock(Stringid){
Stringscript=
"ifredis.call('get',KEYS[1])==ARGV[1]then"+
"returnredis.call('del',KEYS[1])"+
"else"+
"return0"+
"end";
try{
Stringresult=jedis.eval(script,Collections.singletonList(LOCK_KEY),Collections.singletonList(id)).toString();
return"1".equals(result)?true:false;
}finally{
jedis.close();
}
}
驗(yàn)證

我們可以用我們寫的Redis鎖試試效果,可以看到都按照順序去執(zhí)行了

阿里面試:如何用Redis實(shí)現(xiàn)分布式鎖?

思考

大家是不是覺得完美了,但是上面的鎖,有不少瑕疵的,我沒思考很多點(diǎn),你或許可以思考一下,源碼我都開源到我的GItHub了。

而且,鎖一般都是需要可重入行的,上面的線程都是執(zhí)行完了就釋放了,無(wú)法再次進(jìn)入了,進(jìn)去也是重新加鎖了,對(duì)于一個(gè)鎖的設(shè)計(jì)來(lái)說(shuō)肯定不是很合理的。

我不打算手寫,因?yàn)槎加鞋F(xiàn)成的,別人幫我們寫好了。

redisson

redisson的鎖,就實(shí)現(xiàn)了可重入了,但是他的源碼比較晦澀難懂。

使用起來(lái)很簡(jiǎn)單,因?yàn)樗麄兊讓佣挤庋b好了,你連接上你的Redis客戶端,他幫你做了我上面寫的一切,然后更完美。

簡(jiǎn)單看看他的使用吧,跟正常使用Lock沒啥區(qū)別。

ThreadPoolExecutorthreadPoolExecutor=
newThreadPoolExecutor(inventory,inventory,10L,SECONDS,linkedBlockingQueue);
longstart=System.currentTimeMillis();
Configconfig=newConfig();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
finalRedissonClientclient=Redisson.create(config);
finalRLocklock=client.getLock("lock1");

for(inti=0;i<=NUM;i++){
threadPoolExecutor.execute(newRunnable(){
publicvoidrun(){
lock.lock();
inventory--;
System.out.println(inventory);
lock.unlock();
}
});
}
longend=System.currentTimeMillis();
System.out.println("執(zhí)行線程數(shù):"+NUM+"總耗時(shí):"+(end-start)+"庫(kù)存數(shù)為:"+inventory);

上面可以看到我用到了getLock,其實(shí)就是獲取一個(gè)鎖的實(shí)例。

RedissionLock也沒做啥,就是熟悉的初始化。

publicRLockgetLock(Stringname){
returnnewRedissonLock(connectionManager.getCommandExecutor(),name);
}

publicRedissonLock(CommandAsyncExecutorcommandExecutor,Stringname){
super(commandExecutor,name);
//命令執(zhí)行器
this.commandExecutor=commandExecutor;
//UUID字符串
this.id=commandExecutor.getConnectionManager().getId();
//內(nèi)部鎖過(guò)期時(shí)間
this.internalLockLeaseTime=commandExecutor.
getConnectionManager().getCfg().getLockWatchdogTimeout();
this.entryName=id+":"+name;
}
加鎖

有沒有發(fā)現(xiàn)很多跟Lock很多相似的地方呢?

嘗試加鎖,拿到當(dāng)前線程,然后我開頭說(shuō)的ttl也看到了,是不是一切都是那么熟悉?

publicvoidlockInterruptibly(longleaseTime,TimeUnitunit)throwsInterruptedException{

//當(dāng)前線程ID
longthreadId=Thread.currentThread().getId();
//嘗試獲取鎖
Longttl=tryAcquire(leaseTime,unit,threadId);
//如果ttl為空,則證明獲取鎖成功
if(ttl==null){
return;
}
//如果獲取鎖失敗,則訂閱到對(duì)應(yīng)這個(gè)鎖的channel
RFuture<RedissonLockEntry>future=subscribe(threadId);
commandExecutor.syncSubscription(future);

try{
while(true){
//再次嘗試獲取鎖
ttl=tryAcquire(leaseTime,unit,threadId);
//ttl為空,說(shuō)明成功獲取鎖,返回
if(ttl==null){
break;
}
//ttl大于0則等待ttl時(shí)間后繼續(xù)嘗試獲取
if(ttl>=0){
getEntry(threadId).getLatch().tryAcquire(ttl,TimeUnit.MILLISECONDS);
}else{
getEntry(threadId).getLatch().acquire();
}
}
}finally{
//取消對(duì)channel的訂閱
unsubscribe(future,threadId);
}
//get(lockAsync(leaseTime,unit));
}
獲取鎖

獲取鎖的時(shí)候,也比較簡(jiǎn)單,你可以看到,他也是不斷刷新過(guò)期時(shí)間,跟我上面不斷去拿當(dāng)前時(shí)間,校驗(yàn)過(guò)期是一個(gè)道理,只是我比較粗糙。

private<T>RFuture<Long>tryAcquireAsync(longleaseTime,TimeUnitunit,finallongthreadId){

//如果帶有過(guò)期時(shí)間,則按照普通方式獲取鎖
if(leaseTime!=-1){
returntryLockInnerAsync(leaseTime,unit,threadId,RedisCommands.EVAL_LONG);
}

//先按照30秒的過(guò)期時(shí)間來(lái)執(zhí)行獲取鎖的方法
RFuture<Long>ttlRemainingFuture=tryLockInnerAsync(
commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
TimeUnit.MILLISECONDS,threadId,RedisCommands.EVAL_LONG);

//如果還持有這個(gè)鎖,則開啟定時(shí)任務(wù)不斷刷新該鎖的過(guò)期時(shí)間
ttlRemainingFuture.addListener(newFutureListener<Long>(){
@Override
publicvoidoperationComplete(Future<Long>future)throwsException{
if(!future.isSuccess()){
return;
}

LongttlRemaining=future.getNow();
//lockacquired
if(ttlRemaining==null){
scheduleExpirationRenewal(threadId);
}
}
});
returnttlRemainingFuture;
}
底層加鎖邏輯

你可能會(huì)想這么多操作,在一起不是原子性不還是有問(wèn)題么?

大佬們肯定想得到呀,所以還是LUA,他使用了Hash的數(shù)據(jù)結(jié)構(gòu)。

主要是判斷鎖是否存在,存在就設(shè)置過(guò)期時(shí)間,如果鎖已經(jīng)存在了,那對(duì)比一下線程,線程是一個(gè)那就證明可以重入,鎖在了,但是不是當(dāng)前線程,證明別人還沒釋放,那就把剩余時(shí)間返回,加鎖失敗。

是不是有點(diǎn)繞,多理解一遍。

<T>RFuture<T>tryLockInnerAsync(longleaseTime,TimeUnitunit,
longthreadId,RedisStrictCommand<T>command){

//過(guò)期時(shí)間
internalLockLeaseTime=unit.toMillis(leaseTime);

returncommandExecutor.evalWriteAsync(getName(),LongCodec.INSTANCE,command,
//如果鎖不存在,則通過(guò)hset設(shè)置它的值,并設(shè)置過(guò)期時(shí)間
"if(redis.call('exists',KEYS[1])==0)then"+
"redis.call('hset',KEYS[1],ARGV[2],1);"+
"redis.call('pexpire',KEYS[1],ARGV[1]);"+
"returnnil;"+
"end;"+
//如果鎖已存在,并且鎖的是當(dāng)前線程,則通過(guò)hincrby給數(shù)值遞增1
"if(redis.call('hexists',KEYS[1],ARGV[2])==1)then"+
"redis.call('hincrby',KEYS[1],ARGV[2],1);"+
"redis.call('pexpire',KEYS[1],ARGV[1]);"+
"returnnil;"+
"end;"+
//如果鎖已存在,但并非本線程,則返回過(guò)期時(shí)間ttl
"returnredis.call('pttl',KEYS[1]);",
Collections.<Object>singletonList(getName()),
internalLockLeaseTime,getLockName(threadId));
}
解鎖

鎖的釋放主要是publish釋放鎖的信息,然后做校驗(yàn),一樣會(huì)判斷是否當(dāng)前線程,成功就釋放鎖,還有個(gè)hincrby遞減的操作,鎖的值大于0說(shuō)明是可重入鎖,那就刷新過(guò)期時(shí)間。

如果值小于0了,那刪掉Key釋放鎖。

是不是又和AQS很像了?

AQS就是通過(guò)一個(gè)volatile修飾status去看鎖的狀態(tài),也會(huì)看數(shù)值判斷是否是可重入的。

所以我說(shuō)代碼的設(shè)計(jì),最后就萬(wàn)劍歸一,都是一樣的。

publicRFuture<Void>unlockAsync(finallongthreadId){
finalRPromise<Void>result=newRedissonPromise<Void>();

//解鎖方法
RFuture<Boolean>future=unlockInnerAsync(threadId);

future.addListener(newFutureListener<Boolean>(){
@Override
publicvoidoperationComplete(Future<Boolean>future)throwsException{
if(!future.isSuccess()){
cancelExpirationRenewal(threadId);
result.tryFailure(future.cause());
return;
}
//獲取返回值
BooleanopStatus=future.getNow();
//如果返回空,則證明解鎖的線程和當(dāng)前鎖不是同一個(gè)線程,拋出異常
if(opStatus==null){
IllegalMonitorStateExceptioncause=
newIllegalMonitorStateException("
attempttounlocklock,notlockedbycurrentthreadbynodeid:"
+id+"thread-id:"+threadId);
result.tryFailure(cause);
return;
}
//解鎖成功,取消刷新過(guò)期時(shí)間的那個(gè)定時(shí)任務(wù)
if(opStatus){
cancelExpirationRenewal(null);
}
result.trySuccess(null);
}
});

returnresult;
}


protectedRFuture<Boolean>unlockInnerAsync(longthreadId){
returncommandExecutor.evalWriteAsync(getName(),LongCodec.INSTANCE,EVAL,

//如果鎖已經(jīng)不存在,發(fā)布鎖釋放的消息
"if(redis.call('exists',KEYS[1])==0)then"+
"redis.call('publish',KEYS[2],ARGV[1]);"+
"return1;"+
"end;"+
//如果釋放鎖的線程和已存在鎖的線程不是同一個(gè)線程,返回null
"if(redis.call('hexists',KEYS[1],ARGV[3])==0)then"+
"returnnil;"+
"end;"+
//通過(guò)hincrby遞減1的方式,釋放一次鎖
//若剩余次數(shù)大于0,則刷新過(guò)期時(shí)間
"localcounter=redis.call('hincrby',KEYS[1],ARGV[3],-1);"+
"if(counter>0)then"+
"redis.call('pexpire',KEYS[1],ARGV[2]);"+
"return0;"+
//否則證明鎖已經(jīng)釋放,刪除key并發(fā)布鎖釋放的消息
"else"+
"redis.call('del',KEYS[1]);"+
"redis.call('publish',KEYS[2],ARGV[1]);"+
"return1;"+
"end;"+
"returnnil;",
Arrays.<Object>asList(getName(),getChannelName()),
LockPubSub.unlockMessage,internalLockLeaseTime,getLockName(threadId));

}
總結(jié)

這個(gè)寫了比較久,但是不是因?yàn)閺?fù)雜什么的,是因?yàn)閭€(gè)人工作的原因,最近事情很多嘛,還是那句話,程序員才是我的本職寫文章只是個(gè)愛好,不能本末倒置了。

大家會(huì)發(fā)現(xiàn),你學(xué)懂一個(gè)技術(shù)棧之后,學(xué)新的會(huì)很快,而且也能發(fā)現(xiàn)他們的設(shè)計(jì)思想和技巧真的很巧妙,也總能找到相似點(diǎn),和讓你驚嘆的點(diǎn)。

就拿Doug Lea寫的AbstractQueuedSynchronizer(AQS)來(lái)說(shuō),他寫了一行代碼,你可能看幾天才能看懂,大佬們的思想是真的牛。

我看源碼有時(shí)候也頭疼,但是去谷歌一下,自己理解一下,突然恍然大悟的時(shí)候覺得一切又很值。

學(xué)習(xí)就是一條時(shí)而郁郁寡歡,時(shí)而開環(huán)大笑的路,大家加油,我們成長(zhǎng)路上一起共勉。

我是敖丙,一個(gè)在互聯(lián)網(wǎng)茍且偷生的工具人。

最好的關(guān)系是互相成就,大家的**「三連」**就是丙丙創(chuàng)作的最大動(dòng)力,我們下期見!

注:如果本篇博客有任何錯(cuò)誤和建議,歡迎人才們留言,你快說(shuō)句話啊!

你知道的越多,你不知道的越多

推薦內(nèi)容