c# 使用Task实现非阻塞式的I/O操作
在前面的《基于任务的异步编程模式(tap)》文章中讲述了.net 4.5框架下的异步操作自我实现方式,实际上,在.net 4.5中部分类已实现了异步封装。如在.net 4.5中,stream类加入了async方法,所以基于流的通信方式都可以实现异步操作。
1、异步读取文件数据
public static void taskfromiostreamasync(string filename) { int chunksize = 4096; byte[] buffer = new byte[chunksize]; filestream filestream = new filestream(filename, filemode.open, fileaccess.read, fileshare.read, chunksize, true); task<int> task = filestream.readasync(buffer, 0, buffer.length); task.continuewith((readtask) => { int amountread = readtask.result; //必须在continuewith中释放文件流 filestream.dispose(); console.writeline($"async(simple) read {amountread} bytes"); }); }
上述代码中,异步读取数据只读取了一次,完成读取后就将执行权交还主线程了。但在真实场景中,需要从流中读取多次才能获得全部的数据(如文件数据大于给定缓冲区大小,或处理来自网络流的数据(数据还没全部到达机器))。因此,为了完成异步读取操作,需要连续从流中读取数据,直到获取所需全部数据。
上述问题导致需要两级task来处理。外层的task用于全部的读取工作,供调用程序使用。内层的task用于每次的读取操作。
第一次异步读取会返回一个task。如果直接返回调用wait或者continuewith的地方,会在第一次读取结束后继续向下执行。实际上是希望调用者在完成全部读取操作后才执行。因此,不能把第一个task发布会给调用者,需要一个“伪task”在完成全部读取操作后再返回。
上述问题需要使用到taskcompletionsource<t>类解决,该类可以生成一个用于返回的“伪task”。当异步读取操作全部完成后,调用其对象的trysetresult,让wait或continuewith的调用者继续执行。
public static task<long> asynchronousread(string filename) { int chunksize = 4096; byte[] buffer = new byte[chunksize]; //创建一个返回的伪task对象 taskcompletionsource<long> tcs = new taskcompletionsource<long>(); memorystream filecontents = new memorystream();//用于保存读取的内容 filestream filestream = new filestream(filename, filemode.open, fileaccess.read, fileshare.read, chunksize, true); filecontents.capacity += chunksize;//指定缓冲区大小。好像capacity会自动增长,设置与否没关系,后续写入多少数据,就增长多少 task<int> task = filestream.readasync(buffer, 0, buffer.length); task.continuewith(readtask => continueread(readtask, filestream, filecontents, buffer, tcs)); //在continuewith中循环读取,读取完成后,再返回tcs的task return tcs.task; } /// <summary> /// 继续读取数据 /// </summary> /// <param name="task">读取数据的线程</param> /// <param name="filestream">文件流</param> /// <param name="filecontents">文件存放位置</param> /// <param name="buffer">读取数据缓存</param> /// <param name="tcs">伪task对象</param> private static void continueread(task<int> task, filestream filestream, memorystream filecontents, byte[] buffer, taskcompletionsource<long> tcs) { if (task.iscompleted) { int bytesread = task.result; filecontents.write(buffer, 0, bytesread);//写入内存区域。似乎capacity会自动增长 if (bytesread > 0) { //虽然看似是一个新的任务,但是使用了continuewith,所以使用的是同一个线程。 //没有读取完,开启另一个异步继续读取 task<int> newtask = filestream.readasync(buffer, 0, buffer.length); //此处做了一个循环 newtask.continuewith(readtask => continueread(readtask, filestream, filecontents, buffer, tcs)); } else { //已经全部读取完,所以需要返回数据 tcs.trysetresult(filecontents.length); filestream.dispose(); filecontents.dispose();//应该是在使用了数据之后才释放数据缓冲区的数据 } } }
2、适应task的异步编程模式
.net framework中的旧版异步方法都带有“begin-”和“end-”前缀。这些方法仍然有效,为了接口的一致性,它们可以被封装到task中。
fromasyn方法把流的beginread和endread方法作为参数,再加上存放数据的缓冲区。beginread和endread方法会执行,并在endread完成后调用continuation task,把控制权交回主代码。上述例子会关闭流并返回转换的数据
const int readsize = 256; /// <summary> /// 从文件中获取字符串 /// </summary> /// <param name="path">文件路径</param> /// <returns>字符串</returns> public static task<string> getstringfromfile(string path) { fileinfo file = new fileinfo(path); byte[] buffer = new byte[1024];//存放数据的缓冲区 filestream filestream = new filestream( path, filemode.open, fileaccess.read, fileshare.none, buffer.length, fileoptions.deleteonclose | fileoptions.asynchronous); task<int> task = task<int>.factory.fromasync(filestream.beginread, filestream.endread, buffer, 0, readsize, null);//此参数为beginread需要的参数 taskcompletionsource<string> tcs = new taskcompletionsource<string>(); task.continuewith(taskread => onreadbuffer(taskread, filestream, buffer, 0, tcs)); return tcs.task; } /// <summary> /// 读取数据 /// </summary> /// <param name="taskread">读取任务</param> /// <param name="filestream">文件流</param> /// <param name="buffer">读取数据存放位置</param> /// <param name="offset">读取偏移量</param> /// <param name="tcs">伪task</param> private static void onreadbuffer(task<int> taskread, filestream filestream, byte[] buffer, int offset, taskcompletionsource<string> tcs) { int readlength = taskread.result; if (readlength > 0) { int newoffset = offset + readlength; task<int> task = task<int>.factory.fromasync(filestream.beginread, filestream.endread, buffer, newoffset, math.min(buffer.length - newoffset, readsize), null); task.continuewith(callbacktask => onreadbuffer(callbacktask, filestream, buffer, newoffset, tcs)); } else { tcs.trysetresult(system.text.encoding.utf8.getstring(buffer, 0, buffer.length)); filestream.dispose(); } }
3、使用async 和 await方式读取数据
下面的示例中,使用了async和await关键字实现异步读取一个文件的同时进行压缩并写入另一个文件。所有位于await关键字之前的操作都运行于调用者线程,从await开始的操作都是在continuation task中运行。但有无法使用这两个关键字的场合:①task的结束时机不明确时;②必须用到多级task和taskcompletionsource时
/// <summary> /// 同步方法的压缩 /// </summary> /// <param name="lstfiles">文件清单</param> public static void synccompress(ienumerable<string> lstfiles) { byte[] buffer = new byte[16384]; foreach(string file in lstfiles) { using (filestream inputstream = file.openread(file)) { using (filestream outputstream = file.openwrite(file + ".compressed")) { using (system.io.compression.gzipstream compressstream = new system.io.compression.gzipstream(outputstream, system.io.compression.compressionmode.compress)) { int read = 0; while((read=inputstream.read(buffer,0,buffer.length))>0) { compressstream.write(buffer, 0,read); } } } } } } /// <summary> /// 异步方法的文件压缩 /// </summary> /// <param name="lstfiles">需要压缩的文件</param> /// <returns></returns> public static async task asynccompress(ienumerable<string> lstfiles) { byte[] buffer = new byte[16384]; foreach(string file in lstfiles) { using (filestream inputstream = file.openread(file)) { using (filestream outputstream = file.openwrite(file + ".compressed")) { using (system.io.compression.gzipstream compressstream = new system.io.compression.gzipstream(outputstream, system.io.compression.compressionmode.compress)) { int read = 0; while ((read = await inputstream.readasync(buffer, 0, buffer.length)) > 0) { await compressstream.writeasync(buffer, 0, read); } } } } } }
以上就是c# 使用task实现非阻塞式的i/o操作的详细内容,更多关于c# 实现非阻塞式的i/o操作的资料请关注其它相关文章!