本文是大数据系列第 32 篇,演示如何利用 ZooKeeper 临时顺序节点实现公平分布式锁,并提供完整 Java 代码。

完整图文版(含截图):CSDN 原文 | 掘金

为什么需要分布式锁

单机场景下 JVM 的 synchronized / ReentrantLock 可以保证线程安全,但在多节点部署时,各 JVM 进程之间互相隔离,本地锁无法跨进程协调。典型的超卖问题:多台服务器同时处理同一商品的库存扣减请求,若无跨进程锁,就会出现并发冲突。

ZooKeeper 分布式锁原理

ZooKeeper 利用临时顺序节点实现公平锁:

  1. 所有客户端在 /lock 目录下创建临时顺序节点,例如 /lock/0000000001/lock/0000000002
  2. 获取 /lock 下所有子节点并排序
  3. 序号最小的节点对应的客户端持有锁
  4. 其他客户端只监听自己的前驱节点(而非最小节点),避免”羊群效应”
  5. 前驱节点被删除后,收到通知的客户端重新检查是否成为最小节点

临时节点的特性保证了持锁客户端崩溃后锁自动释放,不会产生死锁。

环境配置

ZooKeeper 集群地址:h121.wzk.icu:2181,h122.wzk.icu:2181,h123.wzk.icu:2181(三节点 2C4G)

Maven 依赖引入 ZkClient:

<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.11</version>
</dependency>

完整代码实现

LockTest — 并发测试入口

package icu.wzk.zk.demo02;

public class LockTest {

    public static void main(String[] args) {
        // 启动 10 个并发线程模拟分布式竞争
        for (int i = 0; i < 10; i++) {
            new Thread(new LockRunnable()).start();
        }
    }

    static class LockRunnable implements Runnable {
        @Override
        public void run() {
            final ClientTest clientTest = new ClientTest();
            clientTest.getLock();          // 获取锁
            try {
                Thread.sleep(2000);        // 模拟业务处理
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            clientTest.deleteLock();       // 释放锁
        }
    }
}

ClientTest — 分布式锁核心实现

package icu.wzk.zk.demo02;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class ClientTest {

    private ZkClient zkClient = new ZkClient(
        "h121.wzk.icu:2181,h122.wzk.icu:2181,h123.wzk.icu:2181"
    );

    String beforeNodePath;    // 前驱节点路径
    String currentNodePath;   // 当前客户端创建的节点路径
    CountDownLatch countDownLatch = null;

    public ClientTest() {
        // 确保 /lock 父节点存在,synchronized 防止重复创建
        synchronized (ClientTest.class) {
            if (!zkClient.exists("/lock")) {
                zkClient.createPersistent("/lock");
            }
        }
    }

    /**
     * 尝试获取锁:创建临时顺序节点,判断是否为序号最小的节点
     */
    public boolean tryGetLock() {
        if (null == currentNodePath || currentNodePath.isEmpty()) {
            currentNodePath = zkClient.createEphemeralSequential("/lock/", "lock");
        }
        final List<String> childs = zkClient.getChildren("/lock");
        Collections.sort(childs);
        final String minNode = childs.get(0);

        if (currentNodePath.equals("/lock/" + minNode)) {
            // 当前节点序号最小,获锁成功
            return true;
        } else {
            // 找到前驱节点
            final int i = Collections.binarySearch(
                childs, currentNodePath.substring("/lock/".length())
            );
            beforeNodePath = "/lock/" + childs.get(i - 1);
            return false;
        }
    }

    /**
     * 等待前驱节点删除,使用 CountDownLatch 阻塞当前线程
     */
    public void waitForLock() {
        final IZkDataListener iZkDataListener = new IZkDataListener() {
            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {
                // 数据变更不关心
            }

            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                // 前驱节点删除,唤醒等待线程
                countDownLatch.countDown();
            }
        };

        zkClient.subscribeDataChanges(beforeNodePath, iZkDataListener);

        if (zkClient.exists(beforeNodePath)) {
            countDownLatch = new CountDownLatch(1);
            try {
                countDownLatch.await();   // 阻塞等待
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        zkClient.unsubscribeDataChanges(beforeNodePath, iZkDataListener);
    }

    /**
     * 释放锁:删除当前节点并关闭连接
     */
    public void deleteLock() {
        if (zkClient != null) {
            zkClient.delete(currentNodePath);
            zkClient.close();
        }
    }

    /**
     * 获取锁入口:先尝试,失败则等待前驱后递归重试
     */
    public void getLock() {
        final String threadName = Thread.currentThread().getName();
        if (tryGetLock()) {
            System.out.println(threadName + ": 获锁成功!");
        } else {
            System.out.println(threadName + ": 等待锁...");
            waitForLock();
            getLock();   // 递归重试
        }
    }
}

执行流程分析

  1. 10 个线程同时调用 getLock(),各自在 /lock 下创建临时顺序节点
  2. 序号最小的线程立即获锁,其余线程监听各自的直接前驱节点
  3. 持锁线程执行业务逻辑(sleep(2000) 模拟),调用 deleteLock() 删除节点
  4. 前驱被删除的下一个线程收到 Watcher 事件,CountDownLatch 解除阻塞,递归调用 getLock() 获得锁
  5. 重复上述过程直到 10 个线程全部完成

关键设计要点

要点说明
临时顺序节点保证会话断开后锁自动释放,避免死锁
只监听前驱防止羊群效应,释放锁只唤醒一个等待者
CountDownLatch协调异步 Watcher 事件与主线程等待
递归重试处理 Watcher 到达和节点已删除之间的竞态条件

小结

ZooKeeper 分布式锁通过临时顺序节点 + 前驱监听的方式,实现了公平、高可靠的互斥访问。相比 Redis 锁,ZooKeeper 锁在网络分区场景下一致性更强,适合对正确性要求高的关键业务场景。