欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  科技

flume1.7TailDirsource重复获取数据集不释放资源解决办法

程序员文章站 2022-06-18 16:16:54
flume1.7TailDirsource重复获取数据集不释放资源解决办法。 背景:银行日志生产方式一般有两种 1)按大小切分:xxx.logxxx.log1 xxx.log2...

flume1.7TailDirsource重复获取数据集不释放资源解决办法。

背景:银行日志生产方式一般有两种

1)按大小切分:xxx.logxxx.log1 xxx.log2,及最新日志写入.log,原来的.log mv为.log1,.log1 mv为.log2,依次类推,每个日志固定大小(10M、50M之类)。

2)按天切分:xxx.log xxx.log-20171224(xxx.log-日期),最新日志写入.log,后面的按照日期备份,基本为每天一个日志。

目前缺陷:flume1.7增加了TAILDIR source,可以用于tail日志一个目录下的多个文件。但是 1)、出现数据重复(应用场景是用flume抓取应用日志,根据应用日志实时算交易量,所以会导致交易量增加),如果文件a.log达到定大小后,会被重命名为a.log.1,新建一个a.log,那么此时a.log和a.log.1的inode一样,文件名不一样,默认的flume taildir会认为这是都是一个新文件,就会重新读一遍,所以导致数据重复;2)、不释放资源,如果a.log.11,a.log.12之前被flume监控过,现在把这两个文件删除了,flume不停止,资源不会被释放。

修改思路:文件如果更名后文件没有关闭,先关闭原文件的句柄,之后判断文件是否更新,若更新,重新打开文件;没有更新,只修改文件路径。

代码修改:taildir 包下 ReliableTaildirEventReader类的ReliableTaildirEventReader方法修改,

原代码为:

if(tf==null||!tf.getPath().equals(f.getAbsolutePath())){

long startPos=skipToEnd?f.length():0;

tf=openFile(f,headers,inode,startPos);

}else{…}

修改为:

if (tf == null || !tf.getPath().equals(f.getAbsolutePath())) {

//自定义代码开始

long startPos;

if(tf!=null){

boolean updated = tf.getLastUpdated() < f.lastModified();

if(tf.getRaf()!=null){//得到文件句柄

tf.close();

}

startPos = tf.getPos();

if(updated){ //更新

tf = openFile(f, headers, inode, startPos);

}else{//没更新

tf.updateFilePath(f.getAbsolutePath());

}

}else{

startPos = skipToEnd ? f.length() : 0;

tf = openFile(f, headers, inode, startPos);

}

//自定义代码结束

//源代码

} else {

然后将新代码打jar包,放入flume lib文件里,并在配置文件里面调用该source,测试结果,上面两个问题都得到了解决。