• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Zookeeper学习笔记

武飞扬头像
阿莨去爬山了
帮助1

1.Zookeeper学习笔记

1.1、概述

Zookeeper时一个开源的分布式的,为分布式框架提供协调服务的Apache项目。

Zookeeper的工作机制:

从设计模式角度来说:是基于观察者模式设计的分布式服务管理框架,它负责存储和管理大家都关心的数据,然后接收观察者的注册,一旦这些数据的状态发生改变,Zookeeper就将负责通知已经在Zookeeper上注册的哪些观察者做出相应的反应。

1.2、特点

学新通

1)、一个领导者(Leader),多个跟随者(Follower)组成的集群。

2)、集群中只要有半数以上节点存活,Zookeeper集群就能正常服务,所以Zookeeper适合安装奇数台服务器。

3)、全局数据一致:每个Server保存一份相同的数据副本,Client无论连接到哪个Server,数据都是一致的。

4)、更新请求顺序执行:来自同一个Cilent的更新请求按其发送顺序依次执行。

5)、数据更新原子性:依次数据更新要么成功,要么失败。

6)、实时性:在一定时间范围内,Cilent能读到最新数据。

1.3、数据结构

Zookeeper数据模型 的结构与Unix文件系统很类似,整体可以看作是一棵树,每个节点称作一个ZNode,每个ZNode默认能存储1MB的数据,每个ZNode都可以通过其路径唯一标识。

学新通

1.4、应用场景

提供的服务包括:统一命名服务(例如:域名与IP的关系),统一配置管理(所有节点配置信息一致),统一集群管理,服务器节点动态上下线,软负载均衡等。

1.5、下载地址

https://archive.apache.org/dist/zookeeper/zookeeper-3.5.7/

学新通

1.6、本地模式安装与配置

1.6.1、下载

将下载好的压缩包,上传到Linux的指定路径

学新通

1.6.2、解压文件

tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz

1.6.3、修改解压后的文件夹名

mv apache-zookeeper-3.5.7-bin/ zookeeper-3.5.7

学新通

1.6.4、修改配置
1.6.4.1、首先进入conf目录

cd conf

1.6.4.2、修改zoo_sample.cfg的名字为zoo.cfg

mv sample.cfg zoo.cfg

1.6.4.3、使用vim编辑这个文件修改zookeeper的数据存放路径

vim zoo.cfg

数据的默认存放路径是/tmp/zookeeper,/tmp是linux的临时文件存放路径,tmp目录下的文件数据一个月之后会自动清除,所以更换路径的目的,就是不让zk数据被系统自动清除。

学新通

1.6.4.4、在zookeeper的安装目录下新建数据存放目录

mkdir zkData

学新通

1.6.4.5、再回到配置文件中,修改存放路径

学新通

1.6.5、配置参数说明

学新通

tickTime=2000: 通信心跳时间 Zookeeper服务器与客户端心跳时间,单位为毫秒

initLimit=10: LF(Learder Follower的简称)初始通信时限(即超过了10个心跳时间也就是20秒没有建立连接,即本次通信失败)

syncLimit=5: LF同步通信时限(已经建立好了连接的服务通信时的心跳时间不能超过5个心跳时间,即10秒,超过就表示通信异常)

dataDir: 保存Zookeeper数据的路径,一般默认是tmp目录,但容易被linux定期删除,所以需要新建存放目录

clientPort = 2181: 客户端端口号,一般不做修改

1.7、本地启动

zookeeper的启动需要先启动服务端,再启动客户端。

**注意:**zookeeper部署后, 3.5以后的版本, 会自动占用8080端口. 整合了Zookeeper的项目需要修改配置文件更改服务器端口。否则zk服务器启动不起来。

1.7.1、服务端启动:

**注意:**服务端启动需要加上start,表示启动zookeeper服务

status表示查询zookeeper状态

stop表示停止zookeeper服务

学新通

使用jps命令查看是否启动成功,成功后会出现zk进程

学新通

1.7.2、客户端启动:

**注意:**客户端启动不需要加start

学新通

客户端启动成功后会连接上服务端,输入ls /能查看当前的zookeeper节点

学新通

使用quit退出该界面。

1.8、集群安装与配置

具有Hadoop的基础,会方便很多,我没有学就用普通方法搭建了

1.8.1、集群规划

三台linux系统,都部署zookeeper服务器。

1.8.2、配置服务器编号

在各个主机的zookeeper目录下创建zkData文件夹

在各个主机的zkData目录下创建myid文件

编辑各个主机中的该文件,在文件中给定一个编号值即可,如:1号主机给1,2号主机给2以此类推。

**注意:**给定的编号上下左右不能有空格。

1.8.3、配置zoo.cfg文件

在各个主机中的zoo.cfg文件中添加

################Cluster###########

server.1=192.168.1.19:2888:3888
server.2=192.168.1.32:2888:3888
server.3=192.168.1.33:2888:3888

注意:格式为server.[myid文件中的编号]=[主机ip]:[Leader与Follower交换数据的端口]:[重新选举Leader时的通信端口]

zk根据这部分配置的server个数,判断集群中的服务数量,为Leader的选举做辅助。

1.8.4、运行各个主机的zookeeper服务

bin/zkServer.sh start 运行Zookeeper

学新通

查询启动与集群状态:

bin/zkServer.sh status 查看运行状态

学新通

若是出现这个情况,说明是因为启动的是第一台zk服务,zk的特性之一是:zk集群超过半数服务不可用会导致整个集群不可用。

所以,再继续把其它主机的zk服务启动起来就行。

启动完成后,查看各个主机的zk状态如下:

学新通

**注意:**在此之前最好将防火墙关闭掉。

出现该情况,说明zk集群已经成功运行起来了。启动两台之后已经选举出来一个leader了,再启动的zk服务只能是follower。

1.9、集群概念与使用

1.9.1、集群中的几个概念:

**SID:**服务器ID,用来唯一标识一台ZK集群中的机器,每台机器不饿能重复,和myid一致。

**ZXID:**事务ID,ZXID时一个事务ID,用来标识一次服务器状态的变更,再某一时刻,集群中的每台机器的ZXID值不一定完全一致,这和ZK服务器对于客户端“更新请求”的处理逻辑有关。

**Epoch:**每个LEADER任期的代号。没有Leader时同一投票过程中的逻辑时钟值是相同的。每投完一次票,这个数据就会增加。

1.9.2、当有五台zk服务器时,zk选举机制-第一次启动:

**1)、**服务器1启动,发起一次选举,服务器1投自己一票,此时服务器1票数一票,不够半数以上(3票),选举无法完成,服务器1保持状态LOOKING

**2)、**服务器2启动,再发起一次选举,服务器1和服务器2分别头自己一票,并交换选票信息,此时服务器1发现服务器2的myid比自己目前投票推举的(服务器1)大,更改选票为推举服务器2,此时服务器1票数0票,服务器2票数2票,没有半数以上结果,选举无法完成,服务器1服务器2保持状态LOOKING

**3)、**服务器3启动,发起一次选举,此时服务器1和服务器2都会更改选票为服务器3,此时投票结果:服务器1为0票,服务器2为0票,服务器3为3票,此时,服务器3的票数已经超过半数,服务器3当选Leader,服务器1和2更改状态为FOLLOWING ,服务器3更改状态为LEADING

**4)、**服务器4启动,发起一次选举。此时服务器1,2,3已经不是LOOKING状态,不会更改选票信息。交换选票信息结果:服务器3为3票,服务器4为1票。此时服务器4服从多数,更改选票信息为服务器3,并更改状态为FOLLOWING;

**5)、**和服务器4一样,一旦集群中选举出了Leader,后面加入的都会变成Follower

1.9.3、当有五台zk服务器时,zk选举机制-非第一次启动

1)、当ZK集群中的一台服务器出现以下两种情况之一时,会开始进入Leader选举

  • 服务器初始化启动
  • 服务器运行期间无法和Leader保持连接

2)、当一台机器进入Leader选举流程时,当前集群也可能会处于两种情况:

  • 集群中本来就已经存在一个Leader

    对于已经存在Leader的情况,机器试图去选举Leader时,会被告知当前服务器的Leader 信息,对于该机器来说,仅仅需要和Leader机器建立连接,并进行状态同步即可。

  • 集群中确实不存在Leader

    还能工作的ZK服务,按照规则选举Leader:①选举Epoch大的 (Epoch)②Epoch相同,则选举事务id大的(ZXID)③事务id相同,则选择服务器id大的(SID)

    例如:假设Zookeeper有5台服务器,SID分别是1,2,3,4,5,ZXID分别是8,8,8,7,7,并且此时SID为3的服务器是Leader,此时3和5出故障,此时根据选举规则选择Leader时,1号(1,8,1)2号(1,8,2)4号(1,7,4),最后被选举成为Leader的是2号服务器。

1.9.4、ZK集群启动停止脚本

当zk服务器多的时候,比如100台,一个个去启动和停止是一件非常无聊的事情,那么,一键启动与停止的脚本就非常有必要了。

1.9.4.1、未设置免密登录的脚本:

在使用之前,需要安装sshpass

Linux系统中,使用yum install sshpass 即可安装

#!/bin/bash
case $1 in
"start"){
  for i in 192.168.1.19 192.168.1.32 192.168.1.33
  do
  	echo -------------- Zookeeper $i 启动 ---------------
    sshpass -p 密码 ssh -o StrictHostKeyChecking=no $i "/root/Zookeeper/zookeeper-3.5.7/bin/zkServer.sh start"
  done
}
;;
"stop"){
  for i in 192.168.1.19 192.168.1.32 192.168.1.33
  do
  	echo -------------- Zookeeper $i 停止 ---------------
  	sshpass -p 密码 ssh -o StrictHostKeyChecking=no $i "/root/Zookeeper/zookeeper-3.5.7/bin/zkServer.sh stop"
  done
}
;;
"status"){
  for i in 192.168.1.19 192.168.1.32 192.168.1.33
  do
  	echo -------------- Zookeeper $i 状态 ---------------
  	sshpass -p 密码 ssh -o StrictHostKeyChecking=no $i "/root/Zookeeper/zookeeper-3.5.7/bin/zkServer.sh status"
  done
}
;;
esac
学新通
1.9.4.2、设置了免密登录的脚本:
#!/bin/bash
case $1 in
"start"){
	for i in 192.168.1.19 192.168.1.32 192.168.1.33
	do
		echo ----------------Zookeeper $i 启动 ---------------------
		ssh $i "/root/Zookeeper/zookeeper-3.5.7/bin/zkServer.sh start"
	done
}
;;
"stop"){
	for i in 192.168.1.19 192.168.1.32 192.168.1.33
	do
		echo ----------------Zookeeper $i 停止 ---------------------
		ssh $i "/root/Zookeeper/zookeeper-3.5.7/bin/zkServer.sh stop"
	done
}
;;
"status"){
	for i in 192.168.1.19 192.168.1.32 192.168.1.33
	do
		echo ----------------Zookeeper $i 状态 ---------------------
		ssh $i "/root/Zookeeper/zookeeper-3.5.7/bin/zkServer.sh status"
	done
}
;;
esac
学新通

注意:

1)、将上面的脚本命名为zk.sh放置在/home/zk/bin 目录下

2)、使用chmod 777 zk.sh 修改该脚本权限

3)、使用jpsall 查看多个hadoop进程

4)、使用zk.sh start命令即可启动多个主机的zk服务

5)、使用zk.sh stop 命令即可停止多个主机的zk服务

6)、使用zk.sh status 命令即可查看多个主机上zk服务的状态,角色类型

1.9.4.3、以上脚本可能遇到的问题**:**
1)、JAVA_HOME设置问题

学新通

解决办法:

  • 1、使用witch java,获取java安装路径

学新通

  • 2、编辑zk的bin目录下的zkEnv.sh,添加JAVA_HOME变量值:

学新通

2)、某一个未启动成功报错

学新通

原因是启动失败的那台机器没有关闭防火墙,关闭防火墙即可重新启动成功。

1.9.4.4、启动成功后

学新通

1.10、客户端命令行学习

1.10.1、连接指定zk服务器

当使用客户端连接zk服务器时,上面显示的是localhost

./bin/zkCli.sh 命令执行后,得到了以下的截图

学新通

如果需要显示zk服务器的ip地址,可以使用

./bin/zkCli.sh -server [ip地址]:[端口号2181] 使用这个命令就可以连接到指定的ZK服务器

学新通

1.10.1、ls命令

ls -s [路径] 命令 查看指定路径的信息

学新通

信息项说明:

**cZxid:**创建节点的事务zxid,每次修改Zk状态都会长生一个zk事务id,事务id是zk中所有修改的总次序。每次修改都会有唯一的一个zxid,如果zxid1小于zxid2,

**ctime:**znode被创建的毫秒数,从1970年开始算

**mZxid:**znode最后更新的事务zxid

**mtime:**znode最后修改的毫秒数,从1970年开始算

**pZxid:**znode最后更新的子节点zxid

**cversion:**子节点变化号,znode子节点修改次数

**dataVersion:**znode数据变化号

**aclVersion:**znode访问控制列表的变化号

**ephemeral0wner:**如果是临时节点,这个是znode拥有者的session id,如果不是临时节点则值为0

**dataLenth:**znode的数据长度

**numChildren:**znode子节点数量

携带参数 -w:

用于创建监听器,并监听指定节点的变化

学新通

注意:该监听的通知只会发送一次,即再次更改监听节点的子节点时,不会收到通知了,若想再接收通知,必须要再注册一次监听事件。

1.10.2、get命令

用于获取指定路径下的节点的值

get [节点路径]

例如:

学新通

get的参数:

携带 -s 参数:

学新通

携带 -w 参数:

用于创建监听器,并指定监听的节点的值的变化

学新通

注意:该监听的通知只会发送一次,即再次修改/pp/p2的值时,不会收到通知了,若想再接收通知,必须要再注册一次监听事件。

1.10.3、delete命令

删除指定节点路径的节点

delete [节点路径]

例如:

学新通

**注意:**如果一个节点下还有节点,可以使用deleteall命令递归删除

1.10.4、set命令

用于修改指定节点路径中的节点的值

set [节点路径] [新的节点值]

例如:

学新通

1.10.5、create命令

用于创建节点

create [节点路径] [节点数据]

例如:

学新通

注意:

如果创建时不携带参数,那么生成的节点就是持久节点

若是创建时携带参数-s ,那么生成的节点就是持久带序号的节点,带了序号的节点不会重复

若是创建时携带参数-e ,那么生成的节点就是临时节点,断开连接后,数据就被删除了。

若是创建时,携带参数 -e -s ,那么生成的节点就是临时带序号的节点,带序号的节点不会重复,但断开连接后,也依旧会被删除。

1.10.6、quit命令

退出客户端

1.10.7、节点类型

节点类型分为:持久、临时

持久:客户端和服务器断开连接后,创建的节点不删除。

  • **持久目录节点:**客户端与zk断开连接后,该节点依旧存在。

  • **持久顺序编号目录节点:**客户端与zk断开连接后,该节点依旧存在,只是zk给该节点名称进行顺序编号。

临时:客户端和服务器断开连接后,创建的节点自己删除。

  • **临时目录节点:**客户端与zk断开连接后,该节点被删除。

  • 临时顺序编号目录节点:客户端与zk断开连接后,该节点被删除,只是zk给该节点名称进行顺序编号。

**说明:**创建znode时,设置顺序标识,znode名称后会附加一个值,顺序号时一个单词递增的计数器,由父节点维护。

**注意:**在分布式系统中,顺序号可以被用于为所有时间进行全局排序,这样客户端可以通过顺序号推断事件的顺序。

1.10.8、监听器

**监听器的作用:**用于保证zk对任何数据的任何改变都能快速的响应到监听了该节点的应用程序。

监听器原理:

1)、首先要有一个main线程

2)、在main线程中,创建zookeeper客户端,这时就会创建两个线程,一个负责网络通信(connect),一个负责监听(listener)

3)、通过connect线程将注册的监听事件发送给zookeeper,告知zookeeper,客户端需要监听的是哪个或哪几个节点的变化

4)、在zookeeper的注册监听器列表中,将注册的监听事件添加到列表中

5)、zookeeper监听到节点数据或者路径有变化,就会将这个消息发送给listener线程

6)、listener线程内部调用了process()方法,将消息告知客户端

具体操作可以参考2.11客户端API下的案例。

1.11、客户端API

案例1:使用客户端操作Zookeeper集群,创建节点,写入数据

Maven构建程序,POM文件如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.aliang</groupId>
    <artifactId>ZookeeperDemo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
    </properties>
    <dependencies>
        <!-- Zookeeper依赖包-->
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.5.7</version>
        </dependency>
        <!--Junit依赖包-->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.13.2</version>
        </dependency>
    </dependencies>
</project>
学新通

Java操作代码如下:

package com.aliang.zk;
import org.apache.zookeeper.*;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
public class ZKClient {
    String ip = "192.168.1.19:2181,192.168.1.32:2181,192.168.1.33:2181";
    int ms = 20000; // 注意:默认15秒,如果报连接不上的错误信息,请将该值调大点
    ZooKeeper zk ;
    @Before
    public void init() throws IOException, InterruptedException {
        zk = new ZooKeeper(ip, ms, null);
    }
    @Test
    public void createNode() throws InterruptedException, KeeperException {
        /**
         * create方法:参数1:节点路径,参数2:节点值,参数3:节点权限,参数4:节点类型
         *  参数3:从ZooDefs.Ids.这个类中指定一个权限,常用的有:
         *                                              *OPEN_ACL_UNSAFE-->表示这是一个完全开放的 ACL
         *                                              *CREATOR_ALL_ACL-->表示此ACL授予创建者身份所有权限
         *                                              *READ_ACL_UNSAFE-->表示此ACL赋予任何人只读的权限
         *                                              AUTH_IDS-->表示此Id仅可用于设置 ACL。它将被客户端验证的 ID 替换。
         *                                              ANYONE_ID_UNSAFE-->表示此ID代表任何人。
         *  参数4:从CreateMode类中指定一个节点类型,常用的有:
         *                                          *PERSISTENT-->持久节点类型
         *                                          *PERSISTENT_SEQUENTIAL-->持久带序号的节点类型
         *                                          *EPHEMERAL-->临时节点类型
         *                                          *EPHEMERAL_SEQUENTIAL-->临时带序号的节点类型
         */
        String s = zk.create("/JavaTestCreate2", "testString2".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        System.out.println(s); //输出/JavaTestCreate
    }
}

学新通

测试结果:

学新通

Zookeeper中查看是否新增成功:

学新通

案例2:获取子节点

POM文件依旧采用案例1的,此处不再赘述。

java代码如下:

package com.aliang.zk;
import org.apache.zookeeper.*;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.List;
public class ZKClient {
    String ip = "192.168.1.19:2181,192.168.1.32:2181,192.168.1.33:2181";
    int ms = 20000;
    ZooKeeper zk ;
    @Before
    public void init() throws IOException, InterruptedException {
        zk = new ZooKeeper(ip, ms, new Watcher() {
            @Override
            public void process(WatchedEvent event) {

            }
        });
    }
    /**
     * 获取指定节点的数据
     */
    @Test
    public void getChild() throws InterruptedException, KeeperException {
        /**
         * getChildren方法参数,
         *      参数1:需要获取的节点路径,
         *      参数2:可以为布尔值,也可以为new Watcher()对象,
         *            使用布尔值时,会自动使用初始化时,指定的Watcher对象,
         *            或者自己在这个位置创建一个Watcher
         */
        List<String> children = zk.getChildren("/", true);
        for (String child : children) {
            System.out.println(child);
        }
    }
}

学新通

测试结果:

学新通

案例3:监听指定节点的变化

监听器Watcher的使用

POM文件依旧采用案例1的,此处不再赘述。

Java代码如下:

package com.aliang.zk;
import org.apache.zookeeper.*;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.List;
public class ZKClient {
    String ip = "192.168.1.19:2181,192.168.1.32:2181,192.168.1.33:2181";
    int ms = 200000;
    ZooKeeper zk ;
    @Before
    public void init() throws IOException, InterruptedException {
        zk = new ZooKeeper(ip, ms, new Watcher() {
          	/**
          	 *编写需要注册的监听事件
          	 */
            @Override
            public void process(WatchedEvent event) {
               /**
                 *   重写的process方法用于注册监听事件,监听的是 "/" 节点的变化,当被监听的节点每次发生变化时,该方法就会被执行一次,
                 *	 前提是getChild()中的Thread.sleep(Integer.MAX_VALUE);够长,程序还没结束。
                 */
                List<String> children = null;
                try {
                    children = zk.getChildren("/", true);
                    System.out.println("#####################");
                    for (String child : children) {
                          System.out.println(child);
                      }
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }
    /**
     * 获取指定节点的数据
     */
    @Test
    public void getChild() throws InterruptedException, KeeperException {
        List<String> children = zk.getChildren("/", true);
        for (String child : children) {
            System.out.println(child);
        }
        // 用于将当前线程休眠,否则当监听的节点发生改变时,程序已经结束,就获取不到zookeeper发来的通知消息了
        Thread.sleep(Integer.MAX_VALUE);
    }
}

学新通

测试结果:

学新通

案例4:判断指定节点是否存在

java代码:

package com.aliang.zk;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.List;
public class ZKClient {
    String ip = "192.168.1.19:2181,192.168.1.32:2181,192.168.1.33:2181";
    int ms = 200000;
    ZooKeeper zk ;
    @Before
    public void init() throws IOException, InterruptedException {
        zk = new ZooKeeper(ip, ms, null);
    }
  /**
  	*	判断指定节点是否存在
  	*/
    @Test
    public void isExist() throws InterruptedException, KeeperException {
        Stat exists = zk.exists("/pp", false); // false表示不使用监听器
        System.out.println(exists.getDataLength());
    }
}

学新通

测试结果:

学新通

1.12、写数据原理

场景1:3台zookeeper服务器,客户端发起对zookeeper的写操作到leader服务器
  • 客户端发送写请求到leader

  • leader开始写入znode节点

  • leader通知第一台follower服务器,告知需要写入的数据

  • 第一台follower服务器,写入完毕,返回一个ack的确认包到leader,表示已经写入完成

  • leader根据接收到的ack包数量,判断执行了写操作的zk服务器(包括leader和follower)是否已经达到了集群的半数(2),达到了就返回ack包到客户端,表示写操作已完成,这样的好处就是效率高,不用等待所有follower都写入完成,只要超过半数,就可以通知客户端了

  • leader继续通知其它的follower服务器,写入数据

  • 其它的follower服务器写入完毕,发送ack确认包,告知leader,已经写入完成

  • 至此,客户端的写入操作,执行完毕

场景2:3台zookeeper服务器,客户端发起对zookeeper的写操作到follower服务器
  • 客户端发送写请求到follower
  • follower将请求转发到leader,让leader安排
  • leader开始写入znode节点
  • leader开始通知第一台follower服务器,告知需要写入的数据
  • 第一台follower服务器,写入完毕,返回一个ack的确认包到leader,表示已经写入完成
  • leader根据接收到的ack包数量,判断执行了写操作的zk服务器(包括leader和follower)是否已经达到了集群的半数(2),达到了就返回ack包到接收写入请求的follower服务器,表示写操作已完成,可以响应客户端了
  • leader继续通知其它的follower服务器,写入数据
  • 其它的follower服务器写入完毕,发送ack确认包,告知leader,已经写入完成
  • 至此,客户端的写入操作,执行完毕

1.13、服务器动态上下线案例学习

该案例的目的主要是巩固学习临时节点与监听器。

  • 运行zk集群
  • 编写客户端的逻辑代码
    • 客户端连接zk集群(new zookeeper(IP,TimeOut,Watcher))
    • 客户端添加监听事件,监听指定节点的变化,并打印输出节点名称(zk.getChildren(NodePath,Watcher))
  • 编写服务端的逻辑代码
    • 服务端连接zk集群(new zookeeper(IP,TimeOut,Watcher))
    • 服务端在指定的节点下创建一个临时带序号的节点,模拟服务上线,程序执行完毕临时节点被删除,模拟服务下线(zkClient.create(NodeName,NodeValue,ACLType.NodeType))
  • 客户端接收到zk返回的监听器消息,再次执行监听事件,打印输出节点名称,展示当前被监听的节点下的节点内容的变化
  • 上线就是新增了节点,下线就是节点被删除

服务端代码,可在命令行输入不同主机名去测试,此处拿单个举例

package com.aliang.zk.Case1_OnlineStatusChange;
import org.apache.zookeeper.*;
import java.io.IOException;
public class Zk_Provider {
    String ip = "192.168.1.19:2181,192.168.1.34:2181,192.168.1.33:2181";
    int ms = 200000;
    ZooKeeper zk ;
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        //1、获取zk连接
        Zk_Provider provider = new Zk_Provider();
        provider.getConnect();
        //2、注册服务到zk服务器
        provider.regist(args[0]); // 将命令行的主机名称参数传递到方法中
        ///3、启动业务逻辑
        provider.service();
    }
    private void service() throws InterruptedException {
        Thread.sleep(Integer.MAX_VALUE);
    }
    /**
     * 用于注册服务到zk服务器
     * @param hostName  服务名称
     */
    private void regist(String hostName) throws InterruptedException, KeeperException {
        //1、创建一个节点,表示服务上线,用于消费端订阅
        zk.create("/servers/" hostName,hostName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

        System.out.println("主机【" hostName "】已上线!");
    }
    private void getConnect() throws IOException {
        zk = new ZooKeeper(ip, ms, new Watcher() {
            @Override
            public void process(WatchedEvent event) {

            }
        });
    }
}
学新通

消费端代码:

package com.aliang.zk.Case1_OnlineStatusChange;
import org.apache.zookeeper.*;
import java.io.IOException;
import java.util.List;
public class Zk_Consumer {
    String ip = "192.168.1.19:2181,192.168.1.34:2181,192.168.1.33:2181";
    int ms = 200000;
    ZooKeeper zk ;
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        //1、获取zk连接
        Zk_Consumer provider = new Zk_Consumer();
        provider.getConnect();
        //2、订阅指定节点
        provider.distribute();
        ///3、启动业务逻辑
        provider.service();
    }
    private void service() throws InterruptedException {
        Thread.sleep(Integer.MAX_VALUE);
    }
    /**
     * 用于获取指定节点下的子节点
     */
    private void distribute() throws InterruptedException, KeeperException {
        //1、获取指定节点下的节点列表
        zk.getChildren("/servers", true);
    }
    private void getConnect() throws IOException {
        zk = new ZooKeeper(ip, ms, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                // 注册一个监听指定节点的事件
                List<String> children = null;
                try {
                    children = zk.getChildren("/servers", true);
                    if (children.size()!=0){
                        System.out.println("********被监听节点的当前子节点列表********");
                        for (String child : children) {
                            System.out.println("child:【" child "】");
                        }
                        System.out.println("**************************************");
                    }else{
                        System.out.println("状态码:" event.getState() ",表示服务已全部下线...");
                    }
                    }else{
                        System.out.println("被监听节点的子节点已清空");
                    }
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}
学新通

测试1:当前zk的/servers节点下子节点

学新通

测试2:启动服务端开启注册一个节点

学新通

学新通

测试3:启动消费端监听节点上下线

学新通

测试4:再注册一个服务端,查看消费端的打印信息

学新通

测试5:服务下线

学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhhhehcc
系列文章
更多 icon
同类精品
更多 icon
继续加载