월요일, 3월 17, 2025
HomeLinuxzookeeper Watch Java Example

zookeeper Watch Java Example

zookeeper Watch Java Example

Watches

zookeeper는 watch의 개념을 지원합니다 . 클라이언트는 znode에서 watch를 설정할 수 있습니다.
znode가 변경되면 watch가 트리거되고 제거됩니다.
watch가 트리거되면 클라이언트는 z 노드가 변경되었음을 알리는 패킷을 수신합니다.
클라이언트와 zookeeper 서버 중 하나의 연결이 끊어지면 클라이언트는 로컬 알림을받습니다. 

참조 : http://zookeeper.apache.org/doc/r3.4.10/javaExample.html

zookeeper Java API를 소개하기 위해 아주 간단한 watch클라이언트를 개발했습니다.
이 zookeeper 클라이언트는 zookeeper 노드가 변경되는지 감시하고 프로그램을 시작하거나 중지하여 이에 응답합니다.

 

일반적으로 zookeeper 응용 프로그램은 연결을 유지하는 장치와 데이터를 모니터링하는 장치의 두 가지로 나뉩니다. 이 응용 프로그램에서 Executor 라는 클래스 는 zookeeper 연결을 유지하고 DataMonitor 라는 클래스 는 zookeeper 트리의 데이터를 모니터링합니다. 또한 Executor는 메인 스레드와 실행 로직을 포함합니다. znode의 상태에 따라 사용자 상호 작용이 거의없고 인수로 전달한 실행 가능한 프로그램과의 상호 작용 및 요구 사항에 따라 샘플이 종료 및 재시작되는 것도 담당합니다.

 

아래의 JAVA 예제는 zookeeper node에 /znode 을 생성된 상태에서 /znode의 data값이 수정되었을때 watch 모니터링에 대한 구현입니다.

Executor Class

package com.sample;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

public class Executor implements Watcher, Runnable, DataMonitor.DataMonitorListener
{
    String znode;

    DataMonitor dm;

    ZooKeeper zk;

    Process child;

    public Executor(String hostPort, String znode) throws KeeperException, IOException {
        zk = new ZooKeeper(hostPort, 3000, this);
        dm = new DataMonitor(zk, znode, null, this);
    }

    /**
     * @param args
     */
    public static void main(String[] args) {
        String hostPort = "localhost";
        String znode = "/znode";
        try {
            new Executor(hostPort, znode).run();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /***************************************************************************
     * We do process any events ourselves, we just need to forward them on.
     *
     * @see org.apache.zookeeper.Watcher#process(org.apache.zookeeper.proto.WatcherEvent)
     */
    public void process(WatchedEvent event) {
        dm.process(event);
    }

    public void run() {
        try {
            synchronized (this) {
                while (!dm.dead) {
                    wait();
                }
            }
        } catch (InterruptedException e) {
        }
    }

    public void closing(int rc) {
        synchronized (this) {
            notifyAll();
        }
    }

    static class StreamWriter extends Thread {
        OutputStream os;

        InputStream is;

        StreamWriter(InputStream is, OutputStream os) {
            this.is = is;
            this.os = os;
            start();
        }

        public void run() {
            byte b[] = new byte[80];
            int rc;
            try {
                while ((rc = is.read(b)) > 0) {
                    os.write(b, 0, rc);
                }
            } catch (IOException e) {
            }

        }
    }

    public void exists(String node, byte[] data) {
        if (data == null) {
            if (child != null) {
                System.out.println("Killing process");
                child.destroy();
                try {
                    child.waitFor();
                } catch (InterruptedException e) {
                }
            }
            child = null;
        } else {
            if (child != null) {
                System.out.println("Stopping child");
                child.destroy();
                try {
                    child.waitFor();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            try {
            	String dataStr = new String(data);
            	System.out.println("■ node getData : " + dataStr);
                
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

DataMonitor Class

package com.sample;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

public class DataMonitor implements Watcher, StatCallback {

    ZooKeeper zk;

    String znode;

    Watcher chainedWatcher;

    boolean dead;

    DataMonitorListener listener;

    byte prevData[];

    public DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher, DataMonitorListener listener) {
        this.zk = zk;
        this.znode = znode;
        this.chainedWatcher = chainedWatcher;
        this.listener = listener;
        // Get things started by checking if the node exists. We are going
        // to be completely event driven
        zk.exists(znode, true, this, null);
    }

    /**
     * Other classes use the DataMonitor by implementing this method
     */
    public interface DataMonitorListener {
        /**
         * The existence status of the node has changed.
         */
        void exists(String node, byte data[]);

        /**
         * The ZooKeeper session is no longer valid.
         *
         * @param rc
         *                the ZooKeeper reason code
         */
        void closing(int rc);
    }

    public void process(WatchedEvent event) {
        String path = event.getPath();
        if (event.getType() == Event.EventType.None) {
            // We are are being told that the state of the
            // connection has changed
            switch (event.getState()) {
            case SyncConnected:
                // In this particular example we don't need to do anything
                // here - watches are automatically re-registered with 
                // server and any watches triggered while the client was 
                // disconnected will be delivered (in order of course)
                break;
            case Expired:
                // It's all over
                dead = true;
                listener.closing(KeeperException.Code.SessionExpired);
                break;
            }
        } else {
            if (path != null && path.equals(znode)) {
                // Something has changed on the node, let's find out
                zk.exists(znode, true, this, null);
            }
        }
        if (chainedWatcher != null) {
            chainedWatcher.process(event);
        }
    }

    public void processResult(int rc, String path, Object ctx, Stat stat) {
        boolean exists;
        switch (rc) {
        case Code.Ok:
            exists = true;
            break;
        case Code.NoNode:
            exists = false;
            break;
        case Code.SessionExpired:
        case Code.NoAuth:
            dead = true;
            listener.closing(rc);
            return;
        default:
            // Retry errors
            zk.exists(znode, true, this, null);
            return;
        }

        byte b[] = null;
        if (exists) {
            try {
                b = zk.getData(znode, false, null);
            } catch (KeeperException e) {
                // We don't need to worry about recovering now. The watch
                // callbacks will kick off any exception handling
                e.printStackTrace();
            } catch (InterruptedException e) {
                return;
            }
        }
        if (b != null) {
            listener.exists(path, b);
            prevData = b;
        }
    }
}

아래는 Executor.java 를 실행한 화면입니다.

Executor.java를 실행후 zkCli 를 이용하여 /znode의 값을 아래와 같이 수정합니다.

zkCli 를 이용하여 /znode의 값을 수정후 아래와 같이 감시 이벤트를 발생하여 /znode의 값을 출력합니다.

 

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Most Popular