欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

flume整合kafka

程序员文章站 2022-06-21 18:28:44
...

为什么要用flume对接kafka

生产环境增量数据多都是日志文件选用flume更好的达到实时监控
加入kafka的作用:可以对接多个业务线(将数据分类发送到不同topic)也可以动态增加业务线,不用增加备份(解耦)

案例一:用netcat,kafkasink达到简单对接

1.flume配置文件

a1.sources = r1
a1.sinks = k1
a1.channels = c1
#source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
#channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.bootstrap.servers = 192.168.56.20:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.ack = 1
a1.sinks.k1.kafka.producer.linger.ms = 1

#bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2.启动flume

flume-ng agent --conf-file ../confing/flume_kafka.conf --name a1

3.启动kafka消费者

./kafka-console-consumer.sh --bootstrap-server 192.168.56.20:9092 --topic first

4.启动nc

nc localhost 44444

flume整合kafka

案例二:nc+编写flume自定义拦截器处理数据,并发送到kafka不同topic

dependencies

<!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core -->
    <dependency>
      <groupId>org.apache.flume</groupId>
      <artifactId>flume-ng-core</artifactId>
      <version>1.6.0</version>
    </dependency>

1.flume

public class TypeInterceptor implements Interceptor {

    //2.1声明一个存放时间的集合
    private List<Event> addHeaderEvents;

    @Override
    public void initialize() {
        //2.2初始化
        addHeaderEvents = new ArrayList<>();
    }

    //1.单个事件拦截
    @Override
    public Event intercept(Event event) {
        //获取事件中头信息
        Map<String,String> header = event.getHeaders();
        //获取事件中的body信息
        String body = new String(event.getBody());
        //具体处理:根据body是否有hello来决定添加头信息
        if (body.contains("hello")){
            //添加头信息
            header.put("topic","first");
        }else {
            header.put("topic","second");
        }
        return event;
    }

    //2.批量事件拦截
    @Override
    public List<Event> intercept(List<Event> list) {
        //2.3将全局集合清空
        addHeaderEvents.clear();
        //2.4给每一个事件添加头信息
        for (Event event : list) {
            addHeaderEvents.add(intercept(event));
        }
        return addHeaderEvents;
    }

    @Override
    public void close() {

    }

    //flume配置文件Interceptor内部类
    public static class Builder implements Interceptor.Builder{
        //构建拦截器对象
        @Override
        public Interceptor build() {
            return new TypeInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

2.打包上传到linux本地flume的lib目录下
3.flume配置文件

a1.sources = r1
a1.sinks = k1
a1.channels = c1
#source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
#Interceptor
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atnjbdqn.flume_kafka.TypeInterceptor$Builder
#channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.bootstrap.servers = 192.168.56.20:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.ack = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
#bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

4.开启first,second两个消费者,开启flume
5.开启nc

flume整合kafka

相关标签: Big Data