统计租房访问量
程序员文章站
2022-04-28 08:29:37
...
启动zookeeper:
zjgm01,zjgm02,zjgm03
zkServer.sh start
启动kafka
zjgm01,zjgm02,zjgm03
kafka-server-start.sh /home/hadoop/app/kafka_2.11-0.11.0.2/config/server.properties
启动storm
zjgm01
storm nimbus (如果失败就删掉hadoop/app/apache-storm-0.9.2-incubating/bin/storm-local 里的文件)
zjgm02,zjgm03
storm supervisor (如果失败就删掉hadoop/app/apache-storm-0.9.2-incubating/bin/storm-local 里的文件)
CountToMain代码
package com.zhongruan.count;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import com.zhongruan.strom.MessageScheme;
import com.zhongruan.strom.WordBolt;
import com.zhongruan.strom.WriteBolt;
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.ZkHosts;
public class CountToMain {
public static void main(String[] args) {
String topic ="dsj03";
String zkRoot="/dsj";
String spountId="kafkaSpout";
BrokerHosts zkHosts = new ZkHosts("zjgm01:2181,zjgm02:2181,zjgm03:2181");
TopologyBuilder builder=new TopologyBuilder();
SpoutConfig conf=new SpoutConfig(zkHosts,topic,zkRoot,spountId);
conf.forceFromStart=true;
conf.scheme=new SchemeAsMultiScheme(new MessageScheme());
builder.setSpout(spountId,new KafkaSpout(conf));
builder.setBolt("readBolt",new ReadBolt()).shuffleGrouping(spountId);
builder.setBolt("writeCountBolt",new WriteCountBolt()).shuffleGrouping("readBolt");
LocalCluster cluster=new LocalCluster();
Config conf1=new Config();
conf1.setNumWorkers(4);
cluster.submitTopology("count",conf1,builder.createTopology());
}
}
ReadBolt代码
package com.zhongruan.count;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class ReadBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
String s = tuple.getString(0);
int index = s.indexOf("user phone :");
if(index!=-1 && s.length()>=index + 23){
String s1 = s.substring(index + 12, index + 23);
basicOutputCollector.emit(new Values(s1));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("s"));
}
}
WriteCountBolt 代码
package com.zhongruan.count;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
import java.io.FileWriter;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
public class WriteCountBolt extends BaseBasicBolt {
FileWriter fileWriter=null;
Map<String,Integer> map=null;
@Override
public void prepare(Map stormConf, TopologyContext context) {
try {
fileWriter=new FileWriter("d:\\storm\\yijulog"+ UUID.randomUUID().toString());
map=new HashMap<>();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
String s = tuple.getString(0);
Integer integer = map.get(s);
if(integer==null){
map.put(s,1);
}else {
Integer i=map.get(s);
map.put(s,i+1);
}
try {
fileWriter.write(s+"登入了"+map.get(s)+"次");
fileWriter.write("\n");
fileWriter.flush();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
}
结果
CountToMain代码
public class CountToMain {
public static void main(String[] args) {
String topic ="dsj03";
String zkRoot="/dsj";
String spountId="kafkaSpout";
BrokerHosts zkHosts = new ZkHosts("zjgm01:2181,zjgm02:2181,zjgm03:2181");
TopologyBuilder builder=new TopologyBuilder();
SpoutConfig conf=new SpoutConfig(zkHosts,topic,zkRoot,spountId);
conf.forceFromStart=true;
conf.scheme=new SchemeAsMultiScheme(new MessageScheme());
builder.setSpout(spountId,new KafkaSpout(conf));
builder.setBolt("readBolt",new ReadBolt()).shuffleGrouping(spountId);
builder.setBolt("writeCountBolt",new WriteCountBolt()).shuffleGrouping("readBolt");
LocalCluster cluster=new LocalCluster();
Config conf1=new Config();
conf1.setNumWorkers(4);
cluster.submitTopology("count",conf1,builder.createTopology());
}
}
ReadBolt代码
public class ReadBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
String s = tuple.getString(0);
/* int index = s.indexOf("user phone :");
if(index!=-1 && s.length()>=index + 23){
String s1 = s.substring(index + 12, index + 23);
basicOutputCollector.emit(new Values(s1));
}*/
int index = s.indexOf(" house be searched:");
if(index!=-1 && s.length()>=index + 23){
String s1 = s.substring(index + 18, index + 23);
basicOutputCollector.emit(new Values(s1));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("s"));
}
}
WriteCountBolt 代码
public class WriteCountBolt extends BaseBasicBolt {
Jedis jedis=null;
@Override
public void prepare(Map stormConf, TopologyContext context) {
jedis=new Jedis("127.0.0.1",6379);
}
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
String s=tuple.getString(0);
jedis.hincrBy("houseId",s,1);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
/* FileWriter fileWriter=null;
Map<String,Integer> map=null;
@Override
public void prepare(Map stormConf, TopologyContext context) {
try {
fileWriter=new FileWriter("d:\\storm\\yijulog"+ UUID.randomUUID().toString());
map=new HashMap<>();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
String s = tuple.getString(0);
Integer integer = map.get(s);
if(integer==null){
map.put(s,1);
}else {
Integer i=map.get(s);
map.put(s,i+1);
}
try {
fileWriter.write(s+"登入了"+map.get(s)+"次");
fileWriter.write("\n");
fileWriter.flush();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}*/
}
结果
上一篇: C/C++静态代码安全检查工具
下一篇: 一个比较老的笑话