有了上一篇博客中给出的测试用例(详见http://www.jb51.cc/article/p-qlcyeczg-pm.html),我们现在可以进行开发了。示例代码已上传到http://download.csdn.net/detail/mrbcy/9753008
先写ServerRegister类的第一个测试。
public class ServerRegisterTest { private String zkConnetionString = "amaster:2181,anode1:2181,anode2:2181"; private int sessionTimeout = 2000; private int waitTimeout = 2100; private String groupName = "/MrpcServer"; private String serverNode = "/ServiceImplServer"; @Test public void testRegistAndUnRegist() throws Exception{ try { ServerRegister serverRegister = new ServerRegister(zkConnetionString,groupName); serverRegister.registServer(serverNode,"localhost:8000"); ServerRegister serverRegister2 = new ServerRegister(zkConnetionString,groupName); serverRegister2.registServer(serverNode,"localhost:8001"); ServerRegister serverRegister3 = new ServerRegister(zkConnetionString,groupName); serverRegister3.registServer(serverNode,"localhost:8002"); // 获取服务器地址,检查是否包含所有的3个服务器地址 List<String> serverList = getServerList(); checkServerList(serverList,new String[]{"localhost:8000","localhost:8001","localhost:8002"}); serverRegister2.unregist(); serverList = getServerList(); checkServerList(serverList,"localhost:8002"}); serverRegister.unregist(); serverRegister3.unregist(); serverList = getServerList(); checkServerList(serverList,new String[]{}); } catch (Exception e) { e.printStackTrace(); throw e; } } private void checkServerList(List<String> serverList,String[] expectList) { Assert.assertEquals(serverList.size(),expectList.length); if(expectList.length == 0){ return; } for(String server : expectList){ boolean findEqual = false; for(int i = 0; i < serverList.size(); i++){ if(server.equals(serverList.get(i))){ findEqual = true; break; } } Assert.assertEquals(findEqual,true); } } private List<String> getServerList() throws Exception { ZooKeeper zkClient = new ZooKeeper(zkConnetionString,sessionTimeout,null); Thread.sleep(waitTimeout); List<String> serverNodes = zkClient.getChildren(groupName,false); List<String> serverAddrs = new ArrayList<String>(); for(String serverNode : serverNodes){ serverAddrs.add(new String(zkClient.getData(groupName+"/" + serverNode,null,null))); } return serverAddrs; } }
运行测试,观察它通不过
然后编写代码,直到它通过
public class ServerRegister { private int sessionTimeout = 2000; private ZooKeeper zk; private String groupName; private CountDownLatch latch = new CountDownLatch(1); /** * 创建一个新的服务器地址注册器 * @param zkConnetionString ZooKeeper集群的连接地址 * @param groupName 存放服务器地址的父路径 * @throws IOException */ public ServerRegister(String zkConnetionString,String groupName) throws Exception { this.groupName = groupName; this.zk = new ZooKeeper(zkConnetionString,new Watcher(){ public void process(WatchedEvent event) { if(event.getState() == Event.KeeperState.SyncConnected){ latch.countDown(); } } }); latch.await(sessionTimeout + 100,TimeUnit.MILLISECONDS); if(latch.getCount() > 0){ throw new RuntimeException("Can not connect to ZooKeeper cluster " + zkConnetionString + ",please check and try again later"); } } /** * 注册服务器地址到ZooKeeper集群 * @param nodePath 注册节点的路径地址,真实注册时会在之前拼接groupName * @param serverAddr * @throws Exception */ public void registServer(String nodePath,String serverAddr) throws Exception { // 检查父节点是否一致 Stat groupStat = zk.exists(this.groupName,null); if(groupStat == null){ zk.create(groupName,new String("Mrpc framework server list").getBytes(),Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); } zk.create(groupName + nodePath,serverAddr.getBytes(),CreateMode.EPHEMERAL_SEQUENTIAL); } /** * 注销之前通过此对象注册的所有服务器路径 * @throws Exception */ public void unregist() throws Exception { zk.close(); } }
到目前为止代码不需要重构,我们继续。再添加一个测试用例来测试。
@Test public void testZkReconnect() throws Exception{ try { // 连接到服务器,停止ZooKeeper集群 String ip = "amaster"; ServerRegister serverRegister = new ServerRegister(zkConnetionString,groupName); serverRegister.registServer(serverNode,"localhost:8000"); ServerRegister serverRegister2 = new ServerRegister(zkConnetionString,groupName); serverRegister2.registServer(serverNode,"localhost:8001"); ServerRegister serverRegister3 = new ServerRegister(zkConnetionString,groupName); serverRegister3.registServer(serverNode,"localhost:8002"); Thread.sleep(50000); // 50秒后重新启动ZooKeeper集群,本来想用代码自动重启的,但是失败了,所有手动重启ZooKeeper Thread.sleep(sessionTimeout); // 获取服务器地址,检查是否包含所有的3个服务器地址 List<String> serverList = getServerList(); checkServerList(serverList,"localhost:8002"}); } catch (Exception e) { e.printStackTrace(); throw e; } }
这个测试直接成功了。倒不是我写了多余的代码,是ZooKeeper本身有重连的功能。
不过我查阅了网上的资料,说如果客户端与服务端断开超过一定的时间,重连后提交的临时数据就会丢失。所以我把断开的时间延长了一些,果然,测试失败了。
为了解决这个问题,需要客户端这边记录下创建的临时数据,重新连接时看需要重新创建这些数据。核心的代码如下:
// 处理zk事件 public void process(WatchedEvent event) { System.out.println(event); if(event.getState() == Event.KeeperState.SyncConnected){ latch.countDown(); try { // 逐个查找之前创建过的临时节点,不存在的重新创建 Map<String,String> newTempNodes = new HashMap<String,String>(); for(Map.Entry<String,String> entry:createdTempNodes.entrySet()){ Stat stat = zk.exists(entry.getKey(),null); if(stat == null){ // 重新创建 String newNode = zk.create(entry.getKey(),entry.getValue().getBytes(),CreateMode.EPHEMERAL_SEQUENTIAL); newTempNodes.put(newNode,entry.getValue()); }else{ // 还存在,不需要重新创建 newTempNodes.put(entry.getKey(),entry.getValue()); } } createdTempNodes = newTempNodes; } catch (Exception e) { // TODO: handle exception } } if(event.getState() == Event.KeeperState.Expired){ try { // 会话过期,重新连接ZooKeeper集群 initZk(); } catch (Exception e1) { // TODO Auto-generated catch block e1.printStackTrace(); } } }
已经创建的服务器地址列表用Map保存:
private Map<String,String> createdTempNodes = new HashMap<String,String>();
解释一下,如果客户端重新连接了zk集群,就检查之前创建的服务器列表是否还存在,如果不存在了就重新创建。如果收到zk通知会话过期,就重启一个zk客户端进行连接,连接成功以后再检查之前创建的服务器列表是否还存在。这样测试就通过了。
继续添加新测试,如果zk集群不可用,则抛出异常:
@Test(expected=RuntimeException.class) public void testUnavailableZK() throws Exception{ new ServerRegister("anode3:2181",groupName); }
直接通过了,看来我之前的代码写的太多了。
继续添加新测试,验证服务器地址为空字符串时的情况:
@Test(expected=RuntimeException.class) public void testEmptyServerAddr() throws Exception{ ServerRegister serverRegister = new ServerRegister(zkConnetionString,groupName); serverRegister.registServer(serverNode,""); } @Test(expected=RuntimeException.class) public void testOverLengthServerAddr() throws Exception{ String serverAddr = ""; for(int i = 0; i < 256; i++){ serverAddr = "a" + serverAddr; } ServerRegister serverRegister = new ServerRegister(zkConnetionString,serverAddr); }
测试失败了。
if(serverAddr.length() == 0 || serverAddr.length() > 255){ throw new RuntimeException("the serverAddr's length should between 1 and 255"); }
后面又添加了上篇文章中的几个测试用例,最终一一通过了测试用例。
补充一个测试用例。我希望ServerRegister能够对不存在的多级父目录进行循环创建。
@Test public void testParentPathNotExist() throws Exception{ ServerRegister serverRegister = new ServerRegister(zkConnetionString,"/test/ddd/MrpcServer"); serverRegister.registServer(serverNode,"localhost:8000"); // 获取服务器地址,检查是否包含指定的服务器地址 List<String> serverList = getServerList("/test/ddd/MrpcServer"); checkServerList(serverList,new String[]{"localhost:8000"}); }
测试失败。
// 检查父节点是否存在,不存在就循环创建 createParentPath(); // 创建必要的父节点 private void createParentPath() throws Exception{ Stat groupStat = zk.exists(this.groupName,null); if(groupStat == null){ String[] parents = groupName.split("/"); String curPath = ""; for(int i = 1; i < parents.length; i++){ curPath = curPath + "/" + parents[i]; Stat stat = zk.exists(curPath,null); if(stat == null){ zk.create(curPath,new String("Mrpc framework server list node").getBytes(),CreateMode.PERSISTENT); } } } }
总结:经过TDD开发,我确实觉得这个类靠谱了许多,尤其是在各种网络不稳定的条件下能够正常运行,感觉敢拿来用了。
然而我也发现使用TDD的门槛好像挺高。如果不能在一开始就弄清楚需求是很难用TDD的,毕竟那样根本没法写测试。而且对测试用例的设计要求很高,毕竟代码就是为了通过测试,如果测试写得达不到实际使用的要求,那么显然产品代码也不能符合要求。