08-Zookeeper操作

nobility 发布于 2022-07-11 2188 次阅读


Zookeeper操作

ZooKeeper命令行客户端

进入Zookeeper根目录执行./bin/zkCli.sh脚本可连接到本机2181端口的Zookeeper服务,如果想链接到远程则可以加 -server IP:端口参数,输入quit命令可退出命令行客户端

查询操作

  • ls:查看当前节点的子节点
    • -s:查看当前节点的子节点,同时查看当前节点的状态信息,老版本的Zookeeper是使用ls2命令
  • stat:查看当前节点的状态信息
  • get:获取当前节点存储的数据

节点状态包含以下信息:

cZxid = 0x0                                #创建znode更改的事务ID
ctime = Thu Jan 01 08:00:00 CST 1970       #节点创建时间
mZxid = 0x0                                #修改znode更改的事务ID
mtime = Thu Jan 01 08:00:00 CST 1970       #节点修改时间
pZxid = 0x0                                #添加或删除子节点的znode更改的事务ID
cversion = -1                              #对该znode的子节点进行的更改次数
dataVersion = 0                            #对该znode的数据所做的更改次数
aclVersion = 0                             #对该znode的ACL进行更改的次数
ephemeralOwner = 0x0                       #临时节点拥有者SessionID,如果是永久节点该值为0
dataLength = 0                             #znode数据字段的长度
numChildren = 1                            #子节点个数

增删改操作

  • create:创建节点,后跟创建路径即znode,空格分隔可填入该znode的数据
    • e:创建临时节点
    • s:顺序节点,会自动在节点后增加自增序列
  • set:设置节点数据,后跟创建路径即znode,空格分隔可填入该znode的数据
    • v:dataVersion实现乐观锁(版本不匹配会报错),老版本是在path后跟空格填入dataVersion
  • delete:删除节点,后跟创建路径即znode
    • v:dataVersion实现乐观锁(版本不匹配会报错),老版本是在path后跟空格填入dataVersion

watcher机制

  • 对每一个znode的操作,都有一个监督者wathcer,其实就是一个触发器,当被监控的znode发生变化时就会触发wathcer事件
  • ZooKeeper中的wathcer是一次性的,触发后就会立即销毁
  • 父节点、子节点增删改都能触发wathcher事件,针对不同类型的操作,watcher事件分为
    • 当前节点创建事件NodeCreated
    • 当前节点删除事件NodeDataChanged
    • 当前节点更新事件NodeDeleted
    • 当前节点子节点增删NodeChildrenChanged

为指定znode设置watcher事件主要有如下几种方式:无论节点是否存在都可以设置

  • 新版本get -w 路径,老版本get 路径 watch
  • 新版本stat -w 路径,老版本stat 路径 watch
  • 新版本ls -w 路径,老版本ls 路径 wathc

watcher机制主要用于统一资源配置:通常ZooKeeper是以集群形式进行部署,集群中所有节点的znode信息是一致的,znode中的数据为配置信息,当更新某个节点的配置信息时,由于ZooKeeper集群具有强一致性,所以无论当前我们的服务器连接的是集群中的哪一个节点,都会收到wathcher事件,触发事件后做出响应的动作,比如更新配置

权限控制

每个znode都可以设置相关的读写权限来保证数据的安全性,acl通过scheme:auth:permissions三个字段来构成的字符串,分别代表:

  • permissions:权限组合字符串crdwa,分别是以下的缩写,如果有该权限就有该首字母
    • create:创建子节点
    • read:获取当前节点和子节点列表
    • write:设置当前节点的数据
    • delete:删除子节点
    • admin:设置当前节点的权限
  • auth:允许访问的认证授权组合,比如冒号分隔的用户名和密码、ip
  • scheme:权限机制,主要有如下5种
    • world:世界上的全部,world下只有一个auth即anyone,并且不包含密码
    • auth:认证登入,需要添加认证授权信息后才具有权限(输入的是明文密码;输入的是明文,保存到库中的是密文)
    • digest:认证登入,需要添加认证授权信息后才具有权限(输入的是,先通过SHA1加密,再通过BASE54加密,的密文密码;输入的是什么,保存到库中的就是什么)
    • ip:当设置为指定ip时(也可以是一个网段),只有该ip地址才有权限访问
    • super:超级管理员,拥有所有权限,需要修改zkServer.sh文件,搜索nohup,在该启动命令中添加参数"-Dzookeeper.DigestAuthenticationProvider.superDigest=用户名:先通过SHA1加密再通过BASE54加密的密文密码",再启动Zookeeper

权限控制主要涉及到以下命令:

  • getAcl:获取节点的acl权限信息
  • setAcl:设置某个节点acl权限信息
  • addauth digest:添加认证授权组合

ALC主要用于将开发/测试环境分离、生产环境控制指定ip的服务可访问节点

四字命令

Zookeeper可以通过非CLI方式进行交互,因为所有命令都是四个字母所以称为四字命令,需要使用到nc命令,所以需要使用yum install nc安装该命令,使用该命令首先需要在Zookeeper的配置文件中增加4lw.commands.whitelist=*配置项进行启动,否则会报错命令 is not executed because it is not in the whitelist表示该命令不在白名单中

命令的使用格式为:echo 命令 | nc ip 端口,常用命令如下:

  • echo stat | nc localhost 2181 :查看zookeeper状态信息
  • echo ruok | nc localhost 2181:查看zookeeper是否启动
  • echo dump | nc localhost 2181:查看未处理的会话和临时节点
  • echo conf | nc localhost 2181:查看zookeeper配置信息
  • echo cons | nc localhost 2181:查看连接到zookeeper的客户端信息
  • echo envi | nc localhost 2181:查看zookeeper的环境变量信息
  • echo mntr | nc localhost 2181:查看zookeeper健康信息
  • echo wchs | nc localhost 2181:查看zookeeper的watch数量
  • echo wchc | nc localhost 2181:查看通过session维度对watch进行分类
  • echo wchp | nc localhost 2181:查看通过path维度对watch进行分类

ZooKeeper原生Java API客户端

基本配置

将下面的依赖代码加入pom.xml文件中

<dependency>
  <groupId>org.apache.zookeeper</groupId>
  <artifactId>zookeeper</artifactId>
  <version>x.x.x</version>
</dependency>

连接代码如下

public class Main {
    public static void main(String[] args) throws IOException, InterruptedException {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        // 客户端连接zookeeper是一个异步操作,watcher事件回调函数,如果不需要那就设置为null
        ZooKeeper zooKeeper = new ZooKeeper(
                "localhost:2181",  // 连接服务器的ip字符串,多个ip代表集群逗号分隔,也可以在ip后加路径
                2000,  // session超时时间,无法收到心跳请求就超时
                watchedEvent -> {
                    System.out.println("连接成功,触发的watcher事件为" + watchedEvent);
                    countDownLatch.countDown();
                }
        );
        System.out.println("连接状态为" + zooKeeper.getState());  // CONNECTING
        countDownLatch.await();
        System.out.println("连接状态为" + zooKeeper.getState());  // CONNECTED
    }
}

如果连接中途由于未收到心跳,导致断开连接,可以重新连接

public class Main {
    public static void main(String[] args) throws IOException, InterruptedException {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 2000, watchedEvent -> countDownLatch.countDown());
        
        System.out.println("连接状态为" + zooKeeper.getState());
        countDownLatch.await();
        System.out.println("连接状态为" + zooKeeper.getState());

        long sessionId = zooKeeper.getSessionId();  // 连接后暂存sessionId
        byte[] sessionPasswd = zooKeeper.getSessionPasswd();  // 连接后暂存sessionPasswd

        /* 重连 */
        final CountDownLatch reCountDownLatch = new CountDownLatch(1);
        ZooKeeper reZooKeeper = new ZooKeeper("localhost:2181", 2000, watchedEvent -> reCountDownLatch.countDown(), sessionId, sessionPasswd);
        // 通过sessionId和sessionPasswd进行重连
        System.out.println("重新连接状态为" + reZooKeeper.getState());
        reCountDownLatch.await();
        System.out.println("重新连接状态为" + reZooKeeper.getState());

        System.out.println("对比sessionId:" + (sessionId == reZooKeeper.getSessionId()));
        System.out.println("对比sessionPasswd:" + Arrays.equals(sessionPasswd, reZooKeeper.getSessionPasswd()));

        System.out.println("转化sessionId:0x" + Long.toHexString(sessionId));  // 将sessionId转化为字符串
    }
}

查询操作

获取节点数据
public class Main {
    public static void single() throws IOException, InterruptedException, KeeperException {
        final CountDownLatch countDownLatch = new CountDownLatch(3);
        ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 2000,
                watchedEvent -> {
                    if (watchedEvent.getType() == Watcher.Event.EventType.None) {
                        System.out.println("连接成功");
                    } else {
                        System.out.println("触发其他Watched事件为:" + watchedEvent);
                        countDownLatch.countDown();
                    }
                });
        /* 同步获取节点数据 */
        Stat stat = new Stat();  // 作为getData方法对参数,会回填状态对象
        byte[] data = zooKeeper.getData(
                "/test", // 获取节点路径
                true,  // 是否为该节点设置watch事件,若设置为true,会执行创建zooKeeper时的Watcher(只会执行一次)
                stat  // 回填stat对象
        );
        System.out.println("回填stat对象为:" + stat);
        System.out.println("节点的数据为:" + new String(data, StandardCharsets.UTF_8));

        /* 异步获取节点数据 */
        zooKeeper.getData(
                "/test", // 获取节点路径
                true,  // 是否为该节点设置watch事件,若设置为true,会执行创建zooKeeper时的Watcher(只会执行一次)
                (rc, path, ctx, callbackData, callbackStat) -> {
                    System.out.println("return code为:" + rc);  // 0
                    System.out.println("获取的节点为:" + path); // /test
                    System.out.println("节点的数据为:" + new String(callbackData));
                    System.out.println("传入的上下文为:" + ctx); // ctx
                    System.out.println("回填stat对象为:" + callbackStat);
                    countDownLatch.countDown();
                },
                "ctx"
        );
        /* 手动多次修改该节点数据(修改子节点个数无用),验证创建zooKeeper时的Watcher只会执行一次 */
        countDownLatch.await(60, TimeUnit.SECONDS);
    }

    public static void repeatedly() throws IOException, InterruptedException, KeeperException {
        final CountDownLatch countDownLatch = new CountDownLatch(4);
        ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 2000, null);
        /* 同步获取节点数据 */
        Stat stat = new Stat();  // 作为getData方法对参数,会回填状态对象
        byte[] data = zooKeeper.getData(
                "/test", // 获取节点路径
                watchedEvent -> {
                    System.out.println("触发Watched事件为:" + watchedEvent);
                    countDownLatch.countDown();
                },  // 为该节点设置watch事件,并且监听,只会执行一次
                stat  // 回填stat对象
        );
        System.out.println("回填stat对象为:" + stat);
        System.out.println("节点的数据为:" + new String(data, "UTF-8"));

        /* 异步获取节点数据 */
        zooKeeper.getData(
                "/test", // 获取节点路径
                watchedEvent -> {
                    System.out.println("触发Watched事件为:" + watchedEvent);
                    countDownLatch.countDown();
                },  // 为该节点设置watch事件,并且监听,只会执行一次
                (rc, path, ctx, callbackData, callbackStat) -> {
                    System.out.println("return code为:" + rc);  // 0
                    System.out.println("获取的节点为:" + path); // /test
                    System.out.println("节点的数据为:" + new String(callbackData));
                    System.out.println("传入的上下文为:" + ctx); // ctx
                    System.out.println("回填stat对象为:" + callbackStat);
                    countDownLatch.countDown();
                },
                "ctx"
        );
        /* 手动多次修改该节点数据(修改子节点个数无用),验证Watcher只会执行一次 */
        countDownLatch.await(60, TimeUnit.SECONDS);
    }

    public static void main(String[] args) throws InterruptedException, IOException, KeeperException {
//        repeatedly();
        single();
    }
}
获取子节点
public class Main {
    public static void single() throws IOException, InterruptedException, KeeperException {
        final CountDownLatch countDownLatch = new CountDownLatch(4);
        ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 2000,
                watchedEvent -> {
                    if (watchedEvent.getType() == Watcher.Event.EventType.None) {
                        System.out.println("连接成功");
                    } else {
                        System.out.println("触发其他Watched事件为:" + watchedEvent);
                        countDownLatch.countDown();  // 1
                    }
                });
        /* 同步获取子节点 */
        List<String> children = zooKeeper.getChildren(
                "/test", // 获取节点路径
                true // 是否为该节点设置子节点个数变化对watch事件,若设置为true,会执行创建zooKeeper时的Watcher(只会执行一次)
        );
        System.out.println("子节点为:"); children.forEach(System.out::println); System.out.println("----");

        /* 异步获取子节点 */
        zooKeeper.getChildren(
                "/test", // 获取节点路径
                true,  // 是否为该节点设置子节点个数变化对watch事件,若设置为true,会执行创建zooKeeper时的Watcher(只会执行一次)
                (rc, path, ctx, list) -> {
                    System.out.println("return code为:" + rc);  // 0
                    System.out.println("获取的节点为:" + path); // /test
                    System.out.println("传入的上下文为:" + ctx); // ctx
                    System.out.println("子节点为:"); list.forEach(System.out::println); System.out.println("----");
                    countDownLatch.countDown();  // 2
                },
                "ctx"
        );

        /* 异步获取子节点V2 */
        zooKeeper.getChildren(
                "/test", // 获取节点路径
                true,  // 是否为该节点设置子节点个数变化对watch事件,若设置为true,会执行创建zooKeeper时的Watcher(只会执行一次)
                (rc, path, ctx, list, stat) -> {
                    System.out.println("return code为:" + rc);  // 0
                    System.out.println("获取的节点为:" + path); // /test
                    System.out.println("传入的上下文为:" + ctx); // ctx
                    System.out.println("该节点对stat对象为:" + stat);
                    System.out.println("子节点为:"); list.forEach(System.out::println); System.out.println("----");
                    countDownLatch.countDown();  // 3
                },
                "ctx"
        );
        /* 手动多次增删该节点的子节点(修改该节点数据无用),验证创建zooKeeper时的Watcher只会执行一次 */
        countDownLatch.await(60, TimeUnit.SECONDS);
    }

    public static void repeatedly() throws IOException, InterruptedException, KeeperException {
        final CountDownLatch countDownLatch = new CountDownLatch(6);
        ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 2000, null);
        /* 同步获取节点数据 */
        List<String> children = zooKeeper.getChildren(
                "/test", // 获取节点路径
                watchedEvent -> {
                    System.out.println("触发Watched事件为:" + watchedEvent);
                    countDownLatch.countDown();  // 1
                }  // 为该节点设置子节点数量变化的watch事件,并且监听(只会执行一次)
        );
        System.out.println("子节点为:"); children.forEach(System.out::println); System.out.println("----");

        /* 异步获取子节点 */
        zooKeeper.getChildren(
                "/test", // 获取节点路径
                watchedEvent -> {
                    System.out.println("触发Watched事件为:" + watchedEvent);
                    countDownLatch.countDown();  // 2
                },  // 为该节点设置子节点数量变化的watch事件,并且监听(只会执行一次)
                (rc, path, ctx, list) -> {
                    System.out.println("return code为:" + rc);  // 0
                    System.out.println("获取的节点为:" + path); // /test
                    System.out.println("传入的上下文为:" + ctx); // ctx
                    System.out.println("子节点为:"); list.forEach(System.out::println); System.out.println("----");
                    countDownLatch.countDown();  // 3
                },
                "ctx"
        );

        /* 异步获取子节点V2 */
        zooKeeper.getChildren(
                "/test", // 获取节点路径
                watchedEvent -> {
                    System.out.println("触发Watched事件为:" + watchedEvent);
                    countDownLatch.countDown();  // 4
                },  // 为该节点设置子节点数量变化的watch事件,并且监听(只会执行一次)
                (rc, path, ctx, list, stat) -> {
                    System.out.println("return code为:" + rc);  // 0
                    System.out.println("获取的节点为:" + path); // /test
                    System.out.println("传入的上下文为:" + ctx); // ctx
                    System.out.println("该节点对stat对象为:" + stat);
                    System.out.println("子节点为:"); list.forEach(System.out::println); System.out.println("----");
                    countDownLatch.countDown();  // 5
                },
                "ctx"
        );
        /* 手动多次增删该节点的子节点(修改该节点数据无用),验证Watcher只会执行一次 */
        countDownLatch.await(60, TimeUnit.SECONDS);
    }

    public static void main(String[] args) throws InterruptedException, IOException, KeeperException {
//        repeatedly();
        single();
    }
}
判断节点是否存在
public class Main {
    public static void single() throws IOException, InterruptedException, KeeperException {
        final CountDownLatch countDownLatch = new CountDownLatch(3);
        ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 2000,
                watchedEvent -> {
                    if (watchedEvent.getType() == Watcher.Event.EventType.None) {
                        System.out.println("连接成功");
                    } else {
                        System.out.println("触发其他Watched事件为:" + watchedEvent);
                        countDownLatch.countDown();
                    }
                });
        /* 同步判断节点是否存在 */
        Stat stat = zooKeeper.exists(
                "/test", // 获取节点路径
                true  // 是否为该节点设置watch事件,若设置为true,会执行创建zooKeeper时的Watcher(只会执行一次)
        );
        if (stat != null) {
            System.out.println("该节点存在,并且stat为:" + stat);
        }

        /* 异步判断节点是否存在 */
        zooKeeper.exists(
                "/test", // 获取节点路径
                true,  // 是否为该节点设置watch事件,若设置为true,会执行创建zooKeeper时的Watcher(只会执行一次)
                (rc, path, ctx, callbackStat) -> {
                    System.out.println("return code为:" + rc);  // 0
                    System.out.println("获取的节点为:" + path); // /test
                    System.out.println("传入的上下文为:" + ctx); // ctx
                    if (stat != null) {
                        System.out.println("该节点存在,并且stat为:" + callbackStat);
                    }
                    countDownLatch.countDown();
                },
                "ctx"
        );
        /* 手动多次修改该节点数据(修改子节点个数无用),验证创建zooKeeper时的Watcher只会执行一次 */
        countDownLatch.await(60, TimeUnit.SECONDS);
    }

    public static void repeatedly() throws IOException, InterruptedException, KeeperException {
        final CountDownLatch countDownLatch = new CountDownLatch(4);
        ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 2000, null);
        /* 同步判断节点是否存在 */
        Stat stat = zooKeeper.exists(
                "/test", // 获取节点路径
                watchedEvent -> {
                    System.out.println("触发Watched事件为:" + watchedEvent);
                    countDownLatch.countDown();
                }  // 为该节点设置watch事件,并且监听,只会执行一次
        );
        if (stat != null) {
            System.out.println("该节点存在,并且stat为:" + stat);
        }

        /* 异步判断节点是否存在 */
        zooKeeper.exists(
                "/test", // 获取节点路径
                watchedEvent -> {
                    System.out.println("触发Watched事件为:" + watchedEvent);
                    countDownLatch.countDown();
                },  // 为该节点设置watch事件,并且监听,只会执行一次
                (rc, path, ctx, callbackStat) -> {
                    System.out.println("return code为:" + rc);  // 0
                    System.out.println("获取的节点为:" + path); // /test
                    System.out.println("传入的上下文为:" + ctx); // ctx
                    if (stat != null) {
                        System.out.println("该节点存在,并且stat为:" + stat);
                    }
                    countDownLatch.countDown();
                },
                "ctx"
        );
        /* 手动多次修改该节点数据(修改子节点个数无用),验证Watcher只会执行一次 */
        countDownLatch.await(60, TimeUnit.SECONDS);
    }

    public static void main(String[] args) throws InterruptedException, IOException, KeeperException {
//        repeatedly();
        single();
    }
}

增删改操作

创建节点
public class Main {
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 2000, null);

        /*同步创建节点*/
        String result = zooKeeper.create(
                "/syncTest",  //创建的路径节点
                "data".getBytes(),  //节点存储的数据
                ZooDefs.Ids.OPEN_ACL_UNSAFE,  //控制权限策略
                // Ids.OPEN_ACL_UNSAFE = world:anyone:cdrwa; 任何人都能访问
                // CREATOR_ALL_ACL = auth:user:password:cdrwa; 添加添加认证授权组合才能访问
                CreateMode.PERSISTENT  //节点类型
                // PERSISTENT:持久节点
                // PERSISTENT_SEQUENTIAL:持久顺序节点
                // EPHEMERAL:临时节点
                // EPHEMERAL_SEQUENTIAL:临时顺序节点
        );
        System.out.println(result);  // /syncTest

        /*异步创建节点*/
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        zooKeeper.create(
                "/asyncTest",
                "data".getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.PERSISTENT,
                (rc, path, ctx, name) -> {
                    System.out.println("return code为:" + rc);  // 0
                    System.out.println("创建节点为:" + path + "真实节点为(区分顺序节点):" + name); // /asyncTest
                    System.out.println("传入的上下文为:" + ctx); // ctx
                    countDownLatch.countDown();
                },
                "ctx"  // 向回掉函数中传入的上线文
        );  // 异步或者异步创建节点
        countDownLatch.await();
    }
}
修改节点
public class Main {
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 2000, null);

        /*同步修改节点*/
        Stat stat = zooKeeper.setData(
                "/syncTest",  // 要修改的节点路径
                "test".getBytes(),  // 要修改为的节点数据
                0  // 乐观锁版本
        );
        System.out.println(stat);  // stat对象

        /*异步修改节点*/
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        zooKeeper.setData(
                "/asyncTest",
                "test".getBytes(),
                0,  // 乐观锁版本
                (rc, path, ctx, stat1) -> {
                    System.out.println("return code为:" + rc);  // 0
                    System.out.println("修改节点为:" + path); // /asyncTest
                    System.out.println("传入的上下文为:" + ctx); // ctx
                    System.out.println("修改后的stat为:" + stat1);  // stat对象
                    countDownLatch.countDown();
                },
                "ctx"
        );
        countDownLatch.await();
    }
}
删除节点
public class Main {
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 2000, null);

        /*同步删除节点*/
        zooKeeper.delete(
                "/syncTest",  // 要修改的节点路径
                1  // 乐观锁版本
        );

        /*异步删除节点*/
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        zooKeeper.delete(
                "/asyncTest",
                1,  // 乐观锁版本
                (rc, path, ctx) -> {
                    System.out.println("return code为:" + rc);  // 0
                    System.out.println("删除节点为:" + path); // /asyncTest
                    System.out.println("传入的上下文为:" + ctx); // ctx
                    countDownLatch.countDown();
                },
                "ctx"
        );
        countDownLatch.await();
    }
}

watcher机制

public class Main {
    public static void main(String[] args) throws InterruptedException, IOException, KeeperException {
        final CountDownLatch countDownLatch = new CountDownLatch(6);
        Watcher watcher = watchedEvent -> {
            Watcher.Event.EventType[] eventTypes = Watcher.Event.EventType.values();
            switch (eventTypes[watchedEvent.getType().ordinal()]) {
                case None:
                    System.out.println("连接成功");
                    break;
                case NodeCreated:
                    System.out.println("节点被创建");
                    break;
                case NodeDataChanged:
                    System.out.println("节点数据发生变化");
                    break;
                case NodeChildrenChanged:
                    System.out.println("子节点数量发生变化");
                    break;
                case NodeDeleted:
                    System.out.println("节点被删除");
                    break;
            }
            countDownLatch.countDown();
        };

        ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 2000, watcher);
        
        AsyncCallback.StatCallback statCallback = new AsyncCallback.StatCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx, Stat stat) {
                zooKeeper.exists(path, true, this, ctx);  // 递归绑定该节点变化的watch事件
            }
        };

        AsyncCallback.ChildrenCallback childrenCallback = new AsyncCallback.ChildrenCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx, List<String> list) {
                zooKeeper.getChildren(path, true, this, ctx);  // 递归绑定该节点的子节点数量变化的watch事件
            }
        };

        zooKeeper.exists("/test", true, statCallback, "ctx");  // 递归调用的首次调用
        zooKeeper.getChildren("/test", true, childrenCallback, "ctx");  // 递归调用的首次调用

        /* 手动多次修改该节点数据,以及子节点数量 */
        countDownLatch.await();
        /*
         create /test
         set /test test
         create /test/one
         delete /test/one
         delete /test
        */
    }
}

权限控制

public class Main {
    public static void anyone() throws IOException, InterruptedException, KeeperException {
        /* 任何人都可以对该节点进行增删该查,以及权限修改 */
        ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 2000, null);
        ArrayList<ACL> anyone = ZooDefs.Ids.OPEN_ACL_UNSAFE;
        zooKeeper.create("/anyoneTest", "data".getBytes(), anyone, CreateMode.PERSISTENT);
        zooKeeper.delete("/anyoneTest", 0);
    }

    public static void auth() throws IOException, InterruptedException, KeeperException, NoSuchAlgorithmException {
        /* 自定义认证组合进行访问 */
        ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 2000, null);
        List<ACL> auth = new ArrayList<>();  // 创建权限列表
        Id user1 = new Id("digest", DigestAuthenticationProvider.generateDigest("user1:password"));  //模式采用digest,需要加密认证组合
        Id user2 = new Id("digest", DigestAuthenticationProvider.generateDigest("user2:password"));  //模式采用digest,需要加密认证组合
        Id user3 = new Id("digest", DigestAuthenticationProvider.generateDigest("user3:password"));  //模式采用digest,需要加密认证组合
        auth.add(new ACL(ZooDefs.Perms.READ, user1));  //  认证组合1具有读权限
        auth.add(new ACL(ZooDefs.Perms.CREATE | ZooDefs.Perms.DELETE, user2));  // 认证组合2具有创建和删除子节点权限
        auth.add(new ACL(ZooDefs.Perms.ALL, user3));  // 认证组合3具有全部权限
        zooKeeper.create("/authTest", "data".getBytes(), auth, CreateMode.PERSISTENT);

        // 读权限验证
        try {
            byte[] data = zooKeeper.getData("/authTest", false, null);
            System.out.println("节点内容为:" + new String(data, StandardCharsets.UTF_8));
        } catch (Exception e) {
            System.out.println("调用失败:" + e.getMessage());
        }
        zooKeeper.addAuthInfo(AuthScheme.DIGEST.toString().toLowerCase(), "user1:password".getBytes());  // 添加认证组合1
        byte[] data = zooKeeper.getData("/authTest", false, null);
        System.out.println("节点内容为:" + new String(data, StandardCharsets.UTF_8));

        // 子节点创建修改权限验证
        try {
            zooKeeper.create("/authTest/test", "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            zooKeeper.delete("/authTest/test", 0);
        } catch (Exception e) {
            System.out.println("调用失败:" + e.getMessage());
        }
        zooKeeper.addAuthInfo(AuthScheme.DIGEST.toString().toLowerCase(), "user2:password".getBytes());  // 添加认证组合2
        zooKeeper.create("/authTest/test", "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        zooKeeper.delete("/authTest/test", 0);

        // 权限修改验证
        try {
            zooKeeper.setACL("/authTest", ZooDefs.Ids.OPEN_ACL_UNSAFE, 0);
        } catch (Exception e) {
            System.out.println("调用失败:" + e.getMessage());
        }
        zooKeeper.addAuthInfo(AuthScheme.DIGEST.toString().toLowerCase(), "user3:password".getBytes());  // 添加认证组合3
        List<ACL> before = zooKeeper.getACL("/authTest", null);
        System.out.println("修改前的ACL为:"); before.forEach(System.out::println); System.out.println("----");
        zooKeeper.setACL("/authTest", ZooDefs.Ids.OPEN_ACL_UNSAFE, 0);
        List<ACL> after = zooKeeper.getACL("/authTest", null);
        System.out.println("修改后的ACL为:"); after.forEach(System.out::println); System.out.println("----");

        // 删除节点
        zooKeeper.delete("/authTest", 0);
    }

    public static void ip() throws IOException, NoSuchAlgorithmException, KeeperException, InterruptedException {
        /* 任何人都可以对该节点进行增删该查,以及权限修改 */
        ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 2000, null);
        ArrayList<ACL> ip = new ArrayList<>();
        Id host = new Id("ip", InetAddress.getLocalHost().getHostAddress());  // 获取本机IP,如果zookeeper是外网ip,暂时无法验证
        Id user = new Id("digest", DigestAuthenticationProvider.generateDigest("user:password"));
        ip.add(new ACL(ZooDefs.Perms.READ, host));  // 当前主机只有读权限
        ip.add(new ACL(ZooDefs.Perms.ALL, user));  // 该让认证组合具有所有权限
        zooKeeper.create("/ipTest", "data".getBytes(), ip, CreateMode.PERSISTENT);

        // 验证当前ip只具有读权限
        try {
            zooKeeper.setData("/ipTest", "newData".getBytes(), 0);
        } catch (Exception e) {
            System.out.println("调用失败:" + e.getMessage());
        }
        byte[] data = zooKeeper.getData("/ipTest", false, null);
        System.out.println(new String(data, StandardCharsets.UTF_8));

        // 添加认证组合删除节点
        zooKeeper.addAuthInfo(AuthScheme.DIGEST.toString().toLowerCase(), "user:password".getBytes());  // 添加认证组合
        zooKeeper.delete("/ipTest", 0);
    }

    public static void main(String[] args) throws IOException, InterruptedException, NoSuchAlgorithmException, KeeperException {
        anyone();
        auth();
        ip();
    }
}

Apache Curator客户端

相对于原生Java API客户端具有以下功能:

  • 同步连接,超时重连
  • 链式调用
  • 递归操作
  • 注册一次,多次使用

基本配置

将下面的依赖代码加入pom.xml文件中

<dependency>
  <groupId>org.apache.curator</groupId>
  <artifactId>curator-framework</artifactId>
  <version>x.x.x</version>
</dependency>
<dependency>
  <groupId>org.apache.curator</groupId>
  <artifactId>curator-recipes</artifactId>
  <version>x.x.x</version>
</dependency>

连接代码如下:

public class Main {
    public static void main(String[] args) {
        /**
         * 具有以下几种重试策略:
         * 永远重试:RetryForever(每次重试时间间隔)
         * 重试一次:RetryOneTime(重试时间间隔)
         * 指数退避重试:ExponentialBackoffRetry(初始sleep的时间, 最大重试次数, 最大重试时间)
         * N次重试:RetryNTimes(重试次数, 每次重试间隔的时间)
         * 重试到过期:RetryUntilElapsed(最大重试时间, 每次重试时间间隔)
         */
        RetryPolicy retryPolicy = new RetryForever(2000);
        
        /**
         * 具有以下几种创建方式(集群模式多个URL使用逗号分隔)
         * CuratorFrameworkFactory.newClient(连接字符串, 重试策略)
         * CuratorFrameworkFactory.newClient(连接字符串, session超时时间, 连接超时时间, 重试策略);
         * CuratorFrameworkFactory.builder()
         *                 .connectString(连接字符串)
         *                 .sessionTimeoutMs(session超时时间)
         *                 .retryPolicy(重试策略)
         *                 .namespace(命名空间)  // 之后所有操作都会以当前命名空间
         *                 .build();
         */
        CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", retryPolicy);
        
        System.out.println("连接前,连接状态为:" + client.isStarted());
        client.start();  // 开始连接
        System.out.println("连接后,连接状态为:" + client.isStarted());
        client.close();  // 释放连接,一般在finally中释放连接
        System.out.println("关闭连接后,连接状态为:" + client.isStarted());
    }
}

查询操作

public class Main {
    public static void main(String[] args) throws Exception {
        /* 如果有命名空间,所有操作都会在该命名空间下进行 */
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("localhost:2181")
                .retryPolicy(new RetryForever(2000))
                .build();
        client.start();  // 开始连接

        /* 查询节点数据 */
        Stat reStat = new Stat();  // 回填状态对象
        byte[] data = client.getData()  // 获取节点数据动作
                .storingStatIn(reStat)  // 回填状态对象动作
                .forPath("/test");  // 节点路径
        System.out.println("回填stat对象为:" + reStat);
        System.out.println("节点的数据为:" + new String(data, StandardCharsets.UTF_8));

        /* 查询子节点 */
        List<String> children = client.getChildren()  // 获取子节点动作
                .forPath("/test");  // 节点路径
        System.out.println("子节点为:"); children.forEach(System.out::println); System.out.println("----");

        /* 判断节点是否存在 */
        Stat stat = client.checkExists().forPath("/test");
        if (stat != null) {
            System.out.println("该节点存在,并且stat为:" + stat);
        }

        client.close();  // 释放连接,一般在finally中释放连接
    }
}

增删改操作

public class Main {
    public static void main(String[] args) throws Exception {
        /* 如果有命名空间,所有操作都会在该命名空间下进行 */
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("localhost:2181")
                .retryPolicy(new RetryForever(2000))
                .build();
        client.start();  // 开始连接

        /* 创建节点 */
        client.create()  // 创建动作
                .creatingParentsIfNeeded()  // 如果需要,创建父节点
                .withMode(CreateMode.PERSISTENT)  // 永久节点
                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)  // 所有人都可以访问权限
                .forPath("/parent/child", "data".getBytes());  // 节点路径

        /* 修改节点 */
        client.setData()  // 修改动作
                .withVersion(0)  // 版本
                .forPath("/parent/child", "newData".getBytes());  // 节点路径

        /* 删除节点 */
        client.delete()  // 删除动作
                .guaranteed()  // 保证删除动作成功
                .deletingChildrenIfNeeded()  // 如果需要,删除子节点
                .withVersion(0)  // 版本
                .forPath("/parent");  // 节点路径

        client.close();  // 释放连接,一般在finally中释放连接
    }
}

watcher机制

单次监听
public class Main {
    /* 如果有命名空间,所有操作都会在该命名空间下进行 */
    public static CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString("localhost:2181")
            .retryPolicy(new RetryForever(2000))
            .build();

    public static CountDownLatch countDownLatch = new CountDownLatch(5 * 2);

    public static CuratorWatcher curatorWatcher = new CuratorWatcher() {
        @Override
        public void process(WatchedEvent watchedEvent) throws Exception {
            Watcher.Event.EventType[] eventTypes = Watcher.Event.EventType.values();
            switch (eventTypes[watchedEvent.getType().ordinal()]) {
                case None:
                    System.out.println("连接成功");  // 并不会触发该事件
                    break;
                case NodeCreated:
                    System.out.println("节点被创建");
                    break;
                case NodeDataChanged:
                    System.out.println("节点数据发生变化");
                    break;
                case NodeChildrenChanged:
                    System.out.println("子节点数量发生变化");
                    break;
                case NodeDeleted:
                    System.out.println("节点被删除");
                    break;
            }
            try {  // 递归绑定
                client.checkExists().usingWatcher(this).forPath("/test");  // 绑定该节点变化的watch事件,使用CuratorWatcher
                client.checkExists().usingWatcher(watcher).forPath("/test");  // 绑定该节点变化的watch事件,使用watcher
                if (client.checkExists().forPath("/test") != null) {
                    client.getChildren().usingWatcher(this).forPath("/test");  // 绑定该节点子节点数量变化的watch事件,使用CuratorWatcher
                    client.getChildren().usingWatcher(watcher).forPath("/test");  // 绑定该节点子节点数量变化的watch事件,使用watcher
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            countDownLatch.countDown();
            throw new Exception("抛出的异常");
        }
    };

    public static Watcher watcher = new Watcher() {
        @Override
        public void process(WatchedEvent watchedEvent) {
            Watcher.Event.EventType[] eventTypes = Watcher.Event.EventType.values();
            switch (eventTypes[watchedEvent.getType().ordinal()]) {
                case None:
                    System.out.println("连接成功");  // 并不会触发该事件
                    break;
                case NodeCreated:
                    System.out.println("节点被创建");
                    break;
                case NodeDataChanged:
                    System.out.println("节点数据发生变化");
                    break;
                case NodeChildrenChanged:
                    System.out.println("子节点数量发生变化");
                    break;
                case NodeDeleted:
                    System.out.println("节点被删除");
                    break;
            }
            try {  // 递归绑定
                client.checkExists().usingWatcher(curatorWatcher).forPath("/test");  // 绑定该节点变化的watch事件,使用CuratorWatcher
                client.checkExists().usingWatcher(this).forPath("/test");  // 绑定该节点变化的watch事件,使用watcher
                if (client.checkExists().forPath("/test") != null) {
                    client.getChildren().usingWatcher(curatorWatcher).forPath("/test");  // 绑定该节点子节点数量变化的watch事件,使用CuratorWatcher
                    client.getChildren().usingWatcher(this).forPath("/test");  // 绑定该节点子节点数量变化的watch事件,使用watcher
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            countDownLatch.countDown();
        }
    };

    public static void main(String[] args) throws Exception {
        client.start();  // 开始连接

        client.checkExists().usingWatcher(curatorWatcher).forPath("/test");  // 绑定该节点变化的watch事件,使用CuratorWatcher
        client.checkExists().usingWatcher(watcher).forPath("/test");  // 绑定该节点变化的watch事件,使用watcher

        client.create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath("/test");  // 创建节点

        client.getChildren().usingWatcher(curatorWatcher).forPath("/test");  // 绑定该节点子节点数量变化的watch事件,使用CuratorWatcher
        client.getChildren().usingWatcher(watcher).forPath("/test");  // 绑定该节点子节点数量变化的watch事件,使用watcher
      
				/* 手动多次修改该节点数据,以及子节点数量 */
        countDownLatch.await();
        client.close();  // 释放连接,一般在finally中释放连接
        /*
         set /test test
         create /test/one
         delete /test/one
         delete /test
        */
    }
}
持续监听
public class Main {
    public static void nodeCache() throws Exception {
        /* 如果有命名空间,所有操作都会在该命名空间下进行 */
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("localhost:2181")
                .retryPolicy(new RetryForever(2000))
                .build();
        CountDownLatch countDownLatch = new CountDownLatch(3);
        client.start();

        NodeCache nodeCache = new NodeCache(client, "/test");  // 创建node要监听的节点
        nodeCache.start(true);  // 开始缓存要监听的节点

        if (nodeCache.getCurrentData().getData() != null) {
            // 要保证该节点存在,否则getCurrentData(),导致空指针
            // 要保证使用的是nodeCache.start(true),否则getCurrentData(),导致空指针
            // 要保证节点数据存在,否则getData()为null,会走到另一个分支
            System.out.println("使用的是nodeCache.start(true),可以获取初始化的节点数据,数据为:" + new String(nodeCache.getCurrentData().getData()));
        } else {
            System.out.println("该节点不存在,或者节点存在但是没有数据,或者使用的是nodeCache.start()");
        }

        nodeCache.getListenable().addListener(() -> {  // 添加 节点数据变化的监听器
            // 节点创建和删除不会触发该监听器
            System.out.println("节点数据发生变化,变化为:" + new String(nodeCache.getCurrentData().getData()));
            System.out.println("节点路径为:" + nodeCache.getCurrentData().getPath());
            System.out.println("节点状态信息为:" + nodeCache.getCurrentData().getStat());
            countDownLatch.countDown();
        });

        countDownLatch.await();
        client.close();  // 释放连接,一般在finally中释放连接
    }

    public static void pathChildrenCache() throws Exception {
        /* 如果有命名空间,所有操作都会在该命名空间下进行 */
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("localhost:2181")
                .retryPolicy(new RetryForever(2000))
                .build();
        CountDownLatch countDownLatch = new CountDownLatch(3);
        client.start();  // 开始连接

        PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/test", true);  // 如果设置为true就回缓存该节点到stat对象
        /*
         * 除了像NodeCache那样到两种启动方式,PathChildrenCache还提供了几种启动模式,使用的是StartMode枚举类,含义如下:
         * POST_INITIALIZED_EVENT:异步初始化,初始化之后会触发初始化事件;初始化完成后无法立刻子节点
         * NORMAL:异步初始化,不会触发任何事件;初始化完成后无法立刻子节点
         * BUILD_INITIAL_CACHE:同步初始化;初始化完成后可以立即获取到子节点
         **/
        pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);

        System.out.println("同步打印,子节点信息为:");
        List<ChildData> childDataList = pathChildrenCache.getCurrentData();
        childDataList.forEach((child) -> System.out.println("path:" + child.getPath() + ";" +
                "data:" + new String(child.getData()) + ";" +  // 要保证子节点数据存在,否则回空指针
                "stat:" + child.getStat()));
        System.out.println("----");

        /*
         * 没有该节点会自动创建
         * 只有节点有数据,这些方法才会执行
         * 如果该节点下的子节点,本身就有具有数据的子节点,就会立即执行添加事件
         **/
        pathChildrenCache.getListenable().addListener((curatorFramework, pathChildrenCacheEvent) -> {
            PathChildrenCacheEvent.Type[] eventTypes = PathChildrenCacheEvent.Type.values();
            switch (eventTypes[pathChildrenCacheEvent.getType().ordinal()]) {
                case INITIALIZED:
                    System.out.println("异步打印,子节点信息为:");
                    List<ChildData> children = pathChildrenCache.getCurrentData();
                    children.forEach((child) -> System.out.println("path:" + child.getPath() + ";" +
                            "data:" + new String(child.getData()) + ";" +
                            "stat:" + child.getStat()));
                    System.out.println("----");
                    break;
                case CHILD_ADDED:
                    System.out.println("子节点数量添加事件,被添加的子节点path为:" + pathChildrenCacheEvent.getData().getPath() + ";"
                            + "添加的子节点data为:" + new String(pathChildrenCacheEvent.getData().getData()) + ";" +
                            "添加的子节点stat为:" + pathChildrenCacheEvent.getData().getStat());
                    break;
                case CHILD_REMOVED:
                    System.out.println("子节点数量减少事件,被删除的子节点path为:" + pathChildrenCacheEvent.getData().getPath() + ";"
                            + "被删除的子节点data为:" + new String(pathChildrenCacheEvent.getData().getData()) + ";" +
                            "被删除的子节点stat为:" + pathChildrenCacheEvent.getData().getStat());
                    break;
                case CHILD_UPDATED:
                    System.out.println("子节点数据被更新事件,被更新的子节点path为:" + pathChildrenCacheEvent.getData().getPath() + ";"
                            + "被删除的子节点data为:" + new String(pathChildrenCacheEvent.getData().getData()) + ";" +
                            "被删除的子节点stat为:" + pathChildrenCacheEvent.getData().getStat());
                    break;
            }
            countDownLatch.countDown();
        });


        countDownLatch.await();
        client.close();  // 释放连接,一般在finally中释放连接
    }

    public static void main(String[] args) throws Exception {
//        nodeCache();
        pathChildrenCache();
    }
}

权限控制

public class Main {
    public static void noAuth() throws Exception {
        /* 如果有命名空间,所有操作都会在该命名空间下进行 */
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("localhost:2181")
                .retryPolicy(new RetryForever(2000))
                .build();
        client.start();  // 开始连接

        List<ACL> auth = new ArrayList<>();  // 创建权限列表
        Id user1 = new Id("digest", DigestAuthenticationProvider.generateDigest("user1:password"));  //模式采用digest,需要加密认证组合
        Id user2 = new Id("digest", DigestAuthenticationProvider.generateDigest("user2:password"));  //模式采用digest,需要加密认证组合
        Id user3 = new Id("digest", DigestAuthenticationProvider.generateDigest("user3:password"));  //模式采用digest,需要加密认证组合
        auth.add(new ACL(ZooDefs.Perms.READ, user1));  //  认证组合1具有读权限
        auth.add(new ACL(ZooDefs.Perms.CREATE | ZooDefs.Perms.DELETE, user2));  // 认证组合2具有创建和删除子节点权限
        auth.add(new ACL(ZooDefs.Perms.ALL, user3));  // 认证组合3具有全部权限

        /* 创建节点,并赋予权限 */
        client.create()  // 创建动作
                .creatingParentsIfNeeded()  // 如果需要,创建父节点
                .withMode(CreateMode.PERSISTENT)
                .withACL(auth)
                .forPath("/test", "data".getBytes());  // 节点路径

        // 读权限验证
        try {
            byte[] data = client.getData().forPath("/test");
            System.out.println("节点内容为:" + new String(data, StandardCharsets.UTF_8));
        } catch (Exception e) {
            System.out.println("调用失败:" + e.getMessage());
        }
        // 子节点创建修改权限验证
        try {
            client.create().creatingParentsIfNeeded().forPath("/test/child");
            client.delete().deletingChildrenIfNeeded().forPath("/test/child");
        } catch (Exception e) {
            System.out.println("调用失败:" + e.getMessage());
        }
        // 权限修改验证
        try {
            client.setACL().withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath("/test");
        } catch (Exception e) {
            System.out.println("调用失败:" + e.getMessage());
        }
        client.close();  // 释放连接,一般在finally中释放连接
    }

    public static void auth() throws Exception {
        ArrayList<AuthInfo> auth = new ArrayList<>();
        AuthInfo user1 = new AuthInfo("digest", "user1:password".getBytes());
        AuthInfo user2 = new AuthInfo("digest", "user2:password".getBytes());
        AuthInfo user3 = new AuthInfo("digest", "user3:password".getBytes());
        auth.add(user1);  //  认证组合1具有读权限
        auth.add(user2);  // 认证组合2具有创建和删除子节点权限
        auth.add(user3);  // 认证组合3具有全部权限
        CuratorFramework clientAuth = CuratorFrameworkFactory.builder()
                .authorization(auth)  // 添加认证组合
                .connectString("localhost:2181")
                .retryPolicy(new RetryForever(2000))
                .build();

        clientAuth.start();  // 开始连接

        // 读权限验证
        byte[] data = clientAuth.getData().forPath("/test");
        System.out.println("节点内容为:" + new String(data, StandardCharsets.UTF_8));
        // 子节点创建修改权限验证
        clientAuth.create().creatingParentsIfNeeded().forPath("/test/child");
        clientAuth.delete().deletingChildrenIfNeeded().forPath("/test/child");
        // 权限修改验证
        clientAuth.setACL().withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath("/test");
        // 删除节点
        clientAuth.delete().deletingChildrenIfNeeded().forPath("/test");
      
        clientAuth.close();  // 释放连接,一般在finally中释放连接
    }

    public static void main(String[] args) throws Exception {
        noAuth();
        auth();
    }
}
此作者没有提供个人介绍
最后更新于 2022-07-11