欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

[Abp vNext 源码分析] - 4. 工作单元

程序员文章站 2022-07-02 16:23:33
一、简要说明 统一工作单元是一个比较重要的基础设施组件,它负责管理整个业务流程当中涉及到的数据库事务,一旦某个环节出现异常自动进行回滚处理。 在 ABP vNext 框架当中,工作单元被独立出来作为一个单独的模块( Volo.Abp.Uow )。你可以根据自己的需要,来决定是否使用统一工作单元。 二 ......

一、简要说明

统一工作单元是一个比较重要的基础设施组件,它负责管理整个业务流程当中涉及到的数据库事务,一旦某个环节出现异常自动进行回滚处理。

在 abp vnext 框架当中,工作单元被独立出来作为一个单独的模块(volo.abp.uow)。你可以根据自己的需要,来决定是否使用统一工作单元。

二、源码分析

整个 volo.abp.uow 项目的结构如下,从下图还是可以看到我们的老朋友 iunitofworkmanageriunitofwork ,不过也多了一些新东西。看一个模块的功能,首先从它的 module 入手,我们先看一下 abpunitofworkmodule 里面的实现。

[Abp vNext 源码分析] - 4. 工作单元

2.1 工作单元的初始模块

打开 abpunitofworkmodule 里面的代码,发现还是有点失望,里面就一个服务注册完成事件。

public override void preconfigureservices(serviceconfigurationcontext context)
{
    context.services.onregistred(unitofworkinterceptorregistrar.registerifneeded);
}

这里的结构和之前看的 审计日志 模块类似,就是注册拦截器的作用,没有其他特别的操作。

2.1.1 拦截器注册

继续跟进代码,其实现是通过 unitofworkhelper 来确定哪些类型应该集成 unitofwork 组件。

public static void registerifneeded(ionserviceregistredcontext context)
{
    // 根据回调传入的 context 绑定的实现类型,判断是否应该为该类型注册 unitofworkinterceptor 拦截器。
    if (unitofworkhelper.isunitofworktype(context.implementationtype.gettypeinfo()))
    {
        context.interceptors.tryadd<unitofworkinterceptor>();
    }
}

继续分析 unitofworkhelper 内部的代码,第一种情况则是实现类型 (implementationtype) 或类型的任一方法标注了 unitofwork 特性的话,都会为其注册工作单元拦截器。

第二种情况则是 abp vnext 为我们提供了一个新的 iunitofworkenabled 标识接口。只要继承了该接口的实现,都会被视为需要工作单元组件,会在系统启动的时候,自动为它绑定拦截器。

public static bool isunitofworktype(typeinfo implementationtype)
{
    // 第一种方式,即判断具体类型与其方法是否标注了 unitofwork 特性。
    if (hasunitofworkattribute(implementationtype) || anymethodhasunitofworkattribute(implementationtype))
    {
        return true;
    }

    // 第二种方式,即判断具体类型是否继承自 iunitofworkenabled 接口。
    if (typeof(iunitofworkenabled).gettypeinfo().isassignablefrom(implementationtype))
    {
        return true;
    }

    return false;
}

2.2 新的接口与抽象

在 abp vnext 当中,将一些 职责 从原有的工作单元进行了 分离。抽象出了 idatabaseapiisupportsrollbackitransactionapi 这三个接口,这三个接口分别提供了不同的功能和职责。

2.2.1 数据库统一访问接口

这里以 idatabaseapi 为例,它是提供了一个 数据库提供者(database provider) 的抽象概念,在 abp vnext 里面,是将 efcore 作为数据库概念来进行抽象的。(因为后续 mongodb 与 memorydb 与其同级)

你可以看作是 ef core 的 provider ,在 ef core 里面我们可以实现不同的 provider ,来让 ef core 支持访问不同的数据库。

[Abp vNext 源码分析] - 4. 工作单元

[Abp vNext 源码分析] - 4. 工作单元

而 abp vnext 这么做的意图就是提供一个统一的数据库访问 api,如何理解呢?这里以 efcoredatabaseapi<tdbcontext> 为例,你查看它的实现会发现它继承并实现了 isupportssavingchanges ,也就是说 efcoredatabaseapi<tdbcontext> 支持 savechanges 操作来持久化数据更新与修改。

public class efcoredatabaseapi<tdbcontext> : idatabaseapi, isupportssavingchanges
    where tdbcontext : iefcoredbcontext
{
    public tdbcontext dbcontext { get; }

    public efcoredatabaseapi(tdbcontext dbcontext)
    {
        dbcontext = dbcontext;
    }
    
    public task savechangesasync(cancellationtoken cancellationtoken = default)
    {
        return dbcontext.savechangesasync(cancellationtoken);
    }

    public void savechanges()
    {
        dbcontext.savechanges();
    }
}

也就是说 savechanges 这个操作,是 efcore 这个 databaseapi 提供了一种特殊操作,是该类型数据库的一种特殊接口。

如果针对于某些特殊的数据库,例如 influxdb 等有一些特殊的 api 操作时,就可以通过一个 databaseapi 类型进行处理。

2.2.2 数据库事务接口

通过最开始的项目结构会发现一个 itransactionapi 接口,这个接口只定义了一个 事务提交操作(commit),并提供了异步方法的定义。

public interface itransactionapi : idisposable
{
    void commit();

    task commitasync();
}

跳转到其典型实现 efcoretransactionapi 当中,可以看到该类型还实现了 isupportsrollback 接口。通过这个接口的名字,我们大概就知道它的作用,就是提供了回滚方法的定义。如果某个数据库支持回滚操作,那么就可以为其实现该接口。

其实这里按照语义,你也可以将它放在 efcoredatabaseapi<tdbcontext> 进行实现,因为回滚也是数据库提供的 api 之一,只是在 abp vnext 里面又将其归为事务接口进行处理了。

这里就不再详细赘述该类型的具体实现,后续会在单独的 ef core 章节进行说明。

2.3 工作单元的原理与实现

在 abp vnext 框架当中的工作单元实现,与原来 abp 框架有一些不一样。

2.3.1 内部工作单元 (子工作单元)

首先说内部工作单元的定义,现在是有一个新的 childunitofwork 类型作为 子工作单元。子工作单元本身并不会产生实际的业务逻辑操作,基本所有逻辑都是调用 unitofwork 的方法。

internal class childunitofwork : iunitofwork
{
    public guid id => _parent.id;

    public iunitofworkoptions options => _parent.options;

    public iunitofwork outer => _parent.outer;

    public bool isreserved => _parent.isreserved;

    public bool isdisposed => _parent.isdisposed;

    public bool iscompleted => _parent.iscompleted;

    public string reservationname => _parent.reservationname;

    public event eventhandler<unitofworkfailedeventargs> failed;
    public event eventhandler<unitofworkeventargs> disposed;

    public iserviceprovider serviceprovider => _parent.serviceprovider;

    private readonly iunitofwork _parent;

    // 只有一个带参数的构造函数,传入的就是外部的工作单元(带事务)。
    public childunitofwork([notnull] iunitofwork parent)
    {
        check.notnull(parent, nameof(parent));

        _parent = parent;

        _parent.failed += (sender, args) => { failed.invokesafely(sender, args); };
        _parent.disposed += (sender, args) => { disposed.invokesafely(sender, args); };
    }

    // 下面所有 iunitofwork 的接口方法,都是调用传入的 unitofwork 实例。
    public void setouter(iunitofwork outer)
    {
        _parent.setouter(outer);
    }

    public void initialize(unitofworkoptions options)
    {
        _parent.initialize(options);
    }

    public void reserve(string reservationname)
    {
        _parent.reserve(reservationname);
    }

    public void savechanges()
    {
        _parent.savechanges();
    }

    public task savechangesasync(cancellationtoken cancellationtoken = default)
    {
        return _parent.savechangesasync(cancellationtoken);
    }

    public void complete()
    {

    }

    public task completeasync(cancellationtoken cancellationtoken = default)
    {
        return task.completedtask;
    }

    public void rollback()
    {
        _parent.rollback();
    }

    public task rollbackasync(cancellationtoken cancellationtoken = default)
    {
        return _parent.rollbackasync(cancellationtoken);
    }

    public void oncompleted(func<task> handler)
    {
        _parent.oncompleted(handler);
    }

    public idatabaseapi finddatabaseapi(string key)
    {
        return _parent.finddatabaseapi(key);
    }

    public void adddatabaseapi(string key, idatabaseapi api)
    {
        _parent.adddatabaseapi(key, api);
    }

    public idatabaseapi getoradddatabaseapi(string key, func<idatabaseapi> factory)
    {
        return _parent.getoradddatabaseapi(key, factory);
    }

    public itransactionapi findtransactionapi(string key)
    {
        return _parent.findtransactionapi(key);
    }

    public void addtransactionapi(string key, itransactionapi api)
    {
        _parent.addtransactionapi(key, api);
    }

    public itransactionapi getoraddtransactionapi(string key, func<itransactionapi> factory)
    {
        return _parent.getoraddtransactionapi(key, factory);
    }

    public void dispose()
    {

    }

    public override string tostring()
    {
        return $"[unitofwork {id}]";
    }
}

虽然基本上所有方法的实现,都是调用的实际工作单元实例。但是有两个方法 childunitofwork 是空实现的,那就是 complete()dispose() 方法。

这两个方法一旦在内部工作单元调用了,就会导致 事务被提前提交,所以这里是两个空实现。

下面就是上述逻辑的伪代码:

using(var transactioinuow = uowmgr.begin())
{
    // 业务逻辑 1 。
    using(var childuow1 = uowmgr.begin())
    {
        // 业务逻辑 2。
        using(var childuow2 = uowmgr.begin())
        {
            // 业务逻辑 3。
            childuow2.complete();
        }
        
        childuow1.complete();
    }
    transactioinuow.complete();
}

以上结构一旦某个内部工作单元抛出了异常,到会导致最外层带事务的工作单元无法调用 complete() 方法,也就能够保证我们的 数据一致性

2.3.2 外部工作单元

首先我们查看 unitofwork 类型和 iunitofwork 的定义和属性,可以获得以下信息。

  1. 每个工作单元是瞬时对象,因为它继承了 itransientdependency 接口。

  2. 每个工作单元都会有一个 guid 作为其唯一标识信息。

  3. 每个工作单元拥有一个 iunitofworkoptions 来说明它的配置信息。

    这里的配置信息主要指一个工作单元在执行时的 超时时间是否包含一个事务,以及它的 事务隔离级别(如果是事务性的工作单元的话)。

  4. 每个工作单元存储了 idatabaseapiitransactionapi 的集合,并提供了访问/存储接口。

  5. 提供了两个操作事件 faileddisposed

    这两个事件分别在工作单元执行失败以及被释放时(调用 dispose() 方法)触发,开发人员可以挂载这两个事件提供自己的处理逻辑。

  6. 工作单元还提供了一个工作单元完成事件组。

    用于开发人员在工作单元完成时(调用complete() 方法)挂载自己的处理事件,因为是 list<func<task>> 所以你可以指定多个,它们都会在调用 complete() 方法之后执行,例如如下代码:

    using (var uow = _unitofworkmanager.begin())
    {
     uow.oncompleted(async () => completed = true);
     uow.oncompleted(async()=>console.writeline("hello abp vnext"));
    
     uow.complete();
    }

以上信息是我们查看了 unitofwork 的属性与接口能够直接得出的结论,接下来我会根据一个工作单元的生命周期来说明一遍工作单元的实现。

一个工作单元的的构造是通过工作单元管理器实现的(iunitofworkmanager),通过它的 begin() 方法我们会获得一个工作单元,至于这个工作单元是外部工作单元还是内部工作单元,取决于开发人员传入的参数。

public iunitofwork begin(unitofworkoptions options, bool requiresnew = false)
{
    check.notnull(options, nameof(options));

    // 获得当前的工作单元。
    var currentuow = current;
    // 如果当前工作单元不为空,并且开发人员明确说明不需要构建新的工作单元时,创建内部工作单元。
    if (currentuow != null && !requiresnew)
    {
        return new childunitofwork(currentuow);
    }

    // 调用 createnewunitofwork() 方法创建新的外部工作单元。
    var unitofwork = createnewunitofwork();
    // 使用工作单元配置初始化外部工作单元。
    unitofwork.initialize(options);

    return unitofwork;
}

这里需要注意的就是创建新的外部工作单元方法,它这里就使用了 ioc 容器提供的 scope 生命周期,并且在创建之后会将最外部的工作单元设置为最新创建的工作单元实例。

private iunitofwork createnewunitofwork()
{
    var scope = _serviceprovider.createscope();
    try
    {
        var outeruow = _ambientunitofwork.unitofwork;

        var unitofwork = scope.serviceprovider.getrequiredservice<iunitofwork>();

        // 设置当前工作单元的外部工作单元。
        unitofwork.setouter(outeruow);

        // 设置最外层的工作单元。
        _ambientunitofwork.setunitofwork(unitofwork);

        unitofwork.disposed += (sender, args) =>
        {
            _ambientunitofwork.setunitofwork(outeruow);
            scope.dispose();
        };

        return unitofwork;
    }
    catch
    {
        scope.dispose();
        throw;
    }
}

上述描述可能会有些抽象,结合下面这两幅图可能会帮助你的理解。

[Abp vNext 源码分析] - 4. 工作单元

[Abp vNext 源码分析] - 4. 工作单元

我们可以在任何地方注入 iambientunitofwork 来获取当前活动的工作单元,关于 iambientunitofworkiunitofworkaccessor 的默认实现,都是使用的 ambientunitofwork

在该类型的内部,通过 asynclocal<iunitofwork> 来确保在不同的 异步上下文切换 过程中,其值是正确且统一的。

构造了一个外部工作单元之后,我们在仓储等地方进行数据库操作。操作完成之后,我们需要调用 complete() 方法来说明我们的操作已经完成了。如果你没有调用 complete() 方法,那么工作单元在被释放的时候,就会产生异常,并触发 failed 事件。

public virtual void dispose()
{
    if (isdisposed)
    {
        return;
    }

    isdisposed = true;

    disposetransactions();

    // 只有调用了 complete()/completeasync() 方法之后,iscompleted 的值才为 true。
    if (!iscompleted || _exception != null)
    {
        onfailed();
    }

    ondisposed();
}

所以,我们在手动使用工作单元管理器构造工作单元的时候,一定要注意调用 complete() 方法。

既然 complete() 方法这么重要,它内部究竟做了什么事情呢?下面我们就来看一下。

public virtual void complete()
{
    // 是否已经进行了回滚操作,如果进行了回滚操作,则不提交工作单元。
    if (_isrolledback)
    {
        return;
    }

    // 防止多次调用 complete 方法,原理就是看 _iscompleting 或者 iscompleted 是不是已经为 true 了。
    preventmultiplecomplete();

    try
    {
        _iscompleting = true;
        savechanges();
        committransactions();
        iscompleted = true;
        // 数据储存了,事务提交了,则说明工作单元已经完成了,遍历完成事件集合,依次调用这些方法。
        oncompleted();
    }
    catch (exception ex)
    {
        // 一旦在持久化或者是提交事务时出现了异常,则往上层抛出。
        _exception = ex;
        throw;
    }
}

public virtual void savechanges()
{
    // 遍历集合,如果对象实现了 isupportssavingchanges 则调用相应的方法进行数据持久化。
    foreach (var databaseapi in _databaseapis.values)
    {
        (databaseapi as isupportssavingchanges)?.savechanges();
    }
}

protected virtual void committransactions()
{
    // 遍历事务 api 提供者,调用提交事务方法。
    foreach (var transaction in _transactionapis.values)
    {
        transaction.commit();
    }
}

protected virtual void rollbackall()
{
    // 回滚操作,还是从集合里面判断是否实现了 isupportsrollback 接口,来调用具体的实现进行回滚。
    foreach (var databaseapi in _databaseapis.values)
    {
        try
        {
            (databaseapi as isupportsrollback)?.rollback();
        }
        catch { }
    }

    foreach (var transactionapi in _transactionapis.values)
    {
        try
        {
            (transactionapi as isupportsrollback)?.rollback();
        }
        catch { }
    }
}

这里可以看到,abp vnext 完全剥离了具体事务或者回滚的实现方法,都是移动到具体的模块进行实现的,也就是说在调用了 complete() 方法之后,我们的事务就会被提交了。

本小节从创建、提交、释放这三个生命周期讲解了工作单元的原理和实现,关于具体的事务和回滚实现,我会放在下一篇文章进行说明,这里就不再赘述了。

为什么工作单元常常配合 using 语句块 使用,就是因为在提交工作单元之后,就可以自动调用 dispose() 方法,对工作单元的状态进行校验,而不需要我们手动处理。

using(var uowa = _uowmgr.begion())
{
    uowa.complete();
}

2.3.3 保留工作单元

在 abp vnext 里面,工作单元有了一个新的动作/属性,叫做 是否保留(is reserved)。它的实现也比较简单,指定了一个 reservationname,然后设置 isreservedtrue 就完成了整个动作。

那么它的作用是什么呢?这块内容我会在工作单元管理器小节进行解释。

2.4 工作单元管理器

工作单元管理器在工作单元的原理/实现里面已经有过了解,工作单元管理器主要负责工作单元的创建。

这里我再挑选一个工作单元模块的单元测试,来说明什么叫做 保留工作单元

[fact]
public async task unitofworkmanager_reservation_test()
{
    _unitofworkmanager.current.shouldbenull();

    using (var uow1 = _unitofworkmanager.reserve("reservation1"))
    {
        _unitofworkmanager.current.shouldbenull();

        using (var uow2 = _unitofworkmanager.begin())
        {
            // 此时 current 值是 uow2 的值。
            _unitofworkmanager.current.shouldnotbenull();
            _unitofworkmanager.current.id.shouldnotbe(uow1.id);

            await uow2.completeasync();
        }

        // 这个时候,因为 uow1 是保留工作单元,所以不会被获取到,应该为 null。
        _unitofworkmanager.current.shouldbenull();

        // 调用了该方法,设置 uow1 的 isreserved 属性为 false。
        _unitofworkmanager.beginreserved("reservation1");

        // 获得到了值,并且诶它的 id 是 uow1 的值。
        _unitofworkmanager.current.shouldnotbenull();
        _unitofworkmanager.current.id.shouldbe(uow1.id);

        await uow1.completeasync();
    }

    _unitofworkmanager.current.shouldbenull();
}

通过对代码的注释和断点调试的结果,我们知道了通过 reserved 创建的工作单元它的 isreserved 属性是 true,所以我们调用 iunitofworkmanager.current 访问的时候,会忽略掉保留工作单元,所以得到的值就是 null

但是通过调用 beginreserved(string name) 方法,我们就可以将指定的工作单元置为 当前工作单元,这是因为调用了该方法之后,会重新调用工作单元的 initialize() 方法,在该方法内部,又会将 isreserved 设置为 false

public virtual void initialize(unitofworkoptions options)
{
    // ... 其他代码。
    // 注意这里。
    isreserved = false;
}

保留工作单元的用途主要是在某些特殊场合,在某些特定条件下不想暴露给 iunitofworkmanager.current 时使用。

2.5 工作单元拦截器

如果我们每个地方都通过工作单元管理器来手动创建工作单元,那还是比较麻烦的。abp vnext 通过拦截器,来为特定的类型(符合规则)自动创建工作单元。

关于拦截器的注册已经在文章最开始说明了,这里就不再赘述,我们直接来看拦截器的内部实现。其实在拦截器的内部,一样是使用工作单元拦截器我来为我们创建工作单元的。只不过通过拦截器的方式,就能够无感知/无侵入地为我们构造健壮的数据持久化机制。

public override void intercept(iabpmethodinvocation invocation)
{
    // 如果类型没有标注 unitofwork 特性,或者没有继承 iunitofworkenabled 接口,则不创建工作单元。
    if (!unitofworkhelper.isunitofworkmethod(invocation.method, out var unitofworkattribute))
    {
        invocation.proceed();
        return;
    }

    // 通过工作单元管理器构造工作单元。
    using (var uow = _unitofworkmanager.begin(createoptions(invocation, unitofworkattribute)))
    {
        invocation.proceed();
        uow.complete();
    }
}

关于在 asp.net core mvc 的工作单元过滤器,在实现上与拦截器大同小异,后续讲解 asp.net core mvc 时再着重说明。

三、总结

abp vnext 框架通过统一工作单元为我们提供了健壮的数据库访问与持久化机制,使得开发人员在进行软件开发时,只需要关注业务逻辑即可。不需要过多关注与数据库等基础设施的交互,这一切交由框架完成即可。

这里多说一句,abp vnext 本身就是面向 ddd 所设计的一套快速开发框架,包括值对象(valueobject)这些领域驱动开发的特殊概念也被加入到框架实现当中。

微服务作为 ddd 的一个典型实现,ddd 为微服务的划分提供理论支持。这里为大家推荐《领域驱动设计:软件核心复杂性应对之道》这本书,该书籍由领域驱动设计的提出者编写。

看了之后发现在大型系统当中(博主之前做 erp 的,吃过这个亏)很多时候都是凭感觉来写,没有一个具体的理论来支持软件开发。最近拜读了上述书籍之后,发现领域驱动设计(ddd)就是一套完整的方法论(当然 不是银弹)。大家在学习并理解了领域驱动设计之后,使用 abp vnext 框架进行大型系统开发就会更加得心应手。

四、后记

关于本系列文章的更新,因为最近自己在做 物联网(rust 语言学习、数字电路设计)相关的开发工作,所以 5 月到 6 月这段时间都没怎么去研究 abp vnext。

最近在学习领域驱动设计的过程中,发现 abp vnext 就是为 ddd 而生的,所以趁热打铁想将后续的 abp vnext 文章一并更新,预计在 7 月内会把剩余的文章补完(核心模块)。