大数据现学流程梳理
流程:通过Nginx操作frame项目产生日志并保存到logs/log.frame.access.log-->由flume对日志文件夹进行采集并下沉到kafka-->把从kafka得到的数据保存到HDFS上-->用Hive进行处理
1:把能成功运行起来的Frame项目打包成jar文件
打包方式:找到idea右侧的Maven下的package点击即可,控制台出现success即打包成功
注意打包前frame中的application中的url中的localhost需要更换为电脑的wifi网关pv4
在hdp-4中使用Alt+p在弹出窗口把打包的jar程序拖入 然后在hdp-4中java -jar a(a代表frame打包成jar的名字)
2:Nginx(所学特性负载均衡,反向代理服务器)
作用:为了能使用其他虚拟机访问到hdp-4上运行的frame项目
Nginx安装成功标志 输入hdp-1:80出现Nginx欢迎页面 也可以输入hdp-1:8080访问到hdp-4的frame项目 8080是frame项目的端口号。
Nginx启动方式
cd /usr/local/nginx/sbin
ll
./nginx
Nginx重启方式(在同样的sbin路径下)
./nginx -s reload
Nginx配置文件
配置文件位置
/usr/local/nginx/conf/nginx.conf
#user nobody;
worker_processes 1;
#error_log logs/error.log;
#error_log logs/error.log notice;
#error_log logs/error.log info;
#pid logs/nginx.pid;
events {
worker_connections 1024;
}
http {
include mime.types;
default_type application/octet-stream;
##日志文件保存的什么样的数据
log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for"';
#access_log logs/access.log main;
sendfile on;
#tcp_nopush on;
#keepalive_timeout 0;
keepalive_timeout 65;
#gzip on;
upstream frame-tomcat {
server hdp-4:8881 ;
##指明Nginx转发地址
}
server {
listen 80;
server_name hdp-1;
##指明Nginx的服务地址
#charset koi8-r;
access_log logs/log.frame.access.log main;
##Nginx日志文件的路径和格式
location / {
# root html;
# index index.html index.htm;
proxy_pass http://frame-tomcat;
##代理传递(转发)
}
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root html;
}
}
server {
listen 80;
server_name localhost;
#charset koi8-r;
#access_log logs/host.access.log main;
location / {
root html;
index index.html index.htm;
}
#error_page 404 /404.html;
# redirect server error pages to the static page /50x.html
#
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root html;
}
# proxy the PHP scripts to Apache listening on 127.0.0.1:80
#
#location ~ \.php$ {
# proxy_pass http://127.0.0.1;
#}
# pass the PHP scripts to FastCGI server listening on 127.0.0.1:9000
#
#location ~ \.php$ {
# root html;
# fastcgi_pass 127.0.0.1:9000;
# fastcgi_index index.php;
# fastcgi_param SCRIPT_FILENAME /scripts$fastcgi_script_name;
# include fastcgi_params;
#}
# deny access to .htaccess files, if Apache's document root
# concurs with nginx's one
#
#location ~ /\.ht {
# deny all;
#}
}
# another virtual host using mix of IP-, name-, and port-based configuration
#
#server {
# listen 8000;
# listen somename:8080;
# server_name somename alias another.alias;
# location / {
# root html;
# index index.html index.htm;
# }
#}
# HTTPS server
#
#server {
# listen 443;
# server_name localhost;
# ssl on;
# ssl_certificate cert.pem;
# ssl_certificate_key cert.key;
# ssl_session_timeout 5m;
# ssl_protocols SSLv2 SSLv3 TLSv1;
# ssl_ciphers HIGH:!aNULL:!MD5;
# ssl_prefer_server_ciphers on;
# location / {
# root html;
# index index.html index.htm;
# }
#}
}
3:Flume
作用:采集Nginx生成的日志文件,并下沉到kafka
Flume的三大组件 Source:采集组件 Sink:下沉组件 Channel:传输通道组件
启动命令
bin/flume-ng agent -c conf/ -f tail-kafka.conf -n a1 -Dflume.root.logger=INFO,console
tail-kafka.conf 是自己设置的配置文件 用来监听Nginx产生的日志 并下沉到kafka
a1.sources=source1
a1.sinks=k1
a1.channels=c1
##exec指明数据源来自一个可执行指令
a1.sources.source1.type = exec
##可执行指令跟踪Nginx生成的日志 /root/log/access.log 与Nginx的配置生成日志的路径一样
a1.sources.source1.command = tail -F /root/log/access.log
# Describe the sink
##下沉到kafka的配置
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
##kafka主题
a1.sinks.k1.topic = test
a1.sinks.k1.brokerList =hdp-1:9092, hdp-2:9092, hdp-3:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
a1.sinks.k1.channel = c1
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.source1.channels = c1
a1.sinks.k1.channel = c1d
4:kafka
注意:启动kafka之前需要启动zookeeper kafka不能保存数据 现在理解是对数据排序 举例 香烟插入烟盒
原因
kafka脚本启动方式 根目录下vi kafka.sh 输入一下代码
#! /bin/bash
for host in hdp-1 hdp-2 hdp-3
do
echo "===========$host==========="
ssh $host "source /etc/profile;/root/apps/kafka_2.12-2.2.0/bin/kafka-server-$1.sh -daemon /root/apps/kafka_2.12-2.2.0/config/server.properties"
done
kafka bin的启动方式
./kafka-server-start.sh ../config/server.properties
kafka创建名为test主题的语句
./kafka-topics.sh --create --zookeeper hdp-1:2181 --replication-factor 1 --partitions 1 --topic test
kafka生产者创建语句 注意的是9092位kafka的默认端口号
./kafka-console-producer.sh --broker-list hdp-1:9092 --topic test
kafka消费者的创建语句
./kafka-console-consumer.sh --bootstrap-server hdp-1:9092 --topic test --from-beginning
创建成功后再生产者输入,消费者可以接收到
我们现在需要实现的功能是flume把采集到的数据下沉到kafka,所以kafka启动消费者会监测到数据的变化
把kafka中的数据保存到本地文件夹中
package com.zpark.onekafka;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.log4j.Logger;
import java.io.BufferedWriter;
import java.io.FileOutputStream;
import java.io.OutputStreamWriter;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.Properties;
public class ConsumerDemo {
public static void main(String[] args) {
//调用接收消息的方法
receiveMsg();
}
/**
* 获取kafka topic(animal)上的数据
*/
private static void receiveMsg() {
Logger logger = Logger.getLogger("logRollingFile");
Properties properties = new Properties();
properties.put("bootstrap.servers", "hdp-3:9092");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("group.id","aaaa");
properties.put("enable.auto.commit", true);
//一个方法
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Collections.singleton("text"));
URI uri = null;
Configuration conf = null;
String user = "root";
try {
uri = new URI("hdfs://hdp-1:9000");
conf = new Configuration();
conf = new Configuration();
//dfs.replication:分布式文件系统副本的数量
conf.set("dfs.replication", "2");
//dfs.blocksize:分布式文件系统的块的大小 100M 64+36
conf.set("dfs.blocksize", "64m");
} catch (URISyntaxException e) {
e.printStackTrace();
}
try {
FileOutputStream fos = new FileOutputStream("D:/shuju.log");
OutputStreamWriter osw = new OutputStreamWriter(fos);
// FileSystem fs = FileSystem.get(uri, conf, user);
// FSDataOutputStream fdos = fs.create(new Path("/cf.txt"));
while(true) {
/**
* 获取kafka
*/
ConsumerRecords<String, String> records = consumer.poll(100);
for(ConsumerRecord<String, String> record: records) {
String msg = "key:" + record.key()+ ",value:" + record.value() + ",offset:" + record.offset()+",topic:" + record.topic()+"\r\n";
System.out.printf("key=%s,value=%s,offet=%s,topic=%s",record.key() , record.value() , record.offset(), record.topic());
logger.debug(record.value());
// BufferedWriter bw = new BufferedWriter(osw);
// bw.write(msg);
// bw.flush();
}
}
}catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
}
在把本地文件夹的数据上传到hdfs的hive表中
hive中建表: create external table flumetable2 (ip string ) row format delimited location '/usr/';
package com.zpark.onekafka;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.net.URI;
import java.net.URISyntaxException;
public class HdfsTest {
public static void main(String[] args) {
URI uri = null;
Configuration conf = null;
String user = "root";
FileSystem fs = null;
try {
uri = new URI("hdfs://hdp-1:9000");
conf = new Configuration();
//dfs.replication:分布式文件系统副本的数量
conf.set("dfs.replication", "2");
//dfs.blocksize:分布式文件系统的块的大小 100M 64+36
conf.set("dfs.blocksize", "64m");
fs = FileSystem.get(uri, conf, user);
fs.copyFromLocalFile(new Path("d:/testlog/access.log"),new Path("/usr/a.txt"));
fs.close();
} catch (URISyntaxException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
}
}
}
5:hive分析
推荐阅读
-
Java常用数据流全面大梳理
-
JavaScript入门学习:特点、数据类型分类、常见运算符、三大流程顺序及三种循环讲解
-
大数据现学流程梳理
-
数据防泄 流程梳理是关键
-
基于数据治理、流程梳理、业务分析的数据化体系建设
-
基于数据治理、流程梳理、业务分析的数据化体系建设
-
JavaScript入门学习:特点、数据类型分类、常见运算符、三大流程顺序及三种循环讲解
-
Java常用数据流全面大梳理
-
数据预处理流程梳理(1)---单要素处理
-
20.8.7 事务操作 事务的四大特性 隔离级别 数据库的优化 数据库设计范式 mysql储存引擎 字段数据类型选择 explain语句 表的拆分 数据库的备份和用户管理 pymysql使用流程