flume1.7TailDirsource重复获取数据集不释放资源解决办法
flume1.7TailDirsource重复获取数据集不释放资源解决办法。
背景:银行日志生产方式一般有两种
1)按大小切分:xxx.logxxx.log1 xxx.log2,及最新日志写入.log,原来的.log mv为.log1,.log1 mv为.log2,依次类推,每个日志固定大小(10M、50M之类)。
2)按天切分:xxx.log xxx.log-20171224(xxx.log-日期),最新日志写入.log,后面的按照日期备份,基本为每天一个日志。
目前缺陷:flume1.7增加了TAILDIR source,可以用于tail日志一个目录下的多个文件。但是 1)、出现数据重复(应用场景是用flume抓取应用日志,根据应用日志实时算交易量,所以会导致交易量增加),如果文件a.log达到定大小后,会被重命名为a.log.1,新建一个a.log,那么此时a.log和a.log.1的inode一样,文件名不一样,默认的flume taildir会认为这是都是一个新文件,就会重新读一遍,所以导致数据重复;2)、不释放资源,如果a.log.11,a.log.12之前被flume监控过,现在把这两个文件删除了,flume不停止,资源不会被释放。
修改思路:文件如果更名后文件没有关闭,先关闭原文件的句柄,之后判断文件是否更新,若更新,重新打开文件;没有更新,只修改文件路径。
代码修改:taildir 包下 ReliableTaildirEventReader类的ReliableTaildirEventReader方法修改,
原代码为:
if(tf==null||!tf.getPath().equals(f.getAbsolutePath())){
long startPos=skipToEnd?f.length():0;
tf=openFile(f,headers,inode,startPos);
}else{…}
修改为:
if (tf == null || !tf.getPath().equals(f.getAbsolutePath())) {
//自定义代码开始
long startPos;
if(tf!=null){
boolean updated = tf.getLastUpdated() < f.lastModified();
if(tf.getRaf()!=null){//得到文件句柄
tf.close();
}
startPos = tf.getPos();
if(updated){ //更新
tf = openFile(f, headers, inode, startPos);
}else{//没更新
tf.updateFilePath(f.getAbsolutePath());
}
}else{
startPos = skipToEnd ? f.length() : 0;
tf = openFile(f, headers, inode, startPos);
}
//自定义代码结束
//源代码
} else {
然后将新代码打jar包,放入flume lib文件里,并在配置文件里面调用该source,测试结果,上面两个问题都得到了解决。
上一篇: 用python实现九九乘法表实例