欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

flume采集日志到HDFS中再导入到hive表中

程序员文章站 2022-06-15 10:22:04
...

flume介绍

Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application

简单来说是一个分布式的日志采集系统。简单易用,高容错

这次使用的是spooldir source,kafkachannnel channel, hdfs sink.,以生产方式为列使用两个fluem。
第一个flume 把数据推到kafkachannel
第二个flume把数据从kafkachannel落到hdfs中

flume1

a1.sources = r1
a1.sinks = k1 
a1.channels = c1 

# spooldir source
a1.sources.r1.type = spooldir
a1.sources.r1.channels = c1
a1.sources.r1.spoolDir = /home/test10
#a1.sources.r1.fileHeader = true
# interceptor 拦截timestamp,简单过滤数据
a1.sources.r1.interceptors=i1 i2
a1.sources.r1.interceptors.i1.type=regex_filter
a1.sources.r1.interceptors.i1.regex=(.*)installed(.*)
a1.sources.r1.interceptors.i2.type = regex_extractor
a1.sources.r1.interceptors.i2.regex = ^(?:\\n)?(\\d\\d\\d\\d-\\d\\d-\\d\\d\\s\\d\\d:\\d\\d:\\d\\d)
a1.sources.r1.interceptors.i2.serializers = s1
a1.sources.r1.interceptors.i2.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
a1.sources.r1.interceptors.i2.serializers.s1.name = timestamp
a1.sources.r1.interceptors.i2.serializers.s1.pattern = yyyy-MM-dd HH:mm:ss

# kafka memeory
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = localhost:9092
a1.channels.c1.kafka.topic = top
a1.channels.c1.kafka.consumer.group.id = top-consumer
# 老版本使用具体请看官网
#a1.channels.c1.brokerList = localhost:9092
#a1.channels.c1.zookeeperConnect = localhost:2181
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c1.parseAsFlumeEvent = true

flume2

a1.sources = r1
a1.sinks = k1 
a1.channels = c1 


# kafka memeory
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = localhost:9092
a1.channels.c1.kafka.topic = top
a1.channels.c1.kafka.consumer.group.id = top-consumer
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c1.parseAsFlumeEvent = true

# sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/xzc/logss8/ds=%Y%m%d
a1.sinks.k1.hdfs.fileType = DataStream
#a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.batchSize = 10
a1.sinks.k1.channel = c1    
启动flume1

$ flume-ng agent –conf conf –conf-file fllume1 –name a1 -Dflume.root.logger=INFO.console

启动flume2

$ flume-ng agent –conf conf –conf-file fllume1 –name a1 -Dflume.root.logger=INFO.console

然后添加数据进去


落到hdfs里的部分结果

flume采集日志到HDFS中再导入到hive表中


导入HIve表

  • 首先建立hive的外部表

    create external table if not exists soft(
    time string,
    status string,
    version string
    )
    partitioned by (ds string)
    row format delimited fields terminated by ’ ’
    lines terminated by ‘\n’
    stored as textfile
    location ‘/user/xzc/logss8/’;

  • 导入数据,partition必须跟文件目录保持一致

    & alter table soft add partition (ds = ‘20180806’)

  • 查询数据

    $ select * from soft

flume采集日志到HDFS中再导入到hive表中

最后给出部分测试数据

2018-08-04 12:17:01 status,half-configured,debconf:all 1.5.66
2018-08-04 12:17:01 status,unpacked,debconf:all 1.5.66
2018-08-04 12:17:01 status,half-installed,debconf:all 1.5.66
2018-08-04 12:17:01 status,half-installed,debconf:all 1.5.66
2018-08-04 12:17:01 status,unpacked,debconf:all 1.5.66
2018-08-04 12:17:01 status,unpacked,debconf:all 1.5.66
2018-08-04 12:17:01 install,debianutils:amd64,<none> 4.8.4
2018-08-04 12:17:01 status,half-installed,debianutils:amd64 4.8.4
2018-08-04 12:17:01 status,unpacked,debianutils:amd64 4.8.4
2018-08-04 12:17:01 status,unpacked,debianutils:amd64 4.8.4
2018-08-04 12:17:01 install,diffutils:amd64,<none> 1:3.6-1
2018-08-04 12:17:01 status,half-installed,diffutils:amd64 1:3.6-1
2018-08-04 12:17:01 status,unpacked,diffutils:amd64 1:3.6-1
2018-08-04 12:17:01 status,unpacked,diffutils:amd64 1:3.6-1
2018-08-04 12:17:01 upgrade,dpkg:amd64,1.19.0.5ubuntu2 1.19.0.5ubuntu2
2018-08-04 12:17:01 status,half-configured,dpkg:amd64 1.19.0.5ubuntu2
2018-08-04 12:17:01 status,unpacked,dpkg:amd64 1.19.0.5ubuntu2
2018-08-04 12:17:01 status,half-installed,dpkg:amd64 1.19.0.5ubuntu2
2018-08-04 12:17:01 status,half-installed,dpkg:amd64 1.19.0.5ubuntu2
2018-08-04 12:17:01 status,unpacked,dpkg:amd64 1.19.0.5ubuntu2
2018-08-04 12:17:01 status,unpacked,dpkg:amd64 1.19.0.5ubuntu2
2018-08-04 12:17:01 install,e2fsprogs:amd64,<none> 1.44.1-1
2018-08-04 12:17:01 status,half-installed,e2fsprogs:amd64 1.44.1-1
2018-08-04 12:17:01 status,unpacked,e2fsprogs:amd64 1.44.1-1