欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

简单实现C#异步操作

程序员文章站 2022-06-03 15:21:15
在.net4.0以后异步操作,并行计算变得异常简单,但是由于公司项目开发基于.net3.5所以无法用到4.0的并行计算以及task等异步编程。因此,为了以后更方便的进行异步...

在.net4.0以后异步操作,并行计算变得异常简单,但是由于公司项目开发基于.net3.5所以无法用到4.0的并行计算以及task等异步编程。因此,为了以后更方便的进行异步方式的开发,我封装实现了异步编程框架,通过begininvoke、endinvoke的方式实现异步编程。

一、框架结构

整个框架包括四个部分

1、基类抽象opeartor
我把每个异步执行过程称为一个operate,因此需要一个opeartor去执行
2、funcasync
异步的func
3、actionasync
异步的action
4、asynchorus
对actionasync和funcasync的封装

operator
operator是一个抽象类,实现了ioperationasync和icontinuewithasync两个接口。
ioperationasync实现了异步操作,icontinuewithasync实现了类似于task的continuewith方法,在当前异步操作完成后继续进行的操作

ioperationasync接口详解

public interface ioperationasync
{
  iasyncresult invoke();
  void wait();
  void completedcallback(iasyncresult ar);
  void catchexception(exception exception);
}
  • invoke():异步方法的调用
  • wait():等待异步操作执行
  • completedcallback():操作完成回调
  • catchexception():抓取异常

icontinuewithasync接口详情

public interface icontinuewithasync
{
  operator previous { get; set; }
  operator next { get; set; }
  operator continuewithasync(action action);
  operator continuewithasync<tparameter>(action<tparameter> action, tparameter parameter);
}

previous:前一个操作
next:下一个操作
continuewithasync():异步继续操作

public abstract class operator : ioperationasync, icontinuewithasync
{
  public iasyncresult middle;
  public readonly string id;
  public exception exception { get; private set; }
  public operator previous { get; set; }
  public operator next { get; set; }
  protected operator()
  {
    id = guid.newguid().tostring();
  }
  public abstract iasyncresult invoke();
  protected void setasyncresult(iasyncresult result)
  {
    this.middle = result;
  }
  public virtual void wait()
  {
    if (!middle.iscompleted) middle.asyncwaithandle.waitone();
  }
  public virtual void completedcallback(iasyncresult ar)
  {
  }
  public void catchexception(exception exception)
  {
    this.exception = exception;
  }
  protected operator continueasync()
  {
    if (next != null) next.invoke();
    return next;
  }
  public virtual operator continuewithasync(action action)
  {
    next = new actionasync(action);
    next.previous = this;
    return next;
  }
  public virtual operator continuewithasync<tparameter>(action<tparameter> action, tparameter parameter)
  {
    next = new actionasync<tparameter>(action, parameter);
    next.previous = this;
    return next;
  }
  public virtual operator continuewithasync<tresult>(func<tresult> func)
  {
    next = new funcasync<tresult>();
    next.previous = this;
    return next;
  }
  public virtual operator continuewithasync<tparameter, tresult>(func<tparameter, tresult> func,
    tparameter parameter)
  {
    next = new funcasync<tparameter, tresult>(func, parameter);
    next.previous = this;
    return next;
  }
}

无返回异步操作
actionasync

public class actionasync : operator
{
  private readonly action _action;
  protected actionasync()
  {
  }
  public actionasync(action action)
    : this()
  {
    this._action = action;
  }
  public override iasyncresult invoke()
  {
    var middle = _action.begininvoke(completedcallback, null);
    setasyncresult(middle);
    return middle;
  }
  public override void completedcallback(iasyncresult ar)
  {
    try
    {
      _action.endinvoke(ar);
    }
    catch (exception exception)
    {
      this.catchexception(exception);
    }
    continueasync();
  }
}
public class actionasync<t> : actionasync
{
  public t result;
  private readonly action<t> _action1;
  protected readonly t parameter1;
  public actionasync()
  {
  }
  public actionasync(t parameter)
  {
    this.parameter1 = parameter;
  }
  public actionasync(action<t> action, t parameter)
  {
    this._action1 = action;
    this.parameter1 = parameter;
  }
  public override iasyncresult invoke()
  {
    var result = _action1.begininvoke(parameter1, completedcallback, null);
    setasyncresult(result);
    return result;
  }
  public override void completedcallback(iasyncresult ar)
  {
    try
    {
      _action1.endinvoke(ar);
    }
    catch (exception exception)
    {
      this.catchexception(exception);
    }
    continueasync();
  }
}

有返回异步
funcasync实现了ifuncoperationasync接口

ifuncoperationasync

public interface ifuncoperationasync<t>
{
  void setresult(t result);
  t getresult();
}
  • setresult(t result):异步操作完成设置返回值
  • getresult():获取返回值

1)、funcasync

public class funcasync<tresult> : operator, ifuncoperationasync<tresult>
{
private tresult _result;

public tresult result
{
  get
  {
    if (!middle.iscompleted || _result == null)
    {
      _result = getresult();
    }
    return _result;
  }
}
private readonly func<tresult> _func1;
public funcasync()
{
}
public funcasync(func<tresult> func)
{
  this._func1 = func;
}
public override iasyncresult invoke()
{
  var result = _func1.begininvoke(completedcallback, null);
  setasyncresult(result);
  return result;
}
public override void completedcallback(iasyncresult ar)
{
  try
  {
    var result = _func1.endinvoke(ar);
    setresult(result);
  }
  catch (exception exception)
  {
    this.catchexception(exception);
    setresult(default(tresult));
  }
  continueasync();
}
public virtual tresult getresult()
{
  wait();
  return this._result;
}
public void setresult(tresult result)
{
  _result = result;
}
}
public class funcasync<t1, tresult> : funcasync<tresult>
{
protected readonly t1 parameter1;
private readonly func<t1, tresult> _func2;
public funcasync(func<t1, tresult> action, t1 parameter1)
  : this(parameter1)
{
  this._func2 = action;
}
protected funcasync(t1 parameter1)
  : base()
{
  this.parameter1 = parameter1;
}
public override iasyncresult invoke()
{
  var result = _func2.begininvoke(parameter1, completedcallback, null);
  setasyncresult(result);
  return result;
}
public override void completedcallback(iasyncresult ar)
{
  try
  {
    var result = _func2.endinvoke(ar);
    setresult(result);
  }
  catch (exception exception)
  {
    catchexception(exception);
    setresult(default(tresult));
  }
  continueasync();
}
}

asynchronous 异步操作封装
actionasync和funcasync为异步操作打下了基础,接下来最重要的工作就是通过这两个类执行我们的异步操作,为此我封装了一个异步操作类
主要封装了以下几个部分:

  • waitall(ienumerable<operator> operations):等待所有操作执行完毕
  • waitany(ienumerable<operator> operations):等待任意操作执行完毕
  • actionasync
  • funcasync
  • continuewithaction
  • continuewithfunc

后面四个包含若干个重载,这里只是笼统的代表一个类型的方法
waitall

public static void waitall(ienumerable<operator> operations)
{
foreach (var @operator in operations)
{
  @operator.wait();
}
}

waitany

public static void waitany(ienumerable<operator> operations)
{
while (operations.all(o => !o.middle.iscompleted))
  thread.sleep(100);
}

等待时间可以自定义
actioninvoke

public static operator invoke(action action)
{
operator operation = new actionasync(action);
operation.invoke();
return operation;
}
public static operator invoke<t>(action<t> action, t parameter)
{
operator operation = new actionasync<t>(action, parameter);
operation.invoke();
return operation;
}
public static operator invoke<t1, t2>(action<t1, t2> action, t1 parameter1, t2 parameter2)
{
operator operation = new actionasync<t1, t2>(action, parameter1, parameter2);
operation.invoke();
return operation;
}

funcinvoke

public static operator invoke<tresult>(func<tresult> func)
{
operator operation = new funcasync<tresult>(func);
operation.invoke();
return operation;
}
public static operator invoke<tparameter, tresult>(func<tparameter, tresult> func, tparameter parameter)
{
tparameter param = parameter;
operator operation = new funcasync<tparameter, tresult>(func, param);
operation.invoke();
return operation;
}
public static operator invoke<t1, t2, tresult>(func<t1, t2, tresult> func, t1 parameter1, t2 parameter2)
{
operator operation = new funcasync<t1, t2, tresult>(func, parameter1, parameter2);
operation.invoke();
return operation;
}

continuewithaction

public static operator continuewithasync(ienumerable<operator>operators, action action)
{
return invoke(waitall, operators)
  .continuewithasync(action);
}
public static operator continuewithasync<tparameter>(ienumerable<operator> operators, action<tparameter> action, tparameter parameter)
{
return invoke(waitall, operators)
  .continuewithasync(action, parameter);
}

continuewithfunc

public static operator continuewithasync<tresult>(ienumerable<operator> operators,func<tresult> func)
{
return invoke(waitall, operators)
  .continuewithasync(func);
}
public static operator continuewithasync<tparameter, tresult>(ienumerable<operator> operators, 
func<tparameter, tresult> func, tparameter parameter)
{
return invoke(waitall, operators)
  .continuewithasync(func, parameter);
}

这里有个bug当调用continuewithasync后无法调用wait等待,本来wait需要从前往后等待每个异步操作,但是测试了下不符合预期结果。不过理论上来说应该无需这样操作,continuewithasync只是为了当上一个异步操作执行完毕时继续执行的异步操作,若要等待,那不如两个操作放到一起,最后再等待依然可以实现。
前面的都是单步异步操作的调用,若需要对某集合进行某个方法的异步操作,可以foreach遍历

public void foreachasync(ienumerbale<string> parameters)
{
  foreach(string p in parameters)
  {
    asynchronous.invoke(tast,p);
  }
}
public void test(string parameter)
{
  //todo:做一些事
}

每次都需要去手写foreach,比较麻烦,因此实现类似于plinq的并行计算方法实在有必要,不过有一点差别,plinq是采用多核cpu进行并行计算,而我封装的仅仅遍历集合进行异步操作而已
foreachaction

public static ienumerable<operator> foreach<tparameter>(ienumerable<tparameter> items, action<tparameter> action)
{
  return items.select(t => invoke(action, t)).tolist();
}

foreachfunc

public static ienumerable<operator> foreach<tparameter, tresult>(ienumerable<tparameter> items, func<tparameter, tresult> func)
{
  return items.select(parameter => invoke(func, parameter)).tolist();
}

如何使用
无返回值异步方法调用

public void dosomething()
{
//todo:
}

通过asynchronous.invoke(dosomething) 执行

public void dosomething(string parameter)
{
//todo:
}

通过asynchronous.invoke(dosomething,parameter) 执行

有返回值异步方法调用

public string dosomething()
{
//todo:
}

通过asynchronous.invoke(()=>dosomething())执行

public string dosomething(string parameter)
{
//todo:
}

通过asynchronous.invoke(()=>dosomething(parameter))执行,或者也可以传入参数通过asynchronous.invoke(p=>dosomething(p),parameter)

无返回值foreach

public void test
{
int[] parameters = {1,2,3,4,5};
asynchronous.foreach(parameters,console.writeline);
}

有返回值foreach

public void test
{
int[] parameters = {1,2,3,4,5};
var operators = asynchronous.foreach(parameters,p=> p*2);
asynchrous.waitall(operators);
asynchronous.foreach(operators.cast<funcasync<int,int>>(),
  p=> console.writeline(p.result));
}

首先将集合每个值扩大2倍,然后输出
异步执行完再执行

public void test
{
int[] parameters = {1,2,3,4,5};
var operators = asynchronous.foreach(parameters,p=> p*2);
asynchrous.continuewithasync(operators,console.writeline,"执行完成");
}

每次执行完继续执行
可能有时候我们需要遍历一个集合,每个元素处理完成后我们需要输出xx处理完成

public void test
{
int[] parameters = {1,2,3,4,5};
var operators = asynchronous.foreach(parameters,p=> p*2);
asynchronous.foreach(operators,o=>{
  o.continuewithasync(()={
    //每个元素执行完时执行
    if(o.exception != null)
    {
      //之前执行时产生未处理的异常,这里可以捕获到 
    }
  });
});
}

可以实现链式异步操作

public void chain()
{
asynchronous.invoke(console.writeline,1)
.continuewithasync(console.writeline,2)
.continuewithasync(console.writeline,3)
}

这样会按步骤输出1,2,3
结束语

以上只是列出了部分重载方法,其他重载方法无非就是加参数,本质实际是一样的。

希望对大家的学习有所帮助,在这祝大家新年快乐,新的一年大家一起努力。