Skip to content

客户端API操作

1. IDEA环境搭建

  1. 创建一个工程:zookeeper-client

  2. 添加pom依赖

xml
<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.8.4</version>
</dependency>
<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.13.2</version>
    <!-- 指明在测试中使用 -->
    <scope>test</scope>
</dependency>
  1. 创建logback.xml日志配置文件
    需要在项目的src/test/resources目录下,新建一个文件,命名为"logback-test.xml",在文件中填入以下内容。
详细信息
xml
<?xml version="1.0" encoding="UTF-8" ?>
<!--
configuration是logback的根标签
scan="true" 表示配置文件发生了改变会自动加载
scanPeriod="60 seconds" 检测配置文件修改的时间间隔,默认的单位是毫秒,这里设置的表示每分钟检测
debug="false" debug如果设置为true,表示会打印出来logback自身实时的运行信息,这里就不需要了,因为logback自身运行时非常稳定的,不需要对它做日志
-->
<configuration
        scan="true"
        scanPeriod="60 seconds"
        debug="false">

    <!--**************配置********************-->

    <!--优雅关机 为了防止进程退出时,内存中的数据丢失,请加上此选项
        5 seconds 表示服务会等待5秒后执行释放资源的相关操作,默认位0秒-->
    <shutdownHook class="ch.qos.logback.core.hook.DelayingShutdownHook">
        <delay>5 seconds</delay>
    </shutdownHook>

    <!--**************定义常量参数********************-->

    <!--定义一些参数常量-->
    <!--定义日志的输出地址 用于windows电脑日志存放地址路径-->
    <property name="log.filepath" value="${user.dir}/log"/>

    <!--定义日志的输出地址 用于linux服务器日志存放地址路径
        source="logging.file.path" 指向 yml 或者 properties 配置文件里面的值-->
    <springProperty scope="context"  name="log.path.linux"  source="logging.file.path"/>

    <!--
    定义日志展示格式
    %d{yyyy-MM-dd HH:mm:ss} 定义一条日志的展示时间
    [%thread] 表示执行日志的线程名称
    %-5level 空出5个空格,然后显示日志级别
    %logger{100} 显示在哪个类(全限定类名)里面记录的日志,后面的{100}表示这个类名展示的最大长度是100
    %msg 表示的日志信息
    %n 表示换行
    -->
    <property name="log.pattern" value="%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{100} - %msg%n"/>

    <!--彩色日志展示格式-->
    <property name="console.log.pattern" value="%red(%d{yyyy-MM-dd HH:mm:ss}) %green([%thread]) %highlight(%-5level) %boldMagenta(%logger{36}) - %msg%n"/>

    <!--**************设置输出媒介********************-->

    <!--定义日志输出媒介-->
    <!--定义控制台输出,class指定的具体输出要使动的类-->
    <appender name="consoleAppender" class="ch.qos.logback.core.ConsoleAppender">
        <!--appender其实是负责统一调度日志的输出工作,而具体的日志的格式化工作和输出的工作会交给encoder-->
        <encoder>
            <!--定义日志输出的格式-->
            <pattern>${console.log.pattern}</pattern>
            <!--设置输出内容编码-->
            <charset>utf-8</charset>
        </encoder>
        <!--过滤底层日志记录器向上层抛出的日志
            因为consoleAppender只需要记录INFO及其以上级别的日志,对于其他的日志不需要,
            所以只需要下面设置即可实现只显示INFO及其以上级别的日志-->
        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
            <level>INFO</level>
        </filter>
    </appender>

    <!--
    接下来还要定义不同级别的日志输出的媒介
    对于日志框架来说,日志是有级别的,logback的日志级别有如下:
    trace(追踪) < debug(调试) < info(记录信息) < warn(警告) < error(错误)
    再记录日志的时候,一般需要使用到一个Logger类的对象,然后可以调用如下方法:
    looger.trace("xxxx"),logger.debug("xxxx"),....等等
    如果记录的日志级别是info的话,info是大于trace和debug的,这个时候调用trace和debug发方法记录的日志是无法显示的。而调用warn和error是可以展示的。
    一般情况下在开发项目的时候,只需要记录debug,info,error的日志就行了,而这个三个级别的日志需要放在不同的文件中
    -->
    <!--定义debug级别的日志输出,RollingFileAppender表示滚动日志,可以按天或者按月来生成不同的日志文件-->
    <appender name="debugAppender" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <!--文件路径-->
        <file>${log.filepath}/debug.log</file>
        <!--定义滚动策略,TimeBasedRollingPolicy是一个基于时间的滚动策略-->
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <!--设置按时间的方式来生成文件,如果文件名是用.gz结尾的,那么logback会自动压
                缩日志文件,如果不需要压缩,可以直接把.gz删除就行了
                %d{yyyy-MM-dd} 表示按天保存日志
                %d{yyyy-MM-dd_HH_mm} 表示按分钟保存日志-->
            <fileNamePattern>${log.filepath}/debug-%d{yyyy-MM-dd}.log.gz</fileNamePattern>
            <!--表示日志文件的最大大小,当日志文件达到该大小时,会进行滚动输出,创建一个新的日志文件-->
            <maxFileSize>1024MB</maxFileSize>
            <!--设置文件最大保存的历史数据,这里就默认30份就行了,也就是最大保存30个文件-->
            <maxHistory>30</maxHistory>
        </rollingPolicy>
        <!--定义日志格式-->
        <encoder>
            <!--定义日志输出的格式-->
            <pattern>${log.pattern}</pattern>
            <!--设置输出内容编码-->
            <charset>utf-8</charset>
        </encoder>
        <!--需要注意 <onMatch>ACCEPT</onMatch> 和 <onMismatch>DENY</onMismatch>
            标签只用于日志记录到文件的类型、控制台输出不能使用
            因为debugAppender只需要记录debug级别的日志,对于其他的日志不需要,
            所以要加一个过滤器
            <onMatch>ACCEPT</onMatch> 和 <onMismatch>DENY</onMismatch> 两个标签
            可以实现只显示某一级别的日志,使用这两个标签后,会把非匹配级别的日志直接过滤掉,
            如果只是用了 <level>DEBUG</level> 标签,那么设置级别(包括设置的级别)以上
            的级别都会被记录下来-->
        <filter class="ch.qos.logback.classic.filter.LevelFilter">
            <level>DEBUG</level>
            <!--如果是DEBUG日志,同意被记录到文件-->
            <onMatch>ACCEPT</onMatch>
            <!--如果不是DEBUG日志,直接忽略-->
            <onMismatch>DENY</onMismatch>
        </filter>
    </appender>

    <!--定义info级别的日志的appender-->
    <appender name="infoAppender" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <!--文件路径-->
        <file>${log.filepath}/info.log</file>
        <!--定义滚动策略,TimeBasedRollingPolicy是一个基于时间的滚动策略-->
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <!--设置按时间的方式来生成文件,如果文件名是用.gz结尾的,那么logback会自动压
                缩日志文件,如果不需要压缩,可以直接把.gz删除就行了
                %d{yyyy-MM-dd} 表示按天保存日志
                %d{yyyy-MM-dd_HH_mm} 表示按分钟保存日志-->
            <fileNamePattern>${log.filepath}/info-%d{yyyy-MM-dd}.log.gz</fileNamePattern>
            <!--表示日志文件的最大大小,当日志文件达到该大小时,会进行滚动输出,创建一个新的日志文件-->
            <maxFileSize>1024MB</maxFileSize>
            <!--设置文件最大保存的历史数据,这里就默认30份就行了,也就是最大保存30个文件-->
            <maxHistory>30</maxHistory>
        </rollingPolicy>
        <!--定义日志格式-->
        <encoder>
            <!--定义日志输出的格式-->
            <pattern>${log.pattern}</pattern>
            <!--设置输出内容编码-->
            <charset>utf-8</charset>
        </encoder>
        <!--需要注意 <onMatch>ACCEPT</onMatch> 和 <onMismatch>DENY</onMismatch>
            标签只用于日志记录到文件的类型、控制台输出不能使用
            因为infoAppender只需要记录debug级别的日志,对于其他的日志不需要,
            所以要加一个过滤器
            <onMatch>ACCEPT</onMatch> 和 <onMismatch>DENY</onMismatch> 两个标签
            可以实现只显示某一级别的日志,使用这两个标签后,会把非匹配级别的日志直接过滤掉,
            如果只是用了 <level>DEBUG</level> 标签,那么设置级别(包括设置的级别)以上
            的级别都会被记录下来-->
        <filter class="ch.qos.logback.classic.filter.LevelFilter">
            <level>INFO</level>
            <!--如果是INFO日志,同意被记录到文件-->
            <onMatch>ACCEPT</onMatch>
            <!--如果不是INFO日志,直接忽略-->
            <onMismatch>DENY</onMismatch>
        </filter>
    </appender>

    <!--定义error级别的日志的appender-->
    <appender name="errorAppender" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <!--文件路径-->
        <file>${log.filepath}/error.log</file>
        <!--定义滚动策略,TimeBasedRollingPolicy是一个基于时间的滚动策略-->
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <!--设置按时间的方式来生成文件,如果文件名是用.gz结尾的,那么logback会自动压
                缩日志文件,如果不需要压缩,可以直接把.gz删除就行了
                %d{yyyy-MM-dd} 表示按天保存日志
                %d{yyyy-MM-dd_HH_mm} 表示按分钟保存日志-->
            <fileNamePattern>${log.filepath}/error-%d{yyyy-MM-dd_HH_mm}.log.gz</fileNamePattern>
            <!--表示日志文件的最大大小,当日志文件达到该大小时,会进行滚动输出,创建一个新的日志文件-->
            <maxFileSize>1024MB</maxFileSize>
            <!--设置文件最大保存的历史数据,这里就默认2份就行了,也就是最大保存2个文件-->
            <maxHistory>2</maxHistory>
        </rollingPolicy>
        <!--定义日志格式-->
        <encoder>
            <!--定义日志输出的格式-->
            <pattern>${log.pattern}</pattern>
            <!--设置输出内容编码-->
            <charset>utf-8</charset>
        </encoder>
        <!--需要注意 <onMatch>ACCEPT</onMatch> 和 <onMismatch>DENY</onMismatch>
            标签只用于日志记录到文件的类型、控制台输出不能使用
            因为errorAppender只需要记录debug级别的日志,对于其他的日志不需要,
            所以要加一个过滤器
            <onMatch>ACCEPT</onMatch> 和 <onMismatch>DENY</onMismatch> 两个标签
            可以实现只显示某一级别的日志,使用这两个标签后,会把非匹配级别的日志直接过滤掉,
            如果只是用了 <level>DEBUG</level> 标签,那么设置级别(包括设置的级别)以上
            的级别都会被记录下来-->
        <filter class="ch.qos.logback.classic.filter.LevelFilter">
            <level>ERROR</level>
            <!--如果是ERROR日志,同意被记录到文件-->
            <onMatch>ACCEPT</onMatch>
            <!--如果不是ERROR日志,直接忽略-->
            <onMismatch>DENY</onMismatch>
        </filter>
    </appender>

    <!--**************设置日志记录器(日志记录器+输出媒介即可输出日志)********************-->
    <!--注意一:日志记录器和输出媒介都可以设置日志输出级别,日志记录器是第一层过滤,输出媒介是第二层过滤
        注意二:自定义的日志记录器优先级大于根日志记录器-->

    <!--根日志记录器,所有包下面的日志都会被记录-->
    <root level="DEBUG">
        <appender-ref ref="debugAppender"/>
        <appender-ref ref="infoAppender"/>
        <appender-ref ref="errorAppender"/>
    </root>

    <!--根日志记录器,所有包下面的日志都会被记录-->
    <root level="DEBUG">
        <appender-ref ref="consoleAppender"/>
    </root>

    <!--自定义记录器
        配置自己写的代码的日志记录器
        name="com.dkk" 表示对应包(包含子包)里面的代码产生的日志,会被下面的日志记录器记录,否则不会被记录
        level="DEBUG" 表示日志记录的级别是DEBUG,如果不指定level的时候,这个日志记录器对应的级别自动继承根记录器
        additivity="true" 表示日志事件是否向父节点logger传递。当additivity属性设置为true时,日志事件不仅会被当前logger处理,也会被传递给父节点logger进行处理。这意味着,如果一个logger的配置的appender(媒介输出器)是上级logger(如root logger也就是根记录器)配置的appender,那么当此logger的additivity属性为true时,日志信息会同时输出到当前logger和上级logger的appender中。反之,如果additivity属性设为false,则日志事件只会被当前的logger处理,不会传递给上级logger。因此,根据实际的项目需求,可以灵活设置additivity属性来控制日志信息的输出路径-->
    <logger name="com.jack.zkcase1" level="ERROR" additivity="true">
        <appender-ref ref="consoleAppender"/>
    </logger>
</configuration>
  1. 在src/main/test/java目录下创建包名com.jack.zkcase1
  2. 创建类名称ZkClientTest Alt text

2. 创建ZooKeeper客户端

java
package com.jack.zkcase1;

import org.apache.zookeeper.*;
import org.junit.Before;
import org.junit.Test;
import static org.apache.zookeeper.Watcher.Event.*;

import java.util.List;

public class ZkClientTest {
    // zk地址前后不能包含空格,每个地址用逗号隔开
    private static String connectString = "hadoop105:2181,hadoop106:2181,hadoop107:2181";
    private static int sessionTimeout = 2000;
    private ZooKeeper zkClient = null;

    @Before
    public void init() throws Exception {
        zkClient = new ZooKeeper(connectString, sessionTimeout, event -> {
            if (EventType.None.getIntValue()==event.getType().getIntValue()) {
                // 连接状态改变
                if (event.getState().getIntValue() == KeeperState.SyncConnected.getIntValue()) {
                    // 连接成功
                    System.out.println("--连接上zk集群--");
                } else if (event.getState().getIntValue() == KeeperState.Disconnected.getIntValue()) {
                    // 连接断开
                    System.out.println("--断开连接--");
                } else if (event.getState().getIntValue() == KeeperState.Expired.getIntValue()) {
                    // 会话过期
                    System.out.println("--会话过期--");
                }
            }
        });
    }
}

3. 创建子节点

java
// 创建子节点
@Test
public void createNode() throws Exception {
    // 参数 1:要创建的节点的路径; 参数 2:节点数据 ; 参数 3:节点权限 ;参数 4:节点的类型
    // 创建永久节点
    String nodeCreated1 = zkClient.create("/sanguo",
            "陈寿".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
            CreateMode.PERSISTENT);
    System.out.println("路径已经创建: "+nodeCreated1);
    // 创建临时节点
    String nodeCreated2 = zkClient.create("/sanguo/diaochan",
            "貂蝉".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
            CreateMode.EPHEMERAL);
    System.out.println("路径已经创建: "+nodeCreated2);
    // 创建永久带序号节点
    String nodeCreated3 = zkClient.create("/sanguo/xiaoqiao",
            "小乔".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
            CreateMode.PERSISTENT_SEQUENTIAL);
    System.out.println("路径已经创建: "+nodeCreated3);
    // 创建临时带序列化的节点
    String nodeCreated4 = zkClient.create("/sanguo/daqiao",
            "大乔".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
            CreateMode.EPHEMERAL_SEQUENTIAL);
    System.out.println("路径已经创建: "+nodeCreated4);
    // 3.5.3版本新增类型,Container容器节点,当容器中没有任何子节点,该容器节点会被zk定期删除
    String nodeCreated5 = zkClient.create("/sanguo/zhenfu",
            "甄宓".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
            CreateMode.CONTAINER);
    System.out.println("路径已经创建: "+nodeCreated5);
    // 创建10s后自动删除的永久节点
    String nodeCreated6 = zkClient.create("/sanguo/sunsangxiang",
            "孙尚香".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
            CreateMode.PERSISTENT_WITH_TTL, null, 10000);
    System.out.println("路径已经创建: "+nodeCreated6);
    // 创建10s后自动删除的永久序列节点
    String nodeCreated7 = zkClient.create("/sanguo/caiwenji",
            "蔡文姬".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
            CreateMode.PERSISTENT_SEQUENTIAL_WITH_TTL, null, 10000);
    System.out.println("路径已经创建: "+nodeCreated7);
}

执行代码,控制台日志输出:
Alt text 发现创建带ttl性质的节点报错,原来Zookeeper默认禁用创建带过期时间节点,需要在zoo.cfg中添加配置项extendedTypesEnabled=true手动开启。
程序执行结束后,查看zk节点树,发现永久和永久带序号的节点还在:
Alt text

提示

从日志可以看出,注册节点目录的监听只能监听一次,后续节点发生变化,监听会失效。

4. 获取子节点并监听节点变化

java
// 获取子节点
@Test
public void getChildren() throws Exception {
    List<String> children = zkClient.getChildren("/sanguo", event -> {
        // 收到事件通知后的回调函数
        // 节点事件
        if (event.getType().getIntValue() == EventType.NodeCreated.getIntValue()) {
            // 节点被创建
            System.out.println("节点被创建: "+event.getPath());
        } else if (event.getType().getIntValue() == EventType.NodeDeleted.getIntValue()) {
            // 节点被删除
            System.out.println("节点被删除: "+event.getPath());
        } else if (event.getType().getIntValue() == EventType.NodeDataChanged.getIntValue()) {
            // 节点数据改变
            System.out.println("节点数据改变: "+event.getPath());
        } else if (event.getType().getIntValue() == EventType.NodeChildrenChanged.getIntValue()) {
            // 子节点数据改变
            System.out.println(event.getPath()+"路径下子节点数据改变");
        }
    });
    System.out.println("/sanguo监听注册完成,目前路径有:  ");
    for (String child : children) {
        System.out.println("子路径:  /"+child);
    }
    // 延时阻塞,等待监听事件消息回调
    Thread.sleep(Long.MAX_VALUE);
}
  1. 执行代码,IDEA控制台上看到如下节点: Alt text
  2. hadoop105的客户端上创建一个节点/sanguo/ganfuren,观察IDEA控制台
sh
[zk: localhost:2181(CONNECTED) 12] create /sanguo/ganfuren "甘夫人"
Created /sanguo/ganfuren

Alt text 3. 在hadoop102的客户端上删除节点/sanguo/ganfuren,观察IDEA控制台

sh
[zk: localhost:2181(CONNECTED) 13] delete /sanguo/ganfuren

发现控制台没有变化,监听器已经失效。

5. 注册永久监听器

注册永久监听器有两种方式:

  1. 使用zookeeper的新API: addWatch()方法。
  2. 使用zookeeper的API: getChildren()方法,在触发事件回调里面最后再次注册监听。
    这里使用第1种方式:
java
@Test
public void registerForever() throws Exception {
    // 需要提前创建好/sanguo/shuguo
    zkClient.addWatch("/sanguo/shuguo", event -> {
        // 收到事件通知后的回调函数
        // 节点事件
        if (event.getType().getIntValue() == EventType.NodeCreated.getIntValue()) {
            // 节点被创建
            System.out.println("节点被创建: " + event.getPath());
        } else if (event.getType().getIntValue() == EventType.NodeDeleted.getIntValue()) {
            // 节点被删除
            System.out.println("节点被删除: " + event.getPath());
        } else if (event.getType().getIntValue() == EventType.NodeDataChanged.getIntValue()) {
            // 节点数据改变
            System.out.println("节点数据改变: " + event.getPath());
        } else if (event.getType().getIntValue() == EventType.NodeChildrenChanged.getIntValue()) {
            // 子节点数据改变
            System.out.println(event.getPath() + "路径下子节点数据改变");
        }
    }, AddWatchMode.PERSISTENT_RECURSIVE);
    System.out.println("/sanguo/shuguo监听注册完成");
    // 延时阻塞,等待监听事件消息回调
    Thread.sleep(Long.MAX_VALUE);
}
  1. 执行代码,控制台显示注册永久监听器成功 Alt text
  2. hadoop105的客户端上创建节点,观察IDEA控制台
sh
[zk: localhost:2181(CONNECTED) 28] create -e /sanguo/shuguo/zhugeliang "诸葛亮"
Created /sanguo/shuguo/zhugeliang
[zk: localhost:2181(CONNECTED) 29] create -s /sanguo/shuguo/zhangfei "张飞"
Created /sanguo/shuguo/zhangfei0000000005

Alt text

6. 判断Znode是否存在

java
 // 判断 znode 是否存在
@Test
public void exist() throws Exception {
    Stat stat = zkClient.exists("/jack", false);
    System.out.println("/jack " + (stat == null ? "not exist" : "exist"));
}

执行代码,,观察IDEA控制台 Alt text