使用過(guò)zookeeper的都知道,當(dāng)我們使用zookeeper創(chuàng)建一個(gè)節(jié)點(diǎn)時(shí),我們能選擇節(jié)點(diǎn)的類(lèi)型是“臨時(shí)節(jié)點(diǎn)”還是“永久節(jié)點(diǎn)”。臨時(shí)節(jié)點(diǎn)和永久節(jié)點(diǎn)的區(qū)別是,臨時(shí)節(jié)點(diǎn)會(huì)在客戶(hù)端斷開(kāi)連接時(shí)被刪除,而永久節(jié)點(diǎn)無(wú)論客戶(hù)端是否斷開(kāi)連接,都會(huì)保留。
臨時(shí)節(jié)點(diǎn)非常重要,我們經(jīng)常利用它來(lái)實(shí)現(xiàn)分布式鎖、選舉等等
而為了實(shí)現(xiàn)臨時(shí)節(jié)點(diǎn)的功能,zookeeper服務(wù)端就勢(shì)必要有一套高效的session管理機(jī)制,它能實(shí)現(xiàn)如下功能:當(dāng)客戶(hù)端session失效后,服務(wù)端能感知到,隨后刪掉客戶(hù)端當(dāng)前創(chuàng)建的臨時(shí)節(jié)點(diǎn),并通知給其他的客戶(hù)端。這篇文章會(huì)深入探討zookeeper的session管理機(jī)制。
zookeeper的心跳機(jī)制
為什么要有心跳機(jī)制
zookeeper底層支持兩種網(wǎng)絡(luò)庫(kù),一種是zookeeper基于NIO自己寫(xiě)的,一種是Netty。那么zookeeper能不能直接通過(guò)感知TCP連接是否斷開(kāi)來(lái)感知客戶(hù)端連接是否斷開(kāi)呢?
答案是不能,原因有很多,個(gè)人覺(jué)得最重要的一點(diǎn)是,基于TCP連接來(lái)判斷客戶(hù)端是否存活是不靠譜的。
這里舉兩個(gè)異常case 1、客戶(hù)端進(jìn)程直接crash了,還沒(méi)來(lái)得及發(fā)送FIN報(bào)文,這種情況下,zookeeper在TCP這一側(cè)是沒(méi)辦法及時(shí)感知到TCP連接已經(jīng)失效了。(也就是說(shuō),由于zookeeper沒(méi)收到client的FIN包,雖然client已經(jīng)掛了,TCP側(cè)還認(rèn)為客戶(hù)端還活著。只能等弱雞keepalive了)
2、客戶(hù)端和服務(wù)端之間很久沒(méi)有報(bào)文交互,TCP連接其實(shí)已經(jīng)失效了。(失效原因很多,比如路由器出問(wèn)題了,網(wǎng)絡(luò)設(shè)備故障了等等)這時(shí)候,如果客戶(hù)端發(fā)送報(bào)文給服務(wù)端,linux會(huì)進(jìn)行重試,默認(rèn)差不多要重試15分鐘,才能感知到這個(gè)連接已經(jīng)失效。
這里只是舉個(gè)例子。實(shí)際來(lái)說(shuō),client和zookeeper的報(bào)文交互可能不會(huì)那么少。也就是第二種情況,客戶(hù)端很久不給服務(wù)端發(fā)報(bào)文,要發(fā)了,才發(fā)現(xiàn)tcp連接已經(jīng)有問(wèn)題了,這種情況不太可能出現(xiàn)。(當(dāng)然,選舉這種場(chǎng)景是有可能出現(xiàn)這種情況的,后面會(huì)單獨(dú)寫(xiě)文章分析這個(gè)case) 但是從上面兩個(gè)例子,我們也不難得出結(jié)論,通過(guò)tcp連接的斷開(kāi)與否來(lái)感知客戶(hù)端是否存活,似乎并不太靠譜。
client發(fā)送心跳
因?yàn)閠cp的“不靠譜”,zookeeper為了能夠?qū)崿F(xiàn)可靠的連接管理(也為了?;睿?,選擇自己實(shí)現(xiàn)心跳機(jī)制。
正如上圖所示,client隔一段時(shí)間就會(huì)發(fā)送一個(gè)心跳報(bào)文給服務(wù)端,告訴zookeeper自己還活著,別把我連接關(guān)了。
注意:這里亂入了一個(gè)create報(bào)文,因?yàn)檎?bào)文也算是一種“心跳”。反正,zookeeper只要能收到報(bào)文,就能知道客戶(hù)端還活著。
服務(wù)端收到報(bào)文后,會(huì)更新session信息
這里帶大家看看代碼。很簡(jiǎn)單,收到報(bào)文后,會(huì)調(diào)用SessionTracker.touchSession()來(lái)更新session信息
sessionId是zookeeper為每一個(gè)連接分配的唯一id
管理session的難點(diǎn)在哪
zookeeper 管理session的需求分析
這里我們思考下zookeeper對(duì)于session管理的需求是什么?
1、zookeeper需要有個(gè)地方存session
2、當(dāng)客戶(hù)端一段時(shí)間沒(méi)發(fā)生心跳時(shí),zookeeper要能感知到
第一個(gè)問(wèn)題比較簡(jiǎn)單,java提供了各式各樣的集合,無(wú)論是Map或者是List理論上都能存session。由于我們一般是通過(guò)sessionId來(lái)找到關(guān)聯(lián)的Session的,因此使用Map更合適點(diǎn)。Zookeeper也是這么做的protected final ConcurrentHashMap
第二個(gè)問(wèn)題看起來(lái)也很簡(jiǎn)單,客戶(hù)端不是會(huì)發(fā)送心跳么?我給每一個(gè)session記錄一個(gè)上一個(gè)報(bào)文到達(dá)時(shí)間,一旦收到新的報(bào)文,我就更新這個(gè)時(shí)間。然后我再不斷地掃描每個(gè)session是否很久沒(méi)收到報(bào)文不就行了?
如何檢測(cè)Session失效?
按照上面的說(shuō)法,收到一個(gè)報(bào)文,我就更新Session的“上次報(bào)文”字段。假設(shè)session失效時(shí)間是4秒,我就每隔4秒掃描一次session的集合,找出那個(gè)超過(guò)4秒沒(méi)有新的報(bào)文的session不就行了?像上圖一樣,假設(shè)sessionTimeout是4秒,現(xiàn)在已經(jīng)11點(diǎn)50分05秒了,理論上每個(gè)session上一個(gè)報(bào)文時(shí)間應(yīng)該大于11點(diǎn)50分01秒。很明顯,session1失效了。
定時(shí)任務(wù)的選型
既然要定時(shí)掃描,我們就需要跑一個(gè)定時(shí)任務(wù)。jdk本身也提供了很多的定時(shí)任務(wù)方案。不知道定時(shí)任務(wù)的同學(xué)可以參考這篇文章Java中定時(shí)任務(wù)的6種實(shí)現(xiàn)方式,你知道幾種?- 掘金既然如此,我們完全可以使用jdk自帶的定時(shí)任務(wù),定時(shí)去掃描這個(gè)集合啊,這樣不就能很輕易的找到失效的session了么?
這里有兩個(gè)問(wèn)題:1、每一個(gè)客戶(hù)端和服務(wù)端的連接,sessionTimeout都是可配的。我們例子是4秒沒(méi)收到報(bào)文,就認(rèn)為連接失效。實(shí)際超時(shí)時(shí)間可能有1秒、2秒、3秒.... 所以如果用定時(shí)任務(wù)來(lái)實(shí)現(xiàn),我們可能需要啟動(dòng)不止一個(gè)定時(shí)任務(wù)。2、jdk提供的定時(shí)任務(wù)不夠靈活,什么意思呢。比如我設(shè)置的sessionTimeout是4秒,現(xiàn)在是11點(diǎn)。然后我在11點(diǎn)00分02秒就收到了一個(gè)心跳,那么下次檢測(cè)時(shí)間應(yīng)該變成11點(diǎn)00分06秒。而jdk的定時(shí)任務(wù)限定了,只能每隔4秒檢測(cè)一次。比如:11點(diǎn)、11點(diǎn)00分04秒、11點(diǎn)00分08秒、這樣檢測(cè)下去。而如果用jdk的定時(shí)任務(wù),我們只能簡(jiǎn)單的隔一段時(shí)間,檢測(cè)一次。這里可以仔細(xì)體會(huì)下兩者的差異。
想一想這些問(wèn)題,是不是發(fā)現(xiàn)想實(shí)現(xiàn)一個(gè)高效的session管理機(jī)制是不是沒(méi)那么簡(jiǎn)單。接下來(lái)我們看看,zookeeper是如何巧妙地實(shí)現(xiàn)session的管理。
zookeeper session管理機(jī)制
接下來(lái)就要介紹zookeeper的session管理機(jī)制了。
1、通過(guò)expiryMap存儲(chǔ)過(guò)期時(shí)間與session集合的對(duì)應(yīng)關(guān)系
首先,zookeeper內(nèi)部有一個(gè)expiryMap
非常簡(jiǎn)單,Key是過(guò)期時(shí)間,value是一個(gè)Set,里面放了一個(gè)個(gè)的Session。
舉個(gè)例子:S1在11點(diǎn)50分02秒的key下面,表示如果11點(diǎn)50分02秒前沒(méi)收到新的報(bào)文,就認(rèn)為S1過(guò)期了。
2、當(dāng)收到某條連接的報(bào)文時(shí),更新expiryMap
拿上圖的S1為例,它的sessionTimeout是4秒,在11點(diǎn)50分01秒收到報(bào)文。那么理論上下個(gè)session檢測(cè)時(shí)間會(huì)是 11點(diǎn)50分05秒。
這里要說(shuō)下expiryMap的第一個(gè)特征,它的key并不是隨意一個(gè)時(shí)間。它會(huì)間隔一個(gè)固定的時(shí)間叫做expirationInterval,數(shù)值上它等于zookeeper的配置tickTime(默認(rèn)配的2秒)
所以說(shuō),這里計(jì)算出11點(diǎn)50分05秒后,它會(huì)round下,round到11點(diǎn)50分06秒
如圖所示:
S1在截止時(shí)間前更新了session,我們就要把它從舊的桶里移除,挪到新的桶里。
3、循環(huán)檢測(cè)ExpiryMap
有個(gè)SessionTracker線(xiàn)程會(huì)循環(huán)檢測(cè)這個(gè)expiryMap,找到最近的那個(gè)key對(duì)應(yīng)的session集合,把他們?nèi)慷歼^(guò)期掉。
就像上面的例子,一旦時(shí)間到了11點(diǎn)50分02秒,就把對(duì)應(yīng)的session全部過(guò)期掉。
小結(jié)下
1、如果收到報(bào)文,會(huì)把session放到下一個(gè)過(guò)期桶里。2、SessionTracker會(huì)按次序,不斷地取出過(guò)期的桶,把桶里的session全部過(guò)期掉(過(guò)期會(huì)刪除臨時(shí)節(jié)點(diǎn),當(dāng)然還有其他一系列操作) 3、zookeeper底層使用了非常簡(jiǎn)單的Map就實(shí)現(xiàn)了非常高效的Session管理機(jī)制。
session管理機(jī)制源碼分析
接下來(lái)我們來(lái)看看源碼
1、server端的心跳續(xù)約
//org.apache.zookeeper.server.ExpiryQueue#update public Long update(E elem, int timeout) { //1、除了上面我們介紹的ExpiryMap,zookeeper內(nèi)部還有一個(gè)elemMap,用于存放 Session -> 過(guò)期時(shí)間 Long prevExpiryTime = elemMap.get(elem); long now = Time.currentElapsedTime(); //2、收到心跳后,我們會(huì)計(jì)算session應(yīng)該更新到哪個(gè)桶里 Long newExpiryTime = roundToNextInterval(now + timeout); //桶不變,就不用更新expiryMap了 if (newExpiryTime.equals(prevExpiryTime)) { // No change, so nothing to update return null; } // First add the elem to the new expiry time bucket in expiryMap. //3. 找到新的桶,插入進(jìn)去 Setset = expiryMap.get(newExpiryTime); if (set == null) { // Construct a ConcurrentHashSet using a ConcurrentHashMap set = Collections.newSetFromMap(new ConcurrentHashMap ()); // Put the new set in the map, but only if another thread // hasn't beaten us to it Set existingSet = expiryMap.putIfAbsent(newExpiryTime, set); if (existingSet != null) { set = existingSet; } } set.add(elem); // Map the elem to the new expiry time. If a different previous // mapping was present, clean up the previous expiry bucket. prevExpiryTime = elemMap.put(elem, newExpiryTime); //4. 從舊的桶里移除 if (prevExpiryTime != null && !newExpiryTime.equals(prevExpiryTime)) { Set prevSet = expiryMap.get(prevExpiryTime); if (prevSet != null) { prevSet.remove(elem); } } return newExpiryTime; }
其實(shí)就是這幅圖 1、放入一個(gè):Session -> 過(guò)期時(shí)間 的Map中 2、收到心跳后,我們會(huì)計(jì)算session應(yīng)該更新到哪個(gè)桶里 3、找到新的桶,插入進(jìn)去 4、把session從舊的桶里移除
2、SessionTracker不斷地輪訓(xùn),找到過(guò)期的Session集合,然后都過(guò)期掉
//org.apache.zookeeper.server.SessionTrackerImpl#run @Override public void run() { try { while (running) { //1. 這個(gè)其實(shí)就是不斷地輪訓(xùn)下一個(gè)要檢測(cè)的key // 比如按我們的例子,應(yīng)該是11點(diǎn)50分02秒檢測(cè)一次、11點(diǎn)50分04秒檢測(cè)一次、11點(diǎn)50分06秒檢測(cè)一次... // 這里的waitTime就是找到下次檢測(cè)需要等待多久,比如現(xiàn)在是11點(diǎn)50分01秒了,這個(gè)waitTime就是1秒 // 如果是11點(diǎn)50分02秒了,waitTime就是0,我們要開(kāi)始把過(guò)期的session都失效掉了 long waitTime = sessionExpiryQueue.getWaitTime(); if (waitTime > 0) { Thread.sleep(waitTime); continue; } //2. 取出過(guò)期的Session集合,全部都expire掉,如果session都及時(shí)發(fā)送了心跳了,這里就會(huì)拿到一個(gè)空的集合 for (SessionImpl s : sessionExpiryQueue.poll()) { ServerMetrics.getMetrics().STALE_SESSIONS_EXPIRED.add(1); setSessionClosing(s.sessionId); expirer.expire(s); } } } catch (InterruptedException e) { handleException(this.getName(), e); } LOG.info("SessionTrackerImpl exited loop!"); }
其實(shí)就是這個(gè)圖
-
路由器
+關(guān)注
關(guān)注
22文章
3720瀏覽量
113679 -
TCP
+關(guān)注
關(guān)注
8文章
1353瀏覽量
79046 -
客戶(hù)端
+關(guān)注
關(guān)注
1文章
290瀏覽量
16674 -
zookeeper
+關(guān)注
關(guān)注
0文章
33瀏覽量
3668
原文標(biāo)題:深入詳解zookeeper的session管理機(jī)制
文章出處:【微信號(hào):magedu-Linux,微信公眾號(hào):馬哥Linux運(yùn)維】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。
發(fā)布評(píng)論請(qǐng)先 登錄
相關(guān)推薦
評(píng)論