Curator是Netflix公司開源的一個(gè)Zookeeper客戶端,與Zookeeper提供的原生客戶端相比,Curator的抽象層次更高,簡(jiǎn)化了Zookeeper客戶端的開發(fā)量。1.Zookeeper安裝部署
Zookeeper的部署很簡(jiǎn)單,如果已經(jīng)有java運(yùn)行環(huán)境的話,下載tarball解壓后即可運(yùn)行。
[html] view plain copy PRint?
[root@vm Temp]$ wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz [root@vm Temp]$ tar zxvf zookeeper-3.4.6.tar.gz [root@vm Temp]$ cd zookeeper-3.4.6 [root@vm zookeeper-3.4.6]$ cp conf/zoo_sample.cfg conf/zoo.cfg [root@vm zookeeper-3.4.6]$ export ZOOKEEPER_HOME=/usr/local/src/zookeeper-3.4.5 [root@vm zookeeper-3.4.6]$ export PATH=$ZOOKEEPER_HOME/bin:$PATH [root@vm zookeeper-3.4.6]$ bin/zkServer.sh start [root@vm zookeeper-3.4.6]$ bin/zkCli.sh -server 127.0.0.1:2181 ![]()
[root@vm Temp]$ wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz[root@vm Temp]$ tar zxvf zookeeper-3.4.6.tar.gz[root@vm Temp]$ cd zookeeper-3.4.6[root@vm zookeeper-3.4.6]$ cp conf/zoo_sample.cfg conf/zoo.cfg[root@vm zookeeper-3.4.6]$ export ZOOKEEPER_HOME=/usr/local/src/zookeeper-3.4.5[root@vm zookeeper-3.4.6]$ export PATH=$ZOOKEEPER_HOME/bin:$PATH[root@vm zookeeper-3.4.6]$ bin/zkServer.sh start[root@vm zookeeper-3.4.6]$ bin/zkCli.sh -server 127.0.0.1:21812.客戶端常用操作
用zkCli.sh連接上Zookeeper服務(wù)后,用help能列出所有命令:
[html] view plain copy print?
[root@BC-VM-edce4ac67d304079868c0bb265337bd4 zookeeper-3.4.6]# bin/zkCli.sh -127.0.0.1:2181 Connecting to localhost:2181 2015-06-11 10:55:14,387 [myid:] - INFO [main:Environment@100] - Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT ... [zk: localhost:2181(CONNECTED) 5] help ZooKeeper -server host:port cmd args connect host:port get path [watch] ls path [watch] set path data [version] rmr path delquota [-n|-b] path quit printwatches on|off create [-s] [-e] path data acl stat path [watch] close ls2 path [watch] history listquota path setAcl path acl getAcl path sync path redo cmdno addauth scheme auth delete path [version] setquota -n|-b val path ![]()
[root@BC-VM-edce4ac67d304079868c0bb265337bd4 zookeeper-3.4.6]# bin/zkCli.sh -127.0.0.1:2181Connecting to localhost:21812015-06-11 10:55:14,387 [myid:] - INFO [main:Environment@100] - Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT ...[zk: localhost:2181(CONNECTED) 5] helpZooKeeper -server host:port cmd args connect host:port get path [watch] ls path [watch] set path data [version] rmr path delquota [-n|-b] path quit printwatches on|off create [-s] [-e] path data acl stat path [watch] close ls2 path [watch] history listquota path setAcl path acl getAcl path sync path redo cmdno addauth scheme auth delete path [version] setquota -n|-b val path下面就試驗(yàn)一下常用的命令:
create:創(chuàng)建路徑結(jié)點(diǎn)。ls:查看路徑下的所有結(jié)點(diǎn)。get:獲得結(jié)點(diǎn)上的值。set:修改結(jié)點(diǎn)上的值。delete:刪除結(jié)點(diǎn)。[html] view plain copy print?
[zk: localhost:2181(CONNECTED) 6] create /zktest mydata Created /zktest [zk: localhost:2181(CONNECTED) 12] ls / [zktest, zookeeper] [zk: localhost:2181(CONNECTED) 7] ls /zktest [] [zk: localhost:2181(CONNECTED) 13] get /zktest mydata cZxid = 0x1c ctime = Thu Jun 11 10:58:06 CST 2015 mZxid = 0x1c mtime = Thu Jun 11 10:58:06 CST 2015 pZxid = 0x1c cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 6 numChildren = 0 [zk: localhost:2181(CONNECTED) 14] set /zktest junk cZxid = 0x1c ctime = Thu Jun 11 10:58:06 CST 2015 mZxid = 0x1f mtime = Thu Jun 11 10:59:08 CST 2015 pZxid = 0x1c cversion = 0 dataVersion = 1 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 4 numChildren = 0 [zk: localhost:2181(CONNECTED) 15] delete /zktest [zk: localhost:2181(CONNECTED) 16] ls / [zookeeper] ![]()
[zk: localhost:2181(CONNECTED) 6] create /zktest mydataCreated /zktest[zk: localhost:2181(CONNECTED) 12] ls /[zktest, zookeeper][zk: localhost:2181(CONNECTED) 7] ls /zktest[][zk: localhost:2181(CONNECTED) 13] get /zktestmydatacZxid = 0x1cctime = Thu Jun 11 10:58:06 CST 2015mZxid = 0x1cmtime = Thu Jun 11 10:58:06 CST 2015pZxid = 0x1ccversion = 0dataVersion = 0aclVersion = 0ephemeralOwner = 0x0dataLength = 6numChildren = 0[zk: localhost:2181(CONNECTED) 14] set /zktest junkcZxid = 0x1cctime = Thu Jun 11 10:58:06 CST 2015mZxid = 0x1fmtime = Thu Jun 11 10:59:08 CST 2015pZxid = 0x1ccversion = 0dataVersion = 1aclVersion = 0ephemeralOwner = 0x0dataLength = 4numChildren = 0[zk: localhost:2181(CONNECTED) 15] delete /zktest[zk: localhost:2181(CONNECTED) 16] ls /[zookeeper]3.用Curator管理Zookeeper
Curator的Maven依賴如下,一般直接使用curator-recipes就行了,如果需要自己封裝一些底層些的功能的話,例如增加連接管理重試機(jī)制等,則可以引入curator-framework包。
[html] view plain copy print?
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.7.0</version> </dependency> ![]()
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.7.0</version> </dependency>3.1 Client操作
利用Curator提供的客戶端API,可以完全實(shí)現(xiàn)上面原生客戶端的功能。值得注意的是,Curator采用流式風(fēng)格API。
[java] view plain copy print?
package com.cdai.codebase.bigdata.hadoop.zookeeper.curator; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryNTimes; /** * Curator framework's client test. * Output: * $ create /zktest hello * $ ls / * [zktest, zookeeper] * $ get /zktest * hello * $ set /zktest world * $ get /zktest * world * $ delete /zktest * $ ls / * [zookeeper] */ public class CuratorClientTest { /** Zookeeper info */ private static final String ZK_ADDRESS = "192.168.1.100:2181"; private static final String ZK_PATH = "/zktest"; public static void main(String[] args) throws Exception { // 1.Connect to zk CuratorFramework client = CuratorFrameworkFactory.newClient( ZK_ADDRESS, new RetryNTimes(10, 5000) ); client.start(); System.out.println("zk client start successfully!"); // 2.Client API test // 2.1 Create node String data1 = "hello"; print("create", ZK_PATH, data1); client.create(). creatingParentsIfNeeded(). forPath(ZK_PATH, data1.getBytes()); // 2.2 Get node and data print("ls", "/"); print(client.getChildren().forPath("/")); print("get", ZK_PATH); print(client.getData().forPath(ZK_PATH)); // 2.3 Modify data String data2 = "world"; print("set", ZK_PATH, data2); client.setData().forPath(ZK_PATH, data2.getBytes()); print("get", ZK_PATH); print(client.getData().forPath(ZK_PATH)); // 2.4 Remove node print("delete", ZK_PATH); client.delete().forPath(ZK_PATH); print("ls", "/"); print(client.getChildren().forPath("/")); } private static void print(String... cmds) { StringBuilder text = new StringBuilder("$ "); for (String cmd : cmds) { text.append(cmd).append(" "); } System.out.println(text.toString()); } private static void print(Object result) { System.out.println( result instanceof byte[] ? new String((byte[]) result) : result); } } ![]()
package com.cdai.codebase.bigdata.hadoop.zookeeper.curator;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.retry.RetryNTimes;/** * Curator framework's client test. * Output: * $ create /zktest hello * $ ls / * [zktest, zookeeper] * $ get /zktest * hello * $ set /zktest world * $ get /zktest * world * $ delete /zktest * $ ls / * [zookeeper] */public class CuratorClientTest { /** Zookeeper info */ private static final String ZK_ADDRESS = "192.168.1.100:2181"; private static final String ZK_PATH = "/zktest"; public static void main(String[] args) throws Exception { // 1.Connect to zk CuratorFramework client = CuratorFrameworkFactory.newClient( ZK_ADDRESS, new RetryNTimes(10, 5000) ); client.start(); System.out.println("zk client start successfully!"); // 2.Client API test // 2.1 Create node String data1 = "hello"; print("create", ZK_PATH, data1); client.create(). creatingParentsIfNeeded(). forPath(ZK_PATH, data1.getBytes()); // 2.2 Get node and data print("ls", "/"); print(client.getChildren().forPath("/")); print("get", ZK_PATH); print(client.getData().forPath(ZK_PATH)); // 2.3 Modify data String data2 = "world"; print("set", ZK_PATH, data2); client.setData().forPath(ZK_PATH, data2.getBytes()); print("get", ZK_PATH); print(client.getData().forPath(ZK_PATH)); // 2.4 Remove node print("delete", ZK_PATH); client.delete().forPath(ZK_PATH); print("ls", "/"); print(client.getChildren().forPath("/")); } private static void print(String... cmds) { StringBuilder text = new StringBuilder("$ "); for (String cmd : cmds) { text.append(cmd).append(" "); } System.out.println(text.toString()); } private static void print(Object result) { System.out.println( result instanceof byte[] ? new String((byte[]) result) : result); }}3.2 監(jiān)聽器
Curator提供了三種Watcher(Cache)來監(jiān)聽結(jié)點(diǎn)的變化:
Path Cache:監(jiān)視一個(gè)路徑下1)孩子結(jié)點(diǎn)的創(chuàng)建、2)刪除,3)以及結(jié)點(diǎn)數(shù)據(jù)的更新。產(chǎn)生的事件會(huì)傳遞給注冊(cè)的PathChildrenCacheListener。Node Cache:監(jiān)視一個(gè)結(jié)點(diǎn)的創(chuàng)建、更新、刪除,并將結(jié)點(diǎn)的數(shù)據(jù)緩存在本地。Tree Cache:Path Cache和Node Cache的“合體”,監(jiān)視路徑下的創(chuàng)建、更新、刪除事件,并緩存路徑下所有孩子結(jié)點(diǎn)的數(shù)據(jù)。下面就測(cè)試一下最簡(jiǎn)單的Path Watcher:
[java] view plain copy print?
package com.cdai.codebase.bigdata.hadoop.zookeeper.curator; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode; import org.apache.curator.retry.RetryNTimes; /** * Curator framework watch test. */ public class CuratorWatcherTest { /** Zookeeper info */ private static final String ZK_ADDRESS = "192.168.1.100:2181"; private static final String ZK_PATH = "/zktest"; public static void main(String[] args) throws Exception { // 1.Connect to zk CuratorFramework client = CuratorFrameworkFactory.newClient( ZK_ADDRESS, new RetryNTimes(10, 5000) ); client.start(); System.out.println("zk client start successfully!"); // 2.Register watcher PathChildrenCache watcher = new PathChildrenCache( client, ZK_PATH, true // if cache data ); watcher.getListenable().addListener((client1, event) -> { ChildData data = event.getData(); if (data == null) { System.out.println("No data in event[" + event + "]"); } else { System.out.println("Receive event: " + "type=[" + event.getType() + "]" + ", path=[" + data.getPath() + "]" + ", data=[" + new String(data.getData()) + "]" + ", stat=[" + data.getStat() + "]"); } }); watcher.start(StartMode.BUILD_INITIAL_CACHE); System.out.println("Register zk watcher successfully!"); Thread.sleep(Integer.MAX_VALUE); } } ![]()
package com.cdai.codebase.bigdata.hadoop.zookeeper.curator;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.framework.recipes.cache.ChildData;import org.apache.curator.framework.recipes.cache.PathChildrenCache;import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;import org.apache.curator.retry.RetryNTimes;/** * Curator framework watch test. */public class CuratorWatcherTest { /** Zookeeper info */ private static final String ZK_ADDRESS = "192.168.1.100:2181"; private static final String ZK_PATH = "/zktest"; public static void main(String[] args) throws Exception { // 1.Connect to zk CuratorFramework client = CuratorFrameworkFactory.newClient( ZK_ADDRESS, new RetryNTimes(10, 5000) ); client.start(); System.out.println("zk client start successfully!"); // 2.Register watcher PathChildrenCache watcher = new PathChildrenCache( client, ZK_PATH, true // if cache data ); watcher.getListenable().addListener((client1, event) -> { ChildData data = event.getData(); if (data == null) { System.out.println("No data in event[" + event + "]"); } else { System.out.println("Receive event: " + "type=[" + event.getType() + "]" + ", path=[" + data.getPath() + "]" + ", data=[" + new String(data.getData()) + "]" + ", stat=[" + data.getStat() + "]"); } }); watcher.start(StartMode.BUILD_INITIAL_CACHE); System.out.println("Register zk watcher successfully!"); Thread.sleep(Integer.MAX_VALUE); }}下面是在zkCli.sh中操作時(shí)Java程序的輸出:[html] view plain copy print?
Java: zk client start successfully! Java: Register zk watcher successfully! zkCli: [zk: localhost:2181(CONNECTED) 11] create /zktest/hello mydata Java: Receive event: type=[CHILD_ADDED], path=[/zktest/hello], data=[mydata], stat=[121,121,1434001221097,1434001221097,0,0,0,0,6,0,121] zkCli: [zk: localhost:2181(CONNECTED) 12] set /zktest/hello otherdata Java: Receive event: type=[CHILD_UPDATED], path=[/zktest/hello], data=[otherdata], stat=[121,122,1434001221097,1434001228467,1,0,0,0,9,0,121] zkCli: [zk: localhost:2181(CONNECTED) 13] delete /zktest/hello Java: Receive event: type=[CHILD_REMOVED], path=[/zktest/hello], data=[otherdata], stat=[121,122,1434001221097,1434001228467,1,0,0,0,9,0,121] ![]()
Java: zk client start successfully!Java: Register zk watcher successfully!zkCli: [zk: localhost:2181(CONNECTED) 11] create /zktest/hello mydataJava: Receive event: type=[CHILD_ADDED], path=[/zktest/hello], data=[mydata], stat=[121,121,1434001221097,1434001221097,0,0,0,0,6,0,121]zkCli: [zk: localhost:2181(CONNECTED) 12] set /zktest/hello otherdataJava: Receive event: type=[CHILD_UPDATED], path=[/zktest/hello], data=[otherdata], stat=[121,122,1434001221097,1434001228467,1,0,0,0,9,0,121]zkCli: [zk: localhost:2181(CONNECTED) 13] delete /zktest/helloJava: Receive event: type=[CHILD_REMOVED], path=[/zktest/hello], data=[otherdata], stat=[121,122,1434001221097,1434001228467,1,0,0,0,9,0,121]4.Curator“菜譜”
既然Maven包叫做curator-recipes,那說明Curator有它獨(dú)特的“菜譜”:
鎖:包括共享鎖、共享可重入鎖、讀寫鎖等。選舉:Leader選舉算法。Barrier:阻止分布式計(jì)算直至某個(gè)條件被滿足的“柵欄”,可以看做JDK Concurrent包中Barrier的分布式實(shí)現(xiàn)。緩存:前面提到過的三種Cache及監(jiān)聽機(jī)制。持久化結(jié)點(diǎn):連接或session終止后仍然在Zookeeper中存在的結(jié)點(diǎn)。隊(duì)列:分布式隊(duì)列、分布式優(yōu)先級(jí)隊(duì)列等。4.1 分布式鎖
分布式編程時(shí),比如最容易碰到的情況就是應(yīng)用程序在線上多機(jī)部署,于是當(dāng)多個(gè)應(yīng)用同時(shí)訪問某一資源時(shí),就需要某種機(jī)制去協(xié)調(diào)它們。例如,現(xiàn)在一臺(tái)應(yīng)用正在rebuild緩存內(nèi)容,要臨時(shí)鎖住某個(gè)區(qū)域暫時(shí)不讓訪問;又比如調(diào)度程序每次只想一個(gè)任務(wù)被一臺(tái)應(yīng)用執(zhí)行等等。
下面的程序會(huì)啟動(dòng)兩個(gè)線程t1和t2去爭(zhēng)奪鎖,拿到鎖的線程會(huì)占用5秒。運(yùn)行多次可以觀察到,有時(shí)是t1先拿到鎖而t2等待,有時(shí)又會(huì)反過來。Curator會(huì)用我們提供的lock路徑的結(jié)點(diǎn)作為全局鎖,這個(gè)結(jié)點(diǎn)的數(shù)據(jù)類似這種格式:[_c_64e0811f-9475-44ca-aa36-c1db65ae5350-lock-0000000005],每次獲得鎖時(shí)會(huì)生成這種串,釋放鎖時(shí)清空數(shù)據(jù)。
[java] view plain copy print?
package com.cdai.codebase.bigdata.hadoop.zookeeper.curator; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.RetryNTimes; import java.util.concurrent.TimeUnit; /** * Curator framework's distributed lock test. */ public class CuratorDistrLockTest { /** Zookeeper info */ private static final String ZK_ADDRESS = "192.168.1.100:2181"; private static final String ZK_LOCK_PATH = "/zktest"; public static void main(String[] args) throws InterruptedException { // 1.Connect to zk CuratorFramework client = CuratorFrameworkFactory.newClient( ZK_ADDRESS, new RetryNTimes(10, 5000) ); client.start(); System.out.println("zk client start successfully!"); Thread t1 = new Thread(() -> { doWithLock(client); }, "t1"); Thread t2 = new Thread(() -> { doWithLock(client); }, "t2"); t1.start(); t2.start(); } private static void doWithLock(CuratorFramework client) { InterProcessMutex lock = new InterProcessMutex(client, ZK_LOCK_PATH); try { if (lock.acquire(10 * 1000, TimeUnit.SECONDS)) { System.out.println(Thread.currentThread().getName() + " hold lock"); Thread.sleep(5000L); System.out.println(Thread.currentThread().getName() + " release lock"); } } catch (Exception e) { e.printStackTrace(); } finally { try { lock.release(); } catch (Exception e) { e.printStackTrace(); } } } } ![]()
package com.cdai.codebase.bigdata.hadoop.zookeeper.curator;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.framework.recipes.locks.InterProcessMutex;import org.apache.curator.retry.RetryNTimes;import java.util.concurrent.TimeUnit;/** * Curator framework's distributed lock test. */public class CuratorDistrLockTest { /** Zookeeper info */ private static final String ZK_ADDRESS = "192.168.1.100:2181"; private static final String ZK_LOCK_PATH = "/zktest"; public static void main(String[] args) throws InterruptedException { // 1.Connect to zk CuratorFramework client = CuratorFrameworkFactory.newClient( ZK_ADDRESS, new RetryNTimes(10, 5000) ); client.start(); System.out.println("zk client start successfully!"); Thread t1 = new Thread(() -> { doWithLock(client); }, "t1"); Thread t2 = new Thread(() -> { doWithLock(client); }, "t2"); t1.start(); t2.start(); } private static void doWithLock(CuratorFramework client) { InterProcessMutex lock = new InterProcessMutex(client, ZK_LOCK_PATH); try { if (lock.acquire(10 * 1000, TimeUnit.SECONDS)) { System.out.println(Thread.currentThread().getName() + " hold lock"); Thread.sleep(5000L); System.out.println(Thread.currentThread().getName() + " release lock"); } } catch (Exception e) { e.printStackTrace(); } finally { try { lock.release(); } catch (Exception e) { e.printStackTrace(); } } }}4.2 Leader選舉
當(dāng)集群里的某個(gè)服務(wù)down機(jī)時(shí),我們可能要從slave結(jié)點(diǎn)里選出一個(gè)作為新的master,這時(shí)就需要一套能在分布式環(huán)境中自動(dòng)協(xié)調(diào)的Leader選舉方法。Curator提供了LeaderSelector監(jiān)聽器實(shí)現(xiàn)Leader選舉功能。同一時(shí)刻,只有一個(gè)Listener會(huì)進(jìn)入takeLeadership()方法,說明它是當(dāng)前的Leader。注意:當(dāng)Listener從takeLeadership()退出時(shí)就說明它放棄了“Leader身份”,這時(shí)Curator會(huì)利用Zookeeper再從剩余的Listener中選出一個(gè)新的Leader。autoRequeue()方法使放棄Leadership的Listener有機(jī)會(huì)重新獲得Leadership,如果不設(shè)置的話放棄了的Listener是不會(huì)再變成Leader的。
[java] view plain copy print?
package com.cdai.codebase.bigdata.hadoop.zookeeper.curator; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.leader.LeaderSelector; import org.apache.curator.framework.recipes.leader.LeaderSelectorListener; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.retry.RetryNTimes; import org.apache.curator.utils.EnsurePath; /** * Curator framework's leader election test. * Output: * LeaderSelector-2 take leadership! * LeaderSelector-2 relinquish leadership! * LeaderSelector-1 take leadership! * LeaderSelector-1 relinquish leadership! * LeaderSelector-0 take leadership! * LeaderSelector-0 relinquish leadership! * ... */ public class CuratorLeaderTest { /** Zookeeper info */ private static final String ZK_ADDRESS = "192.168.1.100:2181"; private static final String ZK_PATH = "/zktest"; public static void main(String[] args) throws InterruptedException { LeaderSelectorListener listener = new LeaderSelectorListener() { @Override public void takeLeadership(CuratorFramework client) throws Exception { System.out.println(Thread.currentThread().getName() + " take leadership!"); // takeLeadership() method should only return when leadership is being relinquished. Thread.sleep(5000L); System.out.println(Thread.currentThread().getName() + " relinquish leadership!"); } @Override public void stateChanged(CuratorFramework client, ConnectionState state) { } }; new Thread(() -> { registerListener(listener); }).start(); new Thread(() -> { registerListener(listener); }).start(); new Thread(() -> { registerListener(listener); }).start(); Thread.sleep(Integer.MAX_VALUE); } private static void registerListener(LeaderSelectorListener listener) { // 1.Connect to zk CuratorFramework client = CuratorFrameworkFactory.newClient( ZK_ADDRESS, new RetryNTimes(10, 5000) ); client.start(); // 2.Ensure path try { new EnsurePath(ZK_PATH).ensure(client.getZookeeperClient()); } catch (Exception e) { e.printStackTrace(); } // 3.Register listener LeaderSelector selector = new LeaderSelector(client, ZK_PATH, listener); selector.autoRequeue(); selector.start(); } } ![]()
package com.cdai.codebase.bigdata.hadoop.zookeeper.curator;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.framework.recipes.leader.LeaderSelector;import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;import org.apache.curator.framework.state.ConnectionState;import org.apache.curator.retry.RetryNTimes;import org.apache.curator.utils.EnsurePath;/** * Curator framework's leader election test. * Output: * LeaderSelector-2 take leadership! * LeaderSelector-2 relinquish leadership! * LeaderSelector-1 take leadership! * LeaderSelector-1 relinquish leadership! * LeaderSelector-0 take leadership! * LeaderSelector-0 relinquish leadership! * ... */public class CuratorLeaderTest { /** Zookeeper info */ private static final String ZK_ADDRESS = "192.168.1.100:2181"; private static final String ZK_PATH = "/zktest"; public static void main(String[] args) throws InterruptedException { LeaderSelectorListener listener = new LeaderSelectorListener() { @Override public void takeLeadership(CuratorFramework client) throws Exception { System.out.println(Thread.currentThread().getName() + " take leadership!"); // takeLeadership() method should only return when leadership is being relinquished. Thread.sleep(5000L); System.out.println(Thread.currentThread().getName() + " relinquish leadership!"); } @Override public void stateChanged(CuratorFramework client, ConnectionState state) { } }; new Thread(() -> { registerListener(listener); }).start(); new Thread(() -> { registerListener(listener); }).start(); new Thread(() -> { registerListener(listener); }).start(); Thread.sleep(Integer.MAX_VALUE); } private static void registerListener(LeaderSelectorListener listener) { // 1.Connect to zk CuratorFramework client = CuratorFrameworkFactory.newClient( ZK_ADDRESS, new RetryNTimes(10, 5000) ); client.start(); // 2.Ensure path try { new EnsurePath(ZK_PATH).ensure(client.getZookeeperClient()); } catch (Exception e) { e.printStackTrace(); } // 3.Register listener LeaderSelector selector = new LeaderSelector(client, ZK_PATH, listener); selector.autoRequeue(); selector.start(); }}