欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

大数据实训笔记Day04

程序员文章站 2024-01-15 19:32:10
...

Flume

Source

AVRO Source

  1. AVRO是Apache提供的一套序列化反序列化机制,AVRO的序列化机制能够跨平台跨语言的

  2. AVRO Source实际上是用于接收被AVRO序列化之后的数据,结合AVRO Sink可以实现多级、扇入以及扇出流动

  3. 配置案例

    1. 格式文件

      a1.sources = s1
      a1.channels = c1
      a1.sinks = k1
      
      # 配置AVRO Source
      a1.sources.s1.type = avro
      # 要监听的主机
      a1.sources.s1.bind = 0.0.0.0
      # 要监听的端口
      a1.sources.s1.port = 8090
      
      a1.channels.c1.type = memory
      
      a1.sinks.k1.type = logger
      
      a1.sources.s1.channels = c1
      a1.sinks.k1.channel = c1
      
    2. 启动命令

      flume-ng agent -n a1 -c $FLUME_HOME/conf -f avrosource.conf -Dflume.root.logger=INFO,console
      
    3. 利用AVRO客户端将文件序列化之后发送给Flume

      flume-ng avro-client -H hadoop -p 8090 -F a.txt
      

Exec Source

  1. Exec Source会监听指定的命令,会将这个命令的执行结果作为日志来进行收集

  2. 案例:监听文件,如果文件中新添了数据,自动收集文件中新添的数据

    1. 格式文件

      a1.sources = s1
      a1.channels = c1
      a1.sinks = k1
      
      # 配置Exec Source
      a1.sources.s1.type = exec
      # 指定监听的命令
      a1.sources.s1.command = tail -F /opt/a.txt
      # 指定命令类型
      a1.sources.s1.shell = /bin/bash -c
      
      a1.channels.c1.type = memory
      
      a1.sinks.k1.type = logger
      
      a1.sources.s1.channels = c1
      a1.sinks.k1.channel = c1
      
    2. 启动Flume

      flume-ng agent -n a1 -c $FLUME_HOME/conf -f execsource.conf -Dflume.root.logger=INFO,console
      

Sequence Generator Source

  1. Sequence Generator Source是一个序列产生器,会从0开始递增,递增到totalEvents

  2. totalEvents如果不指定,则默认是Long.MAX_VALUE,即263-1

  3. 案例

    1. 格式文件

      a1.sources = s1
      a1.channels = c1
      a1.sinks = k1
      
      # 配置Sequence Generator Source
      a1.sources.s1.type = seq
      # 指定递增的最大值
      a1.sources.s1.totalEvents = 1000
      
      a1.channels.c1.type = memory
      
      a1.sinks.k1.type = logger
      
      a1.sources.s1.channels = c1
      a1.sinks.k1.channel = c1
      
    2. 启动命令

      flume-ng agent -n a1 -c $FLUME_HOME/conf -f seqsource.conf -Dflume.root.logger=INFO,console
      

Spooling Directory Source

  1. Spooling Directory Source会监听指定的目录,如果指定的目录下产生了新的文件,那么会将这个新文件中的内容自动的进行按行收集

  2. 被收集完的文件会自动添加一个后缀.COMPLETED

  3. 案例

    1. 格式文件

      a1.sources = s1
      a1.channels = c1
      a1.sinks = k1
      
      # 配置Spooling Directory Source
      a1.sources.s1.type = spooldir
      # 要监听的目录
      a1.sources.s1.spoolDir = /opt/flumedata
      
      a1.channels.c1.type = memory
      
      a1.sinks.k1.type = logger
      
      a1.sources.s1.channels = c1
      a1.sinks.k1.channel = c1
      
    2. 启动命令

      flume-ng agent -n a1 -c $FLUME_HOME/conf -f spooldirsource.conf -Dflume.root.logger=INFO,console
      

HTTP Source

  1. HTTP Source监听HTTP请求,只能用于监听GET和POST请求。其中对于GET请求的监听只处于实验阶段,所以实际过程中只用这个Source来监听POST请求

  2. 案例

    1. 格式文件

      a1.sources = s1
      a1.channels = c1
      a1.sinks = k1
      
      # 配置HTTP Source
      a1.sources.s1.type = http
      # 要监听的端口
      a1.sources.s1.port = 8090
      
      a1.channels.c1.type = memory
      
      a1.sinks.k1.type = logger
      
      a1.sources.s1.channels = c1
      a1.sinks.k1.channel = c1
      
    2. 启动命令

      flume-ng agent -n a1 -c $FLUME_HOME/conf -f httpsource.conf -Dflume.root.logger=INFO,console
      
    3. 发送POST请求

      curl -X POST -d '[{"headers":{"date":"2021-07-14"},"body":"hello class"}]' http://hadoop:8090
      

Custom Source

  1. 在Flume中,如果Flume原生提供的Source不能适用指定场景,那么此时就可以考虑自定义Source

  2. 格式文件

    a1.sources = s1
    a1.channels = c1
    a1.sinks = k1
    
    # 配置自定义Source - 指定全路径名
    a1.sources.s1.type = cn.tedu.flume.source.AuthSource
    # 指定步长
    a1.sources.s1.step = 5
    # 指定终止范围
    a1.sources.s1.end = 100
    
    a1.channels.c1.type = memory
    
    a1.sinks.k1.type = logger
    
    a1.sources.s1.channels = c1
    a1.sinks.k1.channel = c1
    

Channel

Memory Channel

  1. Memory Channel将Source收集来的数据临时放到内存的队列中,因此这个Channel的读写速度相对较快但是不可靠

  2. 通过capacity属性来定义队列的容量。如果队列被放慢,那么后来的数据就会被阻塞。capactiy如果不指定则默认值为为100,实际开发中,一般会将这个值调节为300000~500000

  3. 属性transactionCapacity表示事务容量,实际上表示了每次发送或者接收的数据量。transactionCapacity默认也是100,实际开发过程中,会考虑将这个值调节为3000~5000

  4. 案例

    1. 格式文件

      a1.sources = s1
      a1.channels = c1
      a1.sinks = k1
      
      a1.sources.s1.type = netcat
      a1.sources.s1.bind = 0.0.0.0
      a1.sources.s1.port = 8090
      
      # 配置Memory Channel
      a1.channels.c1.type = memory
      # 指定容量
      a1.channels.c1.capacity = 100000
      # 指定数据的批量
      a1.channels.c1.transactionCapacity = 1000
      
      a1.sinks.k1.type = logger
      
      a1.sources.s1.channels = c1
      a1.sinks.k1.channel = c1
      
    2. 启动命令

      flume-ng agent -n a1 -c $FLUME_HOME/bin -f memory.conf -Dflume.root.logger=INFO,console
      
    3. 发送数据

      nc hadoop 8090
      

File Channel

  1. File Channel将Source收集来的数据以文件形式存储到本地磁盘上,所以这个Channel的读写速度慢但是可靠

  2. 在存储的时候,如果不指定,那么会放在~/.flume/file-channel/data路径下

  3. 案例

    1. 格式文件

      a1.sources = s1
      a1.channels = c1
      a1.sinks = k1
      
      a1.sources.s1.type = netcat
      a1.sources.s1.bind = 0.0.0.0
      a1.sources.s1.port = 8090
      
      # 配置File Channel
      a1.channels.c1.type = file
      # 指定在磁盘上的临时存储路径
      a1.channels.c1.dataDirs = /opt/flumedata
      
      a1.sinks.k1.type = logger
      
      a1.sources.s1.channels = c1
      a1.sinks.k1.channel = c1
      
    2. 启动命令

      flume-ng agent -n a1 -c $FLUME_HOME/bin -f filechannel.conf -Dflume.root.logger=INFO,console
      
    3. 发送数据

      nc hadoop 8090
      

JDBC Channel

  1. JDBC Channel是将Source收集的数据临时存储到数据库中,因为数据库存在索引的问题,所以理论上这个JDBC Channel的效率要高于File Channel但是低于Memory Channel
  2. JDBC Channel到目前为止仅仅支持Derby(微型数据库、单连接)。正因为采用的是Derby库,所以实际开发中不用这个JDBC Channel

Sink

HDFS Sink

  1. HDFS Sink将数据写到HDFS上。在写数据的时候,默认是每隔30s在HDFS上生成一个新的文件,那么这会导致HDFS上生成大量的小文件,所以实际过程中需要改变这个值

  2. HDFS Sink在讲数据写到HDFS上的时候,还需要考虑文件类型。Flume支持三种文件类型:SequenceFil(序列文件)、DataStream(文本文件)、CompressedStream(压缩文件)

  3. 案例

    1. 启动HDFS

      start-dfs.sh
      
    2. 格式文件

      a1.sources = s1
      a1.channels = c1
      a1.sinks = k1
      
      a1.sources.s1.type = netcat
      a1.sources.s1.bind = 0.0.0.0
      a1.sources.s1.port = 8090
      
      a1.channels.c1.type = memory
      a1.channels.c1.capacity = 10000
      a1.channels.c1.transactionCapacity = 100
      
      # 配置HDFS Sink
      a1.sinks.k1.type = hdfs
      # 指定在HDFS上的存储路径
      a1.sinks.k1.hdfs.path = hdfs://hadoop:9000/flumedata/test
      # 指定文件的滚动间隔时间
      a1.sinks.k1.hdfs.rollInterval = 3600
      # 指定文件的存储类型
      a1.sinks.k1.hdfs.fileType = DataStream
      
      a1.sources.s1.channels = c1
      a1.sinks.k1.channel = c1
      
    3. 启动命令

      flume-ng agent -n a1 -c $FLUME_HOME/bin -f hdfssink.conf -Dflume.root.logger=INFO,console
      
    4. 发送数据

      nc hadoop 8090
      

Logger Sink

  1. Logger Sink表示将数据打印到指定位置上,一般是控制台上

  2. 在打印的时候,为了防止过多的数据将控制台占满,所以默认打印body部分的数据不超过16个字节,可以通过maxBytesToLog来配置

  3. Logger Sink在打印数据的时候,对中文支持不好

  4. 案例

    1. 格式文件

      a1.sources = s1
      a1.channels = c1
      a1.sinks = k1
      
      a1.sources.s1.type = netcat
      a1.sources.s1.bind = 0.0.0.0
      a1.sources.s1.port = 8090
      
      a1.channels.c1.type = memory
      a1.channels.c1.capacity = 10000
      a1.channels.c1.transactionCapacity = 1000
      
      # 配置Logger Sink
      a1.sinks.k1.type = logger
      # 指定body部分打印的字节个数
      a1.sinks.k1.maxBytesToLog = 20
      
      a1.sources.s1.channels = c1
      a1.sinks.k1.channel = c1
      
    2. 启动命令

      flume-ng agent -n a1 -c $FLUME_HOME/bin -f loggersink.conf -Dflume.root.logger=INFO,console
      
    3. 发送数据

      nc hadoop 8090
      

File Roll Sink

  1. File Roll Sink将数据最终写到执行的目录下。在写的时候同样是每隔30s会生成一个小文件,所以实际过程中需要调节大小

  2. 案例

    1. 格式文件

      a1.sources = s1
      a1.channels = c1
      a1.sinks = k1
      
      a1.sources.s1.type = netcat
      a1.sources.s1.bind = 0.0.0.0
      a1.sources.s1.port = 8090
      
      a1.channels.c1.type = memory
      a1.channels.c1.capacity = 10000
      a1.channels.c1.transactionCapacity = 100
      
      # 配置File Roll Sink
      a1.sinks.k1.type = file_roll
      # 指定数据的存储目录
      a1.sinks.k1.sink.directory = /opt/flumedata
      # 指定文件滚动的间隔时间
      a1.sinks.k1.sink.rollInterval = 3600
      
      a1.sources.s1.channels = c1
      a1.sinks.k1.channel = c1
      
    2. 启动命令

      flume-ng agent -n a1 -c $FLUME_HOME/bin -f filerollsink.conf -Dflume.root.logger=INFO,console
      
    3. 发送数据

      nc hadoop 8090