本文是大数据系列第 30 篇,通过 ZkClient 库演示如何用 Java 代码操作 ZooKeeper,包括建立连接、节点增删、子节点监听与数据变更监听。
为什么用 ZkClient
ZooKeeper 官方 Java 客户端(org.apache.zookeeper.ZooKeeper)API 较底层,需要手动处理 Session 重连、Watcher 重新注册等细节。ZkClient 是对官方客户端的高层封装,提供:
- 自动重连与 Session 恢复
- 持久化 Watcher(内部自动重注册)
- 同步 API,减少回调嵌套
Maven 依赖
<dependencies>
<!-- ZooKeeper 核心库,版本与服务端保持一致 -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.8.4</version>
</dependency>
<!-- ZkClient 高层封装 -->
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.11</version>
</dependency>
</dependencies>
建立 Session 连接
import org.I0Itec.zkclient.ZkClient;
public class ZkDemo {
// 连接字符串:多节点用逗号分隔,客户端会自动负载均衡
private static final String ZK_SERVERS = "h121.wzk.icu:2181,h122.wzk.icu:2181,h123.wzk.icu:2181";
public static void main(String[] args) throws InterruptedException {
ZkClient zkClient = new ZkClient(ZK_SERVERS);
System.out.println("ZooKeeper session created.");
// 保持主线程运行,等待异步通知
Thread.sleep(Long.MAX_VALUE);
}
}
ZkClient 构造函数会阻塞直到连接建立成功。
节点操作
创建持久节点(递归)
// 第二个参数 true 表示递归创建父级目录(类似 mkdir -p)
// 如果 /wzk-java 不存在,会自动创建
zkClient.createPersistent("/wzk-java/temp", true);
System.out.println("ZooKeeper create ZNode: /wzk-java/temp");
等价 zkCli 命令:
create -p /wzk-java ""
create /wzk-java/temp ""
创建临时节点
// 临时节点:Session 断开后自动删除,不可有子节点
zkClient.createEphemeral("/wzk-java/session-lock");
System.out.println("Ephemeral node created.");
写入数据
// 节点已存在时写入,不存在时报错(需先 createPersistent)
zkClient.writeData("/wzk-java/temp", "hello-zookeeper");
读取数据
String data = zkClient.readData("/wzk-java/temp");
System.out.println("Node data: " + data);
删除节点(递归)
// deleteRecursive 会递归删除节点及所有子节点
zkClient.deleteRecursive("/wzk-java");
System.out.println("ZooKeeper delete recursive: /wzk-java");
等价 zkCli 命令:
deleteall /wzk-java
监听:子节点变更
subscribeChildChanges 监听指定路径下子节点列表的增删变化,不监听目标节点本身的数据变化。
// 先创建父节点
zkClient.createPersistent("/wzk-data", true);
zkClient.subscribeChildChanges("/wzk-data", new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) {
System.out.println("子节点变化!");
System.out.println(" 父节点路径: " + parentPath);
System.out.println(" 当前子节点列表: " + currentChilds);
}
});
System.out.println("已注册子节点监听,等待变更...");
触发测试(zkCli):
# 创建子节点 → 触发通知
create /wzk-data/child-1 "data1"
# 再创建一个
create /wzk-data/child-2 "data2"
# 删除子节点 → 再次触发
delete /wzk-data/child-1
输出示例:
子节点变化!
父节点路径: /wzk-data
当前子节点列表: [child-1]
子节点变化!
父节点路径: /wzk-data
当前子节点列表: [child-1, child-2]
子节点变化!
父节点路径: /wzk-data
当前子节点列表: [child-2]
ZkClient 的
subscribeChildChanges内部会在每次通知后自动重新注册,无需手动处理。
监听:节点数据变更
subscribeDataChanges 监听指定节点的数据修改和节点删除事件。
由于 ZkClient 默认序列化器无法反序列化普通字符串,需切换为 SerializableSerializer(或自定义序列化器):
import org.I0Itec.zkclient.serialize.SerializableSerializer;
// 切换序列化器(String 实现了 Serializable,可直接用)
zkClient.setZkSerializer(new SerializableSerializer());
// 先写入初始数据
zkClient.createPersistent("/wzk-data/test-data", true);
zkClient.writeData("/wzk-data/test-data", "initial-value");
// 注册数据变更监听
zkClient.subscribeDataChanges("/wzk-data/test-data", new IZkDataListener() {
@Override
public void handleDataChange(String dataPath, Object data) {
System.out.println("数据改变: " + dataPath + " → " + data);
}
@Override
public void handleDataDeleted(String dataPath) {
System.out.println("节点被删除: " + dataPath);
}
});
System.out.println("已注册数据监听,等待变更...");
触发测试(zkCli):
# 修改数据 → 触发 handleDataChange
set /wzk-data/test-data "updated-value"
# 再次修改
set /wzk-data/test-data "final-value"
# 删除节点 → 触发 handleDataDeleted
delete /wzk-data/test-data
输出示例:
数据改变: /wzk-data/test-data → updated-value
数据改变: /wzk-data/test-data → final-value
节点被删除: /wzk-data/test-data
ZkClient vs 原生 API 对比
| 能力 | 原生 ZooKeeper API | ZkClient |
|---|---|---|
| Watcher 重注册 | 手动,每次触发后需重新调用 | 自动,内部持久监听 |
| Session 重连 | 需自行实现重连逻辑 | 自动重连 |
| 异常处理 | 需处理 KeeperException | 封装为 ZkException |
| API 风格 | 异步回调为主 | 同步 + 异步均支持 |
| 递归创建/删除 | 不支持 | createPersistent(path, true) / deleteRecursive |
完整示例代码结构
public class ZooKeeperJavaDemo {
public static void main(String[] args) throws InterruptedException {
ZkClient zkClient = new ZkClient("h121.wzk.icu:2181");
// 1. 创建节点
zkClient.createPersistent("/wzk-java/data", true);
zkClient.writeData("/wzk-java/data", "hello");
// 2. 注册子节点监听
zkClient.subscribeChildChanges("/wzk-java", (parent, children) -> {
System.out.println("Children changed: " + children);
});
// 3. 注册数据监听
zkClient.setZkSerializer(new SerializableSerializer());
zkClient.subscribeDataChanges("/wzk-java/data", new IZkDataListener() {
public void handleDataChange(String path, Object data) {
System.out.println("Data changed: " + data);
}
public void handleDataDeleted(String path) {
System.out.println("Node deleted: " + path);
}
});
// 4. 等待事件
Thread.sleep(60_000);
// 5. 清理
zkClient.deleteRecursive("/wzk-java");
System.out.println("Cleanup done.");
}
}
小结
ZkClient 大幅简化了 ZooKeeper Java 编程的复杂度:递归节点操作、自动重连、持久化 Watcher 都开箱即用。实际生产中,Curator Framework(Apache 维护)提供更完善的高层抽象(分布式锁、Leader 选举等),是 ZkClient 的进阶替代方案。本系列的 ZooKeeper 基础篇到此结束,下一阶段将进入 Kafka 消息队列的学习。