zookeeper Watch Java Example
Watches
zookeeper는 watch의 개념을 지원합니다 . 클라이언트는 znode에서 watch를 설정할 수 있습니다.
znode가 변경되면 watch가 트리거되고 제거됩니다.
watch가 트리거되면 클라이언트는 z 노드가 변경되었음을 알리는 패킷을 수신합니다.
클라이언트와 zookeeper 서버 중 하나의 연결이 끊어지면 클라이언트는 로컬 알림을받습니다.
참조 : http://zookeeper.apache.org/doc/r3.4.10/javaExample.html
zookeeper Java API를 소개하기 위해 아주 간단한 watch클라이언트를 개발했습니다.
이 zookeeper 클라이언트는 zookeeper 노드가 변경되는지 감시하고 프로그램을 시작하거나 중지하여 이에 응답합니다.
일반적으로 zookeeper 응용 프로그램은 연결을 유지하는 장치와 데이터를 모니터링하는 장치의 두 가지로 나뉩니다. 이 응용 프로그램에서 Executor 라는 클래스 는 zookeeper 연결을 유지하고 DataMonitor 라는 클래스 는 zookeeper 트리의 데이터를 모니터링합니다. 또한 Executor는 메인 스레드와 실행 로직을 포함합니다. znode의 상태에 따라 사용자 상호 작용이 거의없고 인수로 전달한 실행 가능한 프로그램과의 상호 작용 및 요구 사항에 따라 샘플이 종료 및 재시작되는 것도 담당합니다.
아래의 JAVA 예제는 zookeeper node에 /znode 을 생성된 상태에서 /znode의 data값이 수정되었을때 watch 모니터링에 대한 구현입니다.
Executor Class
package com.sample; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; public class Executor implements Watcher, Runnable, DataMonitor.DataMonitorListener { String znode; DataMonitor dm; ZooKeeper zk; Process child; public Executor(String hostPort, String znode) throws KeeperException, IOException { zk = new ZooKeeper(hostPort, 3000, this); dm = new DataMonitor(zk, znode, null, this); } /** * @param args */ public static void main(String[] args) { String hostPort = "localhost"; String znode = "/znode"; try { new Executor(hostPort, znode).run(); } catch (Exception e) { e.printStackTrace(); } } /*************************************************************************** * We do process any events ourselves, we just need to forward them on. * * @see org.apache.zookeeper.Watcher#process(org.apache.zookeeper.proto.WatcherEvent) */ public void process(WatchedEvent event) { dm.process(event); } public void run() { try { synchronized (this) { while (!dm.dead) { wait(); } } } catch (InterruptedException e) { } } public void closing(int rc) { synchronized (this) { notifyAll(); } } static class StreamWriter extends Thread { OutputStream os; InputStream is; StreamWriter(InputStream is, OutputStream os) { this.is = is; this.os = os; start(); } public void run() { byte b[] = new byte[80]; int rc; try { while ((rc = is.read(b)) > 0) { os.write(b, 0, rc); } } catch (IOException e) { } } } public void exists(String node, byte[] data) { if (data == null) { if (child != null) { System.out.println("Killing process"); child.destroy(); try { child.waitFor(); } catch (InterruptedException e) { } } child = null; } else { if (child != null) { System.out.println("Stopping child"); child.destroy(); try { child.waitFor(); } catch (InterruptedException e) { e.printStackTrace(); } } try { String dataStr = new String(data); System.out.println("■ node getData : " + dataStr); } catch (Exception e) { e.printStackTrace(); } } } }
DataMonitor Class
package com.sample; import org.apache.zookeeper.AsyncCallback.StatCallback; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; public class DataMonitor implements Watcher, StatCallback { ZooKeeper zk; String znode; Watcher chainedWatcher; boolean dead; DataMonitorListener listener; byte prevData[]; public DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher, DataMonitorListener listener) { this.zk = zk; this.znode = znode; this.chainedWatcher = chainedWatcher; this.listener = listener; // Get things started by checking if the node exists. We are going // to be completely event driven zk.exists(znode, true, this, null); } /** * Other classes use the DataMonitor by implementing this method */ public interface DataMonitorListener { /** * The existence status of the node has changed. */ void exists(String node, byte data[]); /** * The ZooKeeper session is no longer valid. * * @param rc * the ZooKeeper reason code */ void closing(int rc); } public void process(WatchedEvent event) { String path = event.getPath(); if (event.getType() == Event.EventType.None) { // We are are being told that the state of the // connection has changed switch (event.getState()) { case SyncConnected: // In this particular example we don't need to do anything // here - watches are automatically re-registered with // server and any watches triggered while the client was // disconnected will be delivered (in order of course) break; case Expired: // It's all over dead = true; listener.closing(KeeperException.Code.SessionExpired); break; } } else { if (path != null && path.equals(znode)) { // Something has changed on the node, let's find out zk.exists(znode, true, this, null); } } if (chainedWatcher != null) { chainedWatcher.process(event); } } public void processResult(int rc, String path, Object ctx, Stat stat) { boolean exists; switch (rc) { case Code.Ok: exists = true; break; case Code.NoNode: exists = false; break; case Code.SessionExpired: case Code.NoAuth: dead = true; listener.closing(rc); return; default: // Retry errors zk.exists(znode, true, this, null); return; } byte b[] = null; if (exists) { try { b = zk.getData(znode, false, null); } catch (KeeperException e) { // We don't need to worry about recovering now. The watch // callbacks will kick off any exception handling e.printStackTrace(); } catch (InterruptedException e) { return; } } if (b != null) { listener.exists(path, b); prevData = b; } } }
아래는 Executor.java 를 실행한 화면입니다.
Executor.java를 실행후 zkCli 를 이용하여 /znode의 값을 아래와 같이 수정합니다.
zkCli 를 이용하여 /znode의 값을 수정후 아래와 같이 감시 이벤트를 발생하여 /znode의 값을 출력합니다.