大数据 09 Hadoop 实战 用户行为日志分析
程序员文章站
2022-04-28 09:02:43
...
1 概述
用户行为日志:
- 用户每次访问网站时所有的行为数据(访问、浏览、搜索、点击…)
- 用户行为轨迹、流量日志
日志数据内容:
- 访问的系统属性: 操作系统、浏览器等等
- 访问特征:点击的url、从哪个url跳转过来的(referer)、页面上的停留时间等
- 访问信息:session_id、访问ip (访问城市) 等
日志信息格式:
2013-05-19 13:00:00 http://www.taobao.com/17/?tracker_u=1624169&type=1 B58W48U4WKZCJ5D1T3Z9ZY88RU7QA7B1 http://hao.360.cn/ 1.196.34.243
1.1 数据处理流程:
1)数据采集
Flume: web日志写入到HDFS
2)数据清洗
脏数据
Spark、Hive、MapReduce 或者是其他的一些分布式计算框架
清洗完之后的数据可以存放在HDFS(Hive/Spark SQL)
3)数据处理
按照我们的需要进行相应业务的统计和分析
Spark、Hive、MapReduce 或者是其他的一些分布式计算框架
4)处理结果入库
结果可以存放到RDBMS、NoSQL
5)数据的可视化
通过图形化展示的方式展现出来:饼图、柱状图、地图、折线图
ECharts、HUE、Zeppelin
1.2 离线数据处理架构
1.3 项目需求
统计日志里的不同浏览器的访问次数
- 根据日志信息抽取出浏览器信息
- 针对不同的浏览器进行统计操作
使用第三方jar包 UserAgentParser
# 1 先编译一下
mvn clean package -DskipTests
# 2 安装到maven仓库
mvn clean install -DskipTest
# 3 在pom.xml中添加依赖
<!-- 添加UserAgent解析的依赖 -->
<dependency>
<groupId>com.kumkee</groupId>
<artifactId>UserAgentParser</artifactId>
<version>0.0.1</version>
</dependency>
上传测试文件到HDFS上
2 编程
package com.lihaogn.hadoop.project;
import com.kumkee.userAgent.UserAgent;
import com.kumkee.userAgent.UserAgentParser;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class LogApp {
/**
* Map:读取输入的文件
*/
public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
LongWritable one = new LongWritable(1);
private UserAgentParser userAgentParser;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
userAgentParser = new UserAgentParser();
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 接收到的每一行数据,一行记录信息
String line = value.toString();
String source = line.substring(getCharPosition(line, "\"", 7));
UserAgent agent = userAgentParser.parse(source);
String browser = agent.getBrowser();
//按照指定分隔符进行拆分
String[] words = line.split(" ");
context.write(new Text(browser), one);
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
userAgentParser = null;
}
}
/**
* 获取指定字符串中指定标识的字符串出现的索引位置
*/
private static int getCharPosition(String value, String operator, int index) {
Matcher slashMathcer = Pattern.compile(operator).matcher(value);
int mIdx = 0;
while (slashMathcer.find()) {
mIdx++;
if (mIdx == index) {
break;
}
}
return slashMathcer.start();
}
/**
* Reduce:归并操作
*/
public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long sum = 0;
for (LongWritable value : values) {
// 求key出现的次数总和
sum += value.get();
}
// 最终统计结果的输出
context.write(key, new LongWritable(sum));
}
}
/**
* 定义Driver:封装了MapReduce作业的所有信息
*/
public static void main(String[] args) throws Exception {
//创建Configuration
Configuration configuration = new Configuration();
// 准备清理已存在的输出目录
Path outputPath = new Path(args[1]);
FileSystem fileSystem = FileSystem.get(configuration);
if (fileSystem.exists(outputPath)) {
fileSystem.delete(outputPath, true);
System.out.println("output file exists, but is has deleted");
}
//创建Job
Job job = Job.getInstance(configuration, "LogApp");
//设置job的处理类
job.setJarByClass(LogApp.class);
//设置作业处理的输入路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
//设置map相关参数
job.setMapperClass(LogApp.MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//设置reduce相关参数
job.setReducerClass(LogApp.MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//通过job设置combiner处理类,其实逻辑上和我们的reduce是一模一样的
job.setCombinerClass(MyReducer.class);
//设置作业处理的输出路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
编译文件
mvn assembly:assembly
运行
hadoop jar hadoop-train-1.0--jar-with-dependencies.jar \
com.lihaogn.hadoop.project.LogApp \
/10000_access.log \
/browserout
结果: