欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

C#异步案例一则

程序员文章站 2022-08-08 13:34:20
场景 生产者和消费者队列, 生产者有多个, 消费者也有多个, 生产到消费需要异步. 下面用一个Asp.NetCore Web-API项目来模拟 创建两个API, 一个Get(), 一个Set(), Get返回一个字符串, Set放入一个字符串, Get返回的就是Set进去的字符串. 实现如下: 接着 ......

场景

  生产者和消费者队列, 生产者有多个, 消费者也有多个, 生产到消费需要异步.

下面用一个asp.netcore web-api项目来模拟

  创建两个api, 一个get(), 一个set(), get返回一个字符串, set放入一个字符串, get返回的就是set进去的字符串.

  实现如下:  

[route("api/[controller]/[action]")]
public class foocontroller : control
{
    imessagequeue _mq;
    public foocontroller(imessagequeue mq)
    {
        _mq = mq;
    }

    [httpget]
    public string get()
    {
        string str = _mq.readone<string>();
        return str;
    }

    [httpget]
    public void set(string v)
    {
        _mq.writeone(v);
    }
}

public interface imessagequeue
{
    t readone<t>();
    void writeone<t>(t value);
}

public class messagequeue: imessagequeue
{
    private object _value;

    public t readone<t>()
    {
        return (t)_value;
    }

    public void writeone<t>(t value)
    {
        _value = value;

    }
}

接着在startup中把imessagequeue给注入了.

services.addsingleton<imessagequeue, messagequeue>();

运行后, 先调用/api/foo/set/?v=xxx, 再调用/api/foo/get/

可以看到成功返回了xxx

第二步, value字段改为队列:

使set进去的值不会被下一个覆盖, get取队列最前的值

为了线程安全, 这里使用了concurrentqueue<t>

代码如下:

public class messagequeue: imessagequeue
{
    private readonly concurrentqueue<object> _queue = new concurrentqueue<object>();

    public t readone<t>()
    {
        _queue.trydequeue(out object str);
        return (t)str ;
    }

    public void writeone<t>(tvalue)
    {
        _queue.enqueue(value);
    }
}

那么此时, 只要get不断地轮询, 就可以取到set生产出来的数据了.

调用/api/foo/set/

三, 异步阻塞

再增加需求, 调换get和set的顺序,先get后set模拟异步, (我这里的demo是个web-api会有http请求超时之类的...假装不存在)我想要get调用等待有数据时才返回.

也就是说我想要在浏览器地址栏输入http://localhost:5000/api/foo/get/之后会不断地转圈直到我用set接口放入一个值

方案a: while(true), 根本无情简直无敌, 死等read() != null时break; 为防单核满转加个thread.sleep();

方案b: monitor, 一个wait()一个exit/release();

但是以上两个方案都是基于thread的, .net4.0之后伴随concurrentqueue一起来的还有个blockingcollection<t>相当好用

方案c: 修改后代码如下:

public class messagequeue : imessagequeue
{
    private readonly blockingcollection<object> _queue = new blockingcollection<object>(new concurrentqueue<object>());

    public t readone<t>()
    {
        var obj = _queue.take();
        return (t)obj;
    }

    public void writeone<t>(t value)
    {
        _queue.add(value);
    }
}

此时, 如果先get, 会阻塞等待set; 如果已经有set过数据就会直接返回队列中的数据. get不会无功而返了. 基于这个类型, 可以实现更像样的订阅模型.

扩展rpc

这里的set是生产者, get是消费者, 那如果我的这个生产者并不单纯产生数据返回void而是需要等待一个结果的呢? 此时订阅模型不够用了, 我需要一个异步的rpc .

比如有个ask请求会携带参数发起请求, 并等待, 知道另外有个地方处理了这个任务产生结果, ask结束等待返回这个结果answer. 

我可以回头继续用方案a或b, 但连.net4.0都已经过去很久了, 所以应该用更好的基于task的异步方案.

代码如下, 首先新增两个接口:

public interface imessagequeue
{
    void respond<trequest, tresponse>(func<trequest, tresponse> func);
    task<tresponse> rpc<trequest, tresponse>(trequest req);

    t readone<t>();
    void writeone<t>(t data);
}

接着定义一个特殊的任务类:

public class rpctask<trequest, tresponse>
{
    public taskcompletionsource<tresponse> tcs { get; set; }
    public trequest request { get; set; }
}

实现刚才新加的两个接口:

public task<tresponse> rpc<trequest, tresponse>(trequest req)
{
    taskcompletionsource<tresponse> tcs = new taskcompletionsource<tresponse>();
    _queue.add(new rpctask<trequest, tresponse> { request = req, tcs = tcs});
    return tcs.task;
}

public void respond<trequest, tresponse>(func<trequest, tresponse> func)
{
    var obj = _queue.take();
    if(obj is rpctask<trequest, tresponse> t)
    {
        var response = func(t.request);
        t.tcs.setresult(response);
    }
}

同样的, 写两个web api接口, 一个请求等待结果 一个负责处理工作

[httpget]
public async task<string> ask(string v)
{
    var response = await _mq.rpc<myrequest, myresponse>(new myrequest { id = v });
    return $"[{response.donetime}] {response.id}";
}

[httpget]
public void answer()
{
    _mq.respond<myrequest, myresponse>((req)=> new myresponse { id = req.id, donetime = datetime.now });
}

上面还随便写了两个class作为请求和返回

public class myrequest
{
    public string id { get; set; }
}
public class myresponse
{
    public string id { get; set; }
    public datetime donetime { get; set; }
}

测试一下, 用浏览器或postman打开三个选项卡, 各发起一个ask接口的请求, 参数v分别为1 2 3, 三个选项卡都开始转圈等待

然后再打开一个选项卡访问answer接口, 处理刚才放进队列的任务, 发起一次之前的三个选项卡之中就有一个停止等待并显示返回数据. 需求实现.

这里用到的关键类型是taskcompletionsource<t>.

再扩展

如果是个分布式系统, 请求和处理逻辑不是在一个程序里呢? 那么这个队列可能也是一个单独的服务. 此时就要再加个返回队列了, 给队列中传输的每一个任务打上id, 返回队列中取出返回之后再找到id对于的tcs.setresult()