Skip to content

Zookeeper应用实操

1. 服务器动态上下线监听

1.1 需求

某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线。

1.2 需求分析

Alt text

1.3 具体实现

  1. 先在集群上创建/servers节点
sh
[zk: localhost:2181(CONNECTED) 10] create /servers "servers"
Created /servers
  1. 在Idea创建包名:com.jack.zkcase2
  2. 客户端代码,监听zk上目录的信息(也就是服务器注册主机名信息),动态获取上线的主机,来进行下一步操作。
java
package com.jack.zkcase2;

import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import static org.apache.zookeeper.Watcher.Event.*;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class DistributedClient {
    private static String connectString = "hadoop105:2181,hadoop106:2181,hadoop107:2181";
    private static int sessionTimeout = 2000;
    private ZooKeeper zk = null;

    // 创建到 zk 的客户端连接
    public void getConnect() throws IOException {
        zk = new ZooKeeper(connectString, sessionTimeout, event -> {
            try {
                if (event.getType().getIntValue() == EventType.None.getIntValue()) {
                    if (Watcher.Event.EventType.None.getIntValue() == event.getType().getIntValue()) {
                        // 连接状态改变
                        if (event.getState().getIntValue() == KeeperState.SyncConnected.getIntValue()) {
                            // 连接成功
                            System.out.println("--连接上zk集群--");
                        } else if (event.getState().getIntValue() == KeeperState.Disconnected.getIntValue()) {
                            // 连接断开
                            System.out.println("--断开连接--");
                        } else if (event.getState().getIntValue() == KeeperState.Expired.getIntValue()) {
                            // 会话过期
                            System.out.println("--会话过期--");
                        }
                    }
                } else {
                    // 再次启动监听节点变化
                    getServerList();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }

    // 获取服务器列表信息
    public void getServerList() throws Exception {
        // 1 获取服务器子节点信息,并且对父节点进行监听
        List<String> children = zk.getChildren("/servers", true);
        // 2 存储服务器信息列表
        ArrayList<String> servers = new ArrayList<>();
        System.out.println("/servers监听注册完成,目前路径有:  ");
        // 3 遍历所有节点,获取节点中的主机名称信息
        for (String child : children) {
            byte[] data = zk.getData("/servers/" + child, false, null);
            servers.add(new String(data));
        }
        // 4 打印服务器列表信息
        System.out.println(servers);
    }

    // 业务功能
    public void business() throws Exception {
        System.out.println("client is working ...");
        Thread.sleep(Long.MAX_VALUE);
    }

    public static void main(String[] args) throws Exception {
        // 1 获取 zk 连接
        DistributedClient client = new DistributedClient();
        client.getConnect();
        // 2 获取 servers 的子节点信息,从中获取服务器信息列表
        client.getServerList();
        // 3 业务进程启动
        client.business();
    }
}
  1. 服务器端向Zookeeper注册主机名,服务端内部代码通过运行main方法时从args传入变量主机名,向zk注册临时节点。实现代码如下:
java
import org.apache.zookeeper.*;

import java.io.IOException;

public class DistributedServer {
    private static String connectString = "hadoop105:2181,hadoop106:2181,hadoop107:2181";
    private static int sessionTimeout = 2000;
    private ZooKeeper zk = null;
    private String parentNode = "/servers";

    // 创建到 zk 的客户端连接
    public void getConnect() throws IOException {
        zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
            }
        });
    }

    // 注册服务器
    public void registServer(String hostname) throws Exception {
        // 利用临时节点如果连接zk断开就消失特性
        String create = zk.create("/servers/"+hostname, hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println(hostname + " is online " + create);
    }

    // 业务功能
    public void business(String hostname) throws Exception {
        System.out.println(hostname + " is working ...");
        Thread.sleep(Long.MAX_VALUE);
    }

    public static void main(String[] args) throws Exception {
        // 1 获取 zk 连接
        DistributeServer server = new DistributeServer();
        server.getConnect();
        // 2 利用 zk 连接注册服务器信息
        server.registServer(args[0]);
        // 3 启动业务功能
        server.business(args[0]);
    }
}

1.5 测试

  1. 启动DistributedClient客户端,监听可用的主机名。
  2. 启动DistributeServer服务, 需要在IDEA中设置运行参数,参数为主机名。
    1️⃣ 点击Edit Configurations…
    Alt text 2️⃣ 在弹出的窗口中(Program arguments)输入想启动的主机,例如,hadoop106 Alt text 3️⃣ 回到DistributeServer的main方法,右键,在弹出的窗口中点击Run->"DistributeServer.main()"
    Alt text
    4️⃣ 观察DistributedServer控制台,提示hadoop107 is working
    Alt text 5️⃣ 观察DistributedClient控制台,提示hadoop107已经上线 Alt text

2. ZooKeeper分布式锁

什么叫做分布式锁呢?
比如说"进程 1"在使用该资源的时候,会先去获得锁,"进程 1"获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,"进程 1"用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁。

1.1 分布式锁分析

Alt text

3. 原生Zookeeper实现分布式锁

java
public class DistributedLockTest {

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        // 分布式锁 1
        DistributedLock lock1 = new DistributedLock();
        // 分布式锁 2
        DistributedLock lock2 = new DistributedLock();
        new Thread(() -> {
            // 获取锁对象
            lock1.zkLock();
            System.out.println("线程1获取锁。。。");
            try {
                System.out.println("开始处理业务逻辑。。。。");
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            lock1.zkUnlock();
            System.out.println("线程1释放锁");
        }).start();

        new Thread(() -> {
            // 获取锁对象
            lock2.zkLock();
            System.out.println("线程2获取锁。。。");
            try {
                System.out.println("开始处理业务逻辑。。。。");
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            lock2.zkUnlock();
            System.out.println("线程2释放锁");
        }).start();
    }
}
java
// zookeeper server 列表
    private String connectString = "hadoop105:2181,hadoop106:2181,hadoop107:2181";
    // 超时时间
    private int sessionTimeout = 5000;
    private ZooKeeper zk;
    private String rootNode = "/locks";
    private String subNode = "seq-";
    // 当前 client 等待的子节点
    private String createdPath;
    private String waitPath;
    //ZooKeeper 连接
    private CountDownLatch connectLatch = new CountDownLatch(1);
    private CountDownLatch waitLatch = new CountDownLatch(1);

    DistributedLock() throws IOException, InterruptedException, KeeperException {
        // 建立连接
        zk = new ZooKeeper(connectString, sessionTimeout, event -> {
            // 连接建立时, 打开 latch, 唤醒 wait 在该 latch 上的线程
            if(event.getState()== Watcher.Event.KeeperState.SyncConnected){
                connectLatch.countDown();
            }
            // 发生了 waitPath 的删除事件
            if(event.getType() == Watcher.Event.EventType.NodeDeleted && event.getPath().equals(waitPath)){
                waitLatch.countDown();
            }
        });
        // 等待zk连接成功
        connectLatch.await();
        // 判断根路径/locks,没有就创建
        Stat stat = zk.exists(rootNode, false);
        if (stat == null) {
            // 创建根节点
            zk.create(rootNode, "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
    }

    // 加锁
    public void zkLock() {
        try {
            createdPath = zk.create(rootNode + "/" + subNode, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            List<String> children = zk.getChildren(rootNode, false);
            // 如果子节点只有一个,那么就是最小的
            if (children.size() == 1) {
                return;
            } else {
                // 子节点不止一个,比对自己的节点是否是子节点中最小的
                Collections.sort(children);
                String createChild = createdPath.substring((rootNode+"/").length());
                int index = children.indexOf(createChild);
                if(index ==-1){
                    throw new RuntimeException("数据异常");
                } else if (index==0) {
                    return;
                }else {
                    // 如果不是最小的,需要监听比自己小的节点路径,等待他消失
                    waitPath = rootNode+"/"+children.get(index - 1);
                    zk.getChildren(waitPath, true);
                    waitLatch.await();
                }
            }
        } catch (KeeperException e) {
            throw new RuntimeException(e);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    // 解锁
    public void zkUnlock() {
        //删除节点
        try {
            zk.delete(createdPath, -1);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } catch (KeeperException e) {
            throw new RuntimeException(e);
        }
    }
}

执行测试:
Alt text

4. Curator框架实现分布式锁

4.1 Curator优点

  1. 原生的Java API开发存在的问题:
    • 会话连接是异步的,需要自己去处理。比如使用CountDownLatch
    • Watch 需要重复注册,不然就不能生效
    • 开发的复杂性还是比较高的
    • 不支持多节点删除和创建。需要自己去递归
  2. Curator是一个专门解决分布式锁的框架,解决了原生JavaAPI开发分布式遇到的问题。官网地址:https://curator.apache.org/index.html

4.2 Curator案例实操

  1. 添加依赖
xml
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>5.6.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>5.6.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-client</artifactId>
    <version>5.6.0</version>
</dependency>
  1. 代码实现
java
public class CuratorLockTest {
    private String rootNode = "/locks";
    // zookeeper server 列表
    private String connectString = "hadoop105:2181,hadoop106:2181,hadoop107:2181";
    // connection 超时时间
    private int connectionTimeout = 2000;
    // session 超时时间
    private int sessionTimeout = 2000;
    private CuratorFramework client;
    @Before
    public void init(){
        //重试策略,初试时间 3 秒,重试 3 次
        RetryPolicy policy = new ExponentialBackoffRetry(3000, 3);
        client = CuratorFrameworkFactory.newClient(connectString, sessionTimeout, connectionTimeout, policy);
        client.start();
        System.out.println("zookeeper 初始化完成...");
    }

    @Test
    public void testLock(){
        final InterProcessLock lock1 = new InterProcessMutex(client, rootNode);
        final InterProcessLock lock2 = new InterProcessMutex(client, rootNode);
        new Thread(()->{
            try {
                lock1.acquire();
                System.out.println("线程1获取到锁");
                Thread.sleep(5);
                // 测试锁重入
                System.out.println("线程1再次获取到锁");
                lock1.acquire();

                lock1.release();
                System.out.println("线程1释放锁");
                Thread.sleep(5);
                lock1.release();
                System.out.println("线程1再次释放锁");
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }, "线程1").start();

        new Thread(()->{
            try {
                lock2.acquire();
                System.out.println("线程2获取到锁");
                Thread.sleep(5);
                lock2.acquire();
                System.out.println("线程2再次获取到锁");
                // 测试锁重入
                lock2.release();
                System.out.println("线程2释放锁");
                Thread.sleep(5);
                lock2.release();
                System.out.println("线程2再次释放锁");
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }, "线程2").start();
    }
}

执行代码,查看IDEA控制台: Alt text