欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  科技

Java-PandaFollowerMonitor

程序员文章站 2022-07-07 22:32:38
架构 Server,Spider CrawlId package Peerslee.HotMonitor.Server.Spider; /* * 1. 定时抓取id *...

架构

Java-PandaFollowerMonitor

Server,Spider

CrawlId

package Peerslee.HotMonitor.Server.Spider;
/*
 * 1. 定时抓取id
 * 2. 上传zk
 * https://www.panda.tv/live_lists?pageno=1&pagenum=120
 */

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;

import com.fasterxml.jackson.databind.ObjectMapper;

import Peerslee.HotMonitor.Bean.Info;
import Peerslee.HotMonitor.Bean.Info.Room;
import Peerslee.HotMonitor.Server.Centralize.Centralizer;
import Peerslee.HotMonitor.Utils.MysqlHandler;

public class CrawlId {
	private static final int BUCKET_COUNT = 3;
	private static final String TABLE_NAME = "PandaTv";
	
	private static HttpClient client = HttpClients.createDefault();
	private static MysqlHandler db = MysqlHandler.getMHandler(new String[] {
			"jdbc:mysql://localhost:3306/HotMonitor", "root", "123"});
	private static Centralizer centralizer = new Centralizer(
			"localhost:2181,localhost:2182,localhost:2183");
	
	int hash(String id) { return Integer.parseInt(id) % 3; }
	
	List crawl() {
		List iList = new ArrayList();
		for (int i = 1; ; i++) {
			String url = String.format("https://www.panda.tv/live_lists?pageno=%d&pagenum=120",i);
			System.out.println("CrawId: [" + url + "]");
			HttpGet hGet = new HttpGet(url);
			try {
				HttpResponse response = client.execute(hGet);
				HttpEntity entity = response.getEntity();  
	            String json = EntityUtils.toString(entity); 
	            ObjectMapper mapper = new ObjectMapper();
	            Info info = mapper.readValue(json, Info.class);
	            if (info.getData().getItems().size() == 0) break;
	            iList.add(info);
			} catch (Exception e) { 
				System.out.println("CrawlId: [Crawl '" + url + "' error.]"); 
			}
		}
		return iList;
	}
	
	void divideBucket(List rItems) {
		/*
		 *  JVM将泛型存储的东西都视为Object, 底层的数组类型,它只能是Object[]
		 */
		Map buckets = new HashMap();
		for (Info.Room room: rItems) {
			String roomId = room.getId();
			System.out.println(room);
			
			// db
			if(db.isNullRecord(TABLE_NAME, roomId)) { // 验证
				String Columns = "roomId, roomName, classifyCname, classifyEname, userName, nickName";
				String Values = (new StringBuilder("'")).
						append(room.getId()).append("', '").
						append(room.getName()).append("', '").
						append(room.getClassification().getCname()).append("', '").
						append(room.getClassification().getEname()).append("', '").
						append(room.getUserinfo().getUserName()).append("', '").
						append(room.getUserinfo().getNickName()).append("'").
						toString();
				db.insertRecord(TABLE_NAME, Columns, Values);
			}
		}
		
		// 分桶
		String []roomIds = db.selectAll(TABLE_NAME, "roomId", 0);
		for (String roomId: roomIds) {
			int bId = hash(roomId);
			if (!buckets.containsKey(bId)) buckets.put(bId, new StringBuilder()) ;
			buckets.get(bId).append(roomId).append(",");
		}
		
		// 注册watcher
		centralizer.registerNode(new String[]{
				"/ZNodeId/ZNodeId_0",
				"/ZNodeId/ZNodeId_1",
				"/ZNodeId/ZNodeId_2"});
		
		// 上传zk
		Map ZNode = new HashMap<>();
		for (int i = 0; i < BUCKET_COUNT; i++) {
			if (buckets.get(i) != null) {
				int len = buckets.get(i).length();
				ZNode.put("/ZNodeId/ZNodeId_" + i,
						buckets.get(i).deleteCharAt(len - 1).toString().getBytes());
			}
		}
		centralizer.setBytesToNode(ZNode);
	}
	
	public static void main(String[] args) {
		CrawlId ci = new CrawlId();
		while (true) {
			List rList= new ArrayList();
			for (Info i: ci.crawl()) rList.addAll(i.getData().getItems()); 
			ci.divideBucket(rList);
			try {
				Thread.sleep(1000*60*30);
			} catch (InterruptedException e) { } // 30 分钟一次
		}
	}
}

CrawlInfo0,1,2

package Peerslee.HotMonitor.Server.Spider;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;

import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;

import com.fasterxml.jackson.databind.ObjectMapper;

import Peerslee.HotMonitor.Bean.Follower;
import Peerslee.HotMonitor.Calculate.Calculater;
import Peerslee.HotMonitor.Server.Centralize.Centralizer;

/*
 * https://www.panda.tv/room_followinfo?roomid=266019
 */
public class CrawlInfo0 {
	private static String ZK_NODE; // 根据socket 设置,"/ZNodeId/ZNodeId_"
	private static final int PORT = 5200; // 5201,5202
	
	private static HttpClient client = HttpClients.createDefault();
	private static Centralizer centralizer = new Centralizer(
			"localhost:2181,localhost:2182,localhost:2183");
	private static Calculater calculater = new Calculater();
	
	public void getFollowerByIds() {
		String []ids = centralizer.getStringFromNode(ZK_NODE).split(",");

		Map recentFollower = new HashMap();
		System.out.println("CrawlInfo: [Crawling follower, waiting...]");
		Stream.of(ids).forEach(id -> {
			String fCount = getFollower(id);
			if (fCount != null) {
				recentFollower.put(id, fCount);
			}
		});
		calculater.calculate(recentFollower);
	}
	
	public String getFollower(String roomId) {
		String url = String.format("https://www.panda.tv/room_followinfo?roomid=%s", roomId);
		HttpGet hGet = new HttpGet(url);
		try {
			HttpResponse response = client.execute(hGet);
			HttpEntity entity = response.getEntity();  
            String json = EntityUtils.toString(entity); 
            ObjectMapper mapper = new ObjectMapper();
            return mapper.readValue(json, Follower.class).getData().getFans();
		} catch (Exception e) { 
			System.out.println("CrawlInfo: [Crawl '" + url + "' error.]"); 
			return null;
		}
	}
	
	public static void main(String[] args) {
		CrawlInfo0 cInfo = new CrawlInfo0();
		Socket socket = null;
		try {
			ServerSocket sSocket = new ServerSocket(PORT);
			while (true) {
				socket = sSocket.accept();
				System.out.println("等待连接...");
				InputStreamReader isr = new InputStreamReader(
						socket.getInputStream(), "UTF-8");
				BufferedReader reader = new BufferedReader(isr);
				ZK_NODE = reader.readLine(); // 读一行
				System.out.println("CrawlInfo: [Get follower by " + ZK_NODE + " ids.]"); 
				cInfo.getFollowerByIds();
			}
		} catch (Exception e) {
			System.out.println("CrawlInfo: [CrawlInfo error.]"); 
		} finally {
			try {
				socket.close();
			} catch (IOException e) { }
		}
	}
}

Server,Centralizer

package Peerslee.HotMonitor.Server.Centralize;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;

//要记得初始化 ZKNode
public class Centralizer implements Watcher{
	private static final int SESSION_TIME_OUT = 5000; // 5 second
	private static CountDownLatch cdLatch = new CountDownLatch(1);
	private ZooKeeper zk;
	
	public Centralizer(String hostPort) {
		try {
			this.zk = new ZooKeeper(hostPort, SESSION_TIME_OUT, this);
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	/*
	 * 创建node
	 */
	void initZKNode(Map ZNode) {
		for (Entry node: ZNode.entrySet()) {
			try {
				String path = zk.create(node.getKey(),
						node.getValue(),
						Ids.OPEN_ACL_UNSAFE,
						CreateMode.PERSISTENT);
				System.out.println("Success create znode:" + path);
			} catch (Exception e) {}
		}
	}
	
	/*
	 * 注册watcher
	 */
	public void registerNode(String []node) {
		for (String n: node) {
			try {
				zk.exists(n, true);
				System.out.println("ZK: [Exist " + n + " successful.]");
			} catch (Exception e) { 
				System.out.println("ZK: [Exist " + n + " error.]");
			}
		}
	}
	
	/*
	 * 获取string
	 */
	public String getStringFromNode(String node) {
		try {
			return new String(zk.getData(node, true, new Stat()));
		} catch (Exception e) {
			System.out.println("ZK: [Get bytes error.]");
			return null;
		}
	}
	
	/*
	 * 更新
	 */
	public void setBytesToNode(Map ZNode) {
		for (Entry node: ZNode.entrySet()) {
			try { 
				Thread.sleep(5000); // 伪分布式下,sleep 一下
				Stat stat = zk.setData(node.getKey(), node.getValue(), -1);
				System.out.println("ZK: [Set Node:" + node.getKey() + ".]");
				System.out.println(stat.getCzxid() + "/" + stat.getMzxid() +
						"/" + stat.getVersion());
			} catch (Exception e) { 
				System.out.println("ZK: [Set bytes error.]");
			}
		}
	}
	
	@Override
	public void process(WatchedEvent event) {
		System.out.println("---- zk process ----");
		cdLatch.countDown(); // 连接成功,之后不会再 -1
		if (KeeperState.SyncConnected == event.getState() ) {
			switch (event.getType().getIntValue()) {
			case -1: //EventType.None
				System.out.println("ZK: [process none.]");
				break;
			case 1: //EventType.NodeCreated
				System.out.println("ZK: [process node created.]");
				break;
			case 3: //EventType.NodeDataChanged
				System.out.println("ZK: [process node data changed.]");
//				System.out.println(event.getPath());
				/*
				 * Socket
				 */
				Socket socket = null;
				try {
					String hostName;
					int port = -1;
					System.out.println(event.getPath());
					switch (event.getPath()) {
					case "/ZNodeId/ZNodeId_0":
						hostName = "127.0.0.1";
						port = 5200;
						break;
					case "/ZNodeId/ZNodeId_1":
						hostName = "127.0.0.1";
						port = 5201;
						break;
					case "/ZNodeId/ZNodeId_2":
						hostName = "127.0.0.1";
						port = 5202;
						break;
					default:
						hostName = null;
						break;
					}
					System.out.println(hostName);
					if (hostName == null) return;
					socket = new Socket(hostName, port);
					socket.setSoTimeout(15000);
					PrintWriter writer = new PrintWriter(socket.getOutputStream());
					writer.print(event.getPath());
					writer.flush();
				} catch (Exception e) { 
					System.out.println("ZK: [Socket error.]");
				} finally {
					try {
						socket.close();
					} catch (IOException e) { }
				}
				break;
			case 4: //EventType.NodeChildrenChanged
				System.out.println("ZK: [process children list changed.]");
				break;
			default:
				System.out.println("ZK: [process nothing.]");
				break;
			}
		}
	}
	
	public static void main(String[] args) {
		final String HOST_PORT = "localhost:2181,localhost:2182,localhost:2183";
		Centralizer centralizer = new Centralizer(HOST_PORT);
		
		System.out.println(centralizer.zk.getState());
		
		try {
			cdLatch.await();
		} catch (InterruptedException e) {}
		finally {
			cdLatch.countDown();
		}
		
		Map ZNode = new HashMap();
//		ZNode.put("/ZNodeId", "".getBytes());
		ZNode.put("/ZNodeId/ZNodeId_0", "".getBytes());
//		ZNode.put("/ZNodeId/ZNodeId_1", "".getBytes());
//		ZNode.put("/ZNodeId/ZNodeId_2", "".getBytes());
//		ZNode.put("/ZNodeData", "".getBytes());
//		centralizer.initZKNode(ZNode);
		centralizer.setBytesToNode(ZNode);
		System.out.println(centralizer.getStringFromNode("/ZNodeId/ZNodeId_0"));
		try {
			Thread.sleep(Integer.MAX_VALUE);
		} catch (InterruptedException e) {}
	}

}

Calculate

package Peerslee.HotMonitor.Calculate;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStreamWriter;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;


import Peerslee.HotMonitor.Utils.MysqlHandler;

/*
 * 负责在每台机器上运算
 */
public class Calculater {
	private static final String TABLE_NAME = "PandaTv";
	private static final int COLUMN_NUM = 12;
	private static String BASE_PATH = "E:\\HOT_MONITOR\\";
	
	private static MysqlHandler db = MysqlHandler.getMHandler(new String[] {
			"jdbc:mysql://localhost:3306/HotMonitor", "root", "123"});
	private static Date today = null ;
	private static SimpleDateFormat sdFormat =  new SimpleDateFormat("yyyy/MM/dd HH:mm");
	/*
	 * 根据id,选择性计算
	 */
	public void calculate(Map recentFollower) {
		today = new Date(); // 更新时间
		String roomId = recentFollower.keySet().toString().
				replace("[", "(").replace("]", ")");
		String []columns = {"roomId", "flowerCount",
				"flowerMax", "flowerMatime",
				"flowerMin", "flowerMitime",
				"flowerDist"};
		for (Map previous: db.
				selectRecord(TABLE_NAME, roomId, columns)) {
			String rId = previous.get("roomId");
			update(recentFollower.get(rId), previous, columns);
			db.updateRecord(TABLE_NAME, rId, previous);
			System.out.println(previous);
		}
		System.out.println("Calculater: [Fault-Tolerant --->.]");
		saveToLocal(getRecentRecord());
	}
	
	// 更新
	void update(String rFollower, Map previous, String[] columns) {
		/*
		 * 如果 record 中元素为空,则赋值,recentFollower中元素
		 */
		for (String column: columns) {
			switch (column) {
			case "flowerCount":
				String fCount = previous.get(column);
				if (fCount == null || fCount == rFollower) {
					previous.put(column, rFollower);
					previous.put("flowerDist", "0");
				} else {
					previous.put("flowerDist",
							String.valueOf(Long.parseLong(rFollower) - Long.parseLong(fCount)));
				}
				break;
			case "flowerMax":
				String fMax = previous.get(column);
				// null直接 put,否则比较
				if (fMax == null || Long.parseLong(fMax) < Long.parseLong(rFollower)) {
					previous.put(column, rFollower);
					previous.put("flowerMatime", sdFormat.format(today));
				}
				break;
			case "flowerMin":
				String fMin = previous.get(column);
				if (fMin == null || Long.parseLong(fMin) > Long.parseLong(rFollower)) {
					previous.put(column, rFollower);
					previous.put("flowerMitime", sdFormat.format(today));
				}
				break;
			default:
				break;
			}
		}
	}
	
	//冗余
	String []getRecentRecord() {
		return db.selectAll(TABLE_NAME, "*", COLUMN_NUM);
	}
	
	void saveToLocal(String []strings) {  
//		String tsString = (new SimpleDateFormat("yyyy/MM/dd [HH:mm:ss]")).
//				format((new Date()).getTime());
		long ts = (new Date()).getTime();
        File dir = new File(BASE_PATH);    
        if (!dir.exists()) {    
            dir.mkdirs();    
        }    
        String filePath = String.format("%s%d.dat", BASE_PATH, ts);
		try {
			//文件输入流
			FileOutputStream fos = new FileOutputStream(new File(filePath));
			//输入writer
			OutputStreamWriter opw = new OutputStreamWriter(fos, "UTF-8");
			//缓冲writer
			BufferedWriter bw = new BufferedWriter(opw);
			for (String str: strings) {
				bw.write(str + "\n");
			}
			fos.close();
		} catch (Exception e) {  
            System.out.println("Calculater: [Save to local error.]");
        }
        System.out.println("Calculater: [Save to " + filePath + " successful.]");  
    }
}

Bean

package Peerslee.HotMonitor.Bean;

import java.util.List;
/*
 * Json
 */
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;

@JsonIgnoreProperties({"errno", "errmsg"})
public class Info {
	private int errno;
	private String errmsg;
	private Data data;
	
	@JsonIgnoreProperties({"total", "highLightNum", "sliderdata"})
	public static class Data {
		private List items;
		private int total;
		private int highLightNum;
		private List sliderdata;
		public List getItems() {
			return items;
		}
		public void setItems(List items) {
			this.items = items;
		}
		@Override
		public String toString() {
			return "Data [items=" + items + "]";
		}
	}
	
	@JsonIgnoreProperties({
		"person_num", "pictures", "tag_switch",
		"tag", "tag_color", "room_type",
		"rtype_value", "status", "roomkey",
		"room_key", "ishighlight", "top_description",
		"is_top", "label", "host_level_info",
		"ticket_rank_info", "top_icon", "medalNum",
		"rollinfo", "pkinfo"
	})
	public static class Room {
		private String id; // room_id
		private String name; // room_name
		private String person_num;
		private Classify classification; // classify
		private Object pictures;
		private String tag_switch;
		private String tag;
		private String tag_color;
		private String room_type; 
		private String rtype_value;
		private String status;
		private User userinfo; // user
		private String roomkey;
		private String room_key;
		private String ishighlight;
		private String top_description;
		private int is_top;
		private List label;
		private Object host_level_info;
		private Object ticket_rank_info;
		private String top_icon;
		private int medalNum;
		private List rollinfo;
		private String pkinfo;
		public String getId() {
			return id;
		}
		public void setId(String id) {
			this.id = id;
		}
		public String getName() {
			return name;
		}
		public void setName(String name) {
			this.name = name;
		}
		public Classify getClassification() {
			return classification;
		}
		public void setClassification(Classify classification) {
			this.classification = classification;
		}
		public User getUserinfo() {
			return userinfo;
		}
		public void setUserinfo(User userinfo) {
			this.userinfo = userinfo;
		}
		@Override
		public String toString() {
			return "Room [id=" + id + ", name=" + name + ", classification=" + classification + ", userinfo=" + userinfo
					+ "]";
		}
	}
	
	public static class Classify {
		private String cname; 
		private String ename;
		public String getCname() {
			return cname;
		}
		public void setCname(String cname) {
			this.cname = cname;
		}
		public String getEname() {
			return ename;
		}
		public void setEname(String ename) {
			this.ename = ename;
		}
		@Override
		public String toString() {
			return "Classify [cname=" + cname + ", ename=" + ename + "]";
		}
	}
	
	@JsonIgnoreProperties(ignoreUnknown = true)
	public static class User {
		private String rid;
		private String userName;
		private String nickName;
		@JsonIgnore
		private String avatar;
		public String getRid() {
			return rid;
		}
		public void setRid(String rid) {
			this.rid = rid;
		}
		public String getUserName() {
			return userName;
		}
		public void setUserName(String userName) {
			this.userName = userName;
		}
		public String getNickName() {
			return nickName;
		}
		public void setNickName(String nickName) {
			this.nickName = nickName;
		}
		@Override
		public String toString() {
			return "User [userName=" + userName + ", nickName=" + nickName + "]";
		}
	}
	
	public Data getData() {
		return data;
	}
	public void setData(Data data) {
		this.data = data;
	}
	@Override
	public String toString() {
		return "Info [data=" + data + "]";
	}
}
package Peerslee.HotMonitor.Bean;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;


@JsonIgnoreProperties({"errno", "errmsg"})
public class Follower {
	private int errno;
	private String errmsg;
	private Data data;
	
	@JsonIgnoreProperties(ignoreUnknown = true)
	public static class Data {
		private String fans;
		@JsonIgnore
		private String is_followed;
		public String getFans() {
			return fans;
		}
		public void setFans(String fans) {
			this.fans = fans;
		}
	}
	public Data getData() {
		return data;
	}
	public void setData(Data data) {
		this.data = data;
	}
}

Client

package Peerslee.HotMonitor.Client;

import java.awt.BorderLayout;
import java.awt.Font;
import java.awt.event.ItemEvent;
import java.awt.event.ItemListener;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

import javax.swing.JComboBox;
import javax.swing.JComponent;
import javax.swing.JFrame;
import javax.swing.JLabel;
import javax.swing.JPanel;
import javax.swing.JScrollPane;
import javax.swing.JTabbedPane;
import javax.swing.JTable;
import javax.swing.SwingConstants;
import javax.swing.WindowConstants;

import Peerslee.HotMonitor.Utils.MysqlHandler;
import jdk.nashorn.internal.scripts.JS;

public class Index {
	private static MysqlHandler db = MysqlHandler.getMHandler(new String[] {
			"jdbc:mysql://localhost:3306/HotMonitor", "root", "123"});
	
	void index() {
		JFrame jf = new JFrame();
//		jf.setExtendedState(JFrame.MAXIMIZED_BOTH);
		jf.setSize(600, 800);
		jf.setDefaultCloseOperation(WindowConstants.EXIT_ON_CLOSE);
		jf.setLocationRelativeTo(null);
		jf.setLayout(null); 
		
		JTabbedPane jtp = new JTabbedPane();
		
		jtp.add("类别", createTab1("1"));
		
		jtp.add("主播", createTab2("2"));
		
		jtp.setSelectedIndex(0);
		jf.setContentPane(jtp);
		jf.setVisible(true);
	}
	
	JComponent createTab2(String text) {
		String[] columnNames = {"roomId", "nickName", "roomName",
				"flowercount", "flowerDist", "flowerMax", "flowerMatime", "flowerMin", "flowerMitime"};
		String[] header = {"直播间ID", "主播名称", "直播间名称", 
				"订阅人数", "订阅变化量", "订阅最高值", "高峰时间点", "订阅最低值", "低谷时间点"};
		JPanel jp = new JPanel(new BorderLayout());
		List> rList = null;
		rList = db.selectType(0, null);
		Map jbMap = new LinkedHashMap();
		for (Map r: rList) {
			if (r.get("classifycname").length() != 0) 
				jbMap.put(r.get("classifycname"), r.get("classifyename"));;
		}
		JComboBox jb = new JComboBox(jbMap.keySet().toArray(new String[0]));
		JScrollPane jsp = new JScrollPane();
		jp.add(jsp, BorderLayout.CENTER);
		JLabel jl = new JLabel();
		jl.setFont(new Font(null, Font.PLAIN, 15));
		jl.setHorizontalAlignment(SwingConstants.RIGHT);
		jp.add(jl, BorderLayout.SOUTH);
		jb.addItemListener(new ItemListener() {
            @Override
            public void itemStateChanged(ItemEvent e) {
                if (e.getStateChange() == ItemEvent.SELECTED) {
                	String kString = e.getItem().toString();
                	System.out.println("选中:" + kString);
                	String [][]rowData = getRowData(3, columnNames, jbMap, kString);
                	JTable jt = new JTable(rowData, header);
                	jsp.setViewportView(jt);
                	jsp.repaint();
                	jl.removeAll();
            		jl.setText("["+ kString+ "] 有 ["+ rowData.length+ "] 条数据.");
            		jl.repaint();
                }
            }
        });
		jb.setSelectedIndex(2); // 默认选 熊猫星秀
		jp.add(jb, BorderLayout.NORTH);
		return jp;
	}
	
	JComponent createTab1(String text) {
		JPanel jp = new JPanel(new BorderLayout());
		JComboBox jb = new JComboBox(new String[] {"订阅量", "直播数"});
		
		JScrollPane jsp = new JScrollPane();
		jp.add(jsp, BorderLayout.CENTER);
		JLabel jl = new JLabel();
		jl.setFont(new Font(null, Font.PLAIN, 15));
		jl.setHorizontalAlignment(SwingConstants.RIGHT);
		jp.add(jl, BorderLayout.SOUTH);
		jb.addItemListener(new ItemListener() {
            @Override
            public void itemStateChanged(ItemEvent e) {
                if (e.getStateChange() == ItemEvent.SELECTED) {
                	String kString = e.getItem().toString();
                	System.out.println("选中:" + kString);
                	int type = kString == "订阅量"? 1: 2;
                	String[] columnNames = null;
                	String[] header = null;
                	if (type == 1) {
                		columnNames = new String[] {"fSum", "classifycname"};;
                		header = new String[] {"订阅总人数", "直播类别"};
                	} else {
                		columnNames = new String[] {"RSum", "classifycname"};
                		header = new String[] {"直播间总数", "直播类别"};
                	}
                	String [][]rowData = getRowData(type, columnNames, null, null);
                    JTable jt = new JTable(rowData, header);
                    jsp.setViewportView(jt);
                	jsp.repaint();
                	jl.removeAll();
            		jl.setText("共 " + rowData.length + " 条数据.");
            		jl.repaint();
                }
            }
        });
		jb.setSelectedIndex(1); // 默认选
		jp.add(jb, BorderLayout.NORTH);
		return jp;
	}
	
	String[][] getRowData(int type, String[] columnNames, Map jbMap, String kString) {
		List> iList = db.selectType(type,
				jbMap == null? null: jbMap.get(kString));
		String[][] rowData = new String[iList.size()][columnNames.length];
    	for (int i = 0; i < iList.size(); i++) {
    		for (int j = 0; j < columnNames.length; j++) {
    			rowData[i][j] = iList.get(i).get(columnNames[j]);
    		}
    	}
    	return rowData;
	}
}

Utils

package Peerslee.HotMonitor.Utils;
/*
 * mysql 增删改查
 */

import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import com.mysql.jdbc.Connection;

// 要记得初始化 table
public class MysqlHandler {
	public static Connection db;
	public static Statement opt;
	
	/*
	 * 单例模式:双重检验锁(lazy)
	 * db -> 唯一
	 */
	private volatile static MysqlHandler mHandler; // 禁止指令重排序,保证创建实例为原子动作
	public MysqlHandler(String []params) { //url, username, password
		try {
			Class.forName("com.mysql.jdbc.Driver");
			db = (Connection) DriverManager.getConnection(
					params[0], params[1], params[2]);
			System.out.println("MySQL: [Connected database successfully.]");
			opt = db.createStatement();
		} catch (Exception e) {
			System.out.println("MySQL: [Connected database error.]");
		}
	}
	
	public static MysqlHandler getMHandler(String []params) {
		if (mHandler == null) {
			synchronized (MysqlHandler.class) { // class 锁
				if (mHandler == null) {
					mHandler = new MysqlHandler(params);
				}
			}
		}
		return mHandler;
	}
	
	public void createTable(String tName, Map params, String pKey) {
		StringBuilder sql = new StringBuilder("CREATE TABLE IF NOT EXISTS ");
		sql.append(tName).append(" (");
		for (Entry p: params.entrySet()) {
			sql.append(p.getKey()).append(" ").append(p.getValue()).append(",");
		}
		sql.deleteCharAt(sql.length() - 1);
		sql.append(pKey.isEmpty()? ")": ",PRIMARY KEY ("+ pKey +"))");
		System.out.println("MySQL: [Sql -> " + sql.toString() + "]");
		try {
			opt.executeUpdate(sql.toString());
		} catch (Exception e) { 
			System.out.println("MySQL: [Create table error.]");
			return;
		}
		System.out.println("MySQL: [Create table successfully.]");
	}
	
	public boolean isNullRecord(String tName, String roomId) {
		StringBuilder sql = new StringBuilder("SELECT COUNT(*) FROM ");
		sql.append(tName).append(" WHERE roomId = ").append(roomId);
		System.out.println("MySQL: [Sql -> " + sql.toString() + "]");
		try {
			ResultSet res = opt.executeQuery(sql.toString());
			res.next();	// if the value is SQL NULL, the value returned is 0
			return res.getInt(1) == 0? true: false;
		} catch (Exception e) {
			System.out.println("MySQL: ["+ e.getMessage() +"]");
			return false;
		}
	}
	
	public List> selectType(int type, String cEname) {
		List> resList = new ArrayList>();
		String sql = null;
		String []columns = null;
		switch (type) {
		case 0: // 类别,根据订阅人数
			columns = new String[] {"classifycname", "classifyename"};
			sql = "SELECT classifycname, classifyename "
					+ "FROM pandatv GROUP BY classifyename ORDER BY SUM(flowerCount) DESC";
			break;
		case 1: // 类别,根据订阅人数
			columns = new String[] {"fSum", "classifycname"};
			sql = "SELECT SUM(flowerCount) AS fSum, classifycname "
					+ "FROM pandatv GROUP BY classifycname ORDER BY fSum DESC";
			break;
		case 2: // 类别,根据主播人数
			columns = new String[] {"RSum", "classifycname"};
			sql = "SELECT COUNT(classifycname) AS rSum, classifycname "
					+ "FROM pandatv GROUP BY classifycname ORDER BY rSum DESC";
			break;
		case 3: // 主播,根据类别
			columns = new String[] {"roomId", "nickName", "roomName",
					"flowercount", "flowerDist", "flowerMax", "flowerMatime", "flowerMin", "flowerMitime"};
			sql = "SELECT roomId, nickName, roomName, flowercount, flowerDist, flowerMax, flowerMatime, flowerMin, flowerMitime "
					+ "FROM pandatv "
					+ "WHERE classifyename = '"+ cEname+ "' AND flowerCount > 0 "
					+ "GROUP BY CAST(flowercount AS DECIMAL) DESC;";
			break;
		default:
			break;
		}
		System.out.println(sql);
		if (sql != null) {
			try {
				ResultSet rSet = opt.executeQuery(sql.toString());
				while (rSet.next()) {
					Map res = new HashMap<>();
					for (String c: columns) { res.put(c, rSet.getString(c)); }
					resList.add(res);
				}
			} catch (Exception e) {
				System.out.println("MySQL: [Selecting record error.]");
				return null;
			}
			System.out.println("MySQL: [Select record successfully.]");
		} else {
			System.out.println("MySQL: [Selecting record null.]");
		}
		return resList;
	}
	
	public String[] selectAll(String tName, String columns, int cNum) { 
		String sql = "SELECT " + columns + " FROM " + tName; // *, 则需要传入 column 数量
		System.out.println("MySQL: [Sql -> " + sql + "]");
		try {
			ResultSet rSet = opt.executeQuery(sql.toString());
			if (rSet.last()) {
				String []res = new String[rSet.getRow()];
				rSet.beforeFirst();
				int i = 0;
				StringBuilder sBuilder = new StringBuilder();
				while(rSet.next()) {
					if (cNum == 0) res[i++] = rSet.getString(columns);
					else {
						sBuilder.delete(0, sBuilder.length()); // 清空
						for (int j = 1; j <= cNum; j++) {
							sBuilder.append(rSet.getString(j)).append("::");
						}
						res[i++] = sBuilder.toString();
					}
				}
				System.out.println("MySQL: [Select all successful.]");
				return res;
			} else {
				System.out.println("MySQL: [There are no rows in the result set.]");
				return null;
			}
		} catch (Exception e) {
			e.printStackTrace();
			System.out.println("MySQL: [Select all error.]");
			return null;
		}
	}
	
	public List> selectRecord(String tName, String roomId, String []Columns) {
		List> resList = new ArrayList>();
		
		StringBuilder sql = new StringBuilder("SELECT ");
		for (String c: Columns) sql.append(c).append(", ");
		sql.deleteCharAt(sql.length() - 2).append("FROM ").append(tName);
		if (!roomId.isEmpty()) sql.append(" WHERE roomId IN ").append(roomId);
		System.out.println("MySQL: [Sql -> " + sql.toString() + "]");
		try {
			ResultSet rSet = opt.executeQuery(sql.toString());
			while (rSet.next()) {
				Map res = new HashMap<>();
				for (String c: Columns) res.put(c, rSet.getString(c));
				resList.add(res);
			}
		} catch (Exception e) {
			System.out.println("MySQL: [Selecting record error.]");
			return null;
		}
		System.out.println("MySQL: [Select record successfully.]");
		return resList;
	}
	
	public void insertRecord(String tName, String Columns, String Values) {
		StringBuilder sql = new StringBuilder("INSERT INTO ");
		sql.append(tName).append(" (").append(Columns).append(") VALUES (").
		append(Values).append(")");
		System.out.println("MySQL: [Sql -> " + sql.toString() + "]");
		try {
			opt.executeUpdate(sql.toString());
		} catch (Exception e) { 
			System.out.println("MySQL: [Inserting records error.]");
			return;
		}
		System.out.println("MySQL: [Insert records successfully.]");
	}
	
	public void updateRecord(String tName, String roomId, Map records) {
		if (records.isEmpty()) return;
		
		StringBuilder sql = new StringBuilder("UPDATE ");
		sql.append(tName).append(" SET ");
		
		for (Entry entry: records.entrySet()) {
			sql.append(entry.getKey()).append(" = '").append(entry.getValue()).append("', ");
		}
		sql.deleteCharAt(sql.length() - 2).append("WHERE roomId = ").append(roomId);
		System.out.println("MySQL: [Sql -> " + sql.toString() + "]");
		try {
			opt.executeUpdate(sql.toString());
		} catch (Exception e) { 
			System.out.println("MySQL: [Updating records error.]");
			return;
		}
		System.out.println("MySQL: [Update records successfully.]");
	}
	
	public static void main(String[] args) {
		String url = "jdbc:mysql://localhost:3306/HotMonitor";
		String username = "root";
	    String password = "123";
		
	    String tName = "PandaTv";
	    Map params = new HashMap();
		params.put("roomId", "VARCHAR(16) not NULL");
		params.put("roomName", "VARCHAR(255)");
		params.put("classifyCname", "VARCHAR(16)");
		params.put("classifyEname", "VARCHAR(16)");
		params.put("userName", "VARCHAR(32)");
		params.put("nickName", "VARCHAR(32)");
		params.put("flowerCount", "VARCHAR(8)");
		params.put("flowerMax", "VARCHAR(8)");
		params.put("flowerMatime", "VARCHAR(16)");
		params.put("flowerMin", "VARCHAR(8)");
		params.put("flowerMitime", "VARCHAR(16)");
		params.put("flowerDist", "VARCHAR(8)");
		String pKey ="roomId";
		MysqlHandler db = MysqlHandler.getMHandler(new String[]{url, username, password});
//		db.selectAll("PandaTv", "roomId");
		db.createTable(tName, params, pKey);
//		System.out.println(db.isNullRecord(tName, "123321"));
//		String Columns = "roomId, roomName, classifyCname, classifyEname";
//		String Values = "'123321', 'hello', 'eng', 'eng'";
//		db.insertRecord(tName, Columns, Values);
//		String roomId = "'123321'";
//		String []Columns = {"classifyCname", "classifyEname"};
//		String []Values = {"'chn'", "'chn'"};
//		db.updateRecord(tName, roomId, Columns, Values);
//		System.out.println("Result: " + db.selectRecords(tName, roomId, Columns)); // roomId=""
	}
}

pom


  4.0.0

  Peerslee
  HotMonitor
  0.0.1-SNAPSHOT
  jar

  HotMonitor
  http://maven.apache.org

  UTF-8

  
  	
	
	    org.apache.zookeeper
	    zookeeper
	    3.4.11
	
	
	
	    org.apache.httpcomponents
	    httpclient
	    4.5.2
	
	
	
	    com.fasterxml.jackson.core
	    jackson-databind
	    2.8.11.1
	
	
	
	    com.fasterxml.jackson.core
	    jackson-core
	    2.8.11
	
	
	
	
	    com.fasterxml.jackson.core
	    jackson-annotations
	    2.8.11
	
	
	
	    mysql
	    mysql-connector-java
	    5.1.46
	
    
	
	    junit
	    junit
	    4.12
	    test
	
  

zoo.cfg

tickTime=2000
initLimit=10
syncLimit=5
dataDir=D:\\Tools\\zk\\s001\\data
dataLogDir=D:\\Tools\\zk\\s001\\logs
clientPort=2181
server.1=localhost:2888:3888
server.2=localhost:2889:3889
server.3=localhost:2890:3890
tickTime=2000
initLimit=10
syncLimit=5
dataDir=D:\\Tools\\zk\\s002\\data
dataLogDir=D:\\Tools\\zk\\s002\\logs
clientPort=2182
server.1=localhost:2888:3888
server.2=localhost:2889:3889
server.3=localhost:2890:3890
tickTime=2000
initLimit=10
syncLimit=5
dataDir=D:\\Tools\\zk\\s003\\data
dataLogDir=D:\\Tools\\zk\\s003\\logs
clientPort=2183
server.1=localhost:2888:3888
server.2=localhost:2889:3889
server.3=localhost:2890:3890

安装包

链接:https://pan.baidu.com/s/1ZnTFX8e8L4t5WerNQN0d4g 密码:lx8r

演示流程

 

1. 启动 zk 集群
2. 启动CrawlInfo 0,1,2
3. 启动CrawlId
4. Client