使用Zookeeper来为你的程序加上Leader Election的功能。 博客分类: java基础 zkHadoopmavenApache.net
程序员文章站
2024-02-21 08:40:10
...
ZooKeeper是Hadoop的正式子项目,它是一个针对大型分布式系统的可靠协调系统,提供的功能包括:配置维护、名字服务、分布式同步、组服务等。ZooKeeper的目标就是封装好复杂易出错的关键服务,将简单易用的接口和性能高效、功能稳定的系统提供给用户。
废话不多说,
附件是完整的maven项目。
Reference: http://hadoop.apache.org/zookeeper/docs/r3.3.1/recipes.html#sc_leaderElection
废话不多说,
package com.ericsson.threef.zookeeper; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.List; import java.util.concurrent.CountDownLatch; /** * Created by IntelliJ IDEA. * User: EDENYIN * Date: 1/4/11 * Time: 3:13 PM * To change this template use File | Settings | File Templates. */ public class LeaderElection implements Watcher, Runnable { private String zookeeperConnectionString; private String rootPath; private ZooKeeper zk; private byte[] hostAddress; public LeaderElection(String zookeeperConnectionString, String rootPath) { this.zookeeperConnectionString = zookeeperConnectionString; this.rootPath = rootPath; try { hostAddress = InetAddress.getLocalHost().getHostAddress().getBytes(); } catch (UnknownHostException e) { e.printStackTrace(); } buildZK(); } private void buildZK() { System.out.println("Build zk client"); try { zk = new ZooKeeper(zookeeperConnectionString, 10000, this); Stat s = zk.exists(rootPath, false); if (s == null) { zk.create(rootPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zk.create(rootPath + "/ELECTION", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } String value = zk.create(rootPath + "/ELECTION/n_", hostAddress, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); } catch (Exception e) { e.printStackTrace(); System.err.println("Error connect to zoo keeper"); } } public void process(WatchedEvent event) { System.out.println(event); if (event.getState() == Event.KeeperState.Disconnected || event.getState() == Event.KeeperState.Expired) { System.out.println("Zookeeper connection timeout."); buildZK(); } } public void run() { while (true) { try { List<String> children = zk.getChildren(rootPath + "/ELECTION", false); String leaderPath = "Not found"; int minValue = -1; for (int i=0;i<children.size();i++) { String child = children.get(i); int index = Integer.parseInt(child.substring(2)); if (i == 0) { minValue = index; leaderPath = child; }else if (index < minValue) { minValue = index; leaderPath = child; } } LatchChildWatcher latchChildWatcher = new LatchChildWatcher(); byte[] data = zk.getData(rootPath + "/ELECTION/" + leaderPath, latchChildWatcher, null); System.out.println("find the leader on the path:" + leaderPath + " whose host address is " + new String(data)); latchChildWatcher.await(); } catch (Exception e) { e.printStackTrace(); System.err.println("Error get the leader." + e.getMessage()); } } } private class LatchChildWatcher implements Watcher { CountDownLatch latch; public LatchChildWatcher(){ latch = new CountDownLatch(1); } public void process(WatchedEvent event){ System.out.println("Watcher fired on path: " + event.getPath() + " state: " + event.getState() + " type " + event.getType()); latch.countDown(); } public void await() throws InterruptedException { latch.await(); } } }
附件是完整的maven项目。
Reference: http://hadoop.apache.org/zookeeper/docs/r3.3.1/recipes.html#sc_leaderElection