欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

基于Flume做FTP文件实时同步到本地磁盘的windows服务。

程序员文章站 2022-06-04 07:49:52
需求:做一个windows服务,实现从ftp服务器实时下载或者更新文件到本地磁盘。 功能挺简单的。直接写个ftp工具类用定时器跑就能搞定,那我为什么不用呢? 别问,问就是我无聊啊,然后研究一下Flume打发时间。哈哈~ 一、Flume部分 Source组件和Sink组件用的都是第三方。 source ......

需求:做一个windows服务,实现从ftp服务器实时下载或者更新文件到本地磁盘。

功能挺简单的。直接写个ftp工具类用定时器跑就能搞定,那我为什么不用呢?

别问,问就是我无聊啊,然后研究一下flume打发时间。哈哈~

一、flume部分

source组件和sink组件用的都是第三方。

source组件:

sink组件用的谁的目前已经找不到了,网上搜到了一个升级版的。

sink组件:

因为一些个性化的需求,所以我对他们源代码做了些变动。

具体代码参考:https://gitee.com/syher/spring-boot-project/tree/master/spring-boot-flume

ftp-source组件的关键技术是apache ftpclient,而taildir-sink则用的randomaccessfile。

junit测试类我已经写好了,如果不想安装服务又有兴趣了解的朋友,可以自己改下配置跑一下看看。

基于Flume做FTP文件实时同步到本地磁盘的windows服务。
package com.syher.flume;

import com.google.common.collect.lists;
import com.urey.flume.sink.taildir.saferollingfilesink;
import org.apache.flume.*;
import org.apache.flume.channel.channelprocessor;
import org.apache.flume.channel.memorychannel;
import org.apache.flume.channel.replicatingchannelselector;
import org.apache.flume.conf.configurables;
import org.apache.flume.sink.defaultsinkprocessor;
import org.apache.flume.sink.rollingfilesink;
import org.apache.flume.source.pollablesourcerunner;
import org.junit.before;
import org.junit.test;
import org.junit.runner.runwith;
import org.keedio.flume.source.ftp.source.source;
import org.springframework.boot.test.context.springboottest;
import org.springframework.test.context.junit4.springrunner;

import java.util.arrays;
import java.util.hashmap;
import java.util.map;
import java.util.properties;

//@runwith(springrunner.class)
//@springboottest
public class springbootflumeapplicationtests {
    context defaultcontext = new context();

    @before
    public void init() throws exception {
        map<string, string> prop = new hashmap<>();
        prop.put("channel.capacity", "1000");
        prop.put("channel.transactioncapacity", "1000");

        prop.put("source.client.source", "ftp");
        prop.put("source.name.server", "192.168.1.150");
        prop.put("source.port", "21");
        prop.put("source.user", "username");
        prop.put("source.password", "secret");
        prop.put("source.working.directory", "/ftp/source");
        prop.put("source.filter.pattern", ".+\\.pdf");
       // prop.put("source.folder", "/ftp");
        prop.put("source.flushlines", "false");

        prop.put("sink.sink.directory", "g:/ftp/target/rolling");
        prop.put("sink.sink.movefile", "false");
        prop.put("sink.sink.targetdirectory", "g:/ftp/target/pdffiles");
        prop.put("sink.sink.usecopy", "true");
        prop.put("sink.sink.copydirectory", "g:/ftp/target/copy");
        prop.put("sink.sink.usefilesuffix", "false");
        prop.put("sink.sink.filesuffix", ".log");
        defaultcontext.putall(prop);
    }

    public memorychannel getchannel() {
        memorychannel channel = new memorychannel();
        channel.setname("channel");

        configure(channel, "channel.");
        return channel;
    }

    public source getsource(channel channel) {
        source source = new source();
        source.setname("source");

        channelselector selector = new replicatingchannelselector();
        selector.setchannels(lists.newarraylist(channel));

        channelprocessor processor = new channelprocessor(selector);
        source.setchannelprocessor(processor);

        configure(source, "source.");
        return source;
    }

    public sink getsink(channel channel) {
        saferollingfilesink sink = new saferollingfilesink();
        sink.setname("sink");

        sink.setchannel(channel);

        configure(sink, "sink.");
        return sink;
    }

    public void configure(object target, string prefixproperty) {
        context context = new context();
        context.putall(defaultcontext.getsubproperties(prefixproperty));
        configurables.configure(target, context);
    }

    @test
    public void contextloads() throws exception {
        memorychannel channel = getchannel();
        source source = getsource(channel);
        sink sink = getsink(channel);

        pollablesourcerunner sourcerunner = new pollablesourcerunner();
        sourcerunner.setsource(source);
        channel.start();
        sourcerunner.start();

        sinkprocessor sinkprocessor = new defaultsinkprocessor();
        sinkprocessor.setsinks(arrays.<sink>aslist(sink));
        sinkrunner sinkrunner = new sinkrunner(sinkprocessor);

        channel.start();
        sourcerunner.start();
        sinkrunner.start();

        while (!thread.interrupted()) {
            thread.sleep(200);
        }
    }

}
view code

 

二、jsw服务部分

用的java service wrapper把java程序做成了windows服务。

工具包已经上传在我上面提到的gitee码云项目上。flume-wrapper.zip。

解压后在conf目录可以看到两个配置文件。一个是flume的,一个是jsw的。

bin目录里面是一些装卸启停的批命令。

lib目录里面有项目运行依赖的jar包。

lib.d目录没啥用,是我备份了从flume拷出来的一些无用的jar包。可删。

具体的配置和用法可以看压缩包里的使用说明文档。

注意,jsw的logfile的日志级别最好指定error级别的,不然听说、可能会造成内存不足。

基于Flume做FTP文件实时同步到本地磁盘的windows服务。

 

三、采集结果

基于Flume做FTP文件实时同步到本地磁盘的windows服务。

 可以看到,采集效率还是很稳的。一分钟不到就搞定了。