flume整合kafka
程序员文章站
2022-06-21 18:28:44
...
为什么要用flume对接kafka
生产环境增量数据多都是日志文件选用flume更好的达到实时监控
加入kafka的作用:可以对接多个业务线(将数据分类发送到不同topic)也可以动态增加业务线,不用增加备份(解耦)
案例一:用netcat,kafkasink达到简单对接
1.flume配置文件
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
#channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.bootstrap.servers = 192.168.56.20:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.ack = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
#bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2.启动flume
flume-ng agent --conf-file ../confing/flume_kafka.conf --name a1
3.启动kafka消费者
./kafka-console-consumer.sh --bootstrap-server 192.168.56.20:9092 --topic first
4.启动nc
nc localhost 44444
案例二:nc+编写flume自定义拦截器处理数据,并发送到kafka不同topic
dependencies
<!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core -->
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.6.0</version>
</dependency>
1.flume
public class TypeInterceptor implements Interceptor {
//2.1声明一个存放时间的集合
private List<Event> addHeaderEvents;
@Override
public void initialize() {
//2.2初始化
addHeaderEvents = new ArrayList<>();
}
//1.单个事件拦截
@Override
public Event intercept(Event event) {
//获取事件中头信息
Map<String,String> header = event.getHeaders();
//获取事件中的body信息
String body = new String(event.getBody());
//具体处理:根据body是否有hello来决定添加头信息
if (body.contains("hello")){
//添加头信息
header.put("topic","first");
}else {
header.put("topic","second");
}
return event;
}
//2.批量事件拦截
@Override
public List<Event> intercept(List<Event> list) {
//2.3将全局集合清空
addHeaderEvents.clear();
//2.4给每一个事件添加头信息
for (Event event : list) {
addHeaderEvents.add(intercept(event));
}
return addHeaderEvents;
}
@Override
public void close() {
}
//flume配置文件Interceptor内部类
public static class Builder implements Interceptor.Builder{
//构建拦截器对象
@Override
public Interceptor build() {
return new TypeInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
2.打包上传到linux本地flume的lib目录下
3.flume配置文件
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
#Interceptor
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atnjbdqn.flume_kafka.TypeInterceptor$Builder
#channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.bootstrap.servers = 192.168.56.20:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.ack = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
#bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
4.开启first,second两个消费者,开启flume
5.开启nc
上一篇: 一些调侃男女恋爱笑话。
下一篇: 我的垃圾男友,气死我了.