基于Flume做FTP文件实时同步到本地磁盘的windows服务。
需求:做一个windows服务,实现从ftp服务器实时下载或者更新文件到本地磁盘。
功能挺简单的。直接写个ftp工具类用定时器跑就能搞定,那我为什么不用呢?
别问,问就是我无聊啊,然后研究一下flume打发时间。哈哈~
一、flume部分
source组件和sink组件用的都是第三方。
source组件:
sink组件用的谁的目前已经找不到了,网上搜到了一个升级版的。
sink组件:
因为一些个性化的需求,所以我对他们源代码做了些变动。
具体代码参考:https://gitee.com/syher/spring-boot-project/tree/master/spring-boot-flume
ftp-source组件的关键技术是apache ftpclient,而taildir-sink则用的randomaccessfile。
junit测试类我已经写好了,如果不想安装服务又有兴趣了解的朋友,可以自己改下配置跑一下看看。
package com.syher.flume; import com.google.common.collect.lists; import com.urey.flume.sink.taildir.saferollingfilesink; import org.apache.flume.*; import org.apache.flume.channel.channelprocessor; import org.apache.flume.channel.memorychannel; import org.apache.flume.channel.replicatingchannelselector; import org.apache.flume.conf.configurables; import org.apache.flume.sink.defaultsinkprocessor; import org.apache.flume.sink.rollingfilesink; import org.apache.flume.source.pollablesourcerunner; import org.junit.before; import org.junit.test; import org.junit.runner.runwith; import org.keedio.flume.source.ftp.source.source; import org.springframework.boot.test.context.springboottest; import org.springframework.test.context.junit4.springrunner; import java.util.arrays; import java.util.hashmap; import java.util.map; import java.util.properties; //@runwith(springrunner.class) //@springboottest public class springbootflumeapplicationtests { context defaultcontext = new context(); @before public void init() throws exception { map<string, string> prop = new hashmap<>(); prop.put("channel.capacity", "1000"); prop.put("channel.transactioncapacity", "1000"); prop.put("source.client.source", "ftp"); prop.put("source.name.server", "192.168.1.150"); prop.put("source.port", "21"); prop.put("source.user", "username"); prop.put("source.password", "secret"); prop.put("source.working.directory", "/ftp/source"); prop.put("source.filter.pattern", ".+\\.pdf"); // prop.put("source.folder", "/ftp"); prop.put("source.flushlines", "false"); prop.put("sink.sink.directory", "g:/ftp/target/rolling"); prop.put("sink.sink.movefile", "false"); prop.put("sink.sink.targetdirectory", "g:/ftp/target/pdffiles"); prop.put("sink.sink.usecopy", "true"); prop.put("sink.sink.copydirectory", "g:/ftp/target/copy"); prop.put("sink.sink.usefilesuffix", "false"); prop.put("sink.sink.filesuffix", ".log"); defaultcontext.putall(prop); } public memorychannel getchannel() { memorychannel channel = new memorychannel(); channel.setname("channel"); configure(channel, "channel."); return channel; } public source getsource(channel channel) { source source = new source(); source.setname("source"); channelselector selector = new replicatingchannelselector(); selector.setchannels(lists.newarraylist(channel)); channelprocessor processor = new channelprocessor(selector); source.setchannelprocessor(processor); configure(source, "source."); return source; } public sink getsink(channel channel) { saferollingfilesink sink = new saferollingfilesink(); sink.setname("sink"); sink.setchannel(channel); configure(sink, "sink."); return sink; } public void configure(object target, string prefixproperty) { context context = new context(); context.putall(defaultcontext.getsubproperties(prefixproperty)); configurables.configure(target, context); } @test public void contextloads() throws exception { memorychannel channel = getchannel(); source source = getsource(channel); sink sink = getsink(channel); pollablesourcerunner sourcerunner = new pollablesourcerunner(); sourcerunner.setsource(source); channel.start(); sourcerunner.start(); sinkprocessor sinkprocessor = new defaultsinkprocessor(); sinkprocessor.setsinks(arrays.<sink>aslist(sink)); sinkrunner sinkrunner = new sinkrunner(sinkprocessor); channel.start(); sourcerunner.start(); sinkrunner.start(); while (!thread.interrupted()) { thread.sleep(200); } } }
二、jsw服务部分
用的java service wrapper把java程序做成了windows服务。
工具包已经上传在我上面提到的gitee码云项目上。flume-wrapper.zip。
解压后在conf目录可以看到两个配置文件。一个是flume的,一个是jsw的。
bin目录里面是一些装卸启停的批命令。
lib目录里面有项目运行依赖的jar包。
lib.d目录没啥用,是我备份了从flume拷出来的一些无用的jar包。可删。
具体的配置和用法可以看压缩包里的使用说明文档。
注意,jsw的logfile的日志级别最好指定error级别的,不然听说、可能会造成内存不足。
三、采集结果
可以看到,采集效率还是很稳的。一分钟不到就搞定了。
上一篇: InterLocked学习笔记
下一篇: JSP中脚本、声明和表达式的本质区别