Zookeeper Watch
1. 前言
在我們使用 Zookeeper 來實現(xiàn)服務注冊與發(fā)現(xiàn)、配置中心、分布式通知等功能時,需要使用 Zookeeper 的核心功能 Watch,來對節(jié)點進行監(jiān)聽。那么 Zookeeper 的 Watch 是如何實現(xiàn)對節(jié)點的監(jiān)聽,并響應事件到客戶端的呢?我們就帶著這個問題開始本節(jié)的內容。
2. Watch 的實現(xiàn)
我們在 Zookeeper 的數(shù)據(jù)模型這一小節(jié)中學習過 Znode 節(jié)點的類型和 Znode 節(jié)點的特點,這是 Zookeeper 核心特性之一。在大多數(shù)情況下,我們都會把 Znode 與 Watch 捆綁使用,接下來我們就使用 Zookeeper 的 Java 客戶端 Curator 來實現(xiàn) Watch 對 Znode 節(jié)點的監(jiān)聽。
我們可以繼續(xù)使用我們在 Zookeeper Curator 這一節(jié)中創(chuàng)建的 Spring Boot 測試項目,在測試方法中對 Watch 進行實現(xiàn)。
2.1 CuratorWatcher
在我們使用 Curator 的 Fluent 風格進行鏈式調用時,我們可以使用 usingWatcher 來注冊 CuratorWatcher 來對我們的節(jié)點變化事件進行監(jiān)聽:
@Test
void contextLoads() throws Exception {
// 獲取客戶端
CuratorFramework curatorClient = curatorService.getCuratorClient();
// 開啟會話
curatorClient.start();
// CuratorWatcher 為接口,我們需要實現(xiàn) process 方法
CuratorWatcher watcher = new CuratorWatcher(){
@Override
// 監(jiān)聽事件處理
public void process(WatchedEvent watchedEvent) {
// 輸出 監(jiān)聽事件
System.out.println(watchedEvent.toString());
}
};
// 在命名空間下創(chuàng)建持久節(jié)點 mooc,內容為 Wiki
curatorClient.create().forPath("/mooc","Wiki".getBytes());
// 獲取 mooc 節(jié)點的 data 數(shù)據(jù),并對 mooc 節(jié)點開啟監(jiān)聽
byte[] bytes = curatorClient.getData().usingWatcher(watcher).forPath("/mooc");
// 輸出 data
System.out.println(new String(bytes));
// 第一次更新
curatorClient.setData().forPath("/mooc", "Wiki001".getBytes());
// 第二次更新
curatorClient.setData().forPath("/mooc","Wiki002".getBytes());
}
控制臺輸出:
Wiki
WatchedEvent state:SyncConnected type:NodeDataChanged path:/mooc
控制臺輸出的第一行 Wiki 為 mooc 節(jié)點的 data 數(shù)據(jù)。第二行輸出的 WatchedEvent 為監(jiān)聽到的事件,state 表示監(jiān)聽狀態(tài);type 表示監(jiān)聽到的事件類型,我們可以判斷事件的類型來做相應的處理;path 表示監(jiān)聽的節(jié)點。
介紹完 WatchedEvent,我們發(fā)現(xiàn)控制臺只輸出了一次 WatchedEvent,也就是說 CuratorWatcher 只進行了一次監(jiān)聽。如果想要重復使用我們需要重新使用 usingWatcher 進行注冊。那么有沒有不需要重復注冊的監(jiān)聽呢?接下來我們就來介紹 Curator 一種功能強大的監(jiān)聽 CuratorCacheListener。
2.2 CuratorCacheListener
CuratorCacheListener 是基于 CuratorCache 緩存實現(xiàn)的監(jiān)聽器,CuratorCache 對 ZooKeeper 事件監(jiān)聽進行了封裝,能夠自動處理反復注冊監(jiān)聽。我們使用 CuratorCacheListener 時,需要使用構建器 CuratorCacheListenerBuilder 來對具體的事件監(jiān)聽進行構建,并且把 CuratorCacheListener 注冊到 CuratorCache 緩存中。
首先我們需要構建 CuratorCache 緩存實例,在 CuratorCache 接口中,build 為靜態(tài)方法,我們可以直接調用:
// 構建 CuratorCache 緩存實例
static CuratorCache build(CuratorFramework client, String path, CuratorCache.Options... options) {
return builder(client, path).withOptions(options).build();
}
我們來說明以下入?yún)ⅲ?code>CuratorFramework client 是 Curator 客戶端;String path
是需要被監(jiān)聽的節(jié)點的路徑;CuratorCache.Options... options
是對緩存設置的參數(shù),我們可以設置以下 3 種:
public static enum Options {
// 單節(jié)點緩存
SINGLE_NODE_CACHE,
// 對數(shù)據(jù)進行壓縮
COMPRESSED_DATA,
// CuratorCache 關閉后不清除緩存
DO_NOT_CLEAR_ON_CLOSE;
}
構建完緩存實例,我們再來構建 CuratorCacheListener ,在 CuratorCacheListener 接口中的構建方法 builder 為靜態(tài)方法,我們可以直接調用:
// builder 方法,返回 CuratorCacheListenerBuilder 構建器,我們就可以使用具體的監(jiān)聽方法了
static CuratorCacheListenerBuilder builder() {
return new CuratorCacheListenerBuilderImpl();
}
最后我們需要把 CuratorCacheListener 注冊到 CuratorCache 中,并開啟緩存:
// 注冊 CuratorCacheListener
cache.listenable().addListener(listener);
// 開啟緩存
cache.start();
我們來看一個完整的例子:
@Test
void contextLoads() throws Exception {
// 獲取客戶端
CuratorFramework client = curatorService.getCuratorClient();
// 開啟會話
client.start();
// 構建 CuratorCache 實例
CuratorCache cache = CuratorCache.build(client, "/mooc");
// 使用 Fluent 風格和 lambda 表達式來構建 CuratorCacheListener 的事件監(jiān)聽
CuratorCacheListener listener = CuratorCacheListener.builder()
// 開啟對所有事件的監(jiān)聽
// type 事件類型:NODE_CREATED, NODE_CHANGED, NODE_DELETED;
// oldNode 原節(jié)點:ChildData 類,包括節(jié)點路徑,節(jié)點狀態(tài) Stat,節(jié)點 data
// newNode 新節(jié)點:同上
.forAll((type, oldNode, newNode) -> {
System.out.println("forAll 事件類型:" + type);
System.out.println("forAll 原節(jié)點:" + oldNode);
System.out.println("forAll 新節(jié)點:" + newNode);
})
// 開啟對節(jié)點創(chuàng)建事件的監(jiān)聽
.forCreates(childData -> {
System.out.println("forCreates 新節(jié)點:" + childData);
})
// 開啟對節(jié)點更新事件的監(jiān)聽
.forChanges((oldNode, newNode) -> {
System.out.println("forChanges 原節(jié)點:" + oldNode);
System.out.println("forChanges 新節(jié)點:" + newNode);
})
// 開啟對節(jié)點刪除事件的監(jiān)聽
.forDeletes(oldNode -> {
System.out.println("forDeletes 原節(jié)點:" + oldNode);
})
// 初始化
.forInitialized(() -> {
System.out.println("forInitialized 初始化");
})
// 構建
.build();
// 注冊 CuratorCacheListener 到 CuratorCache
cache.listenable().addListener(listener);
// CuratorCache 開啟緩存
cache.start();
// mooc 節(jié)點創(chuàng)建
client.create().forPath("/mooc");
// mooc 節(jié)點更新
client.setData().forPath("/mooc","Wiki".getBytes());
// mooc 節(jié)點刪除
client.delete().forPath("/mooc");
}
我們來查看 CuratorCacheListenerBuilder 接口中具體的事件監(jiān)聽,我們需要監(jiān)聽哪種事件就使用哪種方法:
public interface CuratorCacheListenerBuilder {
// 全部事件
CuratorCacheListenerBuilder forAll(CuratorCacheListener var1);
// 創(chuàng)建事件
CuratorCacheListenerBuilder forCreates(Consumer<ChildData> var1);
// 更新事件
CuratorCacheListenerBuilder forChanges(CuratorCacheListenerBuilder.ChangeListener var1);
// 創(chuàng)建和更新事件
CuratorCacheListenerBuilder forCreatesAndChanges(CuratorCacheListenerBuilder.ChangeListener var1);
// 刪除事件
CuratorCacheListenerBuilder forDeletes(Consumer<ChildData> var1);
// 初始化后開啟線程異步執(zhí)行
CuratorCacheListenerBuilder forInitialized(Runnable var1);
// 子節(jié)點的事件
CuratorCacheListenerBuilder forPathChildrenCache(String var1, CuratorFramework var2, PathChildrenCacheListener var3);
// 節(jié)點本身的事件和子節(jié)點的事件
CuratorCacheListenerBuilder forTreeCache(CuratorFramework var1, TreeCacheListener var2);
// 節(jié)點本身的事件
CuratorCacheListenerBuilder forNodeCache(NodeCacheListener var1);
// 初始化后開啟監(jiān)聽
CuratorCacheListenerBuilder afterInitialized();
// 構建方法
CuratorCacheListener build();
/*
* 更新事件時被調用
*/
@FunctionalInterface
public interface ChangeListener {
void event(ChildData var1, ChildData var2);
}
}
接下來我們執(zhí)行測試方法,查看控制臺輸出:
forInitialized 初始化
forAll 事件類型:NODE_CREATED
forAll 原節(jié)點:null
forAll 新節(jié)點:ChildData{path='/mooc', stat=2760,2760,1598451457977,1598451457977,0,0,0,0,13,0,2760
, data=[49, 57, 50, 46, 49, 54, 56, 46, 48, 46, 49, 48, 53]}
forCreates 新節(jié)點:ChildData{path='/mooc', stat=2760,2760,1598451457977,1598451457977,0,0,0,0,13,0,2760
, data=[49, 57, 50, 46, 49, 54, 56, 46, 48, 46, 49, 48, 53]}
forAll 事件類型:NODE_CHANGED
forAll 原節(jié)點:ChildData{path='/mooc', stat=2760,2760,1598451457977,1598451457977,0,0,0,0,13,0,2760
, data=[49, 57, 50, 46, 49, 54, 56, 46, 48, 46, 49, 48, 53]}
forAll 新節(jié)點:ChildData{path='/mooc', stat=2760,2761,1598451457977,1598451457984,1,0,0,0,4,0,2760
, data=[87, 105, 107, 105]}
forChanges 原節(jié)點:ChildData{path='/mooc', stat=2760,2760,1598451457977,1598451457977,0,0,0,0,13,0,2760
, data=[49, 57, 50, 46, 49, 54, 56, 46, 48, 46, 49, 48, 53]}
forChanges 新節(jié)點:ChildData{path='/mooc', stat=2760,2761,1598451457977,1598451457984,1,0,0,0,4,0,2760
, data=[87, 105, 107, 105]}
forAll 事件類型:NODE_DELETED
forAll 原節(jié)點:ChildData{path='/mooc', stat=2760,2761,1598451457977,1598451457984,1,0,0,0,4,0,2760
, data=[87, 105, 107, 105]}
forAll 新節(jié)點:null
forDeletes 原節(jié)點:ChildData{path='/mooc', stat=2760,2761,1598451457977,1598451457984,1,0,0,0,4,0,2760
, data=[87, 105, 107, 105]}
我們發(fā)現(xiàn),我們設置的 create,setData,delete 這 3 種事件都被監(jiān)聽到了,而且 forAll 每一種事件都監(jiān)聽到了,所以我們在使用的時候,選擇我們需要的事件監(jiān)聽即可。
介紹完 CuratorCacheListener 監(jiān)聽器,并完成了事件監(jiān)聽的測試,那么 Zookeeper 的 Watch 是如何運行的呢?接下來我們就來介紹 Watch 的運行原理。
3. Watch 的原理
在介紹 Watch 的原理之前,我們先熟悉一個概念:Zookeeper 客戶端對 Znode 的寫操作,也就是新增節(jié)點、更新節(jié)點、刪除節(jié)點這些操作,默認會開啟監(jiān)聽;Zookeeper 客戶端對 Znode 的讀操作,也就是查詢節(jié)點數(shù)據(jù)、查詢節(jié)點是否存在、查詢子節(jié)點等操作,需要手動設置開啟監(jiān)聽。這也是為什么在 GetDataRequest 請求體中會有 watch 這個屬性的原因。
Watch 的運行過程分為 4 部分,分別是:客戶端注冊 Watch 、服務端注冊 Watch、服務端觸發(fā) Watch、客戶端處理回調。
-
客戶端注冊 Watch
當我們使用 Zookeeper 客戶端向 Zookeeper 服務端發(fā)送帶有事件監(jiān)聽的請求時,Zookeeper 客戶端會把該請求標記成帶有 Watch 的請求,然后把 Watch 監(jiān)聽器注冊到 ListenerManager 中。 -
服務端注冊 Watch
Zookeeper 服務端接收到 Zookeeper 客戶端發(fā)送過來的請求,解析請求體,判斷該請求是否帶有 Watch 事件,如果有 Watch 事件,就會把 Watch 事件注冊到 WatchManager 中。 -
服務端觸發(fā) Watch
Zookeeper 服務端注冊完 Watch 事件后,會調用 WatchManager 的 triggerWatch 方法來觸發(fā) Watch 事件,Watch 事件完成后,向客戶端發(fā)送響應。 -
客戶端處理回調
Zookeeper 客戶端接收到 Zookeeper 服務端的響應后,解析響應體,根據(jù)響應體的類型去 ListenerManager 中查找相對應的 Watch 監(jiān)聽器,然后觸發(fā)監(jiān)聽器的回調函數(shù)。
4. 總結
在本節(jié)中,我們學習了使用 Curator 的兩種方式開啟對事件的監(jiān)聽,也了解了 Watch 運行過程的 4 個部分。以下是本節(jié)內容的總結:
- Zookeeper Watch 的實現(xiàn)。
- Zookeeper Watch 的原理。