欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

C# FileStream实现多线程断点续传

程序员文章站 2023-12-16 17:18:40
一、前言        网上有许多的多线程断点续传操作,但总是写的很云里雾里,或者写的比较坑长。由于这几个月要...

一、前言

       网上有许多的多线程断点续传操作,但总是写的很云里雾里,或者写的比较坑长。由于这几个月要负责公司的在线升级项目,所以正好顺便写了一下

代码如下:

using system;
using system.collections.generic;
using system.io;
using system.threading.tasks;

namespace testcenter
{
 class program
 {
  static void main(string[] args)
  {
   string localsavepath = @"e:\test\testfile\local\1.msi"; //本地目标文件路径

   fileinfo severfilepath = new fileinfo(@"e:\test\testfile\server\1.msi"); //服务器待文件路径
   long filelength = severfilepath.length; //待下载文件大小


   console.writeline("start configuration");
   int packcount = 0; //初始化数据包个数

   long packsize = 1024000; //数据包大小

   if (filelength % packsize > 0)
   {
    packcount = (int)(filelength / packsize) + 1;
   }

   else
   {
    packcount = (int)(filelength / packsize);
   }


   console.writeline("start recieve");
   var tasks = new task[packcount]; //多线程任务

   for (int index = 0; index < packcount; index++)
   {


    int threadindex = index; //这步很关键,在task()里的绝对不能直接使用index
    var task = new task(() =>
    {
     string tempfilepath = @"e:\test\testfile\temp\" + "qs_" + threadindex + "_" + packcount; //临时文件路径

     using (filestream tempstream = new filestream(tempfilepath, filemode.create, fileaccess.write, fileshare.write))
     {
      int length = (int)math.min(packsize, filelength - threadindex * packsize);

      var bytes = getfile(threadindex*packcount, length);

      tempstream.write(bytes, 0, length);
      tempstream.flush();
      tempstream.close();
      tempstream.dispose();
     }
    });
    tasks[threadindex] = task;
    task.start();
   }

   task.waitall(tasks); //等待所有线程完成
   console.writeline("recieve end");


   //检测有哪些数据包未下载
   console.writeline("start compare");
   directoryinfo tempdir = new directoryinfo(@"e:\test\testfile\temp"); //临时文件夹路径
   list<string> comparefiles = new list<string>();

   for (int i = 0; i < packcount; i++)
   {
    bool hasfile = false;
    foreach (fileinfo tempfile in tempdir.getfiles())
    {
     if (tempfile.name.split('_')[1] == i.tostring())
     {
      hasfile = true;
      break;
     }
    }
    if (hasfile == false)
    {
     comparefiles.add(i.tostring());
    }
   }

   //最后补上这些缺失的文件
   if (comparefiles.count > 0)
   {
    foreach (string com_index in comparefiles)
    {
     string tempfilepath = @"e:\test\testfile\temp\" + "qs_" + com_index+ "_" + packcount;
     using (filestream compstream = new filestream(tempfilepath, filemode.create, fileaccess.write, fileshare.write))
     {
      int length = (int)math.min(packsize, filelength - convert.toint32(com_index) * packsize);
      var bytes = getfile(convert.toint32(com_index)*packcount, length);
      compstream.write(bytes, 0, length);
      compstream.flush();
      compstream.close();
      compstream.dispose();
     }
    }

   }
   console.writeline("compare end");


   //准备将临时文件融合并写到1.msi中
   console.writeline("start write");
   using (filestream writestream = new filestream(localsavepath, filemode.create, fileaccess.write, fileshare.write))
   {
    foreach (fileinfo tempfile in tempdir.getfiles())
    {
     using (filestream readtempstream = new filestream(tempfile.fullname, filemode.open, fileaccess.read, fileshare.readwrite))
     {
      long onefilelength = tempfile.length;
      byte[] buffer = new byte[convert.toint32(onefilelength)];
      readtempstream.read(buffer, 0, convert.toint32(onefilelength));
      writestream.write(buffer, 0, convert.toint32(onefilelength));
     }
    }
    writestream.flush();
    writestream.close();
    writestream.dispose();
   }
   console.writeline("write end");



   //删除临时文件
   console.writeline("start delete temp files");
   foreach (fileinfo tempfile in tempdir.getfiles())
   {
    tempfile.delete();
   }
   console.writeline("delete success");
   console.readkey();
  }


  //这个方法可以放到remoting或者wcf服务中去,然后本地调用该方法即可实现多线程断点续传
  public static byte[] getfile(int start, int length)
  {
   string severfilepath = @"e:\test\testfile\server\1.msi";
   using (filestream serverstream = new filestream(severfilepath, filemode.open, fileaccess.read, fileshare.readwrite, 1024*80, true))
   {
    byte[] buffer = new byte[length];
    serverstream.position = start;
    //serverstream.seek(start, seekorigin.begin);
    serverstream.read(buffer, 0, length);
    return buffer;
   }
  }
 }
}

二、讨论     

1)需要注意的是第44行,不能直接使用index变量在task()里进行操作,而是要将它赋给threadindex,让threadindex在task()里,不然会直接报错,为什么呢?

2)70至108行代码可以在外面再套一层while循环,循环检测临时文件是否下完整了,然后再定义一个检测最大上限,超过这个上限就放弃本次更新,当用户的网络恢复正常后下次再做更新操作。所以说放临时文件的文件夹最好要包含版本信息,不会把2.0.0的临时文件和1.0.0的临时文件搞混。

3) filestream.position 与 filestream.seek(long offset, seekorigin seekorigin) 的作用都是获取流的指针位置,当文件路径使用绝对路径时使用position;相对路径时使用seek方法,

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。

上一篇:

下一篇: