欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Spring Boot整合FTPClient线程池的实现示例

程序员文章站 2024-03-02 21:15:58
最近在写一个ftp上传工具,用到了apache的ftpclient,但是每个线程频繁的创建和销毁ftpclient对象对服务器的压力很大,因此,此处最好使用一个ftpcli...

最近在写一个ftp上传工具,用到了apache的ftpclient,但是每个线程频繁的创建和销毁ftpclient对象对服务器的压力很大,因此,此处最好使用一个ftpclient连接池。仔细翻了一下apache的api,发现它并没有一个ftpclientpool的实现,所以,不得不自己写一个ftpclientpool。下面就大体介绍一下开发连接池的整个过程,供大家参考。

我们可以利用apache提供的common-pool包来协助我们开发连接池。而开发一个简单的对象池,仅需要实现common-pool 包中的objectpool和poolableobjectfactory两个接口即可。

线程池的意义

为了减少频繁创建、销毁对象带来的性能消耗,我们可以利用对象池的技术来实现对象的复用。对象池提供了一种机制,它可以管理对象池中对象的生命周期,提供了获取和释放对象的方法,可以让客户端很方便的使用对象池中的对象。

pom引入依赖

 <!-- ftpclient依赖包-->
    <dependency>
      <groupid>commons-net</groupid>
      <artifactid>commons-net</artifactid>
      <version>3.5</version>
    </dependency>

    <!-- 线程池-->
    <dependency>
      <groupid>commons-pool</groupid>
      <artifactid>commons-pool</artifactid>
      <version>1.6</version>
    </dependency>

    <dependency>
      <groupid>org.apache.commons</groupid>
      <artifactid>commons-pool2</artifactid>
      <version>2.0</version>
    </dependency>

创建ftp配置信息

在resources目录下创建ftp.properties配置文件,目录结构如下:

Spring Boot整合FTPClient线程池的实现示例

添加如下的配置信息:

########### ftp用户名称 ###########
ftp.username=hrabbit
########### ftp用户密码 ###########
ftp.password=123456
########### ftp主机ip ###########
ftp.host=127.0.0.1
########### ftp主机端口号 ###########
ftp.port=21
########### 保存根路径 ###########
ftp.baseurl=/

创建ftpproperties.java配置文件

加载配置内容到spring中,配置信息基本延用我的就可以。

/**
 * ftp的配置信息
 * @auther: hrabbit
 * @date: 2018-12-03 2:06 pm
 * @description:
 */
@data
@component
@propertysource("classpath:ftp.properties")
@configurationproperties(prefix = "ftp")
public class ftpproperties {

  private string username;

  private string password;

  private string host;

  private integer port;

  private string baseurl;

  private integer passivemode = ftp.binary_file_type;

  private string encoding="utf-8";

  private int clienttimeout=120000;

  private int buffersize;

  private int transferfiletype=ftp.binary_file_type;

  private boolean renameuploaded;

  private int retrytime;
}

创建ftpclientpool线程池

/**
 * 自定义实现ftp连接池
 * @auther: hrabbit
 * @date: 2018-12-03 3:40 pm
 * @description:
 */
@slf4j
@suppresswarnings("all")
public class ftpclientpool implements objectpool<ftpclient> {

  private static final int default_pool_size = 10;

  public blockingqueue<ftpclient> blockingqueue;

  private ftpclientfactory factory;

  public ftpclientpool(ftpclientfactory factory) throws exception {
    this(default_pool_size, factory);
  }

  public ftpclientpool(int poolsize, ftpclientfactory factory) throws exception {
    this.factory = factory;
    this.blockingqueue = new arrayblockingqueue<ftpclient>(poolsize);
    initpool(poolsize);
  }

  /**
   * 初始化连接池
   * @param maxpoolsize
   *         最大连接数
   * @throws exception
   */
  private void initpool(int maxpoolsize) throws exception {
    int count = 0;
    while(count < maxpoolsize) {
      this.addobject();
      count++;
    }
  }

  /**
   * 从连接池中获取对象
   */
  @override
  public ftpclient borrowobject() throws exception {
    ftpclient client = blockingqueue.take();
    if(client == null) {
      client = factory.makeobject();
    } else if(!factory.validateobject(client)) {
      invalidateobject(client);
      client = factory.makeobject();
    }
    return client;
  }

  /**
   * 返还一个对象(链接)
   */
  @override
  public void returnobject(ftpclient client) throws exception {
    if ((client != null) && !blockingqueue.offer(client,2,timeunit.minutes)) {
      try {
        factory.destroyobject(client);
      } catch (exception e) {
        throw e;
      }
    }
  }

  /**
   * 移除无效的对象(ftp客户端)
   */
  @override
  public void invalidateobject(ftpclient client) throws exception {
    blockingqueue.remove(client);
  }

  /**
   * 增加一个新的链接,超时失效
   */
  @override
  public void addobject() throws exception {
    blockingqueue.offer(factory.makeobject(), 2, timeunit.minutes);
  }

  /**
   * 重新连接
   */
  public ftpclient reconnect() throws exception {
    return factory.makeobject();
  }

  /**
   * 获取空闲链接数(这里暂不实现)
   */
  @override
  public int getnumidle() {
    return blockingqueue.size();
  }

  /**
   * 获取正在被使用的链接数
   */
  @override
  public int getnumactive() {
    return default_pool_size - getnumidle();
  }

  @override
  public void clear() throws exception {

  }

  /**
   * 关闭连接池
   */
  @override
  public void close() {
    try {
      while(blockingqueue.iterator().hasnext()) {
        ftpclient client = blockingqueue.take();
        factory.destroyobject(client);
      }
    } catch(exception e) {
      log.error("close ftp client pool failed...{}", e);
    }
  }

  /**
   * 增加一个新的链接,超时失效
   */
  public void addobject(ftpclient ftpclient) throws exception {
    blockingqueue.put(ftpclient);
  }
}

创建一个ftpclientfactory工厂类

创建ftpclientfactory实现poolableobjectfactory的接口,ftpclient工厂类,通过ftpclient工厂提供ftpclient实例的创建和销毁

/**
 * ftpclient 工厂
 * @auther: hrabbit
 * @date: 2018-12-03 3:41 pm
 * @description:
 */
@slf4j
@suppresswarnings("all")
public class ftpclientfactory implements poolableobjectfactory<ftpclient> {

  private ftpproperties ftpproperties;

  public ftpclientfactory(ftpproperties ftpproperties) {
    this.ftpproperties = ftpproperties;
  }

  @override
  public ftpclient makeobject() throws exception {
    ftpclient ftpclient = new ftpclient();
    ftpclient.setcontrolencoding(ftpproperties.getencoding());
    ftpclient.setconnecttimeout(ftpproperties.getclienttimeout());
    try {
      ftpclient.connect(ftpproperties.gethost(), ftpproperties.getport());
      int reply = ftpclient.getreplycode();
      if (!ftpreply.ispositivecompletion(reply)) {
        ftpclient.disconnect();
        log.warn("ftpserver refused connection");
        return null;
      }
      boolean result = ftpclient.login(ftpproperties.getusername(), ftpproperties.getpassword());
      ftpclient.setfiletype(ftpproperties.gettransferfiletype());
      if (!result) {
        log.warn("ftpclient login failed... username is {}", ftpproperties.getusername());
      }
    } catch (exception e) {
      log.error("create ftp connection failed...{}", e);
      throw e;
    }

    return ftpclient;
  }

  @override
  public void destroyobject(ftpclient ftpclient) throws exception {
    try {
      if(ftpclient != null && ftpclient.isconnected()) {
        ftpclient.logout();
      }
    } catch (exception e) {
      log.error("ftp client logout failed...{}", e);
      throw e;
    } finally {
      if(ftpclient != null) {
        ftpclient.disconnect();
      }
    }

  }

  @override
  public boolean validateobject(ftpclient ftpclient) {
    try {
      return ftpclient.sendnoop();
    } catch (exception e) {
      log.error("failed to validate client: {}");
    }
    return false;
  }

  @override
  public void activateobject(ftpclient obj) throws exception {
    //do nothing

  }

  @override
  public void passivateobject(ftpclient obj) throws exception {
    //do nothing

  }
}

创建ftputils.java的工具类

ftputils.java中封装了上传、下载等方法,在项目启动的时候,在@postconstruct注解的作用下通过执行init()的方法,创建ftpclientfactory工厂中,并初始化了ftpclientpool线程池,这样每次调用方法的时候,都直接从ftpclientpool中取出一个ftpclient对象

/**
 * @auther: hrabbit
 * @date: 2018-12-03 3:47 pm
 * @description:
 */
@slf4j
@component
public class ftputils {

  /**
   * ftp的连接池
   */
  @autowired
  public static ftpclientpool ftpclientpool;
  /**
   * ftpclient对象
   */
  public static ftpclient ftpclient;


  private static ftputils ftputils;

  @autowired
  private ftpproperties ftpproperties;

  /**
   * 初始化设置
   * @return
   */
  @postconstruct
  public boolean init() {
    ftpclientfactory factory = new ftpclientfactory(ftpproperties);
    ftputils = this;
    try {
      ftpclientpool = new ftpclientpool(factory);
    } catch (exception e) {
      e.printstacktrace();
      return false;
    }
    return true;
  }


  /**
   * 获取连接对象
   * @return
   * @throws exception
   */
  public static ftpclient getftpclient() throws exception {
    //初始化的时候从队列中取出一个连接
    if (ftpclient==null) {
      synchronized (ftpclientpool) {
        ftpclient = ftpclientpool.borrowobject();
      }
    }
    return ftpclient;
  }


  /**
   * 当前命令执行完成命令完成
   * @throws ioexception
   */
  public void complete() throws ioexception {
    ftpclient.completependingcommand();
  }

  /**
   * 当前线程任务处理完成,加入到队列的最后
   * @return
   */
  public void disconnect() throws exception {
    ftpclientpool.addobject(ftpclient);
  }

  /**
   * description: 向ftp服务器上传文件
   *
   * @version1.0
   * @param remotefile
   *      上传到ftp服务器上的文件名
   * @param input
   *      本地文件流
   * @return 成功返回true,否则返回false
   */
  public static boolean uploadfile(string remotefile, inputstream input) {
    boolean result = false;
    try {
      getftpclient();
      ftpclient.enterlocalpassivemode();
      result = ftpclient.storefile(remotefile, input);
      input.close();
      ftpclient.disconnect();
    } catch (exception e) {
      e.printstacktrace();
    }
    return result;
  }

  /**
   * description: 向ftp服务器上传文件
   *
   * @version1.0
   * @param remotefile
   *      上传到ftp服务器上的文件名
   * @param localfile
   *      本地文件
   * @return 成功返回true,否则返回false
   */
  public static boolean uploadfile(string remotefile, string localfile){
    fileinputstream input = null;
    try {
      input = new fileinputstream(new file(localfile));
    } catch (filenotfoundexception e) {
      e.printstacktrace();
    }
    return uploadfile(remotefile, input);
  }

  /**
   * 拷贝文件
   * @param fromfile
   * @param tofile
   * @return
   * @throws exception
   */
  public boolean copyfile(string fromfile, string tofile) throws exception {
    inputstream in=getfileinputstream(fromfile);
    getftpclient();
    boolean flag = ftpclient.storefile(tofile, in);
    in.close();
    return flag;
  }

  /**
   * 获取文件输入流
   * @param filename
   * @return
   * @throws ioexception
   */
  public static inputstream getfileinputstream(string filename) throws exception {
    bytearrayoutputstream fos=new bytearrayoutputstream();
    getftpclient();
    ftpclient.retrievefile(filename, fos);
    bytearrayinputstream in=new bytearrayinputstream(fos.tobytearray());
    fos.close();
    return in;
  }

  /**
   * description: 从ftp服务器下载文件
   *
   * @version1.0
   * @return
   */
  public static boolean downfile(string remotefile, string localfile){
    boolean result = false;
    try {
      getftpclient();
      outputstream os = new fileoutputstream(localfile);
      ftpclient.retrievefile(remotefile, os);
      ftpclient.logout();
      ftpclient.disconnect();
      result = true;
    } catch (exception e) {
      e.printstacktrace();
    } finally {
      try {
      } catch (exception e) {
        e.printstacktrace();
      }
    }
    return result;
  }

  /**
   * 从ftp中获取文件流
   * @param filepath
   * @return
   * @throws exception
   */
  public static inputstream getinputstream(string filepath) throws exception {
    getftpclient();
    inputstream inputstream = ftpclient.retrievefilestream(filepath);
    return inputstream;
  }

  /**
   * ftp中文件重命名
   * @param fromfile
   * @param tofile
   * @return
   * @throws exception
   */
  public boolean rename(string fromfile,string tofile) throws exception {
    getftpclient();
    boolean result = ftpclient.rename(fromfile,tofile);
    return result;
  }

  /**
   * 获取ftp目录下的所有文件
   * @param dir
   * @return
   */
  public ftpfile[] getfiles(string dir) throws exception {
    getftpclient();
    ftpfile[] files = new ftpfile[0];
    try {
      files = ftpclient.listfiles(dir);
    }catch (throwable thr){
      thr.printstacktrace();
    }
    return files;
  }

  /**
   * 获取ftp目录下的某种类型的文件
   * @param dir
   * @param filter
   * @return
   */
  public ftpfile[] getfiles(string dir, ftpfilefilter filter) throws exception {
    getftpclient();
    ftpfile[] files = new ftpfile[0];
    try {
      files = ftpclient.listfiles(dir, filter);
    }catch (throwable thr){
      thr.printstacktrace();
    }
    return files;
  }

  /**
   * 创建文件夹
   * @param remotedir
   * @return 如果已经有这个文件夹返回false
   */
  public boolean makedirectory(string remotedir) throws exception {
    getftpclient();
    boolean result = false;
    try {
      result = ftpclient.makedirectory(remotedir);
    } catch (ioexception e) {
      e.printstacktrace();
    }
    return result;
  }

  public boolean mkdirs(string dir) throws exception {
    boolean result = false;
    if (null == dir) {
      return result;
    }
    getftpclient();
    ftpclient.changeworkingdirectory("/");
    stringtokenizer dirs = new stringtokenizer(dir, "/");
    string temp = null;
    while (dirs.hasmoreelements()) {
      temp = dirs.nextelement().tostring();
      //创建目录
      ftpclient.makedirectory(temp);
      //进入目录
      ftpclient.changeworkingdirectory(temp);
      result = true;
    }
    ftpclient.changeworkingdirectory("/");
    return result;
  }
}

创建ftpclienttest.java测试类

上传一张图片到ftp服务器,并将文件重新命名为hrabbit.jpg,代码如下:

/**
 * ftpclient测试
 * @auther: hrabbit
 * @date: 2018-12-21 9:14 pm
 * @description:
 */
@runwith(springrunner.class)
@springboottest
public class ftpclienttest {

  /**
   * 测试上传
   */
  @test
  public void uploadfile(){
    boolean flag = ftputils.uploadfile("hrabbit.jpg", "/users/mrotaku/downloads/klklklkl_4x.jpg");
    assert.assertequals(true, flag);
  }

}

程序完美运行,这时候我们查看我们的ftp服务器,http://localhost:8866/hrabbit.jpg

Spring Boot整合FTPClient线程池的实现示例

码云地址:

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。