本文是大数据系列第 32 篇,演示如何利用 ZooKeeper 临时顺序节点实现公平分布式锁,并提供完整 Java 代码。
为什么需要分布式锁
单机场景下 JVM 的 synchronized / ReentrantLock 可以保证线程安全,但在多节点部署时,各 JVM 进程之间互相隔离,本地锁无法跨进程协调。典型的超卖问题:多台服务器同时处理同一商品的库存扣减请求,若无跨进程锁,就会出现并发冲突。
ZooKeeper 分布式锁原理
ZooKeeper 利用临时顺序节点实现公平锁:
- 所有客户端在
/lock目录下创建临时顺序节点,例如/lock/0000000001、/lock/0000000002 - 获取
/lock下所有子节点并排序 - 序号最小的节点对应的客户端持有锁
- 其他客户端只监听自己的前驱节点(而非最小节点),避免”羊群效应”
- 前驱节点被删除后,收到通知的客户端重新检查是否成为最小节点
临时节点的特性保证了持锁客户端崩溃后锁自动释放,不会产生死锁。
环境配置
ZooKeeper 集群地址:h121.wzk.icu:2181,h122.wzk.icu:2181,h123.wzk.icu:2181(三节点 2C4G)
Maven 依赖引入 ZkClient:
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.11</version>
</dependency>
完整代码实现
LockTest — 并发测试入口
package icu.wzk.zk.demo02;
public class LockTest {
public static void main(String[] args) {
// 启动 10 个并发线程模拟分布式竞争
for (int i = 0; i < 10; i++) {
new Thread(new LockRunnable()).start();
}
}
static class LockRunnable implements Runnable {
@Override
public void run() {
final ClientTest clientTest = new ClientTest();
clientTest.getLock(); // 获取锁
try {
Thread.sleep(2000); // 模拟业务处理
} catch (InterruptedException e) {
e.printStackTrace();
}
clientTest.deleteLock(); // 释放锁
}
}
}
ClientTest — 分布式锁核心实现
package icu.wzk.zk.demo02;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class ClientTest {
private ZkClient zkClient = new ZkClient(
"h121.wzk.icu:2181,h122.wzk.icu:2181,h123.wzk.icu:2181"
);
String beforeNodePath; // 前驱节点路径
String currentNodePath; // 当前客户端创建的节点路径
CountDownLatch countDownLatch = null;
public ClientTest() {
// 确保 /lock 父节点存在,synchronized 防止重复创建
synchronized (ClientTest.class) {
if (!zkClient.exists("/lock")) {
zkClient.createPersistent("/lock");
}
}
}
/**
* 尝试获取锁:创建临时顺序节点,判断是否为序号最小的节点
*/
public boolean tryGetLock() {
if (null == currentNodePath || currentNodePath.isEmpty()) {
currentNodePath = zkClient.createEphemeralSequential("/lock/", "lock");
}
final List<String> childs = zkClient.getChildren("/lock");
Collections.sort(childs);
final String minNode = childs.get(0);
if (currentNodePath.equals("/lock/" + minNode)) {
// 当前节点序号最小,获锁成功
return true;
} else {
// 找到前驱节点
final int i = Collections.binarySearch(
childs, currentNodePath.substring("/lock/".length())
);
beforeNodePath = "/lock/" + childs.get(i - 1);
return false;
}
}
/**
* 等待前驱节点删除,使用 CountDownLatch 阻塞当前线程
*/
public void waitForLock() {
final IZkDataListener iZkDataListener = new IZkDataListener() {
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
// 数据变更不关心
}
@Override
public void handleDataDeleted(String dataPath) throws Exception {
// 前驱节点删除,唤醒等待线程
countDownLatch.countDown();
}
};
zkClient.subscribeDataChanges(beforeNodePath, iZkDataListener);
if (zkClient.exists(beforeNodePath)) {
countDownLatch = new CountDownLatch(1);
try {
countDownLatch.await(); // 阻塞等待
} catch (InterruptedException e) {
e.printStackTrace();
}
}
zkClient.unsubscribeDataChanges(beforeNodePath, iZkDataListener);
}
/**
* 释放锁:删除当前节点并关闭连接
*/
public void deleteLock() {
if (zkClient != null) {
zkClient.delete(currentNodePath);
zkClient.close();
}
}
/**
* 获取锁入口:先尝试,失败则等待前驱后递归重试
*/
public void getLock() {
final String threadName = Thread.currentThread().getName();
if (tryGetLock()) {
System.out.println(threadName + ": 获锁成功!");
} else {
System.out.println(threadName + ": 等待锁...");
waitForLock();
getLock(); // 递归重试
}
}
}
执行流程分析
- 10 个线程同时调用
getLock(),各自在/lock下创建临时顺序节点 - 序号最小的线程立即获锁,其余线程监听各自的直接前驱节点
- 持锁线程执行业务逻辑(
sleep(2000)模拟),调用deleteLock()删除节点 - 前驱被删除的下一个线程收到 Watcher 事件,
CountDownLatch解除阻塞,递归调用getLock()获得锁 - 重复上述过程直到 10 个线程全部完成
关键设计要点
| 要点 | 说明 |
|---|---|
| 临时顺序节点 | 保证会话断开后锁自动释放,避免死锁 |
| 只监听前驱 | 防止羊群效应,释放锁只唤醒一个等待者 |
| CountDownLatch | 协调异步 Watcher 事件与主线程等待 |
| 递归重试 | 处理 Watcher 到达和节点已删除之间的竞态条件 |
小结
ZooKeeper 分布式锁通过临时顺序节点 + 前驱监听的方式,实现了公平、高可靠的互斥访问。相比 Redis 锁,ZooKeeper 锁在网络分区场景下一致性更强,适合对正确性要求高的关键业务场景。