1.需求:某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线

2.服务端业务




 
  
  package
  com.cevent.zookeeper.cases;
   
  import
  org.apache.zookeeper.CreateMode;
  import
  org.apache.zookeeper.WatchedEvent;
  import
  org.apache.zookeeper.Watcher;
  import
  org.apache.zookeeper.ZooKeeper;
  import
  org.apache.zookeeper.ZooDefs.Ids;
   
  /**
   *
  zookeeper服务端
   * @author cevent
   * @date 2020年4月19日
   */
  public class ZookeeperServer {
   
     private String connectString="hadoop202.cevent.com:2181,hadoop203.cevent.com:2181,hadoop204.cevent.com:2181";
     private int sessionTimeout=2000;
     //zookeeper中客户端即是服务器,都需要通过zk来注册
     private ZooKeeper zkServer=null;
     //定义父节点
     private String parentNode="/ceventServers";
     
     //1.获取连接
     public void getConnect() throws
  Exception{
        zkServer= new ZooKeeper(connectString, sessionTimeout, new Watcher(){
   
           @Override
           public void process(WatchedEvent event) {
              // 监听事件
              System.out.println("监听启动...");
           }
           
        });
     }
     
     //2.注册信息
     public void register(String hostName) throws
  Exception{
        //CreateMode.PERSISTENT_SEQUENTIAL有序的命名队列类型
        String createRegister=zkServer.create(parentNode+"/ceventServer", 
              hostName.getBytes(), 
              Ids.OPEN_ACL_UNSAFE,
              CreateMode.PERSISTENT_SEQUENTIAL);
        System.out.println(hostName+"
  is onLine "+createRegister);
     }
     
     //3.具体业务
     public void business() throws
  Exception{
        System.out.println("zookeeper开始业务作业...");
        Thread.sleep(Long.MAX_VALUE);
     }
     
     public static void main(String[] args) throws Exception
  {
        
        ZookeeperServer zookeeperServer=new ZookeeperServer();
        //1.获取连接
        zookeeperServer.getConnect();
        //2.注册(创建路径节点),运行一次传入一个hostName
        zookeeperServer.register(args[0]);
        //3.业务逻辑
        zookeeperServer.business();
     }
  }
   
  
 


3.客户端业务




 
  
  package com.cevent.zookeeper.cases;
   
  import
  java.util.ArrayList;
  import
  java.util.List;
   
  import
  org.apache.zookeeper.WatchedEvent;
  import
  org.apache.zookeeper.Watcher;
  import
  org.apache.zookeeper.ZooKeeper;
   
  public class ZookeeperServerClient {
   
     private String connectString="hadoop202.cevent.com:2181,hadoop203.cevent.com:2181,hadoop204.cevent.com:2181";
     private int sessionTimeout=2000;
     private ZooKeeper zkClient=null;
     private String parentNode="/ceventServers";
   
     //1.获取链接
     private void getConnectClient() throws
  Exception{
        zkClient=new ZooKeeper(connectString, sessionTimeout, new Watcher(){
   
           @Override
           public void process(WatchedEvent event) {
              // 触发监听
              System.out.println(event.getType()+"...客户端监听触发..."+event.getPath());
              // 一直监听数据变化
              try {
                  //客户端循环执行,获取服务端数据
                  getServers();
              }
  catch (Exception e) {
                  // TODO
  Auto-generated catch block
                  e.printStackTrace();
              }
           }
           
        });
     }
     
     //2.监听节点变化,获取服务器数据
     public void getServers() throws
  Exception{
        //获取所有节点
        List<String> serverChildren=zkClient.getChildren(parentNode, true);
        //保存节点数据:IP地址
        ArrayList<String> servers=new ArrayList<>();
        
        for(String child:serverChildren){
           //获取节点数据:模拟输出子节点路径,child=ip地址
           byte[] data=zkClient.getData(parentNode+"/"+child, false, null);
           
           //保存信息
           servers.add(new String(data));
        }
        
        //打印数据
        System.out.println(servers);
     }
     
     //3.业务需求
     public void businessClient() throws Exception
  {
        System.out.println("进行客户端业务处理中....");
        Thread.sleep(Long.MAX_VALUE);
     }
     
     public static void main(String[] args) throws Exception
  {
        //1.获取链接
        ZookeeperServerClient zkServerClient=new ZookeeperServerClient();
        zkServerClient.getConnectClient();
        
        //2.监听节点变化
        zkServerClient.getServers();
        
        //3.实现客户端业务
        zkServerClient.businessClient();
     }
  }
   
  
 


4.run

argument
服务端开启
客户端开启

5.linux中直接添加客户端




 
  
   
  WATCHER::
   
  WatchedEvent state:SyncConnected
  type:None path:null
  [zk: localhost:2181(CONNECTED) 0] ls /
  [sanguo, ceventServers, cevent,
  zookeeper]
  [zk: localhost:2181(CONNECTED) 1] ls
  /cevent
   
  ceventServers   cevent
  [zk: localhost:2181(CONNECTED) 1] ls /ceventServers
  [ceventServer0000000001, ceventServer0000000000]
  [zk: localhost:2181(CONNECTED) 2] create
  /cevent  
   
  ceventServers   cevent
  [zk: localhost:2181(CONNECTED) 2] create
  /ceventServers/ceventServer000000000
   
  ceventServer0000000001   ceventServer0000000000
  [zk: localhost:2181(CONNECTED) 2] create /ceventServers/ceventServer hadoop301
  Created /ceventServers/ceventServer
  
 


服务端响应(监听添加的客户端信息)

监听添加的客户端信息

删除临时创建的节点




 
  
  WatchedEvent state:SyncConnected
  type:None path:null
  [zk: localhost:2181(CONNECTED) 0] ls /
  [sanguo, ceventServers, cevent, zookeeper]
  [zk: localhost:2181(CONNECTED) 1] ls
  /cevent             
   
  ceventServers   cevent
  [zk: localhost:2181(CONNECTED) 1] ls /ceventServers
  [ceventServer, ceventServer0000000001,
  ceventServer0000000000]
  [zk: localhost:2181(CONNECTED) 2] delete
  /cevent
   
  ceventServers   cevent
  [zk: localhost:2181(CONNECTED) 2] delete /ceventServers/ceventServer
   
  ceventServer             ceventServer0000000001   ceventServer0000000000
  [zk: localhost:2181(CONNECTED) 2] delete /ceventServers/ceventServer
  [zk: localhost:2181(CONNECTED) 3] ls /cevent
   
  ceventServers   cevent
  [zk: localhost:2181(CONNECTED) 3] ls /ceventServers
  [ceventServer0000000001, ceventServer0000000000]
  
 


服务端监听响应

实时更新获取数据信息


版权声明:本文为weixin_37056888原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/weixin_37056888/article/details/105623506