由于服務器經常會維護,把服務實現服務端的地址直接寫在客戶端(包括配置文件里)都是不太好的。比較好的辦法是服務端啟動的時候把自己的地址注冊到ZooKeeper集群中,然后客戶端啟動時即可獲得所有服務器列表,并選擇一個合適的服務器為自己提供服務。
這樣解決了服務端地址的獲取問題,同時也解決了服務端的負載均衡問題。
來看一下相應的代碼。項目源碼已經上傳到http://download.csdn.net/detail/mrbcy/9747568
package tech.mrbcy.mrpc.demo.demo2;import java.io.IOException;import java.util.ArrayList;import java.util.List;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.KeeperException;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooKeeper;import org.apache.zookeeper.Watcher.Event.EventType;import org.apache.zookeeper.ZooDefs.Ids;import org.apache.zookeeper.data.Stat;public class ServerAddrHelper { public static final String DEFAULT_GROUP_NAME = "/MrpcServers"; PRivate String connString; private String groupName; private ZooKeeper zk; private ServerChangeListener listener; private static int sessionTimeout = 2000; public ServerAddrHelper(String connString){ this.connString = connString; this.groupName = DEFAULT_GROUP_NAME; } /** * * @param connString zk連接字符串 * @param groupName 父節點路徑,位于/下,需要帶/ 示例值:"/MrpcServers" */ public ServerAddrHelper(String connString, String groupName){ this.connString = connString; if(!groupName.startsWith("/")){ groupName = "/" + groupName; } this.groupName = groupName; } /** * 向ZooKeeper集群注冊服務器 * @param registPath 服務器節點路徑,示例值"server" * @param address 服務器地址及端口號,用于客戶端連接 * @throws Exception 連接服務器失敗 */ public void registServer(String registPath, String address) throws Exception{ zk = new ZooKeeper(connString,sessionTimeout,null); // 判斷父目錄是否存在,不存在則創建 Stat groupStat = zk.exists(groupName, false); if(groupStat == null){ zk.create(groupName, "Mrpc server list".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } // 注冊服務器 if(!registPath.startsWith("/")){ registPath = "/" + registPath; } String registAddr = zk.create(groupName+registPath, address.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println("Server is starting, reg addr:" + registAddr); } /** * 發現服務器 * @param listener 監聽器,如果不是null,等到服務器列表發生變化時,監聽器會收到通知 * @return * @throws Exception */ public List<String> discoverServers(ServerChangeListener listener) throws Exception{ this.listener = listener; zk = new ZooKeeper(connString,sessionTimeout,new Watcher(){ public void process(WatchedEvent event) { if(event.getType() == EventType.NodeChildrenChanged){ // 服務器列表發生變化 try { List<String> servers = getServerList(); if(ServerAddrHelper.this.listener != null){ ServerAddrHelper.this.listener.onChange(servers); } } catch (Exception e) { e.printStackTrace(); } } } }); return getServerList(); } private List<String> getServerList() throws Exception { zk.getChildren(groupName, true); List<String> children = zk.getChildren(groupName, true); List<String> servers = new ArrayList<String>(); for(String child : children) { byte[] data = zk.getData(groupName+"/"+child, null, null); servers.add(new String(data)); } return servers; }}一般來講registServer由服務端框架調用,向zk集群注冊自己。discoverServer用客戶端框架調用。取得當前可用的服務器列表。并且提供了服務器列表發生變化時的監聽接口。
另外還實現了一個負載均衡器,使用最簡單的隨機選擇算法來挑選服務器。后續還可以進行改進,比如根據服務端當前的負載來選擇服務器。代碼如下:
package tech.mrbcy.mrpc.demo.demo2;import java.net.InetSocketAddress;import java.util.List;import java.util.Random;public class ServerLoadBalancer { /** * 選擇一個服務器 * @param servers 服務器列表 示例值:133.122.5.88:8888 或 anode2:5884 * @return 連接服務器地址 */ public static InetSocketAddress chooseServer(List<String> servers){ if(servers == null || servers.size() == 0){ return null; } // 隨機選擇一個服務器 String serverAddr = servers.get(0); if(servers.size() > 1){ int index = new Random().nextInt(servers.size()); serverAddr = servers.get(index); } String[] addrAndPort = serverAddr.split(":"); if(addrAndPort.length != 2){ throw new RuntimeException("不合法的server地址:" + serverAddr); } return new InetSocketAddress(addrAndPort[0], Integer.parseInt(addrAndPort[1])); }}最后給出一個測試類的代碼。其他的詳細信息請看上傳的項目代碼吧。
package tech.mrbcy.mrpc.demo.demo2;import java.net.InetSocketAddress;import java.util.List;import org.junit.Test;public class MockClient { private InetSocketAddress serverAddress; @Test public void testClient(){ ServerAddrHelper serverHelper = new ServerAddrHelper("amaster:2181,anode1:2181,anode2:2181"); ServerAddrHelper helper = new ServerAddrHelper("amaster:2181,anode1:2181,anode2:2181"); try { serverHelper.registServer("ServiceImplServer", "localhost:10000"); List<String> serverList = helper.discoverServers(new ServerChangeListener() { public void onChange(List<String> servers) { System.out.println("服務器列表發生變化,當前服務器列表為:"); System.out.println(servers); changeToServer(servers); } }); System.out.println(serverList); if(serverList == null || serverList.size() == 0){ System.out.println("沒有可用的服務器"); } changeToServer(serverList); Thread.sleep(1000); serverHelper.registServer("ServiceImplServer", "localhost:10001"); Thread.sleep(1000); serverHelper.registServer("ServiceImplServer", "localhost:10002"); // 這期間可以手動刪除已經連接的服務器,測試服務器斷線的情況 Thread.sleep(50000); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } private void changeToServer(List<String> servers) { if(servers == null || servers.size() == 0){ return; } // 未指定服務器地址或原服務器已失效,遷移到新的服務器 boolean valid = false; if(servers.size() > 0 && serverAddress != null){ for(String server:servers){ if(server.equals(serverAddress.getHostString() + ":" + serverAddress.getPort())){ valid = true; break; } } } if(serverAddress == null || !valid){ serverAddress = ServerLoadBalancer.chooseServer(servers); System.out.println("未指定服務器地址或原服務器已失效,遷移到新的服務器:" + serverAddress.getHostString() + ":" + serverAddress.getPort()); } }}輸出結果如下:
Server is starting, reg addr:/MrpcServers/ServiceImplServer0000000030[localhost:10000]未指定服務器地址或原服務器已失效,遷移到新的服務器:localhost:10000Server is starting, reg addr:/MrpcServers/ServiceImplServer0000000031服務器列表發生變化,當前服務器列表為:[localhost:10001, localhost:10000]Server is starting, reg addr:/MrpcServers/ServiceImplServer0000000032服務器列表發生變化,當前服務器列表為:[localhost:10001, localhost:10002, localhost:10000]服務器列表發生變化,當前服務器列表為:[localhost:10001, localhost:10002]未指定服務器地址或原服務器已失效,遷移到新的服務器:localhost:10002新聞熱點
疑難解答