欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

统计租房访问量

程序员文章站 2022-04-28 08:29:37
...

启动zookeeper:
zjgm01,zjgm02,zjgm03
zkServer.sh start

启动kafka
zjgm01,zjgm02,zjgm03
kafka-server-start.sh /home/hadoop/app/kafka_2.11-0.11.0.2/config/server.properties

启动storm
zjgm01
storm nimbus (如果失败就删掉hadoop/app/apache-storm-0.9.2-incubating/bin/storm-local 里的文件)

zjgm02,zjgm03
storm supervisor (如果失败就删掉hadoop/app/apache-storm-0.9.2-incubating/bin/storm-local 里的文件)

CountToMain代码

package com.zhongruan.count;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import com.zhongruan.strom.MessageScheme;
import com.zhongruan.strom.WordBolt;
import com.zhongruan.strom.WriteBolt;
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.ZkHosts;

public class CountToMain {
    public static void main(String[] args) {
        String topic ="dsj03";
        String zkRoot="/dsj";
        String spountId="kafkaSpout";
        BrokerHosts zkHosts = new ZkHosts("zjgm01:2181,zjgm02:2181,zjgm03:2181");

        TopologyBuilder builder=new TopologyBuilder();

        SpoutConfig conf=new SpoutConfig(zkHosts,topic,zkRoot,spountId);
        conf.forceFromStart=true;
        conf.scheme=new SchemeAsMultiScheme(new MessageScheme());

        builder.setSpout(spountId,new KafkaSpout(conf));

        builder.setBolt("readBolt",new ReadBolt()).shuffleGrouping(spountId);
        builder.setBolt("writeCountBolt",new WriteCountBolt()).shuffleGrouping("readBolt");

        LocalCluster cluster=new LocalCluster();
        Config conf1=new Config();
        conf1.setNumWorkers(4);
        cluster.submitTopology("count",conf1,builder.createTopology());
    }
}

ReadBolt代码

package com.zhongruan.count;

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class ReadBolt extends BaseBasicBolt {
    @Override
    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
        String s = tuple.getString(0);

        int index = s.indexOf("user phone :");
        if(index!=-1 && s.length()>=index + 23){
            String s1 = s.substring(index + 12, index + 23);
            basicOutputCollector.emit(new Values(s1));
        }


    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("s"));
    }
}

WriteCountBolt 代码

package com.zhongruan.count;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;

import java.io.FileWriter;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

public class WriteCountBolt extends BaseBasicBolt {
    FileWriter fileWriter=null;
    Map<String,Integer> map=null;
    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        try {
            fileWriter=new FileWriter("d:\\storm\\yijulog"+ UUID.randomUUID().toString());
            map=new HashMap<>();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
        String s = tuple.getString(0);
        Integer integer = map.get(s);
        if(integer==null){
            map.put(s,1);
        }else {
            Integer i=map.get(s);
            map.put(s,i+1);
        }
        try {
            fileWriter.write(s+"登入了"+map.get(s)+"次");
            fileWriter.write("\n");
            fileWriter.flush();
        } catch (IOException e) {
            e.printStackTrace();
        }


    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }
}

结果

统计租房访问量

CountToMain代码

public class CountToMain {
    public static void main(String[] args) {
        String topic ="dsj03";
        String zkRoot="/dsj";
        String spountId="kafkaSpout";
        BrokerHosts zkHosts = new ZkHosts("zjgm01:2181,zjgm02:2181,zjgm03:2181");

        TopologyBuilder builder=new TopologyBuilder();

        SpoutConfig conf=new SpoutConfig(zkHosts,topic,zkRoot,spountId);
        conf.forceFromStart=true;
        conf.scheme=new SchemeAsMultiScheme(new MessageScheme());

        builder.setSpout(spountId,new KafkaSpout(conf));

        builder.setBolt("readBolt",new ReadBolt()).shuffleGrouping(spountId);
        builder.setBolt("writeCountBolt",new WriteCountBolt()).shuffleGrouping("readBolt");

        LocalCluster cluster=new LocalCluster();
        Config conf1=new Config();
        conf1.setNumWorkers(4);
        cluster.submitTopology("count",conf1,builder.createTopology());
    }
}

ReadBolt代码

public class ReadBolt extends BaseBasicBolt {
    @Override
    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
        String s = tuple.getString(0);

     /*   int index = s.indexOf("user phone :");
        if(index!=-1 && s.length()>=index + 23){
            String s1 = s.substring(index + 12, index + 23);
            basicOutputCollector.emit(new Values(s1));
        }*/
        int index = s.indexOf(" house be searched:");
        if(index!=-1 && s.length()>=index + 23){
            String s1 = s.substring(index + 18, index + 23);
            basicOutputCollector.emit(new Values(s1));
        }

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("s"));
    }
}

WriteCountBolt 代码

public class WriteCountBolt extends BaseBasicBolt {
    Jedis jedis=null;
    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        jedis=new Jedis("127.0.0.1",6379);
    }
        @Override
    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
        String s=tuple.getString(0);
        jedis.hincrBy("houseId",s,1);

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }
  /*  FileWriter fileWriter=null;
    Map<String,Integer> map=null;
    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        try {
            fileWriter=new FileWriter("d:\\storm\\yijulog"+ UUID.randomUUID().toString());
            map=new HashMap<>();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
        String s = tuple.getString(0);
        Integer integer = map.get(s);
        if(integer==null){
            map.put(s,1);
        }else {
            Integer i=map.get(s);
            map.put(s,i+1);
        }
        try {
            fileWriter.write(s+"登入了"+map.get(s)+"次");
            fileWriter.write("\n");
            fileWriter.flush();
        } catch (IOException e) {
            e.printStackTrace();
        }


    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }*/
}

结果

统计租房访问量