Java-PandaFollowerMonitor
程序员文章站
2022-07-07 22:32:38
架构
Server,Spider
CrawlId
package Peerslee.HotMonitor.Server.Spider;
/*
* 1. 定时抓取id
*...
架构
Server,Spider
CrawlId
package Peerslee.HotMonitor.Server.Spider; /* * 1. 定时抓取id * 2. 上传zk * https://www.panda.tv/live_lists?pageno=1&pagenum=120 */ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.http.HttpEntity; import org.apache.http.HttpResponse; import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.HttpClients; import org.apache.http.util.EntityUtils; import com.fasterxml.jackson.databind.ObjectMapper; import Peerslee.HotMonitor.Bean.Info; import Peerslee.HotMonitor.Bean.Info.Room; import Peerslee.HotMonitor.Server.Centralize.Centralizer; import Peerslee.HotMonitor.Utils.MysqlHandler; public class CrawlId { private static final int BUCKET_COUNT = 3; private static final String TABLE_NAME = "PandaTv"; private static HttpClient client = HttpClients.createDefault(); private static MysqlHandler db = MysqlHandler.getMHandler(new String[] { "jdbc:mysql://localhost:3306/HotMonitor", "root", "123"}); private static Centralizer centralizer = new Centralizer( "localhost:2181,localhost:2182,localhost:2183"); int hash(String id) { return Integer.parseInt(id) % 3; } List crawl() { List iList = new ArrayList(); for (int i = 1; ; i++) { String url = String.format("https://www.panda.tv/live_lists?pageno=%d&pagenum=120",i); System.out.println("CrawId: [" + url + "]"); HttpGet hGet = new HttpGet(url); try { HttpResponse response = client.execute(hGet); HttpEntity entity = response.getEntity(); String json = EntityUtils.toString(entity); ObjectMapper mapper = new ObjectMapper(); Info info = mapper.readValue(json, Info.class); if (info.getData().getItems().size() == 0) break; iList.add(info); } catch (Exception e) { System.out.println("CrawlId: [Crawl '" + url + "' error.]"); } } return iList; } void divideBucket(List rItems) { /* * JVM将泛型存储的东西都视为Object, 底层的数组类型,它只能是Object[] */ Map buckets = new HashMap(); for (Info.Room room: rItems) { String roomId = room.getId(); System.out.println(room); // db if(db.isNullRecord(TABLE_NAME, roomId)) { // 验证 String Columns = "roomId, roomName, classifyCname, classifyEname, userName, nickName"; String Values = (new StringBuilder("'")). append(room.getId()).append("', '"). append(room.getName()).append("', '"). append(room.getClassification().getCname()).append("', '"). append(room.getClassification().getEname()).append("', '"). append(room.getUserinfo().getUserName()).append("', '"). append(room.getUserinfo().getNickName()).append("'"). toString(); db.insertRecord(TABLE_NAME, Columns, Values); } } // 分桶 String []roomIds = db.selectAll(TABLE_NAME, "roomId", 0); for (String roomId: roomIds) { int bId = hash(roomId); if (!buckets.containsKey(bId)) buckets.put(bId, new StringBuilder()) ; buckets.get(bId).append(roomId).append(","); } // 注册watcher centralizer.registerNode(new String[]{ "/ZNodeId/ZNodeId_0", "/ZNodeId/ZNodeId_1", "/ZNodeId/ZNodeId_2"}); // 上传zk Map ZNode = new HashMap<>(); for (int i = 0; i < BUCKET_COUNT; i++) { if (buckets.get(i) != null) { int len = buckets.get(i).length(); ZNode.put("/ZNodeId/ZNodeId_" + i, buckets.get(i).deleteCharAt(len - 1).toString().getBytes()); } } centralizer.setBytesToNode(ZNode); } public static void main(String[] args) { CrawlId ci = new CrawlId(); while (true) { List rList= new ArrayList(); for (Info i: ci.crawl()) rList.addAll(i.getData().getItems()); ci.divideBucket(rList); try { Thread.sleep(1000*60*30); } catch (InterruptedException e) { } // 30 分钟一次 } } }
CrawlInfo0,1,2
package Peerslee.HotMonitor.Server.Spider; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.ServerSocket; import java.net.Socket; import java.util.HashMap; import java.util.Map; import java.util.stream.Stream; import org.apache.http.HttpEntity; import org.apache.http.HttpResponse; import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.HttpClients; import org.apache.http.util.EntityUtils; import com.fasterxml.jackson.databind.ObjectMapper; import Peerslee.HotMonitor.Bean.Follower; import Peerslee.HotMonitor.Calculate.Calculater; import Peerslee.HotMonitor.Server.Centralize.Centralizer; /* * https://www.panda.tv/room_followinfo?roomid=266019 */ public class CrawlInfo0 { private static String ZK_NODE; // 根据socket 设置,"/ZNodeId/ZNodeId_" private static final int PORT = 5200; // 5201,5202 private static HttpClient client = HttpClients.createDefault(); private static Centralizer centralizer = new Centralizer( "localhost:2181,localhost:2182,localhost:2183"); private static Calculater calculater = new Calculater(); public void getFollowerByIds() { String []ids = centralizer.getStringFromNode(ZK_NODE).split(","); Map recentFollower = new HashMap(); System.out.println("CrawlInfo: [Crawling follower, waiting...]"); Stream.of(ids).forEach(id -> { String fCount = getFollower(id); if (fCount != null) { recentFollower.put(id, fCount); } }); calculater.calculate(recentFollower); } public String getFollower(String roomId) { String url = String.format("https://www.panda.tv/room_followinfo?roomid=%s", roomId); HttpGet hGet = new HttpGet(url); try { HttpResponse response = client.execute(hGet); HttpEntity entity = response.getEntity(); String json = EntityUtils.toString(entity); ObjectMapper mapper = new ObjectMapper(); return mapper.readValue(json, Follower.class).getData().getFans(); } catch (Exception e) { System.out.println("CrawlInfo: [Crawl '" + url + "' error.]"); return null; } } public static void main(String[] args) { CrawlInfo0 cInfo = new CrawlInfo0(); Socket socket = null; try { ServerSocket sSocket = new ServerSocket(PORT); while (true) { socket = sSocket.accept(); System.out.println("等待连接..."); InputStreamReader isr = new InputStreamReader( socket.getInputStream(), "UTF-8"); BufferedReader reader = new BufferedReader(isr); ZK_NODE = reader.readLine(); // 读一行 System.out.println("CrawlInfo: [Get follower by " + ZK_NODE + " ids.]"); cInfo.getFollowerByIds(); } } catch (Exception e) { System.out.println("CrawlInfo: [CrawlInfo error.]"); } finally { try { socket.close(); } catch (IOException e) { } } } }
Server,Centralizer
package Peerslee.HotMonitor.Server.Centralize; import java.io.IOException; import java.io.PrintWriter; import java.net.Socket; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; //要记得初始化 ZKNode public class Centralizer implements Watcher{ private static final int SESSION_TIME_OUT = 5000; // 5 second private static CountDownLatch cdLatch = new CountDownLatch(1); private ZooKeeper zk; public Centralizer(String hostPort) { try { this.zk = new ZooKeeper(hostPort, SESSION_TIME_OUT, this); } catch (IOException e) { e.printStackTrace(); } } /* * 创建node */ void initZKNode(Map ZNode) { for (Entry node: ZNode.entrySet()) { try { String path = zk.create(node.getKey(), node.getValue(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("Success create znode:" + path); } catch (Exception e) {} } } /* * 注册watcher */ public void registerNode(String []node) { for (String n: node) { try { zk.exists(n, true); System.out.println("ZK: [Exist " + n + " successful.]"); } catch (Exception e) { System.out.println("ZK: [Exist " + n + " error.]"); } } } /* * 获取string */ public String getStringFromNode(String node) { try { return new String(zk.getData(node, true, new Stat())); } catch (Exception e) { System.out.println("ZK: [Get bytes error.]"); return null; } } /* * 更新 */ public void setBytesToNode(Map ZNode) { for (Entry node: ZNode.entrySet()) { try { Thread.sleep(5000); // 伪分布式下,sleep 一下 Stat stat = zk.setData(node.getKey(), node.getValue(), -1); System.out.println("ZK: [Set Node:" + node.getKey() + ".]"); System.out.println(stat.getCzxid() + "/" + stat.getMzxid() + "/" + stat.getVersion()); } catch (Exception e) { System.out.println("ZK: [Set bytes error.]"); } } } @Override public void process(WatchedEvent event) { System.out.println("---- zk process ----"); cdLatch.countDown(); // 连接成功,之后不会再 -1 if (KeeperState.SyncConnected == event.getState() ) { switch (event.getType().getIntValue()) { case -1: //EventType.None System.out.println("ZK: [process none.]"); break; case 1: //EventType.NodeCreated System.out.println("ZK: [process node created.]"); break; case 3: //EventType.NodeDataChanged System.out.println("ZK: [process node data changed.]"); // System.out.println(event.getPath()); /* * Socket */ Socket socket = null; try { String hostName; int port = -1; System.out.println(event.getPath()); switch (event.getPath()) { case "/ZNodeId/ZNodeId_0": hostName = "127.0.0.1"; port = 5200; break; case "/ZNodeId/ZNodeId_1": hostName = "127.0.0.1"; port = 5201; break; case "/ZNodeId/ZNodeId_2": hostName = "127.0.0.1"; port = 5202; break; default: hostName = null; break; } System.out.println(hostName); if (hostName == null) return; socket = new Socket(hostName, port); socket.setSoTimeout(15000); PrintWriter writer = new PrintWriter(socket.getOutputStream()); writer.print(event.getPath()); writer.flush(); } catch (Exception e) { System.out.println("ZK: [Socket error.]"); } finally { try { socket.close(); } catch (IOException e) { } } break; case 4: //EventType.NodeChildrenChanged System.out.println("ZK: [process children list changed.]"); break; default: System.out.println("ZK: [process nothing.]"); break; } } } public static void main(String[] args) { final String HOST_PORT = "localhost:2181,localhost:2182,localhost:2183"; Centralizer centralizer = new Centralizer(HOST_PORT); System.out.println(centralizer.zk.getState()); try { cdLatch.await(); } catch (InterruptedException e) {} finally { cdLatch.countDown(); } Map ZNode = new HashMap(); // ZNode.put("/ZNodeId", "".getBytes()); ZNode.put("/ZNodeId/ZNodeId_0", "".getBytes()); // ZNode.put("/ZNodeId/ZNodeId_1", "".getBytes()); // ZNode.put("/ZNodeId/ZNodeId_2", "".getBytes()); // ZNode.put("/ZNodeData", "".getBytes()); // centralizer.initZKNode(ZNode); centralizer.setBytesToNode(ZNode); System.out.println(centralizer.getStringFromNode("/ZNodeId/ZNodeId_0")); try { Thread.sleep(Integer.MAX_VALUE); } catch (InterruptedException e) {} } }
Calculate
package Peerslee.HotMonitor.Calculate; import java.io.BufferedWriter; import java.io.File; import java.io.FileOutputStream; import java.io.OutputStreamWriter; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Map; import Peerslee.HotMonitor.Utils.MysqlHandler; /* * 负责在每台机器上运算 */ public class Calculater { private static final String TABLE_NAME = "PandaTv"; private static final int COLUMN_NUM = 12; private static String BASE_PATH = "E:\\HOT_MONITOR\\"; private static MysqlHandler db = MysqlHandler.getMHandler(new String[] { "jdbc:mysql://localhost:3306/HotMonitor", "root", "123"}); private static Date today = null ; private static SimpleDateFormat sdFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm"); /* * 根据id,选择性计算 */ public void calculate(Map recentFollower) { today = new Date(); // 更新时间 String roomId = recentFollower.keySet().toString(). replace("[", "(").replace("]", ")"); String []columns = {"roomId", "flowerCount", "flowerMax", "flowerMatime", "flowerMin", "flowerMitime", "flowerDist"}; for (Map previous: db. selectRecord(TABLE_NAME, roomId, columns)) { String rId = previous.get("roomId"); update(recentFollower.get(rId), previous, columns); db.updateRecord(TABLE_NAME, rId, previous); System.out.println(previous); } System.out.println("Calculater: [Fault-Tolerant --->.]"); saveToLocal(getRecentRecord()); } // 更新 void update(String rFollower, Map previous, String[] columns) { /* * 如果 record 中元素为空,则赋值,recentFollower中元素 */ for (String column: columns) { switch (column) { case "flowerCount": String fCount = previous.get(column); if (fCount == null || fCount == rFollower) { previous.put(column, rFollower); previous.put("flowerDist", "0"); } else { previous.put("flowerDist", String.valueOf(Long.parseLong(rFollower) - Long.parseLong(fCount))); } break; case "flowerMax": String fMax = previous.get(column); // null直接 put,否则比较 if (fMax == null || Long.parseLong(fMax) < Long.parseLong(rFollower)) { previous.put(column, rFollower); previous.put("flowerMatime", sdFormat.format(today)); } break; case "flowerMin": String fMin = previous.get(column); if (fMin == null || Long.parseLong(fMin) > Long.parseLong(rFollower)) { previous.put(column, rFollower); previous.put("flowerMitime", sdFormat.format(today)); } break; default: break; } } } //冗余 String []getRecentRecord() { return db.selectAll(TABLE_NAME, "*", COLUMN_NUM); } void saveToLocal(String []strings) { // String tsString = (new SimpleDateFormat("yyyy/MM/dd [HH:mm:ss]")). // format((new Date()).getTime()); long ts = (new Date()).getTime(); File dir = new File(BASE_PATH); if (!dir.exists()) { dir.mkdirs(); } String filePath = String.format("%s%d.dat", BASE_PATH, ts); try { //文件输入流 FileOutputStream fos = new FileOutputStream(new File(filePath)); //输入writer OutputStreamWriter opw = new OutputStreamWriter(fos, "UTF-8"); //缓冲writer BufferedWriter bw = new BufferedWriter(opw); for (String str: strings) { bw.write(str + "\n"); } fos.close(); } catch (Exception e) { System.out.println("Calculater: [Save to local error.]"); } System.out.println("Calculater: [Save to " + filePath + " successful.]"); } }
Bean
package Peerslee.HotMonitor.Bean; import java.util.List; /* * Json */ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; @JsonIgnoreProperties({"errno", "errmsg"}) public class Info { private int errno; private String errmsg; private Data data; @JsonIgnoreProperties({"total", "highLightNum", "sliderdata"}) public static class Data { private List items; private int total; private int highLightNum; private List sliderdata; public List getItems() { return items; } public void setItems(List items) { this.items = items; } @Override public String toString() { return "Data [items=" + items + "]"; } } @JsonIgnoreProperties({ "person_num", "pictures", "tag_switch", "tag", "tag_color", "room_type", "rtype_value", "status", "roomkey", "room_key", "ishighlight", "top_description", "is_top", "label", "host_level_info", "ticket_rank_info", "top_icon", "medalNum", "rollinfo", "pkinfo" }) public static class Room { private String id; // room_id private String name; // room_name private String person_num; private Classify classification; // classify private Object pictures; private String tag_switch; private String tag; private String tag_color; private String room_type; private String rtype_value; private String status; private User userinfo; // user private String roomkey; private String room_key; private String ishighlight; private String top_description; private int is_top; private List label; private Object host_level_info; private Object ticket_rank_info; private String top_icon; private int medalNum; private List rollinfo; private String pkinfo; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public Classify getClassification() { return classification; } public void setClassification(Classify classification) { this.classification = classification; } public User getUserinfo() { return userinfo; } public void setUserinfo(User userinfo) { this.userinfo = userinfo; } @Override public String toString() { return "Room [id=" + id + ", name=" + name + ", classification=" + classification + ", userinfo=" + userinfo + "]"; } } public static class Classify { private String cname; private String ename; public String getCname() { return cname; } public void setCname(String cname) { this.cname = cname; } public String getEname() { return ename; } public void setEname(String ename) { this.ename = ename; } @Override public String toString() { return "Classify [cname=" + cname + ", ename=" + ename + "]"; } } @JsonIgnoreProperties(ignoreUnknown = true) public static class User { private String rid; private String userName; private String nickName; @JsonIgnore private String avatar; public String getRid() { return rid; } public void setRid(String rid) { this.rid = rid; } public String getUserName() { return userName; } public void setUserName(String userName) { this.userName = userName; } public String getNickName() { return nickName; } public void setNickName(String nickName) { this.nickName = nickName; } @Override public String toString() { return "User [userName=" + userName + ", nickName=" + nickName + "]"; } } public Data getData() { return data; } public void setData(Data data) { this.data = data; } @Override public String toString() { return "Info [data=" + data + "]"; } }
package Peerslee.HotMonitor.Bean; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; @JsonIgnoreProperties({"errno", "errmsg"}) public class Follower { private int errno; private String errmsg; private Data data; @JsonIgnoreProperties(ignoreUnknown = true) public static class Data { private String fans; @JsonIgnore private String is_followed; public String getFans() { return fans; } public void setFans(String fans) { this.fans = fans; } } public Data getData() { return data; } public void setData(Data data) { this.data = data; } }
Client
package Peerslee.HotMonitor.Client; import java.awt.BorderLayout; import java.awt.Font; import java.awt.event.ItemEvent; import java.awt.event.ItemListener; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import javax.swing.JComboBox; import javax.swing.JComponent; import javax.swing.JFrame; import javax.swing.JLabel; import javax.swing.JPanel; import javax.swing.JScrollPane; import javax.swing.JTabbedPane; import javax.swing.JTable; import javax.swing.SwingConstants; import javax.swing.WindowConstants; import Peerslee.HotMonitor.Utils.MysqlHandler; import jdk.nashorn.internal.scripts.JS; public class Index { private static MysqlHandler db = MysqlHandler.getMHandler(new String[] { "jdbc:mysql://localhost:3306/HotMonitor", "root", "123"}); void index() { JFrame jf = new JFrame(); // jf.setExtendedState(JFrame.MAXIMIZED_BOTH); jf.setSize(600, 800); jf.setDefaultCloseOperation(WindowConstants.EXIT_ON_CLOSE); jf.setLocationRelativeTo(null); jf.setLayout(null); JTabbedPane jtp = new JTabbedPane(); jtp.add("类别", createTab1("1")); jtp.add("主播", createTab2("2")); jtp.setSelectedIndex(0); jf.setContentPane(jtp); jf.setVisible(true); } JComponent createTab2(String text) { String[] columnNames = {"roomId", "nickName", "roomName", "flowercount", "flowerDist", "flowerMax", "flowerMatime", "flowerMin", "flowerMitime"}; String[] header = {"直播间ID", "主播名称", "直播间名称", "订阅人数", "订阅变化量", "订阅最高值", "高峰时间点", "订阅最低值", "低谷时间点"}; JPanel jp = new JPanel(new BorderLayout()); List> rList = null; rList = db.selectType(0, null); Map jbMap = new LinkedHashMap(); for (Map r: rList) { if (r.get("classifycname").length() != 0) jbMap.put(r.get("classifycname"), r.get("classifyename"));; } JComboBox jb = new JComboBox(jbMap.keySet().toArray(new String[0])); JScrollPane jsp = new JScrollPane(); jp.add(jsp, BorderLayout.CENTER); JLabel jl = new JLabel(); jl.setFont(new Font(null, Font.PLAIN, 15)); jl.setHorizontalAlignment(SwingConstants.RIGHT); jp.add(jl, BorderLayout.SOUTH); jb.addItemListener(new ItemListener() { @Override public void itemStateChanged(ItemEvent e) { if (e.getStateChange() == ItemEvent.SELECTED) { String kString = e.getItem().toString(); System.out.println("选中:" + kString); String [][]rowData = getRowData(3, columnNames, jbMap, kString); JTable jt = new JTable(rowData, header); jsp.setViewportView(jt); jsp.repaint(); jl.removeAll(); jl.setText("["+ kString+ "] 有 ["+ rowData.length+ "] 条数据."); jl.repaint(); } } }); jb.setSelectedIndex(2); // 默认选 熊猫星秀 jp.add(jb, BorderLayout.NORTH); return jp; } JComponent createTab1(String text) { JPanel jp = new JPanel(new BorderLayout()); JComboBox jb = new JComboBox(new String[] {"订阅量", "直播数"}); JScrollPane jsp = new JScrollPane(); jp.add(jsp, BorderLayout.CENTER); JLabel jl = new JLabel(); jl.setFont(new Font(null, Font.PLAIN, 15)); jl.setHorizontalAlignment(SwingConstants.RIGHT); jp.add(jl, BorderLayout.SOUTH); jb.addItemListener(new ItemListener() { @Override public void itemStateChanged(ItemEvent e) { if (e.getStateChange() == ItemEvent.SELECTED) { String kString = e.getItem().toString(); System.out.println("选中:" + kString); int type = kString == "订阅量"? 1: 2; String[] columnNames = null; String[] header = null; if (type == 1) { columnNames = new String[] {"fSum", "classifycname"};; header = new String[] {"订阅总人数", "直播类别"}; } else { columnNames = new String[] {"RSum", "classifycname"}; header = new String[] {"直播间总数", "直播类别"}; } String [][]rowData = getRowData(type, columnNames, null, null); JTable jt = new JTable(rowData, header); jsp.setViewportView(jt); jsp.repaint(); jl.removeAll(); jl.setText("共 " + rowData.length + " 条数据."); jl.repaint(); } } }); jb.setSelectedIndex(1); // 默认选 jp.add(jb, BorderLayout.NORTH); return jp; } String[][] getRowData(int type, String[] columnNames, Map jbMap, String kString) { List> iList = db.selectType(type, jbMap == null? null: jbMap.get(kString)); String[][] rowData = new String[iList.size()][columnNames.length]; for (int i = 0; i < iList.size(); i++) { for (int j = 0; j < columnNames.length; j++) { rowData[i][j] = iList.get(i).get(columnNames[j]); } } return rowData; } }
Utils
package Peerslee.HotMonitor.Utils; /* * mysql 增删改查 */ import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.Statement; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import com.mysql.jdbc.Connection; // 要记得初始化 table public class MysqlHandler { public static Connection db; public static Statement opt; /* * 单例模式:双重检验锁(lazy) * db -> 唯一 */ private volatile static MysqlHandler mHandler; // 禁止指令重排序,保证创建实例为原子动作 public MysqlHandler(String []params) { //url, username, password try { Class.forName("com.mysql.jdbc.Driver"); db = (Connection) DriverManager.getConnection( params[0], params[1], params[2]); System.out.println("MySQL: [Connected database successfully.]"); opt = db.createStatement(); } catch (Exception e) { System.out.println("MySQL: [Connected database error.]"); } } public static MysqlHandler getMHandler(String []params) { if (mHandler == null) { synchronized (MysqlHandler.class) { // class 锁 if (mHandler == null) { mHandler = new MysqlHandler(params); } } } return mHandler; } public void createTable(String tName, Map params, String pKey) { StringBuilder sql = new StringBuilder("CREATE TABLE IF NOT EXISTS "); sql.append(tName).append(" ("); for (Entry p: params.entrySet()) { sql.append(p.getKey()).append(" ").append(p.getValue()).append(","); } sql.deleteCharAt(sql.length() - 1); sql.append(pKey.isEmpty()? ")": ",PRIMARY KEY ("+ pKey +"))"); System.out.println("MySQL: [Sql -> " + sql.toString() + "]"); try { opt.executeUpdate(sql.toString()); } catch (Exception e) { System.out.println("MySQL: [Create table error.]"); return; } System.out.println("MySQL: [Create table successfully.]"); } public boolean isNullRecord(String tName, String roomId) { StringBuilder sql = new StringBuilder("SELECT COUNT(*) FROM "); sql.append(tName).append(" WHERE roomId = ").append(roomId); System.out.println("MySQL: [Sql -> " + sql.toString() + "]"); try { ResultSet res = opt.executeQuery(sql.toString()); res.next(); // if the value is SQL NULL, the value returned is 0 return res.getInt(1) == 0? true: false; } catch (Exception e) { System.out.println("MySQL: ["+ e.getMessage() +"]"); return false; } } public List> selectType(int type, String cEname) { List> resList = new ArrayList>(); String sql = null; String []columns = null; switch (type) { case 0: // 类别,根据订阅人数 columns = new String[] {"classifycname", "classifyename"}; sql = "SELECT classifycname, classifyename " + "FROM pandatv GROUP BY classifyename ORDER BY SUM(flowerCount) DESC"; break; case 1: // 类别,根据订阅人数 columns = new String[] {"fSum", "classifycname"}; sql = "SELECT SUM(flowerCount) AS fSum, classifycname " + "FROM pandatv GROUP BY classifycname ORDER BY fSum DESC"; break; case 2: // 类别,根据主播人数 columns = new String[] {"RSum", "classifycname"}; sql = "SELECT COUNT(classifycname) AS rSum, classifycname " + "FROM pandatv GROUP BY classifycname ORDER BY rSum DESC"; break; case 3: // 主播,根据类别 columns = new String[] {"roomId", "nickName", "roomName", "flowercount", "flowerDist", "flowerMax", "flowerMatime", "flowerMin", "flowerMitime"}; sql = "SELECT roomId, nickName, roomName, flowercount, flowerDist, flowerMax, flowerMatime, flowerMin, flowerMitime " + "FROM pandatv " + "WHERE classifyename = '"+ cEname+ "' AND flowerCount > 0 " + "GROUP BY CAST(flowercount AS DECIMAL) DESC;"; break; default: break; } System.out.println(sql); if (sql != null) { try { ResultSet rSet = opt.executeQuery(sql.toString()); while (rSet.next()) { Map res = new HashMap<>(); for (String c: columns) { res.put(c, rSet.getString(c)); } resList.add(res); } } catch (Exception e) { System.out.println("MySQL: [Selecting record error.]"); return null; } System.out.println("MySQL: [Select record successfully.]"); } else { System.out.println("MySQL: [Selecting record null.]"); } return resList; } public String[] selectAll(String tName, String columns, int cNum) { String sql = "SELECT " + columns + " FROM " + tName; // *, 则需要传入 column 数量 System.out.println("MySQL: [Sql -> " + sql + "]"); try { ResultSet rSet = opt.executeQuery(sql.toString()); if (rSet.last()) { String []res = new String[rSet.getRow()]; rSet.beforeFirst(); int i = 0; StringBuilder sBuilder = new StringBuilder(); while(rSet.next()) { if (cNum == 0) res[i++] = rSet.getString(columns); else { sBuilder.delete(0, sBuilder.length()); // 清空 for (int j = 1; j <= cNum; j++) { sBuilder.append(rSet.getString(j)).append("::"); } res[i++] = sBuilder.toString(); } } System.out.println("MySQL: [Select all successful.]"); return res; } else { System.out.println("MySQL: [There are no rows in the result set.]"); return null; } } catch (Exception e) { e.printStackTrace(); System.out.println("MySQL: [Select all error.]"); return null; } } public List> selectRecord(String tName, String roomId, String []Columns) { List> resList = new ArrayList>(); StringBuilder sql = new StringBuilder("SELECT "); for (String c: Columns) sql.append(c).append(", "); sql.deleteCharAt(sql.length() - 2).append("FROM ").append(tName); if (!roomId.isEmpty()) sql.append(" WHERE roomId IN ").append(roomId); System.out.println("MySQL: [Sql -> " + sql.toString() + "]"); try { ResultSet rSet = opt.executeQuery(sql.toString()); while (rSet.next()) { Map res = new HashMap<>(); for (String c: Columns) res.put(c, rSet.getString(c)); resList.add(res); } } catch (Exception e) { System.out.println("MySQL: [Selecting record error.]"); return null; } System.out.println("MySQL: [Select record successfully.]"); return resList; } public void insertRecord(String tName, String Columns, String Values) { StringBuilder sql = new StringBuilder("INSERT INTO "); sql.append(tName).append(" (").append(Columns).append(") VALUES ("). append(Values).append(")"); System.out.println("MySQL: [Sql -> " + sql.toString() + "]"); try { opt.executeUpdate(sql.toString()); } catch (Exception e) { System.out.println("MySQL: [Inserting records error.]"); return; } System.out.println("MySQL: [Insert records successfully.]"); } public void updateRecord(String tName, String roomId, Map records) { if (records.isEmpty()) return; StringBuilder sql = new StringBuilder("UPDATE "); sql.append(tName).append(" SET "); for (Entry entry: records.entrySet()) { sql.append(entry.getKey()).append(" = '").append(entry.getValue()).append("', "); } sql.deleteCharAt(sql.length() - 2).append("WHERE roomId = ").append(roomId); System.out.println("MySQL: [Sql -> " + sql.toString() + "]"); try { opt.executeUpdate(sql.toString()); } catch (Exception e) { System.out.println("MySQL: [Updating records error.]"); return; } System.out.println("MySQL: [Update records successfully.]"); } public static void main(String[] args) { String url = "jdbc:mysql://localhost:3306/HotMonitor"; String username = "root"; String password = "123"; String tName = "PandaTv"; Map params = new HashMap(); params.put("roomId", "VARCHAR(16) not NULL"); params.put("roomName", "VARCHAR(255)"); params.put("classifyCname", "VARCHAR(16)"); params.put("classifyEname", "VARCHAR(16)"); params.put("userName", "VARCHAR(32)"); params.put("nickName", "VARCHAR(32)"); params.put("flowerCount", "VARCHAR(8)"); params.put("flowerMax", "VARCHAR(8)"); params.put("flowerMatime", "VARCHAR(16)"); params.put("flowerMin", "VARCHAR(8)"); params.put("flowerMitime", "VARCHAR(16)"); params.put("flowerDist", "VARCHAR(8)"); String pKey ="roomId"; MysqlHandler db = MysqlHandler.getMHandler(new String[]{url, username, password}); // db.selectAll("PandaTv", "roomId"); db.createTable(tName, params, pKey); // System.out.println(db.isNullRecord(tName, "123321")); // String Columns = "roomId, roomName, classifyCname, classifyEname"; // String Values = "'123321', 'hello', 'eng', 'eng'"; // db.insertRecord(tName, Columns, Values); // String roomId = "'123321'"; // String []Columns = {"classifyCname", "classifyEname"}; // String []Values = {"'chn'", "'chn'"}; // db.updateRecord(tName, roomId, Columns, Values); // System.out.println("Result: " + db.selectRecords(tName, roomId, Columns)); // roomId="" } }
pom
4.0.0 Peerslee HotMonitor 0.0.1-SNAPSHOT jar HotMonitor http://maven.apache.orgorg.apache.zookeeper zookeeper 3.4.11 org.apache.httpcomponents httpclient 4.5.2 com.fasterxml.jackson.core jackson-databind 2.8.11.1 com.fasterxml.jackson.core jackson-core 2.8.11 com.fasterxml.jackson.core jackson-annotations 2.8.11 mysql mysql-connector-java 5.1.46 junit junit 4.12 test UTF-8
zoo.cfg
tickTime=2000 initLimit=10 syncLimit=5 dataDir=D:\\Tools\\zk\\s001\\data dataLogDir=D:\\Tools\\zk\\s001\\logs clientPort=2181 server.1=localhost:2888:3888 server.2=localhost:2889:3889 server.3=localhost:2890:3890
tickTime=2000 initLimit=10 syncLimit=5 dataDir=D:\\Tools\\zk\\s002\\data dataLogDir=D:\\Tools\\zk\\s002\\logs clientPort=2182 server.1=localhost:2888:3888 server.2=localhost:2889:3889 server.3=localhost:2890:3890
tickTime=2000 initLimit=10 syncLimit=5 dataDir=D:\\Tools\\zk\\s003\\data dataLogDir=D:\\Tools\\zk\\s003\\logs clientPort=2183 server.1=localhost:2888:3888 server.2=localhost:2889:3889 server.3=localhost:2890:3890
安装包
链接:https://pan.baidu.com/s/1ZnTFX8e8L4t5WerNQN0d4g 密码:lx8r
演示流程
1. 启动 zk 集群 2. 启动CrawlInfo 0,1,2 3. 启动CrawlId 4. Client
上一篇: 二叉排序树及其C代码
下一篇: HBase负载均衡问题分析