欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

flume+kafka+strom;zookeeper+hadoop+hbase(整合启动)

程序员文章站 2022-07-12 22:31:22
...

flume+kafka+strom;zookeeper+hadoop+hbase(整合启动)


一、启动zookeeper+hadoop+Hbase

  1. zookeeper(三台)
cd /opt/zookeeper/zookeeper-3.4.12/bin/
./zkServer.sh start

flume+kafka+strom;zookeeper+hadoop+hbase(整合启动)
2. hadoop(主)

cd /opt/hadoop/hadoop-2.7.3/sbin/
./start-all.sh 

flume+kafka+strom;zookeeper+hadoop+hbase(整合启动)

  • 可能出现zhiyou003的ResourceManager未正常启动
  • zhiyou003启动yarn
cd /opt/hadoop/hadoop-2.7.3/sbin/
./start-yarn.sh

flume+kafka+strom;zookeeper+hadoop+hbase(整合启动)

  1. Hbase(主)
cd /opt/hbase/hbase-2.0.0/bin/
./start-hbase.sh 
./hbase shell

flume+kafka+strom;zookeeper+hadoop+hbase(整合启动)
flume+kafka+strom;zookeeper+hadoop+hbase(整合启动)


二、启动flume+kafka+strom

  1. 创建配置flume
//flume_kafka.conf此时配置kafka的主题为total
##########################################################
##
##主要作用是监听目录中的新增数据,采集到数据之后,输出到kafka
##    注意:Flume agent的运行,主要就是配置source channel sink
##  下面的a1就是agent的代号,source叫r1 channel叫c1 sink叫k1
#########################################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#具体定义source
a1.sources.r1.type = exec
#文件
a1.sources.r1.command =tail -F /orders.log

#sink到kafka里面
a1.sinks.k1.channel = c1
a1.sinks.k1.type =org.apache.flume.sink.kafka.KafkaSink
#设置Kafka的Topic
a1.sinks.k1.kafka.topic = total
#设置Kafka的broker地址和端口号
a1.sinks.k1.kafka.bootstrap.servers = zhiyou001:9092,zhiyou002:9092,zhiyou003:9092
#配置批量提交的数量
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.ki.kafka.producer.compression.type= snappy

#对于channel的配置描述 使用文件做数据的临时缓存 这种的安全性要高
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /home/uplooking/data/flume/checkpoint
a1.channels.c1.dataDirs = /home/uplooking/data/flume/data

#通过channel c1将source r1和sink k1关联起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

flume+kafka+strom;zookeeper+hadoop+hbase(整合启动)

  1. 启动kafka(三台)
cd /opt/kafka/kafka_2.12-1.1.0
./bin/kafka-server-start.sh -daemon config/server.properties 
  1. 创建主题(主机)(启动几台factor、partitions后写几)
cd /opt/kafka/kafka_2.12-1.1.0/bin/
./kafka-topics.sh --create --zookeeper zhiyou001:2181,zhiyou002:2181,zhiyou003:2181 --replication-factor 3 --partitions 3 --topic total
//启动消费者
./kafka-console-consumer.sh --bootstrap-server zhiyou001:9092,zhiyou002:9092,zhiyou003:9092 --from-beginning --topic total

flume+kafka+strom;zookeeper+hadoop+hbase(整合启动)
flume+kafka+strom;zookeeper+hadoop+hbase(整合启动)

  1. 启动flume(主机)
cd /opt/flume/apache-flume-1.8.0-bin/
bin/flume-ng agent --conf conf --conf-file conf/flume_kafka.conf  --name a1  -Dflume.root.logger=INFO,console

flume+kafka+strom;zookeeper+hadoop+hbase(整合启动)

  1. 测试(新窗口)
cd /
echo "1">>orders.log

flume+kafka+strom;zookeeper+hadoop+hbase(整合启动)

三、代码测试

  1. HbaseUtil
package com.zhiyou.total;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;

public class HbaseUtil {
	private static Configuration conf;
	private static Connection conn;
	private static Table table; 
	
	static{
		//1.连接zookeeper
		conf=new Configuration();
		conf.set("hbase.zookeeper.quorum", "zhiyou001:2181,zhiyou002:2181,zhiyou002:2180");
		//2.建立连接
		 try {
			conn=ConnectionFactory.createConnection(conf);
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

	public static Table getTable(String tableName) {
		
				//3.获取表
				 try {
					table=conn.getTable(TableName.valueOf(tableName));
				} catch (IOException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
				return table;
		
	}
	
}
  1. Spout
package com.zhiyou.total;

import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.ZkHosts;

import kafka.api.OffsetRequest;

public class Spout {
	public KafkaSpout createKafkSpout(){
		//配置hosts
		String borkerZKStr="zhiyou001:2181,zhiyou002:2181,zhiyou003:2181";
		
		BrokerHosts brokerHosts=new ZkHosts(borkerZKStr);
		
		String topic="total";//选择主题
		String zkRoot="/"+topic; //zkRoot
		String id="id002"; //标识符
		
		SpoutConfig config=new SpoutConfig(brokerHosts, topic, zkRoot, id);
		//最新的消息
		config.startOffsetTime=OffsetRequest.LatestTime();
		
		return new KafkaSpout(config);
		
	}
}

  1. Bolt1
package com.zhiyou.total;

import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

public class Bolt1 extends BaseBasicBolt{

	public void execute(Tuple input, BasicOutputCollector collector) {
		// TODO Auto-generated method stub
		byte[]bytes=input.getBinary(0);  //字节码
		//byte->String
		//192.168.53.123	0-001	100	手机	小米8	99	2	20181112(timeStamp)
		String line =new String(bytes);
		String[] str=line.split(" ");
		long price=Long.valueOf(str[5]);
		String date=str[7];
		
		System.out.println(line);
		collector.emit(new Values(date,price));
	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("date","price"));
	}
	
}
  1. Bolt2
package com.zhiyou.total;

import java.io.IOException;

import org.apache.hadoop.hbase.client.Table;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;

public class Bolt2 extends BaseBasicBolt{

	public void execute(Tuple input, BasicOutputCollector collector) {
		String date = input.getStringByField("date");
		long price = input.getLongByField("price");
		
		Table table = HbaseUtil.getTable("sum_money_date");
		try {
			table.incrementColumnValue(("sum_money_"+date).getBytes(), "info".getBytes(), "summoney".getBytes(), price);
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		
	}
	
}
  1. Topology
package com.zhiyou.total;

import java.util.HashMap;

import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.topology.TopologyBuilder;

public class Topology {

	public static void main(String[] args) {
		TopologyBuilder bulider=new TopologyBuilder();
		bulider.setSpout("Spout", new Spout().createKafkSpout());
		bulider.setBolt("Bolt1", new Bolt1()).shuffleGrouping("Spout");
		bulider.setBolt("Bolt2", new Bolt2()).shuffleGrouping("Bolt1");
		
		StormTopology stormTopology=bulider.createTopology();
		
		LocalCluster localCluster=new LocalCluster();
		localCluster.submitTopology("a", new HashMap(), stormTopology);
	}
}

四、测试Topology

  1. 修改日志
cd /
echo "192.168.53.123 0-001 100 手机 小米8 99 2 20181112">>orders.log
  1. 后台打印
    flume+kafka+strom;zookeeper+hadoop+hbase(整合启动)
  2. hbase查看
    flume+kafka+strom;zookeeper+hadoop+hbase(整合启动)
相关标签: 整合