zookeeper
之前完成的项目作用zookeeper用作服务发现,抽象出注册和监听功能,供其他项目使用。对zookeeper的一些细节作个记录吧。
我们知道通过Zookeeper对象实例实现对zookeeper服务的调用。首先的问题就是消息的发送和接受,以及事件的回调。这里我们把目光转向ClientCnxn
创建实例
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {..../*** send线程完成发送request,接受response,生成event* event线程派发event*/sendThread = new SendThread(clientCnxnSocket);eventThread = new EventThread();
}
`SendThread线程`
public void run() {//最大心跳间隔final int MAX_SEND_PING_INTERVAL = 10000; while (state.isAlive()) {try {//1、如果末连接,随机休眠1~1000,根据closing(用户请求)决定是跳出还是重连//2、如果需要登录,进行登录请求,根据结果生成事件if (zooKeeperSaslClient != null) {。。。if (sendAuthEvent == true) {eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None,authState,null));}}//3、如果waitTimeOut超时,抛出SessionTimeoutExceptionif (to <= 0) {throw new SessionTimeoutException(...);}//4、心跳sendPing()//5、如果是只读模式pingRwServer();//6、开始读、写IOclientCnxnSocket.doTransport(to, pendingQ, outgoingQ, ClientCnxn);} catch (Throwable e) {//发送Disconnected事件if (state.isAlive()) {eventThread.queueEvent(new WatchedEvent(Event.EventType.None,Event.KeeperState.Disconnected,null));}}}//如果alive状态发送Disconnected事件if (state.isAlive()) {eventThread.queueEvent(new WatchedEvent(Event.EventType.None,Event.KeeperState.Disconnected, null));}
}
`SendThread线程`
void readResponse(ByteBuffer incomingBuffer) throws IOException {//心跳响应if (replyHdr.getXid() == -2) {// -2 is the xid for pingsif (LOG.isDebugEnabled()) {LOG.debug("Got ping response for sessionid: 0x"+ Long.toHexString(sessionId)+ " after "+ ((System.nanoTime() - lastPingSentNs) / 1000000)+ "ms");}return;}//认证响应if (replyHdr.getXid() == -4) {// -4 is the xid for AuthPacket if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {state = States.AUTH_FAILED; eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null) ); }if (LOG.isDebugEnabled()) {LOG.debug("Got auth sessionid:0x"+ Long.toHexString(sessionId));}return;}//通知if (replyHdr.getXid() == -1) {// -1 means notificationif (LOG.isDebugEnabled()) {LOG.debug("Got notification sessionid:0x"+ Long.toHexString(sessionId));}WatcherEvent event = new WatcherEvent();event.deserialize(bbia, "response");// convert from a server path to a client pathif (chrootPath != null) {String serverPath = event.getPath();if(serverPath.compareTo(chrootPath)==0)event.setPath("/");else if (serverPath.length() > chrootPath.length())event.setPath(serverPath.substring(chrootPath.length()));else {LOG.warn("Got server path " + event.getPath()+ " which is too short for chroot path "+ chrootPath);}}WatchedEvent we = new WatchedEvent(event);if (LOG.isDebugEnabled()) {LOG.debug("Got " + we + " for sessionid 0x"+ Long.toHexString(sessionId));}eventThread.queueEvent( we );return;}...finishPacket(packet);
}
`EventThread`
public void run() {try {isRunning = true;while (true) {Object event = waitingEvents.take();if (event == eventOfDeath) {wasKilled = true;} else {processEvent(event);}if (wasKilled)synchronized (waitingEvents) {if (waitingEvents.isEmpty()) {isRunning = false;break;}}}} catch (InterruptedException e) {LOG.error("Event thread exiting due to interruption", e);}LOG.info("EventThread shut down");
}
zookeeper
之前完成的项目作用zookeeper用作服务发现,抽象出注册和监听功能,供其他项目使用。对zookeeper的一些细节作个记录吧。
我们知道通过Zookeeper对象实例实现对zookeeper服务的调用。首先的问题就是消息的发送和接受,以及事件的回调。这里我们把目光转向ClientCnxn
创建实例
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {..../*** send线程完成发送request,接受response,生成event* event线程派发event*/sendThread = new SendThread(clientCnxnSocket);eventThread = new EventThread();
}
`SendThread线程`
public void run() {//最大心跳间隔final int MAX_SEND_PING_INTERVAL = 10000; while (state.isAlive()) {try {//1、如果末连接,随机休眠1~1000,根据closing(用户请求)决定是跳出还是重连//2、如果需要登录,进行登录请求,根据结果生成事件if (zooKeeperSaslClient != null) {。。。if (sendAuthEvent == true) {eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None,authState,null));}}//3、如果waitTimeOut超时,抛出SessionTimeoutExceptionif (to <= 0) {throw new SessionTimeoutException(...);}//4、心跳sendPing()//5、如果是只读模式pingRwServer();//6、开始读、写IOclientCnxnSocket.doTransport(to, pendingQ, outgoingQ, ClientCnxn);} catch (Throwable e) {//发送Disconnected事件if (state.isAlive()) {eventThread.queueEvent(new WatchedEvent(Event.EventType.None,Event.KeeperState.Disconnected,null));}}}//如果alive状态发送Disconnected事件if (state.isAlive()) {eventThread.queueEvent(new WatchedEvent(Event.EventType.None,Event.KeeperState.Disconnected, null));}
}
`SendThread线程`
void readResponse(ByteBuffer incomingBuffer) throws IOException {//心跳响应if (replyHdr.getXid() == -2) {// -2 is the xid for pingsif (LOG.isDebugEnabled()) {LOG.debug("Got ping response for sessionid: 0x"+ Long.toHexString(sessionId)+ " after "+ ((System.nanoTime() - lastPingSentNs) / 1000000)+ "ms");}return;}//认证响应if (replyHdr.getXid() == -4) {// -4 is the xid for AuthPacket if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {state = States.AUTH_FAILED; eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null) ); }if (LOG.isDebugEnabled()) {LOG.debug("Got auth sessionid:0x"+ Long.toHexString(sessionId));}return;}//通知if (replyHdr.getXid() == -1) {// -1 means notificationif (LOG.isDebugEnabled()) {LOG.debug("Got notification sessionid:0x"+ Long.toHexString(sessionId));}WatcherEvent event = new WatcherEvent();event.deserialize(bbia, "response");// convert from a server path to a client pathif (chrootPath != null) {String serverPath = event.getPath();if(serverPath.compareTo(chrootPath)==0)event.setPath("/");else if (serverPath.length() > chrootPath.length())event.setPath(serverPath.substring(chrootPath.length()));else {LOG.warn("Got server path " + event.getPath()+ " which is too short for chroot path "+ chrootPath);}}WatchedEvent we = new WatchedEvent(event);if (LOG.isDebugEnabled()) {LOG.debug("Got " + we + " for sessionid 0x"+ Long.toHexString(sessionId));}eventThread.queueEvent( we );return;}...finishPacket(packet);
}
`EventThread`
public void run() {try {isRunning = true;while (true) {Object event = waitingEvents.take();if (event == eventOfDeath) {wasKilled = true;} else {processEvent(event);}if (wasKilled)synchronized (waitingEvents) {if (waitingEvents.isEmpty()) {isRunning = false;break;}}}} catch (InterruptedException e) {LOG.error("Event thread exiting due to interruption", e);}LOG.info("EventThread shut down");
}