Zookeeper Watch
1. 前言
在我們使用 Zookeeper 來實(shí)現(xiàn)服務(wù)注冊與發(fā)現(xiàn)、配置中心、分布式通知等功能時,需要使用 Zookeeper 的核心功能 Watch,來對節(jié)點(diǎn)進(jìn)行監(jiān)聽。那么 Zookeeper 的 Watch 是如何實(shí)現(xiàn)對節(jié)點(diǎn)的監(jiān)聽,并響應(yīng)事件到客戶端的呢?我們就帶著這個問題開始本節(jié)的內(nèi)容。
2. Watch 的實(shí)現(xiàn)
我們在 Zookeeper 的數(shù)據(jù)模型這一小節(jié)中學(xué)習(xí)過 Znode 節(jié)點(diǎn)的類型和 Znode 節(jié)點(diǎn)的特點(diǎn),這是 Zookeeper 核心特性之一。在大多數(shù)情況下,我們都會把 Znode 與 Watch 捆綁使用,接下來我們就使用 Zookeeper 的 Java 客戶端 Curator 來實(shí)現(xiàn) Watch 對 Znode 節(jié)點(diǎn)的監(jiān)聽。
我們可以繼續(xù)使用我們在 Zookeeper Curator 這一節(jié)中創(chuàng)建的 Spring Boot 測試項(xiàng)目,在測試方法中對 Watch 進(jìn)行實(shí)現(xiàn)。
2.1 CuratorWatcher
在我們使用 Curator 的 Fluent 風(fēng)格進(jìn)行鏈?zhǔn)秸{(diào)用時,我們可以使用 usingWatcher 來注冊 CuratorWatcher 來對我們的節(jié)點(diǎn)變化事件進(jìn)行監(jiān)聽:
@Test
void contextLoads() throws Exception {
// 獲取客戶端
CuratorFramework curatorClient = curatorService.getCuratorClient();
// 開啟會話
curatorClient.start();
// CuratorWatcher 為接口,我們需要實(shí)現(xiàn) process 方法
CuratorWatcher watcher = new CuratorWatcher(){
@Override
// 監(jiān)聽事件處理
public void process(WatchedEvent watchedEvent) {
// 輸出 監(jiān)聽事件
System.out.println(watchedEvent.toString());
}
};
// 在命名空間下創(chuàng)建持久節(jié)點(diǎn) mooc,內(nèi)容為 Wiki
curatorClient.create().forPath("/mooc","Wiki".getBytes());
// 獲取 mooc 節(jié)點(diǎn)的 data 數(shù)據(jù),并對 mooc 節(jié)點(diǎn)開啟監(jiān)聽
byte[] bytes = curatorClient.getData().usingWatcher(watcher).forPath("/mooc");
// 輸出 data
System.out.println(new String(bytes));
// 第一次更新
curatorClient.setData().forPath("/mooc", "Wiki001".getBytes());
// 第二次更新
curatorClient.setData().forPath("/mooc","Wiki002".getBytes());
}
控制臺輸出:
Wiki
WatchedEvent state:SyncConnected type:NodeDataChanged path:/mooc
控制臺輸出的第一行 Wiki 為 mooc 節(jié)點(diǎn)的 data 數(shù)據(jù)。第二行輸出的 WatchedEvent 為監(jiān)聽到的事件,state 表示監(jiān)聽狀態(tài);type 表示監(jiān)聽到的事件類型,我們可以判斷事件的類型來做相應(yīng)的處理;path 表示監(jiān)聽的節(jié)點(diǎn)。
介紹完 WatchedEvent,我們發(fā)現(xiàn)控制臺只輸出了一次 WatchedEvent,也就是說 CuratorWatcher 只進(jìn)行了一次監(jiān)聽。如果想要重復(fù)使用我們需要重新使用 usingWatcher 進(jìn)行注冊。那么有沒有不需要重復(fù)注冊的監(jiān)聽呢?接下來我們就來介紹 Curator 一種功能強(qiáng)大的監(jiān)聽 CuratorCacheListener。
2.2 CuratorCacheListener
CuratorCacheListener 是基于 CuratorCache 緩存實(shí)現(xiàn)的監(jiān)聽器,CuratorCache 對 ZooKeeper 事件監(jiān)聽進(jìn)行了封裝,能夠自動處理反復(fù)注冊監(jiān)聽。我們使用 CuratorCacheListener 時,需要使用構(gòu)建器 CuratorCacheListenerBuilder 來對具體的事件監(jiān)聽進(jìn)行構(gòu)建,并且把 CuratorCacheListener 注冊到 CuratorCache 緩存中。
首先我們需要構(gòu)建 CuratorCache 緩存實(shí)例,在 CuratorCache 接口中,build 為靜態(tài)方法,我們可以直接調(diào)用:
// 構(gòu)建 CuratorCache 緩存實(shí)例
static CuratorCache build(CuratorFramework client, String path, CuratorCache.Options... options) {
return builder(client, path).withOptions(options).build();
}
我們來說明以下入?yún)ⅲ?code>CuratorFramework client 是 Curator 客戶端;String path
是需要被監(jiān)聽的節(jié)點(diǎn)的路徑;CuratorCache.Options... options
是對緩存設(shè)置的參數(shù),我們可以設(shè)置以下 3 種:
public static enum Options {
// 單節(jié)點(diǎn)緩存
SINGLE_NODE_CACHE,
// 對數(shù)據(jù)進(jìn)行壓縮
COMPRESSED_DATA,
// CuratorCache 關(guān)閉后不清除緩存
DO_NOT_CLEAR_ON_CLOSE;
}
構(gòu)建完緩存實(shí)例,我們再來構(gòu)建 CuratorCacheListener ,在 CuratorCacheListener 接口中的構(gòu)建方法 builder 為靜態(tài)方法,我們可以直接調(diào)用:
// builder 方法,返回 CuratorCacheListenerBuilder 構(gòu)建器,我們就可以使用具體的監(jiān)聽方法了
static CuratorCacheListenerBuilder builder() {
return new CuratorCacheListenerBuilderImpl();
}
最后我們需要把 CuratorCacheListener 注冊到 CuratorCache 中,并開啟緩存:
// 注冊 CuratorCacheListener
cache.listenable().addListener(listener);
// 開啟緩存
cache.start();
我們來看一個完整的例子:
@Test
void contextLoads() throws Exception {
// 獲取客戶端
CuratorFramework client = curatorService.getCuratorClient();
// 開啟會話
client.start();
// 構(gòu)建 CuratorCache 實(shí)例
CuratorCache cache = CuratorCache.build(client, "/mooc");
// 使用 Fluent 風(fēng)格和 lambda 表達(dá)式來構(gòu)建 CuratorCacheListener 的事件監(jiān)聽
CuratorCacheListener listener = CuratorCacheListener.builder()
// 開啟對所有事件的監(jiān)聽
// type 事件類型:NODE_CREATED, NODE_CHANGED, NODE_DELETED;
// oldNode 原節(jié)點(diǎn):ChildData 類,包括節(jié)點(diǎn)路徑,節(jié)點(diǎn)狀態(tài) Stat,節(jié)點(diǎn) data
// newNode 新節(jié)點(diǎn):同上
.forAll((type, oldNode, newNode) -> {
System.out.println("forAll 事件類型:" + type);
System.out.println("forAll 原節(jié)點(diǎn):" + oldNode);
System.out.println("forAll 新節(jié)點(diǎn):" + newNode);
})
// 開啟對節(jié)點(diǎn)創(chuàng)建事件的監(jiān)聽
.forCreates(childData -> {
System.out.println("forCreates 新節(jié)點(diǎn):" + childData);
})
// 開啟對節(jié)點(diǎn)更新事件的監(jiān)聽
.forChanges((oldNode, newNode) -> {
System.out.println("forChanges 原節(jié)點(diǎn):" + oldNode);
System.out.println("forChanges 新節(jié)點(diǎn):" + newNode);
})
// 開啟對節(jié)點(diǎn)刪除事件的監(jiān)聽
.forDeletes(oldNode -> {
System.out.println("forDeletes 原節(jié)點(diǎn):" + oldNode);
})
// 初始化
.forInitialized(() -> {
System.out.println("forInitialized 初始化");
})
// 構(gòu)建
.build();
// 注冊 CuratorCacheListener 到 CuratorCache
cache.listenable().addListener(listener);
// CuratorCache 開啟緩存
cache.start();
// mooc 節(jié)點(diǎn)創(chuàng)建
client.create().forPath("/mooc");
// mooc 節(jié)點(diǎn)更新
client.setData().forPath("/mooc","Wiki".getBytes());
// mooc 節(jié)點(diǎn)刪除
client.delete().forPath("/mooc");
}
我們來查看 CuratorCacheListenerBuilder 接口中具體的事件監(jiān)聽,我們需要監(jiān)聽哪種事件就使用哪種方法:
public interface CuratorCacheListenerBuilder {
// 全部事件
CuratorCacheListenerBuilder forAll(CuratorCacheListener var1);
// 創(chuàng)建事件
CuratorCacheListenerBuilder forCreates(Consumer<ChildData> var1);
// 更新事件
CuratorCacheListenerBuilder forChanges(CuratorCacheListenerBuilder.ChangeListener var1);
// 創(chuàng)建和更新事件
CuratorCacheListenerBuilder forCreatesAndChanges(CuratorCacheListenerBuilder.ChangeListener var1);
// 刪除事件
CuratorCacheListenerBuilder forDeletes(Consumer<ChildData> var1);
// 初始化后開啟線程異步執(zhí)行
CuratorCacheListenerBuilder forInitialized(Runnable var1);
// 子節(jié)點(diǎn)的事件
CuratorCacheListenerBuilder forPathChildrenCache(String var1, CuratorFramework var2, PathChildrenCacheListener var3);
// 節(jié)點(diǎn)本身的事件和子節(jié)點(diǎn)的事件
CuratorCacheListenerBuilder forTreeCache(CuratorFramework var1, TreeCacheListener var2);
// 節(jié)點(diǎn)本身的事件
CuratorCacheListenerBuilder forNodeCache(NodeCacheListener var1);
// 初始化后開啟監(jiān)聽
CuratorCacheListenerBuilder afterInitialized();
// 構(gòu)建方法
CuratorCacheListener build();
/*
* 更新事件時被調(diào)用
*/
@FunctionalInterface
public interface ChangeListener {
void event(ChildData var1, ChildData var2);
}
}
接下來我們執(zhí)行測試方法,查看控制臺輸出:
forInitialized 初始化
forAll 事件類型:NODE_CREATED
forAll 原節(jié)點(diǎn):null
forAll 新節(jié)點(diǎn):ChildData{path='/mooc', stat=2760,2760,1598451457977,1598451457977,0,0,0,0,13,0,2760
, data=[49, 57, 50, 46, 49, 54, 56, 46, 48, 46, 49, 48, 53]}
forCreates 新節(jié)點(diǎn):ChildData{path='/mooc', stat=2760,2760,1598451457977,1598451457977,0,0,0,0,13,0,2760
, data=[49, 57, 50, 46, 49, 54, 56, 46, 48, 46, 49, 48, 53]}
forAll 事件類型:NODE_CHANGED
forAll 原節(jié)點(diǎn):ChildData{path='/mooc', stat=2760,2760,1598451457977,1598451457977,0,0,0,0,13,0,2760
, data=[49, 57, 50, 46, 49, 54, 56, 46, 48, 46, 49, 48, 53]}
forAll 新節(jié)點(diǎn):ChildData{path='/mooc', stat=2760,2761,1598451457977,1598451457984,1,0,0,0,4,0,2760
, data=[87, 105, 107, 105]}
forChanges 原節(jié)點(diǎn):ChildData{path='/mooc', stat=2760,2760,1598451457977,1598451457977,0,0,0,0,13,0,2760
, data=[49, 57, 50, 46, 49, 54, 56, 46, 48, 46, 49, 48, 53]}
forChanges 新節(jié)點(diǎn):ChildData{path='/mooc', stat=2760,2761,1598451457977,1598451457984,1,0,0,0,4,0,2760
, data=[87, 105, 107, 105]}
forAll 事件類型:NODE_DELETED
forAll 原節(jié)點(diǎn):ChildData{path='/mooc', stat=2760,2761,1598451457977,1598451457984,1,0,0,0,4,0,2760
, data=[87, 105, 107, 105]}
forAll 新節(jié)點(diǎn):null
forDeletes 原節(jié)點(diǎn):ChildData{path='/mooc', stat=2760,2761,1598451457977,1598451457984,1,0,0,0,4,0,2760
, data=[87, 105, 107, 105]}
我們發(fā)現(xiàn),我們設(shè)置的 create,setData,delete 這 3 種事件都被監(jiān)聽到了,而且 forAll 每一種事件都監(jiān)聽到了,所以我們在使用的時候,選擇我們需要的事件監(jiān)聽即可。
介紹完 CuratorCacheListener 監(jiān)聽器,并完成了事件監(jiān)聽的測試,那么 Zookeeper 的 Watch 是如何運(yùn)行的呢?接下來我們就來介紹 Watch 的運(yùn)行原理。
3. Watch 的原理
在介紹 Watch 的原理之前,我們先熟悉一個概念:Zookeeper 客戶端對 Znode 的寫操作,也就是新增節(jié)點(diǎn)、更新節(jié)點(diǎn)、刪除節(jié)點(diǎn)這些操作,默認(rèn)會開啟監(jiān)聽;Zookeeper 客戶端對 Znode 的讀操作,也就是查詢節(jié)點(diǎn)數(shù)據(jù)、查詢節(jié)點(diǎn)是否存在、查詢子節(jié)點(diǎn)等操作,需要手動設(shè)置開啟監(jiān)聽。這也是為什么在 GetDataRequest 請求體中會有 watch 這個屬性的原因。
Watch 的運(yùn)行過程分為 4 部分,分別是:客戶端注冊 Watch 、服務(wù)端注冊 Watch、服務(wù)端觸發(fā) Watch、客戶端處理回調(diào)。
-
客戶端注冊 Watch
當(dāng)我們使用 Zookeeper 客戶端向 Zookeeper 服務(wù)端發(fā)送帶有事件監(jiān)聽的請求時,Zookeeper 客戶端會把該請求標(biāo)記成帶有 Watch 的請求,然后把 Watch 監(jiān)聽器注冊到 ListenerManager 中。 -
服務(wù)端注冊 Watch
Zookeeper 服務(wù)端接收到 Zookeeper 客戶端發(fā)送過來的請求,解析請求體,判斷該請求是否帶有 Watch 事件,如果有 Watch 事件,就會把 Watch 事件注冊到 WatchManager 中。 -
服務(wù)端觸發(fā) Watch
Zookeeper 服務(wù)端注冊完 Watch 事件后,會調(diào)用 WatchManager 的 triggerWatch 方法來觸發(fā) Watch 事件,Watch 事件完成后,向客戶端發(fā)送響應(yīng)。 -
客戶端處理回調(diào)
Zookeeper 客戶端接收到 Zookeeper 服務(wù)端的響應(yīng)后,解析響應(yīng)體,根據(jù)響應(yīng)體的類型去 ListenerManager 中查找相對應(yīng)的 Watch 監(jiān)聽器,然后觸發(fā)監(jiān)聽器的回調(diào)函數(shù)。
4. 總結(jié)
在本節(jié)中,我們學(xué)習(xí)了使用 Curator 的兩種方式開啟對事件的監(jiān)聽,也了解了 Watch 運(yùn)行過程的 4 個部分。以下是本節(jié)內(nèi)容的總結(jié):
- Zookeeper Watch 的實(shí)現(xiàn)。
- Zookeeper Watch 的原理。