This is article 32 in the Big Data series. Demonstrates how to implement fair distributed lock using ZooKeeper ephemeral sequential nodes, with complete Java code.

Complete illustrated version: CSDN Original | Juejin

Why Distributed Lock is Needed

In single-machine scenarios, JVM’s synchronized / ReentrantLock can guarantee thread safety, but in multi-node deployment, each JVM process is isolated from each other, local locks cannot coordinate across processes. Typical overselling problem: multiple servers simultaneously process inventory deduction request for same product, without cross-process lock, concurrent conflicts occur.

ZooKeeper Distributed Lock Principle

ZooKeeper implements fair lock using ephemeral sequential nodes:

  1. All clients create ephemeral sequential nodes under /lock directory, e.g., /lock/0000000001, /lock/0000000002
  2. Get all child nodes under /lock and sort
  3. Client with smallest sequence number node holds the lock
  4. Other clients only monitor their predecessor node (not smallest node), avoid “herd effect”
  5. After predecessor node is deleted, client that receives notification re-checks if it becomes smallest node

Ephemeral node feature guarantees lock automatically released after lock-holding client crashes, no deadlocks.

Environment Configuration

ZooKeeper cluster address: h121.wzk.icu:2181,h122.wzk.icu:2181,h123.wzk.icu:2181 (three-node 2C4G)

Maven dependency for ZkClient:

<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.11</version>
</dependency>

Complete Code Implementation

LockTest — Concurrency Test Entry

package icu.wzk.zk.demo02;

public class LockTest {

    public static void main(String[] args) {
        // Start 10 concurrent threads simulating distributed competition
        for (int i = 0; i < 10; i++) {
            new Thread(new LockRunnable()).start();
        }
    }

    static class LockRunnable implements Runnable {
        @Override
        public void run() {
            final ClientTest clientTest = new ClientTest();
            clientTest.getLock();          // Acquire lock
            try {
                Thread.sleep(2000);        // Simulate business processing
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            clientTest.deleteLock();       // Release lock
        }
    }
}

ClientTest — Distributed Lock Core Implementation

package icu.wzk.zk.demo02;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class ClientTest {

    private ZkClient zkClient = new ZkClient(
        "h121.wzk.icu:2181,h122.wzk.icu:2181,h123.wzk.icu:2181"
    );

    String beforeNodePath;    // Predecessor node path
    String currentNodePath;   // Current client created node path
    CountDownLatch countDownLatch = null;

    public ClientTest() {
        // Ensure /lock parent node exists, synchronized to prevent duplicate creation
        synchronized (ClientTest.class) {
            if (!zkClient.exists("/lock")) {
                zkClient.createPersistent("/lock");
            }
        }
    }

    /**
     * Try to get lock: create ephemeral sequential node, check if smallest sequence
     */
    public boolean tryGetLock() {
        if (null == currentNodePath || currentNodePath.isEmpty()) {
            currentNodePath = zkClient.createEphemeralSequential("/lock/", "lock");
        }
        final List<String> childs = zkClient.getChildren("/lock");
        Collections.sort(childs);
        final String minNode = childs.get(0);

        if (currentNodePath.equals("/lock/" + minNode)) {
            // Current node has smallest sequence, lock acquired successfully
            return true;
        } else {
            // Find predecessor node
            final int i = Collections.binarySearch(
                childs, currentNodePath.substring("/lock/".length())
            );
            beforeNodePath = "/lock/" + childs.get(i - 1);
            return false;
        }
    }

    /**
     * Wait for predecessor node deletion, use CountDownLatch to block current thread
     */
    public void waitForLock() {
        final IZkDataListener iZkDataListener = new IZkDataListener() {
            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {
                // Don't care about data changes
            }

            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                // Predecessor node deleted, wake up waiting thread
                countDownLatch.countDown();
            }
        };

        zkClient.subscribeDataChanges(beforeNodePath, iZkDataListener);

        if (zkClient.exists(beforeNodePath)) {
            countDownLatch = new CountDownLatch(1);
            try {
                countDownLatch.await();   // Block and wait
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        zkClient.unsubscribeDataChanges(beforeNodePath, iZkDataListener);
    }

    /**
     * Release lock: delete current node and close connection
     */
    public void deleteLock() {
        if (zkClient != null) {
            zkClient.delete(currentNodePath);
            zkClient.close();
        }
    }

    /**
     * Get lock entry: try first, if fail wait for predecessor then recursive retry
     */
    public void getLock() {
        final String threadName = Thread.currentThread().getName();
        if (tryGetLock()) {
            System.out.println(threadName + ": Lock acquired!");
        } else {
            System.out.println(threadName + ": Waiting for lock...");
            waitForLock();
            getLock();   // Recursive retry
        }
    }
}

Execution Flow Analysis

  1. 10 threads simultaneously call getLock(), each creates ephemeral sequential node under /lock
  2. Thread with smallest sequence immediately gets lock, other threads monitor their direct predecessor node
  3. Lock-holding thread executes business logic (sleep(2000) simulation), calls deleteLock() to delete node
  4. Next thread whose predecessor was deleted receives Watcher event, CountDownLatch unblocks, recursively calls getLock() to acquire lock
  5. Repeat above process until all 10 threads complete

Key Design Points

PointDescription
Ephemeral sequential nodeGuarantees lock automatically released after session disconnect, avoids deadlock
Monitor only predecessorPrevents herd effect, releasing lock only wakes one waiter
CountDownLatchCoordinate asynchronous Watcher events with main thread waiting
Recursive retryHandle race condition between Watcher arrival and node deletion

Summary

ZooKeeper distributed lock achieves fair, highly reliable mutual exclusion through ephemeral sequential nodes + predecessor monitoring. Compared to Redis locks, ZooKeeper locks have stronger consistency in network partition scenarios, suitable for critical business scenarios with high correctness requirements.