Zookeeper极快入门学习笔记
第 3 章 服务器动态上下线监听案例
需求分析
某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知 到主节点服务器的上下线
具体实现
-
先在集群上创建/servers 节点
[zk: localhost:2181(CONNECTED) 10] create /servers "servers" Created /servers
-
服务器端向 Zookeeper 注册代码
/** * @author cVzhanshi * @create 2021-11-23 16:58 */ public class DistributeServer { // 注意:逗号前后不能有空格 private String connectString = "centos7.9_1:2181,centos7.9_2:2181,centos7.9_3:2181"; private int sessionTimeout = 2000; private ZooKeeper zk; private String parentNode = "/servers"; public static void main(String[] args) throws Exception { // 1 获取 zk 连接 DistributeServer server = new DistributeServer(); server.getConnect(); // 2 利用 zk 连接注册服务器信息 server.registerServer(args[0]); // 3 启动业务功能 server.business(args[0]); } // 业务功能 public void business(String hostname) throws Exception{ System.out.println(hostname " is working ..."); Thread.sleep(Long.MAX_VALUE); } // 注册服务器 public void registerServer(String hostname) throws Exception{ String create = zk.create(parentNode "/" hostname, hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(hostname " is online " create); } // 创建到 zk 的客户端连接 public void getConnect() throws IOException { zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent event) { } }); } }
-
客户端实现代码
/** * @author cVzhanshi * @create 2021-11-23 17:03 */ public class DistributeClient { // 注意:逗号前后不能有空格 private String connectString = "centos7.9_1:2181,centos7.9_2:2181,centos7.9_3:2181"; private int sessionTimeout = 2000; private ZooKeeper zk; private String parentNode = "/servers"; public static void main(String[] args) throws Exception { // 1 获取 zk 连接 DistributeClient client = new DistributeClient(); client.getConnect(); // 2 获取 servers 的子节点信息,从中获取服务器信息列表 client.getServerList(); // 3 业务进程启动 client.business(); } // 创建到 zk 的客户端连接 public void getConnect() throws IOException { zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent event) { // 因为注册一次 监听一次,所有放入这里面可以一直监听 try { getServerList(); } catch (Exception e) { e.printStackTrace(); } } }); } // 业务功能 public void business() throws Exception{ System.out.println("client is working ..."); Thread.sleep(Long.MAX_VALUE); } // 获取服务器列表信息 public void getServerList() throws Exception { // 1 获取服务器子节点信息,并且对父节点进行监听 List<String> children = zk.getChildren(parentNode, true); // 2 存储服务器信息列表 ArrayList<String> servers = new ArrayList<>(); // 3 遍历所有节点,获取节点中的主机名称信息 for (String child : children) { byte[] data = zk.getData(parentNode "/" child, false, null); servers.add(new String(data)); } // 4 打印服务器列表信息 System.out.println(servers); } }
测试
在 Linux 命令行上操作增加减少服务器
-
启动 DistributeClient 客户端
-
在 centos101 上 zk 的客户端/servers 目录上创建临时带序号节点
[zk: localhost:2181(CONNECTED) 0] create -e -s /servers/hadoop101 "hadoop101" Created /servers/hadoop1010000000007 [zk: localhost:2181(CONNECTED) 1] create -e -s /servers/hadoop102 "hadoop102" Created /servers/hadoop1020000000008
-
观察 Idea 控制台变化
[] [] client is working ... [hadoop101] [hadoop101, hadoop102]
-
执行删除操作
[zk: localhost:2181(CONNECTED) 4] delete /servers/hadoop1010000000007
-
观察 Idea 控制台变化
[] [] client is working ... [hadoop101] [hadoop101, hadoop102] [hadoop102]
在 Idea 上操作增加减少服务器
-
启动 DistributeClient 客户端(如果已经启动过,不需要重启)
-
通过Edit Configurations传入参数
-
启动 DistributeServer 服务
-
观察 DistributeServer 控制台
hadoop103 is online /servers/hadoop1030000000009 hadoop103 is working .
-
观察 DistributeClient 控制台
[] [] client is working ... [hadoop101] [hadoop101, hadoop102] [hadoop102] [hadoop102, hadoop103]
第 4 章 ZooKeeper 分布式锁案例
分布式锁概述
比如说"进程 1"在使用该资源的时候,会先去获得锁,"进程 1"获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,"进程 1"用完该资源以后就将锁释放掉,让其 他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的 访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁。
分布式锁案例分析
- 接收到请求后,在/locks节点下创建一个临时顺序节点
- 判断自己是不是当前节点下最小的节点:是,获取到锁;不是,对前一个节点进行监听
- 获取到锁,处理完业务后,delete节点释放锁,然后 下面的节点将收到通知,重复第二步判断
4.1 原生 Zookeeper 实现分布式锁案例
分布式锁实现
/**
* @author cVzhanshi
* @create 2021-11-24 10:21
*/
public class DistributeLock {
private final String connectString = "centos7.9_1:2181,centos7.9_2:2181,centos7.9_3:2181";
private final int sessionTimeout = 2000;
private final ZooKeeper zk;
private String waitPath;
private String currentMode;
private CountDownLatch connectLatch = new CountDownLatch(1);
private CountDownLatch waitLatch = new CountDownLatch(1);
public DistributeLock() throws IOException, InterruptedException, KeeperException {
// 获取连接
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
// connectLatch 如果连接上zk 可以释放
if(Event.KeeperState.SyncConnected == event.getState()){
//如果收到了服务端的响应事件,连接成功
connectLatch.countDown();
}
if(event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)){
// waitLatch 需要释放
waitLatch.countDown();
}
}
});
// 等待zk正常连接后,往下走程序
connectLatch.await();
//CONNECTED
System.out.println(zk.getState());
// 判断根节点/locks是否存在
Stat stat = zk.exists("/locks", false);
if(stat == null){
// 不存在 创建根节点
zk.create("/locks","locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
}
}
// 对zk加锁
public void zkLock(){
// 创建对应的临时带序号节点
try {
// /locks/seq-0000000002
currentMode = zk.create("/locks/" "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
//System.out.println(currentMode);
// 判断创建的节点是否是最小的序号节点,如果是获取到锁,如果不是,监听他序号前一个节点
List<String> children = zk.getChildren("/locks", false);
// 如果children 只有一个值,那就直接获取锁; 如果有多个节点,需要判断,谁最小
if(children.size() == 1){
return;
}else{
Collections.sort(children);
// 获取节点名称 seq-00000000
String thisNode = currentMode.substring("/locks/".length());
// 通过seq-00000000获取该节点在children集合的位置
int index = children.indexOf(thisNode);
if(index == -1){
System.out.println("数据异常");
}else if(index == 0) {
// 就一个节点
return;
}else{
// 需要监听 他前一个节点变化
waitPath = "/locks/" children.get(index - 1);
zk.getData(waitPath,true,new Stat());
// 等待监听
waitLatch.await();
return;
}
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 解锁
public void unZkLock(){
// 删除节点
try {
zk.delete(this.currentMode,-1);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
}
测试代码:
/**
* @author cVzhanshi
* @create 2021-11-24 11:04
*/
public class DistributeLockTest {
public static void main(String[] args) throws InterruptedException, IOException, KeeperException {
final DistributeLock lock1 = new DistributeLock();
final DistributeLock lock2 = new DistributeLock();
new Thread(() -> {
try {
lock1.zkLock();
System.out.println("线程1 启动 获取到了锁");
Thread.sleep(5000);
lock1.unZkLock();
System.out.println("线程1 释放锁");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
lock2.zkLock();
System.out.println("线程2 启动 获取到了锁");
Thread.sleep(5000);
lock2.unZkLock();
System.out.println("线程2 释放锁");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
4.2 Curator 框架实现分布式锁案例
原生的 Java API 开发存在的问题
- 会话连接是异步的,需要自己去处理。比如使用 CountDownLatch
- Watch 需要重复注册,不然就不能生效
- 开发的复杂性还是比较高的
- 不支持多节点删除和创建。需要自己去递归
Curator 是一个专门解决分布式锁的框架,解决了原生 JavaAPI 开发分布式遇到的问题
- 官方文档:https://curator.apache.org/index.html
Curator 案例实操
-
添加依赖
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.3.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.3.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-client</artifactId> <version>4.3.0</version> </dependency>
-
代码实现
/** * @author cVzhanshi * @create 2021-11-24 11:42 */ public class CuratorLockTest { private static final String connectString = "centos7.9_1:2181,centos7.9_2:2181,centos7.9_3:2181"; private static final int sessionTimeout = 2000; public static void main(String[] args) { // 创建分布式锁1 InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks"); // 创建分布式锁2 InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks"); new Thread(new Runnable() { @Override public void run() { try { lock1.acquire(); System.out.println("线程1 获取到锁"); lock1.acquire(); System.out.println("线程1 再次获取到锁"); Thread.sleep(5 * 1000); lock1.release(); System.out.println("线程1 释放锁"); lock1.release(); System.out.println("线程1 再次释放锁"); } catch (Exception e) { e.printStackTrace(); } } }).start(); new Thread(new Runnable() { @Override public void run() { try { lock2.acquire(); System.out.println("线程2 获取到锁"); lock2.acquire(); System.out.println("线程2 再次获取到锁"); Thread.sleep(5 * 1000); lock2.release(); System.out.println("线程2 释放锁"); lock2.release(); System.out.println("线程2 再次释放锁"); } catch (Exception e) { e.printStackTrace(); } } }).start(); } private static CuratorFramework getCuratorFramework() { ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3); CuratorFramework client = CuratorFrameworkFactory.builder().connectString(connectString) .connectionTimeoutMs(2000) .sessionTimeoutMs(sessionTimeout) .retryPolicy(policy).build(); // 启动客户端 client.start(); System.out.println("zookeeper 启动成功"); return client; } }
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhhhefbh
系列文章
更多
同类精品
更多
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
微信运动停用后别人还能看到步数吗
PHP中文网 07-22 -
excel打印预览压线压字怎么办
PHP中文网 06-22