Zookeeper Curator
1. 前言
在上一節(jié)中我們學(xué)習(xí)了 Zookeeper 的 Java 客戶端之一 ZkClient ,除了 ZkClient 之外,還有就是 Zookeeper 最流行的 Java 客戶端之一的 Curator。Curator 與 ZkClient 相比較又有什么區(qū)別呢?接下來我們就開始 Curator 的學(xué)習(xí)。
2. Curator 簡介
我們來看一下 Curator 的官網(wǎng)介紹:
Curator 是 Netflix 公司開源的一套 Zookeeper 客戶端框架,后來捐獻(xiàn)給 Apache 成為頂級的開源項(xiàng)目。
Curator 和 ZkClient 同樣簡化了 Zookeeper 原生 API 的開發(fā)工作,而 Curator 提供了一套易用性和可讀性更強(qiáng)的 Fluent 風(fēng)格的客戶端 API ,還提供了 Zookeeper 各種應(yīng)用場景的抽象封裝,比如:分布式鎖服務(wù)、集群領(lǐng)導(dǎo)選舉、共享計(jì)數(shù)器、緩存機(jī)制、分布式隊(duì)列等。
Curator 相較其它 Zookeeper 客戶端功能更強(qiáng)大,應(yīng)用更廣泛,使用更便捷,所以它能成為當(dāng)下最流行的 Zookeeper 的 Java 客戶端之一。
接下來我們就開始學(xué)習(xí)如何使用 Curator 客戶端對 Zookeeper 服務(wù)進(jìn)行操作。
Tips: Fluent 風(fēng)格類似于鏈?zhǔn)骄幊蹋褂?Fluent 風(fēng)格編寫的類,調(diào)用該類的方法會返回該類本身,然后可以繼續(xù)調(diào)用該類方法。
3. Curator 使用
我們新建一個(gè) Spring Boot 項(xiàng)目來對 Curator 進(jìn)行集成。首先我們要在 pom.xml 文件中加入 Curator 的 Maven 依賴。
3.1 Curator 依賴
pom.xml 文件配置如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.2.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>cn.cdd</groupId>
<artifactId>curator-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>curator-demo</name>
<description>curator-demo project for Spring Boot</description>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-framework -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.1.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
引入 Curator 的依賴后,我們先來介紹一下 Curator 的 API ,然后再編寫測試用例進(jìn)行 API 測試。
3.2 Curator API
本小節(jié)我們來對 Curator 的 API 進(jìn)行介紹,主要有 Curator 客戶端實(shí)例的創(chuàng)建,session 的重連策略,節(jié)點(diǎn)的添加,獲取節(jié)點(diǎn)數(shù)據(jù),修改節(jié)點(diǎn)的數(shù)據(jù),刪除節(jié)點(diǎn)等。
3.2.1 創(chuàng)建客戶端
我們這里講解 3 種創(chuàng)建客戶端的方法,Curator 客戶端的實(shí)現(xiàn)類為 CuratorFrameworkImpl,我們可以用它的接口 CuratorFramework 來接收創(chuàng)建客戶端的返回值 。
- 第 1 種: 使用創(chuàng)建 Curator 客戶端的 API newClient 方法,其中第一個(gè)參數(shù) connectString 為 Zookeeper 服務(wù)端的地址字符串,第二個(gè)參數(shù) RetryPolicy 為會話重連策略,關(guān)于重連策略我們稍后再進(jìn)行詳細(xì)的講解。
public static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy) { return newClient(connectString, DEFAULT_SESSION_TIMEOUT_MS, DEFAULT_CONNECTION_TIMEOUT_MS, retryPolicy); }
- 第 2 種: 在上面的 newClient 方法中,其實(shí)還是調(diào)用的下面的 newClient 方法,增加了參數(shù) sessionTimeoutMs 會話超時(shí)時(shí)間,connectionTimeoutMs 連接超時(shí)時(shí)間。
public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy) { return builder().connectString(connectString).sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).retryPolicy(retryPolicy).build(); }
- 第 3 種: 我們可以直接調(diào)用工廠類 CuratorFrameworkFactory 的 builder 方法,并且使用 Fluent 風(fēng)格的寫法來完成客戶端的實(shí)例化。寫法如下:
/** * 獲取 CuratorClient * 使用 Fluent 風(fēng)格 * @return CuratorFramework */ public CuratorFramework getCuratorClient(){ // 使用 CuratorFrameworkFactory 來構(gòu)建 CuratorFramework CuratorFramework client = CuratorFrameworkFactory.builder() // Zookeeper 服務(wù)器地址字符串 .connectString(connectString) // session 會話超時(shí)時(shí)間 .sessionTimeoutMs(sessionTimeoutMs) // 使用哪種重連策略 .retryPolicy(retryOneTime) // 命名空間,表示當(dāng)前客戶端的父節(jié)點(diǎn),我們可以用它來做業(yè)務(wù)區(qū)分 .namespace(namespace) .build(); return client; }
Curator 客戶端創(chuàng)建完畢后,我們使用 start 方法就可以創(chuàng)建會話,使用 close 方法結(jié)束會話。
client.start();
client.close();
3.2.2 會話重連策略
Curator 提供了會話重連策略的接口 RetryPolicy,并且提供了幾種默認(rèn)的實(shí)現(xiàn),下面我們介紹幾種常用的策略。
- RetryForever
// RetryForever:間隔{參數(shù)1}毫秒后重連,永遠(yuǎn)重試 private RetryPolicy retryForever = new RetryForever(3000);
- RetryOneTime
// RetryOneTime:{參數(shù)1}毫秒后重連,只重連一次 private RetryPolicy retryOneTime = new RetryOneTime(3000);
- RetryNTimes
// RetryNTimes: {參數(shù)2}毫秒后重連,重連{參數(shù)1}次 private RetryPolicy retryNTimes = new RetryNTimes(3,3000);
- RetryUntilElapsed
// RetryUntilElapsed:每{參數(shù)2}毫秒重連一次,總等待時(shí)間超過{參數(shù)1}毫秒后停止重連 private RetryPolicy retryUntilElapsed = new RetryUntilElapsed(10000,3000);
- ExponentialBackoffRetry
// ExponentialBackoffRetry:可重連{參數(shù)2}次,并增加每次重連之間的睡眠時(shí)間,遞增加公式如下: // {參數(shù)1} * Math.max(1,random.nextInt(1 << ({參數(shù)2} + 1))) private RetryPolicy exponential = new ExponentialBackoffRetry(1000,3);
Curator 的會話重連策略方案介紹完畢,我們選擇其中一種實(shí)現(xiàn)即可。
3.2.3 創(chuàng)建節(jié)點(diǎn)
創(chuàng)建好客戶端實(shí)例,開啟會話之后,我們就可以開始創(chuàng)建節(jié)點(diǎn)了,我們使用 create 方法來創(chuàng)建節(jié)點(diǎn),F(xiàn)luent 風(fēng)格的方式可以讓我們自由組合創(chuàng)建方式。
// 節(jié)點(diǎn)路徑前必須加上/
String path = "/imooc";
// forPath 指定路徑創(chuàng)建節(jié)點(diǎn),內(nèi)容默認(rèn)為客戶端ip。默認(rèn)為持久節(jié)點(diǎn)。
client.create().forPath(path);
// 創(chuàng)建 imooc 節(jié)點(diǎn),內(nèi)容為 Wiki,內(nèi)容參數(shù)需要字節(jié)數(shù)組。
client.create().forPath(path,"Wiki".getBytes());
// 創(chuàng)建節(jié)點(diǎn)時(shí),同時(shí)創(chuàng)建它的父節(jié)點(diǎn)。withMode 聲明節(jié)點(diǎn)是什么類型的,可以使用枚舉類型CreateMode來確定。
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPa
th(path);
3.2.4 獲取節(jié)點(diǎn)數(shù)據(jù)
獲取節(jié)點(diǎn)數(shù)據(jù)我們使用 getData 方法,同時(shí)我們還可以使用 Stat 來獲取節(jié)點(diǎn)的最新狀態(tài)信息。
// 普通查詢
client.getData().forPath(path);
// 包含狀態(tài)的查詢
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath(path);
3.2.5 更新節(jié)點(diǎn)數(shù)據(jù)
更新節(jié)點(diǎn)數(shù)據(jù)我們使用 setData 方法,我們可以指定 version 來更新對應(yīng)版本的數(shù)據(jù)。如果 version 已過期,則拋出 BadVersionException 異常,表示更新節(jié)點(diǎn)數(shù)據(jù)失敗。
// 普通更新
client.setData().forPath(path,"wiki".getBytes());
// 指定版本更新
client.setData().withVersion(1).forPath(path);
3.2.4 刪除節(jié)點(diǎn)
刪除節(jié)點(diǎn)我們使用 delete 的方法,我們可以對節(jié)點(diǎn)進(jìn)行遞歸刪除,我們也可以指定 version 進(jìn)行刪除,我們還可以強(qiáng)制刪除一個(gè)節(jié)點(diǎn),只要當(dāng)前客戶端的會話有效,客戶端在后臺就會持續(xù)進(jìn)行刪除操作,直到刪除成功。
// 普通刪除
client.delete().forPath(path);
// 遞歸刪除子節(jié)點(diǎn)
client.delete().deletingChildrenIfNeeded().forPath(path);
// 指定版本刪除
client.delete().withVersion(1).forPath(path);
// 強(qiáng)制刪除
client.delete().guaranteed().forPath(path);
這里的 version 過期也會拋出 BadVersionException 異常,表示刪除失敗。
Curator 的 API 介紹完畢,我們接下來進(jìn)行 API 測試。
3.3 API 測試
我們在 Spring Boot 主函數(shù)的同級新建 service 目錄,在 service 目錄中新建 CuratorService 類來獲取客戶端實(shí)例:
package cn.cdd.curatordemo.service;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.retry.RetryUntilElapsed;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class CuratorService {
// Zookeeper 服務(wù)器地址
@Value("${curator.connectString}")
private String connectString;
// session 會話超時(shí)時(shí)間
@Value("${curator.sessionTimeoutMs}")
private int sessionTimeoutMs;
// 名稱空間:在操作節(jié)點(diǎn)時(shí),會以 namespace 為父節(jié)點(diǎn)
@Value("${curator.namespace}")
private String namespace;
/**
* session 重連策略,使用其中一種即可
*/
// RetryForever:間隔{參數(shù)1}毫秒后重連,永遠(yuǎn)重試
private RetryPolicy retryForever = new RetryForever(3000);
// RetryOneTime:{參數(shù)1}毫秒后重連,只重連一次
private RetryPolicy retryOneTime = new RetryOneTime(3000);
// RetryNTimes: {參數(shù)2}毫秒后重連,重連{參數(shù)1}次
private RetryPolicy retryNTimes = new RetryNTimes(3,3000);
// RetryUntilElapsed:每{參數(shù)2}毫秒重連一次,總等待時(shí)間超過{參數(shù)1}毫秒后停止重連
private RetryPolicy retryUntilElapsed = new RetryUntilElapsed(10000,3000);
// ExponentialBackoffRetry:可重連{參數(shù)2}次,并增加每次重連之間的睡眠時(shí)間,增加公式如下:
// {參數(shù)1} * Math.max(1,random.nextInt(1 << ({參數(shù)2:maxRetries} + 1)))
private RetryPolicy exponential = new ExponentialBackoffRetry(1000,3);
/**
* 獲取 CuratorClient
* 使用 Fluent 風(fēng)格
* @return CuratorFramework
*/
public CuratorFramework getCuratorClient(){
// 使用 CuratorFrameworkFactory 來構(gòu)建 CuratorFramework
return CuratorFrameworkFactory.builder()
// Zookeeper 服務(wù)器地址字符串
.connectString(connectString)
// session 會話超時(shí)時(shí)間
.sessionTimeoutMs(sessionTimeoutMs)
// 使用哪種重連策略
.retryPolicy(retryOneTime)
// 配置父節(jié)點(diǎn)
.namespace(namespace)
.build();
}
}
在 application.properties 配置文件中添加配置:
# Zookeeper 地址
curator.connectString=192.168.0.77:2181,192.168.0.88:2181,192.168.0.88:2181
# 會話超時(shí)時(shí)間
curator.sessionTimeoutMs=5000
# 命名空間,當(dāng)前客戶端的父節(jié)點(diǎn)
curator.namespace=imooc
配置完成后,在 CuratorDemoApplicationTests 測試類中編寫測試用例。
首先我們來測試節(jié)點(diǎn)的創(chuàng)建:
package cn.cdd.curatordemo;
import cn.cdd.curatordemo.service.CuratorService;
import org.apache.curator.framework.CuratorFramework;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class CuratorDemoApplicationTests {
// 注入 CuratorService 依賴
@Autowired
private CuratorService curatorService;
@Test
void contextLoads() throws Exception {
// 獲取客戶端
CuratorFramework curatorClient = curatorService.getCuratorClient();
// 開啟會話
curatorClient.start();
// 在 namespace 下創(chuàng)建節(jié)點(diǎn) Mooc , 節(jié)點(diǎn)前需要加 “/” 表示命名空間下的子節(jié)點(diǎn)
// 節(jié)點(diǎn)內(nèi)容為 Wiki ,使用字節(jié)數(shù)組傳入
String mooc = curatorClient.create().forPath("/Mooc", "Wiki".getBytes());
// 返回 /Mooc
System.out.println(mooc);
curatorClient.close();
}
}
控制臺輸出當(dāng)前創(chuàng)建的節(jié)點(diǎn):
/Mooc
創(chuàng)建完成后我們來查詢命名空間下的子節(jié)點(diǎn):
@Test
void contextLoads() throws Exception {
// 獲取客戶端
CuratorFramework curatorClient = curatorService.getCuratorClient();
// 開啟會話
curatorClient.start();
// 查詢命名空間下的子節(jié)點(diǎn)
List<String> strings = curatorClient.getChildren().forPath("/");
System.out.println(strings);
curatorClient.close();
}
控制臺輸出命名空間的子節(jié)點(diǎn)列表:
[Mooc]
Tips: 在我們創(chuàng)建客戶端使用了命名空間時(shí),API 中可用
/
表示命名空間,也表示當(dāng)前客戶端的根節(jié)點(diǎn)。
獲取節(jié)點(diǎn)數(shù)據(jù)測試:
@Test
void contextLoads() throws Exception {
// 獲取客戶端
CuratorFramework curatorClient = curatorService.getCuratorClient();
// 開啟會話
curatorClient.start();
// 獲取 Mooc 節(jié)點(diǎn)的內(nèi)容
byte[] bytes = curatorClient.getData().forPath("/Mooc");
// 輸出
System.out.println(new String(bytes));
curatorClient.close();
}
控制臺輸出當(dāng)前節(jié)點(diǎn)的內(nèi)容:
Wiki
更新節(jié)點(diǎn)數(shù)據(jù)測試:
@Test
void contextLoads() throws Exception {
// 獲取客戶端
CuratorFramework curatorClient = curatorService.getCuratorClient();
// 開啟會話
curatorClient.start();
// 更新節(jié)點(diǎn)數(shù)據(jù),返回當(dāng)前節(jié)點(diǎn)狀態(tài)
Stat stat = curatorClient.setData().forPath("/Mooc", "wiki".getBytes());
// 輸出
System.out.println(stat);
curatorClient.close();
}
控制臺輸出表示當(dāng)前節(jié)點(diǎn)狀態(tài)的數(shù)字:
4294967345,4294967352,1597805299226,1597850397723,1,0,0,0,4,0,4294967345
上面這串?dāng)?shù)字表示當(dāng)前節(jié)點(diǎn)的狀態(tài) Stat,我們可以查看 Stat 類來找到對應(yīng)的信息:
public class Stat implements Record {
// 創(chuàng)建節(jié)點(diǎn)時(shí)的事務(wù) id
private long czxid;
// 修改節(jié)點(diǎn)時(shí)的事務(wù) id
private long mzxid;
// 節(jié)點(diǎn)創(chuàng)建時(shí)的毫秒值
private long ctime;
// 節(jié)點(diǎn)修改時(shí)的毫秒值
private long mtime;
// 節(jié)點(diǎn)數(shù)據(jù)修改的次數(shù)
private int version;
// 子節(jié)點(diǎn)修改的次數(shù)
private int cversion;
// ACL修改的次數(shù)
private int aversion;
// 如果是臨時(shí)節(jié)點(diǎn),該值為節(jié)點(diǎn)的 SessionId,其它類型的節(jié)點(diǎn)則為 0
private long ephemeralOwner;
// 數(shù)據(jù)長度
private int dataLength;
// 子節(jié)點(diǎn)數(shù)量
private int numChildren;
// 添加和刪除子節(jié)點(diǎn)的事務(wù) id
private long pzxid;
}
刪除節(jié)點(diǎn)數(shù)據(jù)測試:
@Test
void contextLoads() throws Exception {
// 獲取客戶端
CuratorFramework curatorClient = curatorService.getCuratorClient();
// 開啟會話
curatorClient.start();
// 刪除節(jié)點(diǎn)
curatorClient.delete().forPath("/Mooc");
curatorClient.close();
}
執(zhí)行完成后,我們再次查詢命名空間下的子節(jié)點(diǎn):
@Test
void contextLoads() throws Exception {
// 獲取客戶端
CuratorFramework curatorClient = curatorService.getCuratorClient();
// 開啟會話
curatorClient.start();
// 查詢命名空間下的子節(jié)點(diǎn)
List<String> strings = curatorClient.getChildren().forPath("/");
System.out.println(strings);
curatorClient.close();
}
控制臺輸出為空,表示刪除成功
[]
Tips: 使用 API 時(shí),我們需要注意是否配置 namespace ,如果沒有配置 namespace 的話,我們使用 API 進(jìn)行操作時(shí),path 參數(shù)需要填寫全路徑。如果配置了 namespace ,我們使用 API 時(shí),Curator 會自動幫我們在 path 前加上 namespace 。
4. 總結(jié)
本節(jié)我們學(xué)習(xí)了 Curator 是什么,Curator 可以是實(shí)現(xiàn)什么功能,我們還介紹了 Curator 常用的 API,并做了相應(yīng)的測試。以下是本節(jié)內(nèi)容的總結(jié):
- 為什么要學(xué)習(xí)使用 Curator 客戶端。
- Curator 常用的 API。
- 使用 Spring Boot 集成 Curator。