本文是大数据系列第 30 篇,通过 ZkClient 库演示如何用 Java 代码操作 ZooKeeper,包括建立连接、节点增删、子节点监听与数据变更监听。

完整图文版(含截图):CSDN 原文 | 掘金

为什么用 ZkClient

ZooKeeper 官方 Java 客户端(org.apache.zookeeper.ZooKeeper)API 较底层,需要手动处理 Session 重连、Watcher 重新注册等细节。ZkClient 是对官方客户端的高层封装,提供:

  • 自动重连与 Session 恢复
  • 持久化 Watcher(内部自动重注册)
  • 同步 API,减少回调嵌套

Maven 依赖

<dependencies>
    <!-- ZooKeeper 核心库,版本与服务端保持一致 -->
    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.8.4</version>
    </dependency>

    <!-- ZkClient 高层封装 -->
    <dependency>
        <groupId>com.101tec</groupId>
        <artifactId>zkclient</artifactId>
        <version>0.11</version>
    </dependency>
</dependencies>

建立 Session 连接

import org.I0Itec.zkclient.ZkClient;

public class ZkDemo {
    // 连接字符串:多节点用逗号分隔,客户端会自动负载均衡
    private static final String ZK_SERVERS = "h121.wzk.icu:2181,h122.wzk.icu:2181,h123.wzk.icu:2181";

    public static void main(String[] args) throws InterruptedException {
        ZkClient zkClient = new ZkClient(ZK_SERVERS);
        System.out.println("ZooKeeper session created.");

        // 保持主线程运行,等待异步通知
        Thread.sleep(Long.MAX_VALUE);
    }
}

ZkClient 构造函数会阻塞直到连接建立成功。

节点操作

创建持久节点(递归)

// 第二个参数 true 表示递归创建父级目录(类似 mkdir -p)
// 如果 /wzk-java 不存在,会自动创建
zkClient.createPersistent("/wzk-java/temp", true);
System.out.println("ZooKeeper create ZNode: /wzk-java/temp");

等价 zkCli 命令:

create -p /wzk-java ""
create /wzk-java/temp ""

创建临时节点

// 临时节点:Session 断开后自动删除,不可有子节点
zkClient.createEphemeral("/wzk-java/session-lock");
System.out.println("Ephemeral node created.");

写入数据

// 节点已存在时写入,不存在时报错(需先 createPersistent)
zkClient.writeData("/wzk-java/temp", "hello-zookeeper");

读取数据

String data = zkClient.readData("/wzk-java/temp");
System.out.println("Node data: " + data);

删除节点(递归)

// deleteRecursive 会递归删除节点及所有子节点
zkClient.deleteRecursive("/wzk-java");
System.out.println("ZooKeeper delete recursive: /wzk-java");

等价 zkCli 命令:

deleteall /wzk-java

监听:子节点变更

subscribeChildChanges 监听指定路径下子节点列表的增删变化,不监听目标节点本身的数据变化。

// 先创建父节点
zkClient.createPersistent("/wzk-data", true);

zkClient.subscribeChildChanges("/wzk-data", new IZkChildListener() {
    @Override
    public void handleChildChange(String parentPath, List<String> currentChilds) {
        System.out.println("子节点变化!");
        System.out.println("  父节点路径: " + parentPath);
        System.out.println("  当前子节点列表: " + currentChilds);
    }
});

System.out.println("已注册子节点监听,等待变更...");

触发测试(zkCli):

# 创建子节点 → 触发通知
create /wzk-data/child-1 "data1"

# 再创建一个
create /wzk-data/child-2 "data2"

# 删除子节点 → 再次触发
delete /wzk-data/child-1

输出示例:

子节点变化!
  父节点路径: /wzk-data
  当前子节点列表: [child-1]

子节点变化!
  父节点路径: /wzk-data
  当前子节点列表: [child-1, child-2]

子节点变化!
  父节点路径: /wzk-data
  当前子节点列表: [child-2]

ZkClient 的 subscribeChildChanges 内部会在每次通知后自动重新注册,无需手动处理。

监听:节点数据变更

subscribeDataChanges 监听指定节点的数据修改节点删除事件。

由于 ZkClient 默认序列化器无法反序列化普通字符串,需切换为 SerializableSerializer(或自定义序列化器):

import org.I0Itec.zkclient.serialize.SerializableSerializer;

// 切换序列化器(String 实现了 Serializable,可直接用)
zkClient.setZkSerializer(new SerializableSerializer());

// 先写入初始数据
zkClient.createPersistent("/wzk-data/test-data", true);
zkClient.writeData("/wzk-data/test-data", "initial-value");

// 注册数据变更监听
zkClient.subscribeDataChanges("/wzk-data/test-data", new IZkDataListener() {
    @Override
    public void handleDataChange(String dataPath, Object data) {
        System.out.println("数据改变: " + dataPath + " → " + data);
    }

    @Override
    public void handleDataDeleted(String dataPath) {
        System.out.println("节点被删除: " + dataPath);
    }
});

System.out.println("已注册数据监听,等待变更...");

触发测试(zkCli):

# 修改数据 → 触发 handleDataChange
set /wzk-data/test-data "updated-value"

# 再次修改
set /wzk-data/test-data "final-value"

# 删除节点 → 触发 handleDataDeleted
delete /wzk-data/test-data

输出示例:

数据改变: /wzk-data/test-data → updated-value
数据改变: /wzk-data/test-data → final-value
节点被删除: /wzk-data/test-data

ZkClient vs 原生 API 对比

能力原生 ZooKeeper APIZkClient
Watcher 重注册手动,每次触发后需重新调用自动,内部持久监听
Session 重连需自行实现重连逻辑自动重连
异常处理需处理 KeeperException封装为 ZkException
API 风格异步回调为主同步 + 异步均支持
递归创建/删除不支持createPersistent(path, true) / deleteRecursive

完整示例代码结构

public class ZooKeeperJavaDemo {
    public static void main(String[] args) throws InterruptedException {
        ZkClient zkClient = new ZkClient("h121.wzk.icu:2181");

        // 1. 创建节点
        zkClient.createPersistent("/wzk-java/data", true);
        zkClient.writeData("/wzk-java/data", "hello");

        // 2. 注册子节点监听
        zkClient.subscribeChildChanges("/wzk-java", (parent, children) -> {
            System.out.println("Children changed: " + children);
        });

        // 3. 注册数据监听
        zkClient.setZkSerializer(new SerializableSerializer());
        zkClient.subscribeDataChanges("/wzk-java/data", new IZkDataListener() {
            public void handleDataChange(String path, Object data) {
                System.out.println("Data changed: " + data);
            }
            public void handleDataDeleted(String path) {
                System.out.println("Node deleted: " + path);
            }
        });

        // 4. 等待事件
        Thread.sleep(60_000);

        // 5. 清理
        zkClient.deleteRecursive("/wzk-java");
        System.out.println("Cleanup done.");
    }
}

小结

ZkClient 大幅简化了 ZooKeeper Java 编程的复杂度:递归节点操作、自动重连、持久化 Watcher 都开箱即用。实际生产中,Curator Framework(Apache 维护)提供更完善的高层抽象(分布式锁、Leader 选举等),是 ZkClient 的进阶替代方案。本系列的 ZooKeeper 基础篇到此结束,下一阶段将进入 Kafka 消息队列的学习。