Zookeeper学习笔记
1.Zookeeper学习笔记
1.1、概述
Zookeeper时一个开源的分布式的,为分布式框架提供协调服务的Apache项目。
Zookeeper的工作机制:
从设计模式角度来说:是基于观察者模式设计的分布式服务管理框架,它负责存储和管理大家都关心的数据,然后接收观察者的注册,一旦这些数据的状态发生改变,Zookeeper就将负责通知已经在Zookeeper上注册的哪些观察者做出相应的反应。
1.2、特点
1)、一个领导者(Leader),多个跟随者(Follower)组成的集群。
2)、集群中只要有半数以上节点存活,Zookeeper集群就能正常服务,所以Zookeeper适合安装奇数台服务器。
3)、全局数据一致:每个Server保存一份相同的数据副本,Client无论连接到哪个Server,数据都是一致的。
4)、更新请求顺序执行:来自同一个Cilent的更新请求按其发送顺序依次执行。
5)、数据更新原子性:依次数据更新要么成功,要么失败。
6)、实时性:在一定时间范围内,Cilent能读到最新数据。
1.3、数据结构
Zookeeper数据模型 的结构与Unix文件系统很类似,整体可以看作是一棵树,每个节点称作一个ZNode,每个ZNode默认能存储1MB的数据,每个ZNode都可以通过其路径唯一标识。
1.4、应用场景
提供的服务包括:统一命名服务(例如:域名与IP的关系),统一配置管理(所有节点配置信息一致),统一集群管理,服务器节点动态上下线,软负载均衡等。
1.5、下载地址
https://archive.apache.org/dist/zookeeper/zookeeper-3.5.7/
1.6、本地模式安装与配置
1.6.1、下载
将下载好的压缩包,上传到Linux的指定路径
1.6.2、解压文件
tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz
1.6.3、修改解压后的文件夹名
mv apache-zookeeper-3.5.7-bin/ zookeeper-3.5.7
1.6.4、修改配置
1.6.4.1、首先进入conf目录
cd conf
1.6.4.2、修改zoo_sample.cfg的名字为zoo.cfg
mv sample.cfg zoo.cfg
1.6.4.3、使用vim编辑这个文件修改zookeeper的数据存放路径
vim zoo.cfg
数据的默认存放路径是/tmp/zookeeper,/tmp是linux的临时文件存放路径,tmp目录下的文件数据一个月之后会自动清除,所以更换路径的目的,就是不让zk数据被系统自动清除。
1.6.4.4、在zookeeper的安装目录下新建数据存放目录
mkdir zkData
1.6.4.5、再回到配置文件中,修改存放路径
1.6.5、配置参数说明
tickTime=2000: 通信心跳时间 Zookeeper服务器与客户端心跳时间,单位为毫秒
initLimit=10: LF(Learder Follower的简称)初始通信时限(即超过了10个心跳时间也就是20秒没有建立连接,即本次通信失败)
syncLimit=5: LF同步通信时限(已经建立好了连接的服务通信时的心跳时间不能超过5个心跳时间,即10秒,超过就表示通信异常)
dataDir: 保存Zookeeper数据的路径,一般默认是tmp目录,但容易被linux定期删除,所以需要新建存放目录
clientPort = 2181: 客户端端口号,一般不做修改
1.7、本地启动
zookeeper的启动需要先启动服务端,再启动客户端。
**注意:**zookeeper部署后, 3.5以后的版本, 会自动占用8080端口. 整合了Zookeeper的项目需要修改配置文件更改服务器端口。否则zk服务器启动不起来。
1.7.1、服务端启动:
**注意:**服务端启动需要加上start
,表示启动zookeeper服务
加status
表示查询zookeeper状态
加stop
表示停止zookeeper服务
使用jps
命令查看是否启动成功,成功后会出现zk进程
1.7.2、客户端启动:
**注意:**客户端启动不需要加start
客户端启动成功后会连接上服务端,输入ls /
能查看当前的zookeeper节点
使用quit
退出该界面。
1.8、集群安装与配置
具有Hadoop的基础,会方便很多,我没有学就用普通方法搭建了
1.8.1、集群规划
三台linux系统,都部署zookeeper服务器。
1.8.2、配置服务器编号
在各个主机的zookeeper目录下创建zkData文件夹
在各个主机的zkData目录下创建myid文件
编辑各个主机中的该文件,在文件中给定一个编号值即可,如:1号主机给1,2号主机给2以此类推。
**注意:**给定的编号上下左右不能有空格。
1.8.3、配置zoo.cfg文件
在各个主机中的zoo.cfg文件中添加
################Cluster###########
server.1=192.168.1.19:2888:3888
server.2=192.168.1.32:2888:3888
server.3=192.168.1.33:2888:3888
注意:格式为server.[myid文件中的编号]=[主机ip]:[Leader与Follower交换数据的端口]:[重新选举Leader时的通信端口]
zk根据这部分配置的server个数,判断集群中的服务数量,为Leader的选举做辅助。
1.8.4、运行各个主机的zookeeper服务
bin/zkServer.sh start 运行Zookeeper
查询启动与集群状态:
bin/zkServer.sh status 查看运行状态
若是出现这个情况,说明是因为启动的是第一台zk服务,zk的特性之一是:zk集群超过半数服务不可用会导致整个集群不可用。
所以,再继续把其它主机的zk服务启动起来就行。
启动完成后,查看各个主机的zk状态如下:
**注意:**在此之前最好将防火墙关闭掉。
出现该情况,说明zk集群已经成功运行起来了。启动两台之后已经选举出来一个leader了,再启动的zk服务只能是follower。
1.9、集群概念与使用
1.9.1、集群中的几个概念:
**SID:**服务器ID,用来唯一标识一台ZK集群中的机器,每台机器不饿能重复,和myid一致。
**ZXID:**事务ID,ZXID时一个事务ID,用来标识一次服务器状态的变更,再某一时刻,集群中的每台机器的ZXID值不一定完全一致,这和ZK服务器对于客户端“更新请求”的处理逻辑有关。
**Epoch:**每个LEADER任期的代号。没有Leader时同一投票过程中的逻辑时钟值是相同的。每投完一次票,这个数据就会增加。
1.9.2、当有五台zk服务器时,zk选举机制-第一次启动:
**1)、**服务器1启动,发起一次选举,服务器1投自己一票,此时服务器1票数一票,不够半数以上(3票),选举无法完成,服务器1保持状态LOOKING
;
**2)、**服务器2启动,再发起一次选举,服务器1和服务器2分别头自己一票,并交换选票信息,此时服务器1发现服务器2的myid
比自己目前投票推举的(服务器1)大,更改选票为推举服务器2,此时服务器1票数0票,服务器2票数2票,没有半数以上结果,选举无法完成,服务器1服务器2保持状态LOOKING
**3)、**服务器3启动,发起一次选举,此时服务器1和服务器2都会更改选票为服务器3,此时投票结果:服务器1为0票,服务器2为0票,服务器3为3票,此时,服务器3的票数已经超过半数,服务器3当选Leader
,服务器1和2更改状态为FOLLOWING
,服务器3更改状态为LEADING
。
**4)、**服务器4启动,发起一次选举。此时服务器1,2,3已经不是LOOKING
状态,不会更改选票信息。交换选票信息结果:服务器3为3票,服务器4为1票。此时服务器4服从多数,更改选票信息为服务器3,并更改状态为FOLLOWING
;
**5)、**和服务器4一样,一旦集群中选举出了Leader,后面加入的都会变成Follower
1.9.3、当有五台zk服务器时,zk选举机制-非第一次启动
1)、当ZK集群中的一台服务器出现以下两种情况之一时,会开始进入Leader选举
- 服务器初始化启动
- 服务器运行期间无法和Leader保持连接
2)、当一台机器进入Leader选举流程时,当前集群也可能会处于两种情况:
-
集群中本来就已经存在一个Leader
对于已经存在Leader的情况,机器试图去选举Leader时,会被告知当前服务器的Leader 信息,对于该机器来说,仅仅需要和Leader机器建立连接,并进行状态同步即可。
-
集群中确实不存在Leader
还能工作的ZK服务,按照规则选举Leader:①选举Epoch大的 (Epoch);②Epoch相同,则选举事务id大的(ZXID);③事务id相同,则选择服务器id大的(SID)
例如:假设Zookeeper有5台服务器,SID分别是1,2,3,4,5,ZXID分别是8,8,8,7,7,并且此时SID为3的服务器是Leader,此时3和5出故障,此时根据选举规则选择Leader时,1号(1,8,1)2号(1,8,2)4号(1,7,4),最后被选举成为Leader的是2号服务器。
1.9.4、ZK集群启动停止脚本
当zk服务器多的时候,比如100台,一个个去启动和停止是一件非常无聊的事情,那么,一键启动与停止的脚本就非常有必要了。
1.9.4.1、未设置免密登录的脚本:
在使用之前,需要安装sshpass
Linux系统中,使用yum install sshpass 即可安装
#!/bin/bash
case $1 in
"start"){
for i in 192.168.1.19 192.168.1.32 192.168.1.33
do
echo -------------- Zookeeper $i 启动 ---------------
sshpass -p 密码 ssh -o StrictHostKeyChecking=no $i "/root/Zookeeper/zookeeper-3.5.7/bin/zkServer.sh start"
done
}
;;
"stop"){
for i in 192.168.1.19 192.168.1.32 192.168.1.33
do
echo -------------- Zookeeper $i 停止 ---------------
sshpass -p 密码 ssh -o StrictHostKeyChecking=no $i "/root/Zookeeper/zookeeper-3.5.7/bin/zkServer.sh stop"
done
}
;;
"status"){
for i in 192.168.1.19 192.168.1.32 192.168.1.33
do
echo -------------- Zookeeper $i 状态 ---------------
sshpass -p 密码 ssh -o StrictHostKeyChecking=no $i "/root/Zookeeper/zookeeper-3.5.7/bin/zkServer.sh status"
done
}
;;
esac
1.9.4.2、设置了免密登录的脚本:
#!/bin/bash
case $1 in
"start"){
for i in 192.168.1.19 192.168.1.32 192.168.1.33
do
echo ----------------Zookeeper $i 启动 ---------------------
ssh $i "/root/Zookeeper/zookeeper-3.5.7/bin/zkServer.sh start"
done
}
;;
"stop"){
for i in 192.168.1.19 192.168.1.32 192.168.1.33
do
echo ----------------Zookeeper $i 停止 ---------------------
ssh $i "/root/Zookeeper/zookeeper-3.5.7/bin/zkServer.sh stop"
done
}
;;
"status"){
for i in 192.168.1.19 192.168.1.32 192.168.1.33
do
echo ----------------Zookeeper $i 状态 ---------------------
ssh $i "/root/Zookeeper/zookeeper-3.5.7/bin/zkServer.sh status"
done
}
;;
esac
注意:
1)、将上面的脚本命名为
zk.sh
放置在/home/zk/bin
目录下2)、使用
chmod 777 zk.sh
修改该脚本权限3)、使用jpsall 查看多个hadoop进程
4)、使用
zk.sh start
命令即可启动多个主机的zk服务5)、使用
zk.sh stop
命令即可停止多个主机的zk服务6)、使用
zk.sh status
命令即可查看多个主机上zk服务的状态,角色类型
1.9.4.3、以上脚本可能遇到的问题**:**
1)、JAVA_HOME设置问题
解决办法:
- 1、使用
witch java
,获取java安装路径
- 2、编辑zk的bin目录下的
zkEnv.sh
,添加JAVA_HOME
变量值:
2)、某一个未启动成功报错
原因是启动失败的那台机器没有关闭防火墙,关闭防火墙即可重新启动成功。
1.9.4.4、启动成功后
1.10、客户端命令行学习
1.10.1、连接指定zk服务器
当使用客户端连接zk服务器时,上面显示的是localhost
./bin/zkCli.sh 命令执行后,得到了以下的截图
如果需要显示zk服务器的ip地址,可以使用
./bin/zkCli.sh -server [ip地址]:[端口号2181]
使用这个命令就可以连接到指定的ZK服务器
1.10.1、ls命令
ls -s [路径] 命令 查看指定路径的信息
信息项说明:
**cZxid:**创建节点的事务zxid,每次修改Zk状态都会长生一个zk事务id,事务id是zk中所有修改的总次序。每次修改都会有唯一的一个zxid,如果zxid1小于zxid2,
**ctime:**znode被创建的毫秒数,从1970年开始算
**mZxid:**znode最后更新的事务zxid
**mtime:**znode最后修改的毫秒数,从1970年开始算
**pZxid:**znode最后更新的子节点zxid
**cversion:**子节点变化号,znode子节点修改次数
**dataVersion:**znode数据变化号
**aclVersion:**znode访问控制列表的变化号
**ephemeral0wner:**如果是临时节点,这个是znode拥有者的session id,如果不是临时节点则值为0
**dataLenth:**znode的数据长度
**numChildren:**znode子节点数量
携带参数 -w:
用于创建监听器,并监听指定节点的变化
注意:该监听的通知只会发送一次,即再次更改监听节点的子节点时,不会收到通知了,若想再接收通知,必须要再注册一次监听事件。
1.10.2、get命令
用于获取指定路径下的节点的值
get [节点路径]
例如:
get的参数:
携带 -s 参数:
携带 -w 参数:
用于创建监听器,并指定监听的节点的值的变化
注意:该监听的通知只会发送一次,即再次修改/pp/p2的值时,不会收到通知了,若想再接收通知,必须要再注册一次监听事件。
1.10.3、delete命令
删除指定节点路径的节点
delete [节点路径]
例如:
**注意:**如果一个节点下还有节点,可以使用deleteall
命令递归删除
1.10.4、set命令
用于修改指定节点路径中的节点的值
set [节点路径] [新的节点值]
例如:
1.10.5、create命令
用于创建节点
create [节点路径] [节点数据]
例如:
注意:
如果创建时不携带参数,那么生成的节点就是持久节点
若是创建时携带参数
-s
,那么生成的节点就是持久带序号的节点,带了序号的节点不会重复若是创建时携带参数
-e
,那么生成的节点就是临时节点,断开连接后,数据就被删除了。若是创建时,携带参数
-e -s
,那么生成的节点就是临时带序号的节点,带序号的节点不会重复,但断开连接后,也依旧会被删除。
1.10.6、quit命令
退出客户端
1.10.7、节点类型
节点类型分为:持久、临时
持久:客户端和服务器断开连接后,创建的节点不删除。
-
**持久目录节点:**客户端与zk断开连接后,该节点依旧存在。
-
**持久顺序编号目录节点:**客户端与zk断开连接后,该节点依旧存在,只是zk给该节点名称进行顺序编号。
临时:客户端和服务器断开连接后,创建的节点自己删除。
-
**临时目录节点:**客户端与zk断开连接后,该节点被删除。
-
临时顺序编号目录节点:客户端与zk断开连接后,该节点被删除,只是zk给该节点名称进行顺序编号。
**说明:**创建znode时,设置顺序标识,znode名称后会附加一个值,顺序号时一个单词递增的计数器,由父节点维护。
**注意:**在分布式系统中,顺序号可以被用于为所有时间进行全局排序,这样客户端可以通过顺序号推断事件的顺序。
1.10.8、监听器
**监听器的作用:**用于保证zk对任何数据的任何改变都能快速的响应到监听了该节点的应用程序。
监听器原理:
1)、首先要有一个main线程
2)、在main线程中,创建zookeeper客户端,这时就会创建两个线程,一个负责网络通信(connect),一个负责监听(listener)
3)、通过connect线程将注册的监听事件发送给zookeeper,告知zookeeper,客户端需要监听的是哪个或哪几个节点的变化
4)、在zookeeper的注册监听器列表中,将注册的监听事件添加到列表中
5)、zookeeper监听到节点数据或者路径有变化,就会将这个消息发送给listener线程
6)、listener线程内部调用了process()方法,将消息告知客户端
具体操作可以参考2.11客户端API下的案例。
1.11、客户端API
案例1:使用客户端操作Zookeeper集群,创建节点,写入数据
Maven构建程序,POM文件如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.aliang</groupId>
<artifactId>ZookeeperDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
</properties>
<dependencies>
<!-- Zookeeper依赖包-->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.7</version>
</dependency>
<!--Junit依赖包-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
</dependency>
</dependencies>
</project>
Java操作代码如下:
package com.aliang.zk;
import org.apache.zookeeper.*;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
public class ZKClient {
String ip = "192.168.1.19:2181,192.168.1.32:2181,192.168.1.33:2181";
int ms = 20000; // 注意:默认15秒,如果报连接不上的错误信息,请将该值调大点
ZooKeeper zk ;
@Before
public void init() throws IOException, InterruptedException {
zk = new ZooKeeper(ip, ms, null);
}
@Test
public void createNode() throws InterruptedException, KeeperException {
/**
* create方法:参数1:节点路径,参数2:节点值,参数3:节点权限,参数4:节点类型
* 参数3:从ZooDefs.Ids.这个类中指定一个权限,常用的有:
* *OPEN_ACL_UNSAFE-->表示这是一个完全开放的 ACL
* *CREATOR_ALL_ACL-->表示此ACL授予创建者身份所有权限
* *READ_ACL_UNSAFE-->表示此ACL赋予任何人只读的权限
* AUTH_IDS-->表示此Id仅可用于设置 ACL。它将被客户端验证的 ID 替换。
* ANYONE_ID_UNSAFE-->表示此ID代表任何人。
* 参数4:从CreateMode类中指定一个节点类型,常用的有:
* *PERSISTENT-->持久节点类型
* *PERSISTENT_SEQUENTIAL-->持久带序号的节点类型
* *EPHEMERAL-->临时节点类型
* *EPHEMERAL_SEQUENTIAL-->临时带序号的节点类型
*/
String s = zk.create("/JavaTestCreate2", "testString2".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println(s); //输出/JavaTestCreate
}
}
测试结果:
Zookeeper中查看是否新增成功:
案例2:获取子节点
POM文件依旧采用案例1的,此处不再赘述。
java代码如下:
package com.aliang.zk;
import org.apache.zookeeper.*;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.List;
public class ZKClient {
String ip = "192.168.1.19:2181,192.168.1.32:2181,192.168.1.33:2181";
int ms = 20000;
ZooKeeper zk ;
@Before
public void init() throws IOException, InterruptedException {
zk = new ZooKeeper(ip, ms, new Watcher() {
@Override
public void process(WatchedEvent event) {
}
});
}
/**
* 获取指定节点的数据
*/
@Test
public void getChild() throws InterruptedException, KeeperException {
/**
* getChildren方法参数,
* 参数1:需要获取的节点路径,
* 参数2:可以为布尔值,也可以为new Watcher()对象,
* 使用布尔值时,会自动使用初始化时,指定的Watcher对象,
* 或者自己在这个位置创建一个Watcher
*/
List<String> children = zk.getChildren("/", true);
for (String child : children) {
System.out.println(child);
}
}
}
测试结果:
案例3:监听指定节点的变化
监听器Watcher的使用
POM文件依旧采用案例1的,此处不再赘述。
Java代码如下:
package com.aliang.zk;
import org.apache.zookeeper.*;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.List;
public class ZKClient {
String ip = "192.168.1.19:2181,192.168.1.32:2181,192.168.1.33:2181";
int ms = 200000;
ZooKeeper zk ;
@Before
public void init() throws IOException, InterruptedException {
zk = new ZooKeeper(ip, ms, new Watcher() {
/**
*编写需要注册的监听事件
*/
@Override
public void process(WatchedEvent event) {
/**
* 重写的process方法用于注册监听事件,监听的是 "/" 节点的变化,当被监听的节点每次发生变化时,该方法就会被执行一次,
* 前提是getChild()中的Thread.sleep(Integer.MAX_VALUE);够长,程序还没结束。
*/
List<String> children = null;
try {
children = zk.getChildren("/", true);
System.out.println("#####################");
for (String child : children) {
System.out.println(child);
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
/**
* 获取指定节点的数据
*/
@Test
public void getChild() throws InterruptedException, KeeperException {
List<String> children = zk.getChildren("/", true);
for (String child : children) {
System.out.println(child);
}
// 用于将当前线程休眠,否则当监听的节点发生改变时,程序已经结束,就获取不到zookeeper发来的通知消息了
Thread.sleep(Integer.MAX_VALUE);
}
}
测试结果:
案例4:判断指定节点是否存在
java代码:
package com.aliang.zk;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.List;
public class ZKClient {
String ip = "192.168.1.19:2181,192.168.1.32:2181,192.168.1.33:2181";
int ms = 200000;
ZooKeeper zk ;
@Before
public void init() throws IOException, InterruptedException {
zk = new ZooKeeper(ip, ms, null);
}
/**
* 判断指定节点是否存在
*/
@Test
public void isExist() throws InterruptedException, KeeperException {
Stat exists = zk.exists("/pp", false); // false表示不使用监听器
System.out.println(exists.getDataLength());
}
}
测试结果:
1.12、写数据原理
场景1:3台zookeeper服务器,客户端发起对zookeeper的写操作到leader服务器
-
客户端发送写请求到leader
-
leader开始写入znode节点
-
leader通知第一台follower服务器,告知需要写入的数据
-
第一台follower服务器,写入完毕,返回一个ack的确认包到leader,表示已经写入完成
-
leader根据接收到的ack包数量,判断执行了写操作的zk服务器(包括leader和follower)是否已经达到了集群的半数(2),达到了就返回ack包到客户端,表示写操作已完成,这样的好处就是效率高,不用等待所有follower都写入完成,只要超过半数,就可以通知客户端了
-
leader继续通知其它的follower服务器,写入数据
-
其它的follower服务器写入完毕,发送ack确认包,告知leader,已经写入完成
-
至此,客户端的写入操作,执行完毕
场景2:3台zookeeper服务器,客户端发起对zookeeper的写操作到follower服务器
- 客户端发送写请求到follower
- follower将请求转发到leader,让leader安排
- leader开始写入znode节点
- leader开始通知第一台follower服务器,告知需要写入的数据
- 第一台follower服务器,写入完毕,返回一个ack的确认包到leader,表示已经写入完成
- leader根据接收到的ack包数量,判断执行了写操作的zk服务器(包括leader和follower)是否已经达到了集群的半数(2),达到了就返回ack包到接收写入请求的follower服务器,表示写操作已完成,可以响应客户端了
- leader继续通知其它的follower服务器,写入数据
- 其它的follower服务器写入完毕,发送ack确认包,告知leader,已经写入完成
- 至此,客户端的写入操作,执行完毕
1.13、服务器动态上下线案例学习
该案例的目的主要是巩固学习临时节点与监听器。
- 运行zk集群
- 编写客户端的逻辑代码
- 客户端连接zk集群(new zookeeper(IP,TimeOut,Watcher))
- 客户端添加监听事件,监听指定节点的变化,并打印输出节点名称(zk.getChildren(NodePath,Watcher))
- 编写服务端的逻辑代码
- 服务端连接zk集群(new zookeeper(IP,TimeOut,Watcher))
- 服务端在指定的节点下创建一个临时带序号的节点,模拟服务上线,程序执行完毕临时节点被删除,模拟服务下线(zkClient.create(NodeName,NodeValue,ACLType.NodeType))
- 客户端接收到zk返回的监听器消息,再次执行监听事件,打印输出节点名称,展示当前被监听的节点下的节点内容的变化
- 上线就是新增了节点,下线就是节点被删除
服务端代码,可在命令行输入不同主机名去测试,此处拿单个举例
package com.aliang.zk.Case1_OnlineStatusChange;
import org.apache.zookeeper.*;
import java.io.IOException;
public class Zk_Provider {
String ip = "192.168.1.19:2181,192.168.1.34:2181,192.168.1.33:2181";
int ms = 200000;
ZooKeeper zk ;
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
//1、获取zk连接
Zk_Provider provider = new Zk_Provider();
provider.getConnect();
//2、注册服务到zk服务器
provider.regist(args[0]); // 将命令行的主机名称参数传递到方法中
///3、启动业务逻辑
provider.service();
}
private void service() throws InterruptedException {
Thread.sleep(Integer.MAX_VALUE);
}
/**
* 用于注册服务到zk服务器
* @param hostName 服务名称
*/
private void regist(String hostName) throws InterruptedException, KeeperException {
//1、创建一个节点,表示服务上线,用于消费端订阅
zk.create("/servers/" hostName,hostName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("主机【" hostName "】已上线!");
}
private void getConnect() throws IOException {
zk = new ZooKeeper(ip, ms, new Watcher() {
@Override
public void process(WatchedEvent event) {
}
});
}
}
消费端代码:
package com.aliang.zk.Case1_OnlineStatusChange;
import org.apache.zookeeper.*;
import java.io.IOException;
import java.util.List;
public class Zk_Consumer {
String ip = "192.168.1.19:2181,192.168.1.34:2181,192.168.1.33:2181";
int ms = 200000;
ZooKeeper zk ;
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
//1、获取zk连接
Zk_Consumer provider = new Zk_Consumer();
provider.getConnect();
//2、订阅指定节点
provider.distribute();
///3、启动业务逻辑
provider.service();
}
private void service() throws InterruptedException {
Thread.sleep(Integer.MAX_VALUE);
}
/**
* 用于获取指定节点下的子节点
*/
private void distribute() throws InterruptedException, KeeperException {
//1、获取指定节点下的节点列表
zk.getChildren("/servers", true);
}
private void getConnect() throws IOException {
zk = new ZooKeeper(ip, ms, new Watcher() {
@Override
public void process(WatchedEvent event) {
// 注册一个监听指定节点的事件
List<String> children = null;
try {
children = zk.getChildren("/servers", true);
if (children.size()!=0){
System.out.println("********被监听节点的当前子节点列表********");
for (String child : children) {
System.out.println("child:【" child "】");
}
System.out.println("**************************************");
}else{
System.out.println("状态码:" event.getState() ",表示服务已全部下线...");
}
}else{
System.out.println("被监听节点的子节点已清空");
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
测试1:当前zk的/servers节点下子节点
测试2:启动服务端开启注册一个节点
测试3:启动消费端监听节点上下线
测试4:再注册一个服务端,查看消费端的打印信息
测试5:服务下线
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhhhehcc
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
微信运动停用后别人还能看到步数吗
PHP中文网 07-22