flume+kafka+strom;zookeeper+hadoop+hbase(整合启动)
程序员文章站
2022-07-12 22:31:22
...
flume+kafka+strom;zookeeper+hadoop+hbase(整合启动)
一、启动zookeeper+hadoop+Hbase
- zookeeper(三台)
cd /opt/zookeeper/zookeeper-3.4.12/bin/
./zkServer.sh start
2. hadoop(主)
cd /opt/hadoop/hadoop-2.7.3/sbin/
./start-all.sh
- 可能出现zhiyou003的ResourceManager未正常启动
- zhiyou003启动yarn
cd /opt/hadoop/hadoop-2.7.3/sbin/
./start-yarn.sh
- Hbase(主)
cd /opt/hbase/hbase-2.0.0/bin/
./start-hbase.sh
./hbase shell
二、启动flume+kafka+strom
- 创建配置flume
//flume_kafka.conf此时配置kafka的主题为total
##########################################################
##
##主要作用是监听目录中的新增数据,采集到数据之后,输出到kafka
## 注意:Flume agent的运行,主要就是配置source channel sink
## 下面的a1就是agent的代号,source叫r1 channel叫c1 sink叫k1
#########################################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#具体定义source
a1.sources.r1.type = exec
#文件
a1.sources.r1.command =tail -F /orders.log
#sink到kafka里面
a1.sinks.k1.channel = c1
a1.sinks.k1.type =org.apache.flume.sink.kafka.KafkaSink
#设置Kafka的Topic
a1.sinks.k1.kafka.topic = total
#设置Kafka的broker地址和端口号
a1.sinks.k1.kafka.bootstrap.servers = zhiyou001:9092,zhiyou002:9092,zhiyou003:9092
#配置批量提交的数量
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.ki.kafka.producer.compression.type= snappy
#对于channel的配置描述 使用文件做数据的临时缓存 这种的安全性要高
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /home/uplooking/data/flume/checkpoint
a1.channels.c1.dataDirs = /home/uplooking/data/flume/data
#通过channel c1将source r1和sink k1关联起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
- 启动kafka(三台)
cd /opt/kafka/kafka_2.12-1.1.0
./bin/kafka-server-start.sh -daemon config/server.properties
- 创建主题(主机)(启动几台factor、partitions后写几)
cd /opt/kafka/kafka_2.12-1.1.0/bin/
./kafka-topics.sh --create --zookeeper zhiyou001:2181,zhiyou002:2181,zhiyou003:2181 --replication-factor 3 --partitions 3 --topic total
//启动消费者
./kafka-console-consumer.sh --bootstrap-server zhiyou001:9092,zhiyou002:9092,zhiyou003:9092 --from-beginning --topic total
- 启动flume(主机)
cd /opt/flume/apache-flume-1.8.0-bin/
bin/flume-ng agent --conf conf --conf-file conf/flume_kafka.conf --name a1 -Dflume.root.logger=INFO,console
- 测试(新窗口)
cd /
echo "1">>orders.log
三、代码测试
- HbaseUtil
package com.zhiyou.total;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
public class HbaseUtil {
private static Configuration conf;
private static Connection conn;
private static Table table;
static{
//1.连接zookeeper
conf=new Configuration();
conf.set("hbase.zookeeper.quorum", "zhiyou001:2181,zhiyou002:2181,zhiyou002:2180");
//2.建立连接
try {
conn=ConnectionFactory.createConnection(conf);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public static Table getTable(String tableName) {
//3.获取表
try {
table=conn.getTable(TableName.valueOf(tableName));
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return table;
}
}
- Spout
package com.zhiyou.total;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.ZkHosts;
import kafka.api.OffsetRequest;
public class Spout {
public KafkaSpout createKafkSpout(){
//配置hosts
String borkerZKStr="zhiyou001:2181,zhiyou002:2181,zhiyou003:2181";
BrokerHosts brokerHosts=new ZkHosts(borkerZKStr);
String topic="total";//选择主题
String zkRoot="/"+topic; //zkRoot
String id="id002"; //标识符
SpoutConfig config=new SpoutConfig(brokerHosts, topic, zkRoot, id);
//最新的消息
config.startOffsetTime=OffsetRequest.LatestTime();
return new KafkaSpout(config);
}
}
- Bolt1
package com.zhiyou.total;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
public class Bolt1 extends BaseBasicBolt{
public void execute(Tuple input, BasicOutputCollector collector) {
// TODO Auto-generated method stub
byte[]bytes=input.getBinary(0); //字节码
//byte->String
//192.168.53.123 0-001 100 手机 小米8 99 2 20181112(timeStamp)
String line =new String(bytes);
String[] str=line.split(" ");
long price=Long.valueOf(str[5]);
String date=str[7];
System.out.println(line);
collector.emit(new Values(date,price));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("date","price"));
}
}
- Bolt2
package com.zhiyou.total;
import java.io.IOException;
import org.apache.hadoop.hbase.client.Table;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;
public class Bolt2 extends BaseBasicBolt{
public void execute(Tuple input, BasicOutputCollector collector) {
String date = input.getStringByField("date");
long price = input.getLongByField("price");
Table table = HbaseUtil.getTable("sum_money_date");
try {
table.incrementColumnValue(("sum_money_"+date).getBytes(), "info".getBytes(), "summoney".getBytes(), price);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
- Topology
package com.zhiyou.total;
import java.util.HashMap;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.topology.TopologyBuilder;
public class Topology {
public static void main(String[] args) {
TopologyBuilder bulider=new TopologyBuilder();
bulider.setSpout("Spout", new Spout().createKafkSpout());
bulider.setBolt("Bolt1", new Bolt1()).shuffleGrouping("Spout");
bulider.setBolt("Bolt2", new Bolt2()).shuffleGrouping("Bolt1");
StormTopology stormTopology=bulider.createTopology();
LocalCluster localCluster=new LocalCluster();
localCluster.submitTopology("a", new HashMap(), stormTopology);
}
}
四、测试Topology
- 修改日志
cd /
echo "192.168.53.123 0-001 100 手机 小米8 99 2 20181112">>orders.log
- 后台打印
- hbase查看
推荐阅读
-
flume+kafka+strom;zookeeper+hadoop+hbase(整合启动)
-
idea中 ssm整合完 启动tomcat报错
-
整合ssm框架的时候出现错误org.apache.catalina.LifecycleException: 无法启动组件[StandardEngine[Catalina].StandardHost[l
-
关于springboot2整合lettuce启动卡住问题的解决方法
-
SpringBoot整合MybatisPlus启动时报错: org.springframework.beans.factory.UnsatisfiedDependencyException
-
springboot整合shiro启动报错Consider renaming one of the beans or enabling overriding by setting spring.mai
-
java-Java的框架SSH整合的项目,项目启动时总是会报一个莫名其妙的错误?
-
servlet整合quartz:servlet中使用quartz,服务器启动时加载任务
-
java-Java的框架SSH整合的项目,项目启动时总是会报一个莫名其妙的错误?
-
idea中 ssm整合完 启动tomcat报错