用户日志分析系统实战(二)
程序员文章站
2022-04-28 08:59:03
...
用户日志分析系统实战(二)
1 日志收集
1.1 常见数据来源
- 非结构化数据
- 机器产生的数据
- 用户访问日志
- 用户购买日志
- 结构化数据
- RDBMS:MySQL、Oracle等
- 全量导入
- 增量导入
- RDBMS:MySQL、Oracle等
1.2 数据收集与入库要求
- 分布式
- 数据源多样性
- 数据源分散
- 可靠性
- 保证数据不丢失
- 允许丢失部分数据
- 可扩展
- 数据源可能会不断增加
- 通过并行提高性能
1.3 Flume
- Flume收集数据到HDFS的配置
flume-conf-logAnalysis.properties:
logAgent.sources = logSource
logAgent.channels = fileChannel
logAgent.sinks = hdfsSink
# For each one of the sources, the type is defined
logAgent.sources.logSource.type = exec
logAgent.sources.logSource.command = tail -F /home/bigdata/datasource/record.list
# The channel can be defined as follows.
logAgent.sources.logSource.channels = fileChannel
# Each sink's type must be defined
logAgent.sinks.hdfsSink.type = hdfs
logAgent.sinks.hdfsSink.hdfs.path = hdfs://bigdata:9000/flume/record/%Y-%m-%d/%H%M
logAgent.sinks.hdfsSink.hdfs.filePrefix= transaction_log
logAgent.sinks.hdfsSink.hdfs.rollInterval= 600
logAgent.sinks.hdfsSink.hdfs.rollCount= 10000
logAgent.sinks.hdfsSink.hdfs.rollSize= 0
logAgent.sinks.hdfsSink.hdfs.round = true
logAgent.sinks.hdfsSink.hdfs.roundValue = 10
logAgent.sinks.hdfsSink.hdfs.roundUnit = minute
logAgent.sinks.hdfsSink.hdfs.fileType = DataStream
logAgent.sinks.hdfsSink.hdfs.useLocalTimeStamp = true
#Specify the channel the sink should use
logAgent.sinks.hdfsSink.channel = fileChannel
# Each channel's type is defined.
logAgent.channels.fileChannel.type = file
logAgent.channels.fileChannel.checkpointDir= /home/bigdata/apache-flume-1.7.0-bin/dataCheckpointDir
logAgent.channels.fileChannel.dataDirs= /home/bigdata/apache-flume-1.7.0-bin/dataDir
- Flume收集数据到Kafka的配置
flume-conf-logAnalysis-kafka.properties:
logAgent.sources = logSource
logAgent.channels = fileChannel
logAgent.sinks = kafkaSink
# For each one of the sources, the type is defined
logAgent.sources.logSource.type = exec
logAgent.sources.logSource.command = tail -F /home/bigdata/datasource/record.list
# The channel can be defined as follows.
logAgent.sources.logSource.channels = fileChannel
# Each sink's type must be defined
logAgent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
logAgent.sinks.kafkaSink.topic = log
logAgent.sinks.kafkaSink.brokerList= bigdata:9092
logAgent.sinks.kafkaSink.batchSize= 10
#Specify the channel the sink should use
logAgent.sinks.kafkaSink.channel = fileChannel
# Each channel's type is defined.
logAgent.channels.fileChannel.type = file
logAgent.channels.fileChannel.checkpointDir= /home/bigdata/apache-flume-1.7.0-bin/dataCheckpointDir_realtime
logAgent.channels.fileChannel.dataDirs= /home/bigdata/apache-flume-1.7.0-bin/dataDir_realtime
- Flume启动命令
start_flume_batch.sh:
#!/bin/bash
flume-ng agent --conf /home/bigdata/apache-flume-1.7.0-bin/conf --conf-file /home/bigdata/apache-flume-1.7.0-bin/conf/flume-conf-logAnalysis.properties --name logAgent -Dflume.root.logger=DEBUG,console -Dflume.monitoring.type=http -Dflume.monitoring.port=34545
start_flume_realtime.sh:
#!/bin/bash
flume-ng agent --conf /home/bigdata/apache-flume-1.7.0-bin/conf --conf-file /home/bigdata/apache-flume-1.7.0-bin/conf/flume-conf-logAnalysis-kafka.properties --name logAgent -Dflume.root.logger=DEBUG,console -Dflume.monitoring.type=http -Dflume.monitoring.port=34546
1.4 Sqoop
-
Sqoop: SQL-to-Hadoop
- 连接传统关系型数据和Hadoop的桥梁
- 利用MR加快数据传输速度
- 批处理方式进行数据传输
-
Sqoop优势:
- 高效、可控的利用资源
- 任务并发度,超时时间等
- 数据类型映射与转换
- 可自行进行,用户也可自定义
- 支持多种数据库
- MySQL、Oracle、Postgresql等
- 高效、可控的利用资源
Sqoop导入数据到HDFS的命令
user_dimension_sqoop.sh:
#!/bin/bash
sqoop import --connect jdbc:mysql://bigdata:3306/log --username root --password root --table user_dimension --driver com.mysql.jdbc.Driver --m 10 --target-dir /warehouse/user_dimension
brand_dimension_sqoop.sh:
#!/bin/bash
sqoop import --connect jdbc:mysql://bigdata:3306/log --username root --password root --table brand_dimension --driver com.mysql.jdbc.Driver --m 10 --target-dir /warehouse/brand_dimension
2 文件存储(HDFS)
2.1 HDFS架构
2.2 文件存储
-
数据分区
- 年/月/日形式
-
数据压缩
- 占用较少存储空间
3 HDFS优化小技巧
3.1 小文件优化
- 合并成大文件
- Sequence File
- Hadoop Archive
- 保存到key/value系统中
- HBase
- TFS(Tao Bao FileSystem)
3.2 压缩与归档
Flume
MR/Hive/Spark(MR压缩请参照我的博客Shuffle揭秘)
3.3 纠删码编码
通过引入纠删码,节省存储空间(节省一半空间)
Hadoop 3.0可用