博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
zookeeper客户端使用第三方(Curator)封装的Api操作节点
阅读量:7227 次
发布时间:2019-06-29

本文共 10008 字,大约阅读时间需要 33 分钟。

1.为什么使用Curator?

Curator本身是Netflix公司开源的zookeeper客户端

Curator  提供了各种应用场景实现封装

curator-framework  提供了fluent风格api;

curator-replice   提供了实现封装;

2.引入依赖:

org.apache.curator
curator-framework
2.11.0
org.apache.curator
curator-recipes
2.11.0

 3.创建会话连接

1 package com.karat.cn.zookeeper.curator; 2  3 import org.apache.curator.framework.CuratorFramework; 4 import org.apache.curator.framework.CuratorFrameworkFactory; 5 import org.apache.curator.retry.ExponentialBackoffRetry; 6  7 /** 8  * 创建会话连接 9  * @author Administrator10  *11  */12 public class CuratorCreateSessionDemo {13     private final static String CONNECTSTRING="47.107.121.215:2181";14     15     public static void main(String args[]) {16         //创建会话连接的2种方式17         //正常的风格18         CuratorFramework curatorFramework1=CuratorFrameworkFactory.19                 newClient(CONNECTSTRING,5000,5000,20                         new ExponentialBackoffRetry(1000, 3));//重试机制21         curatorFramework1.start();22         //fluent风格23         CuratorFramework curatorFramework2=CuratorFrameworkFactory.builder().24                     connectString(CONNECTSTRING).25                     sessionTimeoutMs(5000).26                     retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();27         curatorFramework2.start();28         System.out.println("success");29         30     }31 }
View Code

4.curator连接的重试策略

ExponentialBackoffRetry()  衰减重试

RetryNTimes 指定最大重试次数

RetryOneTime 仅重试一次

RetryUnitilElapsed 一直重试知道规定的时间

5.节点操作

1 package com.karat.cn.zookeeper.curator;  2   3 import org.apache.curator.framework.CuratorFramework;  4 import org.apache.curator.framework.CuratorFrameworkFactory;  5 import org.apache.curator.framework.api.BackgroundCallback;  6 import org.apache.curator.framework.api.CuratorEvent;  7 import org.apache.curator.framework.api.transaction.CuratorTransactionResult;  8 import org.apache.curator.retry.ExponentialBackoffRetry;  9 import org.apache.zookeeper.CreateMode; 10 import org.apache.zookeeper.data.Stat; 11  12 import java.util.Collection; 13 import java.util.Collections; 14 import java.util.concurrent.CountDownLatch; 15 import java.util.concurrent.Executor; 16 import java.util.concurrent.ExecutorService; 17 import java.util.concurrent.Executors; 18  19 /** 20  * curator对节点的增删改查 21  * @author Administrator 22  * 23  */ 24 public class CuratorOperatorDemo { 25  26     public static void main(String[] args) throws InterruptedException { 27         CuratorFramework curatorFramework=CuratorClientUtils.getInstance(); 28         System.out.println("连接成功........."); 29  30         //fluent风格api增删改查操作 31         /** 32          * 创建节点 33          */ 34        /*try { 35             String result=curatorFramework.create() 36                     .creatingParentsIfNeeded()//创建父节点 37                     .withMode(CreateMode.PERSISTENT)//持久节点:节点创建后,会一直存在,不会因客户端会话失效而删除; 38                     .forPath("/curator/curator1/curator11","123".getBytes()); 39             System.out.println(result); 40         } catch (Exception e) { 41             e.printStackTrace(); 42         }*/ 43         /** 44          * 删除节点 45          */ 46         /*try { 47             //默认情况下,version为-1 48             curatorFramework.delete()//删除操作 49             .deletingChildrenIfNeeded()//删除子节点 50             .forPath("/node"); 51         } catch (Exception e) { 52             e.printStackTrace(); 53         }*/ 54  55         /** 56          * 查询 57          */ 58         /*Stat stat=new Stat(); 59         try { 60             byte[] bytes=curatorFramework 61                     .getData() 62                     .storingStatIn(stat) 63                     .forPath("/curator/curator1/curator11"); 64             System.out.println(new String(bytes)+"-->stat:"+stat); 65         } catch (Exception e) { 66             e.printStackTrace(); 67         }*/ 68         /** 69          * 更新 70          */ 71         /*try { 72             Stat stat=curatorFramework 73                     .setData() 74                     .forPath("/curator","lijing".getBytes()); 75             System.out.println(stat); 76         } catch (Exception e) { 77             e.printStackTrace(); 78         }*/ 79  80         /** 81          * 异步操作 82          */ 83         /*ExecutorService service= Executors.newFixedThreadPool(1);//线程池(创建节点的事件由线程池处理) 84         CountDownLatch countDownLatch=new CountDownLatch(1);//计数器 85         try { 86             curatorFramework 87             .create() 88             .creatingParentsIfNeeded() 89             .withMode(CreateMode.EPHEMERAL) 90             .inBackground(new BackgroundCallback() { 91                  @Override 92                  public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception { 93                      System.out.println(Thread.currentThread().getName() 94                              +"->resultCode:" 95                              +curatorEvent.getResultCode()+"->"//响应结果 96                              +curatorEvent.getType());//当前节点操作类型 97                      countDownLatch.countDown(); 98                  } 99             },service)100             .forPath("/mic","123".getBytes());101         } catch (Exception e) {102             e.printStackTrace();103         }104         countDownLatch.await();//等待(让当前线程等待)105         service.shutdown();//关闭线程*/106         /**107          * 事务操作(curator独有的)108          */109         try {110             Collection
resultCollections=curatorFramework111 .inTransaction()//开启一个事务112 .create()113 .forPath("/trans","111".getBytes())//创建一个节点114 .and()//通过and去修改一个节点115 .setData()116 .forPath("/curator","111".getBytes())//当修改节点不存在,则一成功一失败,事务不会提交成功117 .and()118 .commit();//提交事务119 for (CuratorTransactionResult result:resultCollections){120 System.out.println(result.getForPath()+"->"+result.getType());121 }122 } catch (Exception e) {123 e.printStackTrace();124 }125 }126 }
View Code

6.将会话连接做成工具类

1 package com.karat.cn.zookeeper.curator; 2  3 import org.apache.curator.framework.CuratorFramework; 4 import org.apache.curator.framework.CuratorFrameworkFactory; 5 import org.apache.curator.retry.ExponentialBackoffRetry; 6  7 /** 8  * 会话连接工具类 9  * @author Administrator10  *11  */12 public class CuratorClientUtils {13 14     private static CuratorFramework curatorFramework;15     16     private final static String CONNECTSTRING="47.107.121.215:2181";17 18 19     public static CuratorFramework getInstance(){20         curatorFramework= CuratorFrameworkFactory.21                 newClient(CONNECTSTRING,5000,5000,22                         new ExponentialBackoffRetry(1000,3));23         curatorFramework.start();24         return curatorFramework;25     }26 }
View Code

7.监听

1 package com.karat.cn.zookeeper.curator; 2  3 import org.apache.curator.framework.CuratorFramework; 4 import org.apache.curator.framework.recipes.cache.NodeCache; 5 import org.apache.curator.framework.recipes.cache.PathChildrenCache; 6 import org.apache.zookeeper.CreateMode; 7  8 import java.util.concurrent.TimeUnit; 9 10 /**11  * 监听12  * @author Administrator13  *14  */15 public class CuratorEventDemo {16 17     /**18      * 三种watcher来做节点的监听19      * pathcache   监视一个路径下子节点的创建、删除、节点数据更新20      * NodeCache   监视一个节点的创建、更新、删除21      * TreeCache   pathcaceh+nodecache 的合体(监视路径下的创建、更新、删除事件),22      * 缓存路径下的所有子节点的数据23      */24 25     public static void main(String[] args) throws Exception {26         CuratorFramework curatorFramework=CuratorClientUtils.getInstance();27         /**28          * 节点变化NodeCache29          */30         //监听31         /*NodeCache cache=new NodeCache(curatorFramework,"/curator",false);32         cache.start(true);33         //监听事件34         cache.getListenable().addListener(()-> System.out.println("节点数据发生变化,变化后的结果" +35                 ":"+new String(cache.getCurrentData().getData())));36         //修改节点37         curatorFramework.setData().forPath("/curator","666".getBytes());*/38 39 40         /**41          * PatchChildrenCache42          */43 44         PathChildrenCache cache=new PathChildrenCache(curatorFramework,"/event",true);//参数2监听的节点,参数3是否缓存45         cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);46         // Normal/ BUILD_INITIAL_CACHE /POST_INITIALIZED_EVENT47 48         cache.getListenable().addListener((curatorFramework1,pathChildrenCacheEvent)->{49             switch (pathChildrenCacheEvent.getType()){50                 case CHILD_ADDED:51                     System.out.println("增加子节点");52                     break;53                 case CHILD_REMOVED:54                     System.out.println("删除子节点");55                     break;56                 case CHILD_UPDATED:57                     System.out.println("更新子节点");58                     break;59                 default:break;60             }61         });62         //创建节点63         curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/event","event".getBytes());64         TimeUnit.SECONDS.sleep(1);65         System.out.println("1");66         //创建子节点67         curatorFramework.create().withMode(CreateMode.EPHEMERAL).forPath("/event/event1","1".getBytes());68         TimeUnit.SECONDS.sleep(1);69         System.out.println("2");70         //修改节点71         curatorFramework.setData().forPath("/event/event1","222".getBytes());72         TimeUnit.SECONDS.sleep(1);73         System.out.println("3");74         //删除节点75         curatorFramework.delete().forPath("/event/event1");76         System.out.println("4");77          78         System.in.read();79     }80 }
View Code

 

转载于:https://www.cnblogs.com/LJing21/p/10543817.html

你可能感兴趣的文章
Overloading Django Form Fields
查看>>
03.MyBatis的核心配置文件SqlMapConfig.xml
查看>>
python学习笔记(9)-python编程风格
查看>>
Apache HTTP Server搭建虚拟主机
查看>>
(译).NET4.X 并行任务中Task.Start()的FAQ
查看>>
git log显示
查看>>
java中相同名字不同返回类型的方法
查看>>
Rails NameError uninitialized constant class solution
查看>>
Android 获取SDCard中某个目录下图片
查看>>
设置cookies第二天0点过期
查看>>
【转载】NIO客户端序列图
查看>>
poj_2709 贪心算法
查看>>
【程序员眼中的统计学(11)】卡方分布的应用
查看>>
文件夹工具类 - FolderUtils
查看>>
http://blog.csdn.net/huang_xw/article/details/7090173
查看>>
lua学习例子
查看>>
研究:印度气候变暖速度加剧 2040年或面临重灾
查看>>
python爬虫——爬取豆瓣TOP250电影
查看>>
C++与Rust操作裸指针的比较
查看>>
了解webpack-4.0版本(一)
查看>>