1.需求:某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线
2.服务端业务
package
com.cevent.zookeeper.cases;
import
org.apache.zookeeper.CreateMode;
import
org.apache.zookeeper.WatchedEvent;
import
org.apache.zookeeper.Watcher;
import
org.apache.zookeeper.ZooKeeper;
import
org.apache.zookeeper.ZooDefs.Ids;
/**
*
zookeeper服务端
* @author cevent
* @date 2020年4月19日
*/
public class ZookeeperServer {
private String connectString="hadoop202.cevent.com:2181,hadoop203.cevent.com:2181,hadoop204.cevent.com:2181";
private int sessionTimeout=2000;
//zookeeper中客户端即是服务器,都需要通过zk来注册
private ZooKeeper zkServer=null;
//定义父节点
private String parentNode="/ceventServers";
//1.获取连接
public void getConnect() throws
Exception{
zkServer= new ZooKeeper(connectString, sessionTimeout, new Watcher(){
@Override
public void process(WatchedEvent event) {
// 监听事件
System.out.println("监听启动...");
}
});
}
//2.注册信息
public void register(String hostName) throws
Exception{
//CreateMode.PERSISTENT_SEQUENTIAL有序的命名队列类型
String createRegister=zkServer.create(parentNode+"/ceventServer",
hostName.getBytes(),
Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT_SEQUENTIAL);
System.out.println(hostName+"
is onLine "+createRegister);
}
//3.具体业务
public void business() throws
Exception{
System.out.println("zookeeper开始业务作业...");
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String[] args) throws Exception
{
ZookeeperServer zookeeperServer=new ZookeeperServer();
//1.获取连接
zookeeperServer.getConnect();
//2.注册(创建路径节点),运行一次传入一个hostName
zookeeperServer.register(args[0]);
//3.业务逻辑
zookeeperServer.business();
}
}
3.客户端业务
package com.cevent.zookeeper.cases;
import
java.util.ArrayList;
import
java.util.List;
import
org.apache.zookeeper.WatchedEvent;
import
org.apache.zookeeper.Watcher;
import
org.apache.zookeeper.ZooKeeper;
public class ZookeeperServerClient {
private String connectString="hadoop202.cevent.com:2181,hadoop203.cevent.com:2181,hadoop204.cevent.com:2181";
private int sessionTimeout=2000;
private ZooKeeper zkClient=null;
private String parentNode="/ceventServers";
//1.获取链接
private void getConnectClient() throws
Exception{
zkClient=new ZooKeeper(connectString, sessionTimeout, new Watcher(){
@Override
public void process(WatchedEvent event) {
// 触发监听
System.out.println(event.getType()+"...客户端监听触发..."+event.getPath());
// 一直监听数据变化
try {
//客户端循环执行,获取服务端数据
getServers();
}
catch (Exception e) {
// TODO
Auto-generated catch block
e.printStackTrace();
}
}
});
}
//2.监听节点变化,获取服务器数据
public void getServers() throws
Exception{
//获取所有节点
List<String> serverChildren=zkClient.getChildren(parentNode, true);
//保存节点数据:IP地址
ArrayList<String> servers=new ArrayList<>();
for(String child:serverChildren){
//获取节点数据:模拟输出子节点路径,child=ip地址
byte[] data=zkClient.getData(parentNode+"/"+child, false, null);
//保存信息
servers.add(new String(data));
}
//打印数据
System.out.println(servers);
}
//3.业务需求
public void businessClient() throws Exception
{
System.out.println("进行客户端业务处理中....");
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String[] args) throws Exception
{
//1.获取链接
ZookeeperServerClient zkServerClient=new ZookeeperServerClient();
zkServerClient.getConnectClient();
//2.监听节点变化
zkServerClient.getServers();
//3.实现客户端业务
zkServerClient.businessClient();
}
}
4.run
5.linux中直接添加客户端
WATCHER::
WatchedEvent state:SyncConnected
type:None path:null
[zk: localhost:2181(CONNECTED) 0] ls /
[sanguo, ceventServers, cevent,
zookeeper]
[zk: localhost:2181(CONNECTED) 1] ls
/cevent
ceventServers cevent
[zk: localhost:2181(CONNECTED) 1] ls /ceventServers
[ceventServer0000000001, ceventServer0000000000]
[zk: localhost:2181(CONNECTED) 2] create
/cevent
ceventServers cevent
[zk: localhost:2181(CONNECTED) 2] create
/ceventServers/ceventServer000000000
ceventServer0000000001 ceventServer0000000000
[zk: localhost:2181(CONNECTED) 2] create /ceventServers/ceventServer hadoop301
Created /ceventServers/ceventServer
服务端响应(监听添加的客户端信息)
删除临时创建的节点
WatchedEvent state:SyncConnected
type:None path:null
[zk: localhost:2181(CONNECTED) 0] ls /
[sanguo, ceventServers, cevent, zookeeper]
[zk: localhost:2181(CONNECTED) 1] ls
/cevent
ceventServers cevent
[zk: localhost:2181(CONNECTED) 1] ls /ceventServers
[ceventServer, ceventServer0000000001,
ceventServer0000000000]
[zk: localhost:2181(CONNECTED) 2] delete
/cevent
ceventServers cevent
[zk: localhost:2181(CONNECTED) 2] delete /ceventServers/ceventServer
ceventServer ceventServer0000000001 ceventServer0000000000
[zk: localhost:2181(CONNECTED) 2] delete /ceventServers/ceventServer
[zk: localhost:2181(CONNECTED) 3] ls /cevent
ceventServers cevent
[zk: localhost:2181(CONNECTED) 3] ls /ceventServers
[ceventServer0000000001, ceventServer0000000000]
服务端监听响应
版权声明:本文为weixin_37056888原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。