欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Java负载均衡服务器实现上传文件同步

程序员文章站 2022-06-15 14:16:18
负载服务器z,应用服务器a 和b ,从a上传的附件,如何在b上下载下来?这个问题我的解决思路如下(后来被一个装逼的面试官给批评了这种做法,不过我瞧不起他)服务器a、b 上传附件的时候,将这个附件备份到...

负载服务器z,应用服务器a 和b ,从a上传的附件,如何在b上下载下来?

这个问题我的解决思路如下(后来被一个装逼的面试官给批评了这种做法,不过我瞧不起他)

服务器a、b 上传附件的时候,将这个附件备份到服务器z ,当a、b下载文件的时候,首先会在自己服务器的目录下寻找,如果找不到,就会从服务器z 上下载一份到当前服务器。

服务器之间的文件备份通过sftp,参考:(下文中的sftpcustom 类就是这个链接里的 “sftp上传下载文件例子” 中的类)

这里主要介绍一下重写上传、下载的方法时应该添加的代码

上传文件,异步操作

 new thread(() -> {

  sftpcustom fu = new sftpcustom();
  fu.upload(file.getabsolutepath(), getfilename(filedescr));
  fu.closechannel();
}).start();

下载文件,先从当前服务器寻找

string tmppath = roots[0].getpath() + '/' + getfilename(filedescr);
file file2 = new file(tmppath);
if (file2.exists()) {
  return fileutils.openinputstream(file2);
}

sftpcustom fu = new sftpcustom();
fu.download(getfilename(filedescr), tmppath);
file2 = new file(tmppath);
inputstream = fileutils.openinputstream(file2);
fu.closechannel();
return inputstream;

cuba 框架中重写上传文件类filestorage.java 的代码如下:

package com.haulmont.cuba.core.app.custom;

import com.google.common.util.concurrent.threadfactorybuilder;
import com.haulmont.cuba.core.app.filestorageapi;
import com.haulmont.cuba.core.app.serverconfig;
import com.haulmont.cuba.core.entity.filedescriptor;
import com.haulmont.cuba.core.global.*;
import com.haulmont.cuba.core.sys.appcontext;
import com.haulmont.cuba.core.sys.securitycontext;
import org.apache.commons.io.fileutils;
import org.apache.commons.io.ioutils;
import org.apache.commons.lang.stringutils;
import org.slf4j.logger;
import org.slf4j.loggerfactory;

import javax.annotation.postconstruct;
import javax.annotation.predestroy;
import javax.inject.inject;
import java.io.*;
import java.nio.charset.standardcharsets;
import java.text.simpledateformat;
import java.util.arraylist;
import java.util.calendar;
import java.util.list;
import java.util.concurrent.executorservice;
import java.util.concurrent.executors;

import static com.haulmont.bali.util.preconditions.checknotnullargument;

public class filestorage implements filestorageapi {

  private final logger log = loggerfactory.getlogger(filestorage.class);

  @inject
  protected usersessionsource usersessionsource;

  @inject
  protected timesource timesource;

  @inject
  protected configuration configuration;

  protected boolean isimmutablefilestorage;

  protected executorservice writeexecutor = executors.newfixedthreadpool(5,
      new threadfactorybuilder().setnameformat("filestoragewriter-%d").build());

  protected volatile file[] storageroots;

  @postconstruct
  public void init() {
    this.isimmutablefilestorage = configuration.getconfig(serverconfig.class).getimmutablefilestorage();
  }

  /**
   * internal. don't use in application code.
   */
  public file[] getstorageroots() {
    if (storageroots == null) {
      string conf = configuration.getconfig(serverconfig.class).getfilestoragedir();
      if (stringutils.isblank(conf)) {
        string datadir = configuration.getconfig(globalconfig.class).getdatadir();
        file dir = new file(datadir, "filestorage");
        dir.mkdirs();
        storageroots = new file[]{dir};
      } else {
        list<file> list = new arraylist<>();
        for (string str : conf.split(",")) {
          str = str.trim();
          if (!stringutils.isempty(str)) {
            file file = new file(str);
            if (!list.contains(file))
              list.add(file);
          }
        }
        storageroots = list.toarray(new file[list.size()]);
      }
    }
    return storageroots;
  }

  @override
  public long savestream(final filedescriptor filedescr, final inputstream inputstream) throws filestorageexception {
    checkfiledescriptor(filedescr);

    file[] roots = getstorageroots();

    // store to primary storage

    checkstoragedefined(roots, filedescr);
    checkprimarystorageaccessible(roots, filedescr);

    file dir = getstoragedir(roots[0], filedescr);
    dir.mkdirs();
    checkdirectoryexists(dir);

    final file file = new file(dir, getfilename(filedescr));
    checkfileexists(file);

    long size = 0;
    outputstream os = null;
    try {
      os = fileutils.openoutputstream(file);
      size = ioutils.copylarge(inputstream, os);
      os.flush();
      writelog(file, false);

      new thread(() -> {
        sftpcustom fu = new sftpcustom();
        fu.upload(file.getabsolutepath(), getfilename(filedescr));
        fu.closechannel();
      }).start();

    } catch (ioexception e) {
      ioutils.closequietly(os);
      fileutils.deletequietly(file);

      throw new filestorageexception(filestorageexception.type.io_exception, file.getabsolutepath(), e);
    } finally {
      ioutils.closequietly(os);
    }

    // copy file to secondary storages asynchronously

    final securitycontext securitycontext = appcontext.getsecuritycontext();
    for (int i = 1; i < roots.length; i++) {
      if (!roots[i].exists()) {
        log.error("error saving {} into {} : directory doesn't exist", filedescr, roots[i]);
        continue;
      }

      file copydir = getstoragedir(roots[i], filedescr);
      final file filecopy = new file(copydir, getfilename(filedescr));

      writeexecutor.submit(new runnable() {
        @override
        public void run() {
          try {
            appcontext.setsecuritycontext(securitycontext);
            fileutils.copyfile(file, filecopy, true);
            writelog(filecopy, false);
          } catch (exception e) {
            log.error("error saving {} into {} : {}", filedescr, filecopy.getabsolutepath(), e.getmessage());
          } finally {
            appcontext.setsecuritycontext(null);
          }
        }
      });
    }

    return size;
  }

  protected void checkfileexists(file file) throws filestorageexception {
    if (file.exists() && isimmutablefilestorage)
      throw new filestorageexception(filestorageexception.type.file_already_exists, file.getabsolutepath());
  }

  protected void checkdirectoryexists(file dir) throws filestorageexception {
    if (!dir.exists())
      throw new filestorageexception(filestorageexception.type.storage_inaccessible, dir.getabsolutepath());
  }

  protected void checkprimarystorageaccessible(file[] roots, filedescriptor filedescr) throws filestorageexception {
    if (!roots[0].exists()) {
      log.error("inaccessible primary storage at {}", roots[0]);
      throw new filestorageexception(filestorageexception.type.storage_inaccessible, filedescr.getid().tostring());
    }
  }

  protected void checkstoragedefined(file[] roots, filedescriptor filedescr) throws filestorageexception {
    if (roots.length == 0) {
      log.error("no storage directories defined");
      throw new filestorageexception(filestorageexception.type.storage_inaccessible, filedescr.getid().tostring());
    }
  }

  @override
  public void savefile(final filedescriptor filedescr, final byte[] data) throws filestorageexception {
    checknotnullargument(data, "file content is null");
    savestream(filedescr, new bytearrayinputstream(data));
  }

  protected synchronized void writelog(file file, boolean remove) {
    simpledateformat df = new simpledateformat("yyyy-mm-dd hh:mm:ss.sss");

    stringbuilder sb = new stringbuilder();
    sb.append(df.format(timesource.currenttimestamp())).append(" ");
    sb.append("[").append(usersessionsource.getusersession().getuser()).append("] ");
    sb.append(remove ? "remove" : "create").append(" ");
    sb.append("\"").append(file.getabsolutepath()).append("\"\n");

    file rootdir;
    try {
      rootdir = file.getparentfile().getparentfile().getparentfile().getparentfile();
    } catch (nullpointerexception e) {
      log.error("unable to write log: invalid file storage structure", e);
      return;
    }
    file logfile = new file(rootdir, "storage.log");
    try {
      try (fileoutputstream fos = new fileoutputstream(logfile, true)) {
        ioutils.write(sb.tostring(), fos, standardcharsets.utf_8.name());
      }
    } catch (ioexception e) {
      log.error("unable to write log", e);
    }
  }

  @override
  public void removefile(filedescriptor filedescr) throws filestorageexception {
    checkfiledescriptor(filedescr);

    file[] roots = getstorageroots();
    if (roots.length == 0) {
      log.error("no storage directories defined");
      return;
    }

    for (file root : roots) {
      file dir = getstoragedir(root, filedescr);
      file file = new file(dir, getfilename(filedescr));
      if (file.exists()) {
        if (!file.delete()) {
          throw new filestorageexception(filestorageexception.type.io_exception, "unable to delete file " + file.getabsolutepath());
        } else {
          writelog(file, true);
        }
      }
    }
  }

  protected void checkfiledescriptor(filedescriptor fd) {
    if (fd == null || fd.getcreatedate() == null) {
      throw new illegalargumentexception("a filedescriptor instance with populated 'createdate' attribute must be provided");
    }
  }

  @override
  public inputstream openstream(filedescriptor filedescr) throws filestorageexception {
    checkfiledescriptor(filedescr);

    file[] roots = getstorageroots();
    if (roots.length == 0) {
      log.error("no storage directories available");
      throw new filestorageexception(filestorageexception.type.file_not_found, filedescr.getid().tostring());
    }

    inputstream inputstream = null;
    for (file root : roots) {
      file dir = getstoragedir(root, filedescr);

      file file = new file(dir, getfilename(filedescr));
      if (!file.exists()) {
        log.error("file " + file + " not found");
        continue;
      }

      try {
        inputstream = fileutils.openinputstream(file);
        break;
      } catch (ioexception e) {
        log.error("error opening input stream for " + file, e);
      }
    }
    if (inputstream != null) {
      return inputstream;
    } else {
      try {
        string tmppath = roots[0].getpath() + '/' + getfilename(filedescr);
        file file2 = new file(tmppath);
        if (file2.exists()) {
          return fileutils.openinputstream(file2);
        }

        sftpcustom fu = new sftpcustom();
        fu.download(getfilename(filedescr), tmppath);
        file2 = new file(tmppath);
        inputstream = fileutils.openinputstream(file2);
        fu.closechannel();
        return inputstream;
      } catch (exception e) {
        throw new filestorageexception(filestorageexception.type.file_not_found, filedescr.getid().tostring());
      }
    }
  }

  @override
  public byte[] loadfile(filedescriptor filedescr) throws filestorageexception {
    inputstream inputstream = openstream(filedescr);
    try {
      return ioutils.tobytearray(inputstream);
    } catch (ioexception e) {
      throw new filestorageexception(filestorageexception.type.io_exception, filedescr.getid().tostring(), e);
    } finally {
      ioutils.closequietly(inputstream);
    }
  }

  @override
  public boolean fileexists(filedescriptor filedescr) {
    checkfiledescriptor(filedescr);

    file[] roots = getstorageroots();
    for (file root : roots) {
      file dir = getstoragedir(root, filedescr);
      file file = new file(dir, getfilename(filedescr));
      if (file.exists()) {
        return true;
      }
    }
    return false;
  }

  /**
   * internal. don't use in application code.
   */
  public file getstoragedir(file rootdir, filedescriptor filedescriptor) {
    checknotnullargument(rootdir);
    checknotnullargument(filedescriptor);

    calendar cal = calendar.getinstance();
    cal.settime(filedescriptor.getcreatedate());
    int year = cal.get(calendar.year);
    int month = cal.get(calendar.month) + 1;
    int day = cal.get(calendar.day_of_month);

    return new file(rootdir, year + "/"
        + stringutils.leftpad(string.valueof(month), 2, '0') + "/"
        + stringutils.leftpad(string.valueof(day), 2, '0'));
  }

  public static string getfilename(filedescriptor filedescriptor) {
    return filedescriptor.getid().tostring() + "." + filedescriptor.getextension();
  }

  @predestroy
  protected void stopwriteexecutor() {
    writeexecutor.shutdown();
  }
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。