欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flume 总结(二)flume概念

程序员文章站 2022-06-15 11:26:45
...

Flume 总结(二)flume概念

1. agent 代理

  1. flume作为分布式日志采集框架,需要从各种分布式集群中进行日志文件采集
  2. 这时候,flume就需要在各个节点上运行一个程序进行数据采集,这个程序就叫做agent。(可以理解flume就是一个抽水机系统,agent就是挂在各个池塘,河流中的一个一个抽水机,agent抽水之后,通过管道将水汇聚到一个地方)
  3. flume 采集系统就是由agent互相连接组合起来的,和抽水机一样,agent也可以互相连接起来,组合成一个负载的级联网络,就跟大家在生活遇到的一级水泵,二级水泵,三级水泵一样一样的。
    Flume 总结(二)flume概念
    Flume 总结(二)flume概念
  4. agent本身内部又是由source、channel、sink三个模块组成的。
  • souce可以看作是水泵的进水口,在这里可以对数据做清洗,过滤等操作。就跟水泵的进水口会有各种滤网是一样一样的。
  • channel,可以看做是水泵的中间仓,因为有时候进水口和出水口的水流速度不一样,这时候channel就像一个缓冲器一样,可以对数据流速做缓冲。所以channel可以看成是一个缓冲池,用来缓解数据流入和数据流出之间的速度差异的。极端情况下,还会使用kafka等来做缓存,避免数据流入和数据流出速度相差过大,一般是数据流出速度小于速度流入,例如做活动时,线上用户流量暴增,日志也暴增
  • sink,可以看成是出水口。

和水泵一样,一个agent中,可以有多个source进行数据采集,可以有多个channel进行数据缓冲,可以有多个sink进行数据流出。
因为这种灵活性,可以很好适应企业开发中各种日志采集业务需求。

2. event事件

  1. 因为日志数据本身会有各种来源,如socket、文件、mysql、kafka。数据来源多种多样,同样的数据形式也多种多样,那如何让这些数据能够很方便在agent内部的source、channel、sink甚至agent之间进行很方便的传输呢?
  2. event就应运而生,使用一个对象,将这些数据一份一份包装起来。
  • event中包含header和body,header就是一个hashmap,可以使用key value形式对这些数据添加附加信息。
  • 因为需要对数据做清洗、过滤、基本数据补齐、分发等操作,所以一定的附加信息可以很方便这些清洗过滤操作。
  • event中的body就是一个字节数组,没错,就跟hbase一样,使用字节数组可以模糊掉数据类型,不管存取,都由开发者自己定义,这样灵活性可以有很好的保证,并且字节数组是最基础的数据类型,可以支持做更多数据操作。
  • event的组成可以看出,这样是很方便做序列化操作的,也就方便event在agent内部的source、channel、sink模块之间传输,也方便在agent之间,agent和输出目的地之间进行数据传输。

3. transaction事务控制

  1. 一个公司最重要的资产之一就是数据,而现在互联网公司都是采取敏捷开发的方式。
  • 敏捷开发意味着最小功能开发,然后使用迭代方式对非核心功能进行逐步迭代,核心功能进行规划性迭代。在这个时候,由于互联网受众众多,如何确保自己开发出来的产品能够得到用户喜欢,使用用户行为日志分析就是一个很好的方式。
  • 在传统软件开发中,使用问卷调查、试用、内部使用等方式开发软件越来越不适应现有的互联网用户喜好。而且目前很多用户都是身心不一的代表着,嘴里说着不要,身体很诚实的现象很普遍。这时候采用行为日志分析就是一个非常好的方式,因为行为不会说谎,喜欢就是喜欢,不喜欢就是不喜欢,都会体现在用户行为日志中。
  1. 事务控制,顾名思义,事务控制可以达到ACID四个特征,这样一来,能够确保日志数据采集时,数据能够不丢失。当然,实际上flume的事务控制并不能像mysql一样做到很精准的数据不丢失也不重复,flume的事务控制考虑了性能,针对不同source,channel,sink类型,可以比较好保证数据不丢失,但无法保证数据不重复。
  2. flume的事务控制,体现在2个流程。数据从source到channel,数据从channel到sink。
    Flume 总结(二)flume概念
  1. 以TailDirSource举例(注意不是所有的source都实现了事务控制,如exec source就没有实现事务控制)。当数据从source中采集好一批,传输给channel之后,这时候source就会标记这个event batch批次的事务完成,等到同一批数据从channel传输到sink之后,一样的会有一个事务完成标记。如果因为各种原因,导致标记无法记录,这个批次的数据就会回滚。
  2. 实际tailDirSource,是会记录文件的读取偏移量,如果事务标记失败,则文件读取偏移量不会改变,下次会从这个偏移量继续读取。这样就可以保证就算事务操作失败了,但数据不会被丢失。
  3. 极端情况下,可以看出,如果数据刚好传输成功,但是在写标记的过程中(很短,可能就0.0001秒)的过程中,电脑死机了,这时候数据已经输出到数据目的地,但事务标记还没写好,文件读取偏移量还是这个批次的数据偏移量。那下次电脑重启,flume会从同样的文件偏移量开始读取数据,这时候就造成了数据重复的问题。
  4. 应对这种极端情况下的数据重复,在大数据场景下,可以自己手写Source进行自定义,做事务强控制,但这样会带来很大的性能损耗。另外一个思路就是不管,因为发生几率很小,而且在大数据处理流程中,数据去重可以在后面缓解做如将文件从hdfs录入到hbase或者hive中时,对数据进行去重。
  1. channel的transactionCapacity,注意这就是事务容量,也就是一次事务控制最大event数量,这个参数必须大于等于source和sink的batchSize,必须小于这个channel的capacity,也就是event总缓冲量大小。

4. interceptor拦截器

  1. 顾名思义,agent就跟水泵一样,这时候,如果水泵在比较多杂物的河流中,一般都会加一个滤网,用来对水做拦截,过滤。
  2. interceptor拦截器,就跟水泵滤网一样,可以对采集回来的数据做加工,可以给每个event打标记,header就是一个hashmap,里面可以添加各种key value。也可以对数据做清洗,event中body就是一份数据转换出来的字节数组。将其按照已知格式进行反序列化,就可以进行数据读取和处理,例如检查日志记录中的字段多余、缺失、数据损坏等等。
    注意,拦截器中不能进行数据去重,去重需要有完整的数据才能去重而不是一批数据,拦截器中一般是一条一条或者一批一批数据,不适合做去重这种处理。
  3. 拦截器案例代码
public class MyIntercept implements Interceptor {

    private String sourceFromType;

    public MyIntercept(String sourceFromType) {
        this.sourceFromType = sourceFromType;
    }

    @Override
    public void initialize() {
        // 初始化的代码
    }

    @Override
    public Event intercept(Event event) {

        String bodyStr = new String(event.getBody());

        try {
            // 在这里取出json数据中事件的时间戳,并加进去
            JSONObject jsonObject = JSON.parseObject(bodyStr);

            long timeStamp = jsonObject.getLong("timeStamp");

            event.getHeaders().put("timestamp", timeStamp + "");

            // 在这里判断事件中,有openid就加wx的header,有appId就加app
            if (jsonObject.containsKey("openid")) {
                event.getHeaders().put(sourceFromType, "wx");

                return event;
            } else if (jsonObject.containsKey("appId")) {
                event.getHeaders().put(sourceFromType, "app");

                return event;
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

        return event;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        for (Event event : list) {
            intercept(event);
        }

        return list;
    }

    @Override
    public void close() {
        // 结束前调用的代码
    }

    public static class MyInterceptBuilder implements Interceptor.Builder {

        private String sourceFromType;

        @Override
        public Interceptor build() {
            return new MyIntercept(sourceFromType);
        }

        @Override
        public void configure(Context context) {
            sourceFromType = context.getString("sourceFromType");
        }
    }
}
  1. 拦截器,就是实现一个trait,implements Interceptor
  2. 需要实现一个内部类,implements Interceptor.Builder
  3. 中间就是如何针对一个一个event进行数据拦截处理,至于一批一批的拦截,就for循环,调用一下单个event处理的方法即可。
# 定义名字 注意如果一个agent中有多个sources或者channels,多个sinks,使用空格间隔它们的名字
a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2

# source 
# app的去c1  wx的去c2
a1.sources.r1.channels = c1 c2
a1.sources.r1.type = TAILDIR
# 文件组
a1.sources.r1.filegroups = g1
a1.sources.r1.filegroups.g1 = /root/loggen/logdata/event.*
a1.sources.r1.fileHeader = false

# 拦截器,将数据打上时间戳、wx和app的标记
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.doit.intercept.MyIntercept$MyInterceptBuilder
a1.sources.r1.interceptors.i1.sourceFromType = sourceFromType

# 选择器
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = sourceFromType
a1.sources.r1.selector.mapping.app = c1
a1.sources.r1.selector.mapping.wx = c2
a1.sources.r1.selector.default = c2



# channel 这里有2个channel,分别承载来自不同标记的event数据
a1.channels.c1.type = memory
a1.channels.c1.capacity = 2000
a1.channels.c1.transactionCapacity = 1000

# 注意,一个channel的数据可以发给多个sink,这时候可以使用sink的group processor来做数据输出策略,如load balance,fail over等策略
a1.channels.c2.type = memory
a1.channels.c2.capacity = 2000
a1.channels.c2.transactionCapacity = 1000

# 这里有2个sink,注意一个sink只能从一个channel读取数据,但一个channel的数据可以分发给多个sink
a1.sinks.k1.channel = c1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://doitedu01:8020/loggendata_app/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = app-log-
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.rollSize = 268435456
a1.sinks.k1.hdfs.rollInterval = 120
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.batchSize = 1000
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.useLocalTimeStamp = false


a1.sinks.k2.channel = c2
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = hdfs://doitedu01:8020/loggendata_wx/%Y-%m-%d
a1.sinks.k2.hdfs.filePrefix = wx-log-
a1.sinks.k2.hdfs.fileSuffix = .log
a1.sinks.k2.hdfs.rollSize = 268435456
a1.sinks.k2.hdfs.rollInterval = 120
a1.sinks.k2.hdfs.rollCount = 0
a1.sinks.k2.hdfs.batchSize = 1000
a1.sinks.k2.hdfs.fileType = DataStream 
a1.sinks.k2.hdfs.useLocalTimeStamp = false

# 3. 启动flume agent
bin/flume-ng agent -c ./conf -f ./agentconf/test1.properties -n a1 -Dflume.root.logger=DEBUG,console