简单实现C#异步操作
在.net4.0以后异步操作,并行计算变得异常简单,但是由于公司项目开发基于.net3.5所以无法用到4.0的并行计算以及task等异步编程。因此,为了以后更方便的进行异步方式的开发,我封装实现了异步编程框架,通过begininvoke、endinvoke的方式实现异步编程。
一、框架结构
整个框架包括四个部分
1、基类抽象opeartor
我把每个异步执行过程称为一个operate,因此需要一个opeartor去执行
2、funcasync
异步的func
3、actionasync
异步的action
4、asynchorus
对actionasync和funcasync的封装
operator
operator是一个抽象类,实现了ioperationasync和icontinuewithasync两个接口。
ioperationasync实现了异步操作,icontinuewithasync实现了类似于task的continuewith方法,在当前异步操作完成后继续进行的操作
ioperationasync接口详解
public interface ioperationasync { iasyncresult invoke(); void wait(); void completedcallback(iasyncresult ar); void catchexception(exception exception); }
- invoke():异步方法的调用
- wait():等待异步操作执行
- completedcallback():操作完成回调
- catchexception():抓取异常
icontinuewithasync接口详情
public interface icontinuewithasync { operator previous { get; set; } operator next { get; set; } operator continuewithasync(action action); operator continuewithasync<tparameter>(action<tparameter> action, tparameter parameter); }
previous:前一个操作
next:下一个操作
continuewithasync():异步继续操作
public abstract class operator : ioperationasync, icontinuewithasync { public iasyncresult middle; public readonly string id; public exception exception { get; private set; } public operator previous { get; set; } public operator next { get; set; } protected operator() { id = guid.newguid().tostring(); } public abstract iasyncresult invoke(); protected void setasyncresult(iasyncresult result) { this.middle = result; } public virtual void wait() { if (!middle.iscompleted) middle.asyncwaithandle.waitone(); } public virtual void completedcallback(iasyncresult ar) { } public void catchexception(exception exception) { this.exception = exception; } protected operator continueasync() { if (next != null) next.invoke(); return next; } public virtual operator continuewithasync(action action) { next = new actionasync(action); next.previous = this; return next; } public virtual operator continuewithasync<tparameter>(action<tparameter> action, tparameter parameter) { next = new actionasync<tparameter>(action, parameter); next.previous = this; return next; } public virtual operator continuewithasync<tresult>(func<tresult> func) { next = new funcasync<tresult>(); next.previous = this; return next; } public virtual operator continuewithasync<tparameter, tresult>(func<tparameter, tresult> func, tparameter parameter) { next = new funcasync<tparameter, tresult>(func, parameter); next.previous = this; return next; } }
无返回异步操作
actionasync
public class actionasync : operator { private readonly action _action; protected actionasync() { } public actionasync(action action) : this() { this._action = action; } public override iasyncresult invoke() { var middle = _action.begininvoke(completedcallback, null); setasyncresult(middle); return middle; } public override void completedcallback(iasyncresult ar) { try { _action.endinvoke(ar); } catch (exception exception) { this.catchexception(exception); } continueasync(); } } public class actionasync<t> : actionasync { public t result; private readonly action<t> _action1; protected readonly t parameter1; public actionasync() { } public actionasync(t parameter) { this.parameter1 = parameter; } public actionasync(action<t> action, t parameter) { this._action1 = action; this.parameter1 = parameter; } public override iasyncresult invoke() { var result = _action1.begininvoke(parameter1, completedcallback, null); setasyncresult(result); return result; } public override void completedcallback(iasyncresult ar) { try { _action1.endinvoke(ar); } catch (exception exception) { this.catchexception(exception); } continueasync(); } }
有返回异步
funcasync实现了ifuncoperationasync接口
ifuncoperationasync
public interface ifuncoperationasync<t> { void setresult(t result); t getresult(); }
- setresult(t result):异步操作完成设置返回值
- getresult():获取返回值
1)、funcasync
public class funcasync<tresult> : operator, ifuncoperationasync<tresult> { private tresult _result; public tresult result { get { if (!middle.iscompleted || _result == null) { _result = getresult(); } return _result; } } private readonly func<tresult> _func1; public funcasync() { } public funcasync(func<tresult> func) { this._func1 = func; } public override iasyncresult invoke() { var result = _func1.begininvoke(completedcallback, null); setasyncresult(result); return result; } public override void completedcallback(iasyncresult ar) { try { var result = _func1.endinvoke(ar); setresult(result); } catch (exception exception) { this.catchexception(exception); setresult(default(tresult)); } continueasync(); } public virtual tresult getresult() { wait(); return this._result; } public void setresult(tresult result) { _result = result; } } public class funcasync<t1, tresult> : funcasync<tresult> { protected readonly t1 parameter1; private readonly func<t1, tresult> _func2; public funcasync(func<t1, tresult> action, t1 parameter1) : this(parameter1) { this._func2 = action; } protected funcasync(t1 parameter1) : base() { this.parameter1 = parameter1; } public override iasyncresult invoke() { var result = _func2.begininvoke(parameter1, completedcallback, null); setasyncresult(result); return result; } public override void completedcallback(iasyncresult ar) { try { var result = _func2.endinvoke(ar); setresult(result); } catch (exception exception) { catchexception(exception); setresult(default(tresult)); } continueasync(); } }
asynchronous 异步操作封装
actionasync和funcasync为异步操作打下了基础,接下来最重要的工作就是通过这两个类执行我们的异步操作,为此我封装了一个异步操作类
主要封装了以下几个部分:
- waitall(ienumerable<operator> operations):等待所有操作执行完毕
- waitany(ienumerable<operator> operations):等待任意操作执行完毕
- actionasync
- funcasync
- continuewithaction
- continuewithfunc
后面四个包含若干个重载,这里只是笼统的代表一个类型的方法
waitall
public static void waitall(ienumerable<operator> operations) { foreach (var @operator in operations) { @operator.wait(); } }
waitany
public static void waitany(ienumerable<operator> operations) { while (operations.all(o => !o.middle.iscompleted)) thread.sleep(100); }
等待时间可以自定义
actioninvoke
public static operator invoke(action action) { operator operation = new actionasync(action); operation.invoke(); return operation; } public static operator invoke<t>(action<t> action, t parameter) { operator operation = new actionasync<t>(action, parameter); operation.invoke(); return operation; } public static operator invoke<t1, t2>(action<t1, t2> action, t1 parameter1, t2 parameter2) { operator operation = new actionasync<t1, t2>(action, parameter1, parameter2); operation.invoke(); return operation; }
funcinvoke
public static operator invoke<tresult>(func<tresult> func) { operator operation = new funcasync<tresult>(func); operation.invoke(); return operation; } public static operator invoke<tparameter, tresult>(func<tparameter, tresult> func, tparameter parameter) { tparameter param = parameter; operator operation = new funcasync<tparameter, tresult>(func, param); operation.invoke(); return operation; } public static operator invoke<t1, t2, tresult>(func<t1, t2, tresult> func, t1 parameter1, t2 parameter2) { operator operation = new funcasync<t1, t2, tresult>(func, parameter1, parameter2); operation.invoke(); return operation; }
continuewithaction
public static operator continuewithasync(ienumerable<operator>operators, action action) { return invoke(waitall, operators) .continuewithasync(action); } public static operator continuewithasync<tparameter>(ienumerable<operator> operators, action<tparameter> action, tparameter parameter) { return invoke(waitall, operators) .continuewithasync(action, parameter); }
continuewithfunc
public static operator continuewithasync<tresult>(ienumerable<operator> operators,func<tresult> func) { return invoke(waitall, operators) .continuewithasync(func); } public static operator continuewithasync<tparameter, tresult>(ienumerable<operator> operators, func<tparameter, tresult> func, tparameter parameter) { return invoke(waitall, operators) .continuewithasync(func, parameter); }
这里有个bug当调用continuewithasync后无法调用wait等待,本来wait需要从前往后等待每个异步操作,但是测试了下不符合预期结果。不过理论上来说应该无需这样操作,continuewithasync只是为了当上一个异步操作执行完毕时继续执行的异步操作,若要等待,那不如两个操作放到一起,最后再等待依然可以实现。
前面的都是单步异步操作的调用,若需要对某集合进行某个方法的异步操作,可以foreach遍历
public void foreachasync(ienumerbale<string> parameters) { foreach(string p in parameters) { asynchronous.invoke(tast,p); } } public void test(string parameter) { //todo:做一些事 }
每次都需要去手写foreach,比较麻烦,因此实现类似于plinq的并行计算方法实在有必要,不过有一点差别,plinq是采用多核cpu进行并行计算,而我封装的仅仅遍历集合进行异步操作而已
foreachaction
public static ienumerable<operator> foreach<tparameter>(ienumerable<tparameter> items, action<tparameter> action) { return items.select(t => invoke(action, t)).tolist(); }
foreachfunc
public static ienumerable<operator> foreach<tparameter, tresult>(ienumerable<tparameter> items, func<tparameter, tresult> func) { return items.select(parameter => invoke(func, parameter)).tolist(); }
如何使用
无返回值异步方法调用
public void dosomething() { //todo: }
通过asynchronous.invoke(dosomething) 执行
public void dosomething(string parameter) { //todo: }
通过asynchronous.invoke(dosomething,parameter) 执行
有返回值异步方法调用
public string dosomething() { //todo: }
通过asynchronous.invoke(()=>dosomething())执行
public string dosomething(string parameter) { //todo: }
通过asynchronous.invoke(()=>dosomething(parameter))执行,或者也可以传入参数通过asynchronous.invoke(p=>dosomething(p),parameter)
无返回值foreach
public void test { int[] parameters = {1,2,3,4,5}; asynchronous.foreach(parameters,console.writeline); }
有返回值foreach
public void test { int[] parameters = {1,2,3,4,5}; var operators = asynchronous.foreach(parameters,p=> p*2); asynchrous.waitall(operators); asynchronous.foreach(operators.cast<funcasync<int,int>>(), p=> console.writeline(p.result)); }
首先将集合每个值扩大2倍,然后输出
异步执行完再执行
public void test { int[] parameters = {1,2,3,4,5}; var operators = asynchronous.foreach(parameters,p=> p*2); asynchrous.continuewithasync(operators,console.writeline,"执行完成"); }
每次执行完继续执行
可能有时候我们需要遍历一个集合,每个元素处理完成后我们需要输出xx处理完成
public void test { int[] parameters = {1,2,3,4,5}; var operators = asynchronous.foreach(parameters,p=> p*2); asynchronous.foreach(operators,o=>{ o.continuewithasync(()={ //每个元素执行完时执行 if(o.exception != null) { //之前执行时产生未处理的异常,这里可以捕获到 } }); }); }
可以实现链式异步操作
public void chain() { asynchronous.invoke(console.writeline,1) .continuewithasync(console.writeline,2) .continuewithasync(console.writeline,3) }
这样会按步骤输出1,2,3
结束语
以上只是列出了部分重载方法,其他重载方法无非就是加参数,本质实际是一样的。
希望对大家的学习有所帮助,在这祝大家新年快乐,新的一年大家一起努力。