Zookeeper Curator使用介绍

2023-01-21 0 2,876

目录

从编码风格上来讲,curator提供了基于Fluent的编程风格支持

1、添加依赖

pom.xml文件中添加如下内容:

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>2.12.0</version>
</dependency>

2、创建会话

Curator的创建会话方式与原生的API和ZkClient的创建方式区别很⼤。Curator创建客户端是通过CuratorFrameworkFactory工厂类来实现的。具体如下:

1. 使用CuratorFramework这个工厂类的两个静态方法来创建⼀个客户端

public static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy)
public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)

其中参数RetryPolicy提供重试策略的接口,可以让⽤户实现⾃定义的重试策略,默认提供了以下实现, 分别为ExponentialBackoffRetry(基于backoff的重连策略)、RetryNTimes(重连N次策略)、RetryForever(永远重试策略)

2. 通过调用CuratorFramework中的start()方法来启动会话

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3); 
CuratorFramework client = CuratorFrameworkFactory.newClient(\"127.0.0.1:2181\",retryPolicy); client.start();
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3); 
CuratorFramework client = CuratorFrameworkFactory.newClient(\"127.0.0.1:2181\", 5000,1000,retryPolicy);
client.start();

其实进⼀步查看源代码可以得知,其实这两种方法内部实现⼀样,只是对外包装成不同的方法。它们的底层都是通过第三个⽅法builder来实现的。

RetryPolicy retryPolicy	= new ExponentialBackoffRetry(1000,3);
private static CuratorFramework Client = CuratorFrameworkFactory.builder()
            .connectString(\"server1:2181,server2:2181,server3:2181\")
            .sessionTimeoutMs(50000)
            .connectionTimeoutMs(30000)
            .retryPolicy(retryPolicy)
            .build();
client.start();

参数:

connectString:zk的server地址,多个server之间使⽤英⽂逗号分隔开

connectionTimeoutMs: 连 接 超 时 时 间 , 如 上 是 30s, 默 认 是 15

ssessionTimeoutMs: 会 话 超 时 时 间 , 如 上 是 50s, 默 认 是 60s

retryPolicy:失败重试策略

ExponentialBackoffRetry:构造器含有三个参数 ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)

baseSleepTimeMs:初始的sleep时间,用于计算之后的每次重试的sleep时间

计 算 公 式 : 当 前 sleep 时 间 =baseSleepTimeMs*Math.max(1, random.nextInt(1<<(retryCount+1)))

maxRetries:最大重试次数

maxSleepMs:最大sleep时间,如果上述的当前sleep计算出来比这个大,那么sleep用这个时间,默认的最大时间是Integer.MAX_VALUE毫秒。

其他,查看org.apache.curator.RetryPolicy接⼝的实现类

start():完成会话的创建

package com.lagou.curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class CreateSession {
    // 创建会话
    public static void main(String[] args) {
        // 不使用fluent编程风格
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(\"8.142.8.105:2181\", retryPolicy);
        curatorFramework.start();
        System.out.println(\"会话被建立了\");
        // 使用fluent编程风格
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString(\"8.142.8.105:2181\")
                .sessionTimeoutMs(50000)
                .connectionTimeoutMs(30000)
                .retryPolicy(retryPolicy)
                .namespace(\"base\")  // 独立的命名空间 /base
                .build();
        client.start();
        System.out.println(\"会话2被创建了\");
    }
}

需要注意的是session2会话含有隔离命名空间,即客户端对Zookeeper上数据节点的任何操作都是相对/base目录进行的,这有利于实现不同的Zookeeper的业务之间的隔离

3、创建节点

curator提供了⼀系列Fluent风格的接口,通过使用Fluent编程风格的接口,开发人员可以进行自由组合来完成各种类型节点的创建。

下面简单介绍⼀下常用的几个节点创建场景。

(1)创建⼀个初始内容为空的节点

client.create().forPath(path);

Curator默认创建的是持久节点,内容为空。

(2)创建⼀个包含内容的节点

client.create().forPath(path,\"我是内容\".getBytes());

Curator和ZkClient不同的是依旧采用Zookeeper原生API的⻛格,内容使用byte[]作为方法参数。

(3)递归创建父节点,并选择节点类型

client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);

creatingParentsIfNeeded这个接口非常有用,在使用ZooKeeper 的过程中,开发人员经常会碰到NoNodeException 异常,其中⼀个可能的原因就是试图对⼀个不存在的父节点创建子节点。因此,开发人员不得不在每次创建节点之前,都判断⼀下该父节点是否存在——这个处理通常比较麻烦。在使用Curator 之后,通过调用creatingParentsIfNeeded接口,Curator就能够自动地递归创建所有需要的⽗节点。

下面通过一个实际例子来演示如何在代码中使用这些API。

package com.lagou.curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
public class CreateNote_curator {
    // 创建会话
    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        // 使用fluent编程风格
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString(\"8.142.8.105:2181\")
                .sessionTimeoutMs(50000)
                .connectionTimeoutMs(30000)
                .retryPolicy(retryPolicy)
                .namespace(\"base\")  // 独立的命名空间 /base
                .build();
        client.start();
        System.out.println(\"会话2被创建了\");
        // 创建节点
        String path = \"/lg-curator/c1\";
        String s = client.create().creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT).forPath(path, \"init\".getBytes());
        System.out.println(\"节点递归创建成功,该节点路径:\" + s);
    }
}

4、删除节点

删除节点的方法也是基于Fluent方式来进行操作,不同类型的操作调用新增不同的方法调用即可。

(1)删除一个子节点

client.delete().forPath(path);

(2)删除节点并递归删除其子节点

client.delete().deletingChildrenIfNeeded().forPath(path);

(3)指定版本进行删除

client.delete().withVersion(1).forPath(path);

如果此版本已经不存在,则删除异常,异常信息如下。

org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode =BadVersion for

(4)强制保证删除一个节点

client.delete().guaranteed().forPath(path);

只要客户端会话有效,那么Curator会在后台持续进行删除操作,直到节点删除成功。比如遇到⼀些网络异常的情况,此guaranteed的强制删除就会很有效果。

演示实例:

package com.lagou.curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class DeleteNote_curator {
    // 创建会话
    public static void main(String[] args) throws Exception {
        // 不使用fluent编程风格
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        // 使用fluent编程风格
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString(\"8.142.8.105:2181\")
                .sessionTimeoutMs(50000)
                .connectionTimeoutMs(30000)
                .retryPolicy(retryPolicy)
                .namespace(\"base\")  // 独立的命名空间 /base
                .build();
        client.start();
        System.out.println(\"会话2被创建了\");
        // 删除节点
        String path = \"/lg-curator\";
        client.delete().deletingChildrenIfNeeded().withVersion(-1).forPath(path);
        System.out.println(\"删除成功,删除的节点:\" + path);
    }
}

5、获取数据

获取节点数据内容API相当简单,同时Curator提供了传入一个Stat变量的方式来存储服务器端返回的最新的节点状态信息

// 普通查询
client.getData().forPath(path);
// 包含状态查询
Stat stat = new Stat(); 
client.getData().storingStatIn(stat).forPath(path);

演示:

package com.lagou.curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
public class GetNote_curator {
    // 创建会话
    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        // 使用fluent编程风格
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString(\"8.142.8.105:2181\")
                .sessionTimeoutMs(50000)
                .connectionTimeoutMs(30000)
                .retryPolicy(retryPolicy)
                .namespace(\"base\")  // 独立的命名空间 /base
                .build();
        client.start();
        System.out.println(\"会话2被创建了\");
        // 创建节点
        String path = \"/lg-curator/c1\";
        String s = client.create().creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT).forPath(path, \"init\".getBytes());
        System.out.println(\"节点递归创建成功,该节点路径:\" + s);
        // 获取节点的数据内容以及状态信息
        // 数据内容
        byte[] bytes = client.getData().forPath(path);
        System.out.println(\"获取到的节点数据内容:\" + new String(bytes));
        // 状态信息
        Stat stat = new Stat();
        client.getData().storingStatIn(stat).forPath(path);
        System.out.println(\"获取节点的状态信息:\" + stat);
    }
}

6、更新数据

更新数据,如果未传入version参数,那么更新当前最新版本,如果传入version则更新指定version,如 果version已经变更,则抛出异常。

// 普通更新
client.setData().forPath(path,\"新内容\".getBytes());
// 指定版本更新
client.setData().withVersion(1).forPath(path);

版本不一致异常信息:

org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for

案例演示:

package com.lagou.curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
public class UpdateNote_curator {
    // 创建会话
    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        // 使用fluent编程风格
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString(\"8.142.8.105:2181\")
                .sessionTimeoutMs(50000)
                .connectionTimeoutMs(30000)
                .retryPolicy(retryPolicy)
                .namespace(\"base\")  // 独立的命名空间 /base
                .build();
        client.start();
        System.out.println(\"会话2被创建了\");
        String path = \"/lg-curator/c1\";
        // 获取节点的数据内容以及状态信息
        // 数据内容
        byte[] bytes = client.getData().forPath(path);
        System.out.println(\"获取到的节点数据内容:\" + new String(bytes));
        // 状态信息
        Stat stat = new Stat();
        client.getData().storingStatIn(stat).forPath(path);
        System.out.println(\"获取节点的状态信息:\" + stat);
        // 更新节点内容
        int version = client.setData().withVersion(stat.getVersion()).forPath(path, \"修改内容1\".getBytes()).getVersion();
        System.out.println(\"当前的最新版本是\"+version);
        byte[] byte2 = client.getData().forPath(path);
        System.out.println(\"修改后的节点数据内容:\" + new String(byte2));
    }
}

:本文采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可, 转载请附上原文出处链接。
1、本站提供的源码不保证资源的完整性以及安全性,不附带任何技术服务!
2、本站提供的模板、软件工具等其他资源,均不包含技术服务,请大家谅解!
3、本站提供的资源仅供下载者参考学习,请勿用于任何商业用途,请24小时内删除!
4、如需商用,请购买正版,由于未及时购买正版发生的侵权行为,与本站无关。
5、本站部分资源存放于百度网盘或其他网盘中,请提前注册好百度网盘账号,下载安装百度网盘客户端或其他网盘客户端进行下载;
6、本站部分资源文件是经压缩后的,请下载后安装解压软件,推荐使用WinRAR和7-Zip解压软件。
7、如果本站提供的资源侵犯到了您的权益,请邮件联系: 442469558@qq.com 进行处理!

猪小侠源码-最新源码下载平台 Java教程 Zookeeper Curator使用介绍 http://www.20zxx.cn/464073/xuexijiaocheng/javajc.html

猪小侠源码,优质资源分享网

常见问题
  • 本站所有资源版权均属于原作者所有,均只能用于参考学习,请勿直接商用。若由于商用引起版权纠纷,一切责任均由使用者承担
查看详情
  • 最常见的情况是下载不完整: 可对比下载完压缩包的与网盘上的容量,建议提前注册好百度网盘账号,使用百度网盘客户端下载
查看详情

相关文章

官方客服团队

为您解决烦忧 - 24小时在线 专业服务