zookeeper Watch Java Example
Watches
zookeeper는 watch의 개념을 지원합니다 . 클라이언트는 znode에서 watch를 설정할 수 있습니다.
znode가 변경되면 watch가 트리거되고 제거됩니다.
watch가 트리거되면 클라이언트는 z 노드가 변경되었음을 알리는 패킷을 수신합니다.
클라이언트와 zookeeper 서버 중 하나의 연결이 끊어지면 클라이언트는 로컬 알림을받습니다.
참조 : http://zookeeper.apache.org/doc/r3.4.10/javaExample.html
zookeeper Java API를 소개하기 위해 아주 간단한 watch클라이언트를 개발했습니다.
이 zookeeper 클라이언트는 zookeeper 노드가 변경되는지 감시하고 프로그램을 시작하거나 중지하여 이에 응답합니다.
일반적으로 zookeeper 응용 프로그램은 연결을 유지하는 장치와 데이터를 모니터링하는 장치의 두 가지로 나뉩니다. 이 응용 프로그램에서 Executor 라는 클래스 는 zookeeper 연결을 유지하고 DataMonitor 라는 클래스 는 zookeeper 트리의 데이터를 모니터링합니다. 또한 Executor는 메인 스레드와 실행 로직을 포함합니다. znode의 상태에 따라 사용자 상호 작용이 거의없고 인수로 전달한 실행 가능한 프로그램과의 상호 작용 및 요구 사항에 따라 샘플이 종료 및 재시작되는 것도 담당합니다.
아래의 JAVA 예제는 zookeeper node에 /znode 을 생성된 상태에서 /znode의 data값이 수정되었을때 watch 모니터링에 대한 구현입니다.
Executor Class
package com.sample;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
public class Executor implements Watcher, Runnable, DataMonitor.DataMonitorListener
{
String znode;
DataMonitor dm;
ZooKeeper zk;
Process child;
public Executor(String hostPort, String znode) throws KeeperException, IOException {
zk = new ZooKeeper(hostPort, 3000, this);
dm = new DataMonitor(zk, znode, null, this);
}
/**
* @param args
*/
public static void main(String[] args) {
String hostPort = "localhost";
String znode = "/znode";
try {
new Executor(hostPort, znode).run();
} catch (Exception e) {
e.printStackTrace();
}
}
/***************************************************************************
* We do process any events ourselves, we just need to forward them on.
*
* @see org.apache.zookeeper.Watcher#process(org.apache.zookeeper.proto.WatcherEvent)
*/
public void process(WatchedEvent event) {
dm.process(event);
}
public void run() {
try {
synchronized (this) {
while (!dm.dead) {
wait();
}
}
} catch (InterruptedException e) {
}
}
public void closing(int rc) {
synchronized (this) {
notifyAll();
}
}
static class StreamWriter extends Thread {
OutputStream os;
InputStream is;
StreamWriter(InputStream is, OutputStream os) {
this.is = is;
this.os = os;
start();
}
public void run() {
byte b[] = new byte[80];
int rc;
try {
while ((rc = is.read(b)) > 0) {
os.write(b, 0, rc);
}
} catch (IOException e) {
}
}
}
public void exists(String node, byte[] data) {
if (data == null) {
if (child != null) {
System.out.println("Killing process");
child.destroy();
try {
child.waitFor();
} catch (InterruptedException e) {
}
}
child = null;
} else {
if (child != null) {
System.out.println("Stopping child");
child.destroy();
try {
child.waitFor();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
String dataStr = new String(data);
System.out.println("■ node getData : " + dataStr);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
DataMonitor Class
package com.sample;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
public class DataMonitor implements Watcher, StatCallback {
ZooKeeper zk;
String znode;
Watcher chainedWatcher;
boolean dead;
DataMonitorListener listener;
byte prevData[];
public DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher, DataMonitorListener listener) {
this.zk = zk;
this.znode = znode;
this.chainedWatcher = chainedWatcher;
this.listener = listener;
// Get things started by checking if the node exists. We are going
// to be completely event driven
zk.exists(znode, true, this, null);
}
/**
* Other classes use the DataMonitor by implementing this method
*/
public interface DataMonitorListener {
/**
* The existence status of the node has changed.
*/
void exists(String node, byte data[]);
/**
* The ZooKeeper session is no longer valid.
*
* @param rc
* the ZooKeeper reason code
*/
void closing(int rc);
}
public void process(WatchedEvent event) {
String path = event.getPath();
if (event.getType() == Event.EventType.None) {
// We are are being told that the state of the
// connection has changed
switch (event.getState()) {
case SyncConnected:
// In this particular example we don't need to do anything
// here - watches are automatically re-registered with
// server and any watches triggered while the client was
// disconnected will be delivered (in order of course)
break;
case Expired:
// It's all over
dead = true;
listener.closing(KeeperException.Code.SessionExpired);
break;
}
} else {
if (path != null && path.equals(znode)) {
// Something has changed on the node, let's find out
zk.exists(znode, true, this, null);
}
}
if (chainedWatcher != null) {
chainedWatcher.process(event);
}
}
public void processResult(int rc, String path, Object ctx, Stat stat) {
boolean exists;
switch (rc) {
case Code.Ok:
exists = true;
break;
case Code.NoNode:
exists = false;
break;
case Code.SessionExpired:
case Code.NoAuth:
dead = true;
listener.closing(rc);
return;
default:
// Retry errors
zk.exists(znode, true, this, null);
return;
}
byte b[] = null;
if (exists) {
try {
b = zk.getData(znode, false, null);
} catch (KeeperException e) {
// We don't need to worry about recovering now. The watch
// callbacks will kick off any exception handling
e.printStackTrace();
} catch (InterruptedException e) {
return;
}
}
if (b != null) {
listener.exists(path, b);
prevData = b;
}
}
}
아래는 Executor.java 를 실행한 화면입니다.
Executor.java를 실행후 zkCli 를 이용하여 /znode의 값을 아래와 같이 수정합니다.
zkCli 를 이용하여 /znode의 값을 수정후 아래와 같이 감시 이벤트를 발생하여 /znode의 값을 출력합니다.




