欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

你一定看得懂的 DDD+CQRS+EDA+ES 核心思想与极简可运行代码示例

程序员文章站 2022-03-10 16:15:04
前言 随着分布式架构微服务的兴起,DDD(领域驱动设计)、CQRS(命令查询职责分离)、EDA(事件驱动架构)、ES(事件溯源)等概念也一并成为时下的火热概念,我也在早些时候阅读了一些大佬的分析文,学习相关概念,不过一直有种雾里看花、似懂非懂的感觉。经过一段时间的学习和研究大佬的代码后,自己设计实现 ......

前言

       随着分布式架构微服务的兴起,ddd(领域驱动设计)、cqrs(命令查询职责分离)、eda(事件驱动架构)、es(事件溯源)等概念也一并成为时下的火热概念,我也在早些时候阅读了一些大佬的分析文,学习相关概念,不过一直有种雾里看花、似懂非懂的感觉。经过一段时间的学习和研究大佬的代码后,自己设计实现了一套我消化理解后的代码。为了突出重点,避免受到大量实现细节的干扰,当然也是懒(这才是主要原因),其中的所有基础设施都使用了现成的库。所实现的研究成果也做成了傻瓜式一键体验(我对对着黑框框敲命令没什么兴趣,能点两下鼠标搞定的事我绝不在键盘上敲又臭又长的命令,敲命令能敲出优越感的人我觉得应该是抖m)。

正文

ddd(领域驱动设计)

       这一定是最群魔乱舞的一个概念,每个大佬都能讲出一大篇演讲稿,但都或多或少存在差异或分歧,在我初看 ddd 时,我就被整懵了,这到底是咋回事?

       现在回过头来看,ddd 其实是一个高阶思想概念,并不能指导开发者如何敲键盘,是指导人如何思考领域问题,而不是指导人思考出具体的领域的。正是因为中间隔了一层虚幻飘渺的概念,导致不同的人得出了不同的结论。还好 ddd 存在一些比较具体容易落实的概念,现在就来讲下我对这些常见基础概念的理解和我编码时的基本原则,希望大家能在看大佬的文章时不用一脸懵逼,也进行下心得交流。

entity(实体)

       实体是一个存储数据的类,如果类中包含自身的合法性验证规则之类的方法,一般称之为充血模型,相对的单纯保存数据的则称为贫血模型(有时也叫做 poco 类)。实体有一个重要性质,相等性是由标识属性决定的,这个标识可以是一个简单的 int 型的 id,也可以是多个内部数据的某种组合(类似数据库表的复合字段主键)。除标识外的其他东西均不对两个实体对象的相等性产生影响。并且实体的数据属性是可更改的。

       有很多大佬认为实体应该是充血的,但在我看来,贫血的似乎更好,因为需求的不稳定性可能导致这些规则并不稳定,或规则本身并不唯一,在不同场合可能需要不同规则。这时候充血模型无论怎么办都很别扭,如果把规则定义和校验交给外部组件,这些需求就很容易满足,比如使用 fluentvalidate 为一种实体定义多套规则或对内部的规则条目按情况重新组合。

valueobject(值对象)

       值对象也是用来存储数据的类。与实体相对,值对象没有标识属性,其相等性由所有内部属性决定,当且仅当两个值对象实例的所有属性一一相等时,这两个值对象相等。并且值对象的所有属性为只读,仅能在构造函数中进行唯一一次设置,如果希望修改某个值对象的某一属性,唯一的办法是使用新的值对象替换旧的值对象。并且值对象经常作为实体的属性存在。

      这个概念看起来和实体特别相似,都是用来存储数据的,但也有些性质上的根本不同。网上的大佬通常会为值对象编写基类,但我认为,值对象和实体在代码实现上并没有这么大的区别。可以看作整数和小数在计算机中表现为不同的数据类型,但在数学概念上他们没有区别,仅仅只是因为离散的计算机系统无法完美表示连续的数学数字而产生的缝合怪。我倾向于根据类的代码定义所表现出来的性质与谁相符就将其视为谁,而不是看实现的接口或继承的基类。因为需求的不确定性会导致他们可能会发生转换,根据代码进行自我描述来判断可以避免很多潜在的麻烦。

aggregate,aggregate root(聚合及聚合根)

       聚合根表示一个领域所操作的*实体类型,其他附属数据都是聚合根的内部属性,聚合根和其所属的其他实体的组合称为聚合。这是一个纯概念性的东西。对领域实体的操作必须从聚合根开始,也就是说确保数据完整性的基本单位是聚合。大佬的代码中经常会用一个空接口来表示聚合根,如果某个实体实现了这个接口,就表示这个实体可以是一个聚合根。请注意,聚合根不一定必须是*类型,也可以是其他实体的一个属性。这表示一个实体在,某些情况下是聚合根,而其他情况下是另一个聚合根的内部属性。也就是说实体之间并非严格的树状关系,而是一般有向图状关系。

       我认为定义这样的空接口实际意不大,反而可能造成一些误会。如果某个实体由于需求变动导致不再会成为聚合根,那这个实体事实上将不再是聚合根,但人是会犯错的,很可能忘记去掉聚合根接口,这时代码与事实将产生矛盾。所以我认为聚合根应该基于事实而不是代码。当一个实体不再会作为聚合根使用时,将相关代码删除,就同时表示它不再是聚合根,阅读代码的人也因为看不到相关代码而自动认为它不是聚合根。在代码中的体现方式与下一个的概念有关。

repository(仓储)

       仓储表示对聚合根的持久化的抽象,在代码上可表现为声明了增删查改的相关方法的接口,而仓储的实现类负责具体解决如何对聚合根实体进行增删查改。例如在仓储内部使用数据库完成具体工作。

       如果一个仓储负责管理一个聚合根实体的持久化或者说存取,那这个实体就是一个事实上的聚合根。那么在这里,就可以在代码操作上将看到某个实体被仓储管理等价为这个实体是聚合根,反之就不是。也就是说,如果将某个实体的仓储的最后一个实际使用代码删除,这个实体就在事实上不再是聚合根,此时代码表现与事实将完美同步,不再会产生矛盾。至于由于没看到某个实体的仓储而将实体误认为不是聚合根,这其实并没有任何问题。这说明在你所关注的领域中这个实体确实不是聚合根,而这个实体可能作为聚合根使用的领域你根本不关心,所以看不到,那这个实体是否在其他领域作为聚合根使用对你而言其实是无所谓的。

domain service(领域服务)

       这就涉及到业务代码的编写了。如果一个业务需要由多个聚合根配合完成,也就是需要多个仓储,那么就应该将这些对仓储的调用封装进一个服务,统一对外暴露提供服务。

       如果这些仓储操作需要具有事务性,也可以在这里进行协调管理。如果某个业务只需要一个仓储参与,要不要专门封装一个服务就看你高兴了。

cqrs(命令查询职责分离)

       cqrs 本质上是一种指导思想,指导开发者如何设计一个低耦合高可扩展架构的思想。传统的 curd 将对数据的操作分为 读、写、改、删,将他们封装在一起导致他们将紧密耦合在相同的数据源中,不利于扩展。cqrs 则将对数据的操作分为会改变数据源的和不会改变数据源的,前者称为命令,后者称为查询。将他们分别封装能让他们各自使用不同的数据源,提高可扩展性。

       其中命令是一个会改变数据源,但不返回任何值的方法;查询是会返回值,但绝不会改变数据源的方法。但是在我的编码中,命令是可以返回值的,至于要返回什么,根据实际情况调整。比如最简单的返回一个 bool 表示操作是否成功以决定接下来的业务流程该走向何方,这是很常见的情况。所以在我的概念里,一个方法是命令还是查询实际上只看这个方法是否会改变数据源,要封装在一起还是分别封装都无所谓。建议分开封装到不同的仓储中,通过仓储关联到具体的数据源,命令和查询的仓储关联到不同的数据源的时候,自然就完成了读写分离。通过起名来明示方法的目的应该可以轻松分辨一个方法属于命令还是查询。只要脑子里有这个概念,要实现扩展办法多的是。

事件驱动架构(eda)

       可以说所有图形界面(gui)编程都是清一色的事件驱动架构,这东西一点也不稀奇。说白了,eda 就是一种被动架构,通过某些事情的发生来触发某些操作的执行,否则系统就随时待命,按兵不动。

       eda 的实现需要一个中介才能实现,在 windows 中,这个东西叫做 windows 消息队列(消息循环)和事件处理器。同样的,在非 gui 编程中也需要这俩东西,但通常被称为消息总线和消息消费者。在分布式系统中,这个中介将不与系统在同一进程甚至不在同一设备中,称为分布式消息总线。这样在开发时可以分成两拨,一拨负责写生产并发送事件的代码,一拨负责写接收事件信息并进行处理的代码。他们之间的沟通仅限于交流关心的事件叫什么以及事件携带了什么信息。至于产生的消息是如何送到正确的消费端并触发消费处理器的,那是消息总线的事。如果一个消息总线需要这两拨人了解中间的过程甚至需要自己去实现,那这个消息总线是个废品,也起不到什么解耦的效果,甚至是个拖后腿的东西。

eda + cqrs

       当他们结合在一起,就产生了命令或查询的发起和实际处理实现可以分离的效果。命令的发起方向命令总线发送一条命令消息并带上必要参数,消费方收到消息后获取参数完成任务并返回结果。命令可以看作一种特殊的事件,命令只由一个命令处理器处理,并可向发送方返回一个处理结果;事件由所有对同种事件感兴趣的事件处理器处理,不向事件发送方返回任何结果。

       事件处理器的执行顺序是不确定的,所以任何事件处理器都必须独立完成事件处理。如果两个事件处理之间存在因果依赖,应该在前置事件处理后由事件处理器发布新事件,并由后置事件处理器去处理前置事件产生的新事件,而不是让它们处理同一事件。

es(事件溯源)

       事件溯源表示能追查一个事件的源头,甚至与之相关的其他事件的概念,说句大白话就是刨祖坟。es 对历史状态回溯的需求有着天然的支持,最常见的如撤销重做。而 es 一般会配合 eda 使用,es 保存 eda 产生的事件信息,并且这些信息有只读性和因果连贯性。这顺便能让我们对系统中的实体究竟是如何一步一步变成现在这个样子有一个清晰的了解。毕竟实体具有可变性,实体信息一旦改变,旧的信息就会丢失,es 刚好弥补了这个缺陷。

代码展示说明

       此处的事件消息中介使用 mediatr 实现。

接口

       ddd 相关

实体

       定义一个实体的基本要素,实现接口的类就是实体,值对象没有接口或基类,只看代码所展现的性质是否符合值对象的定义,聚合根没有接口或基类,只看实体是否被仓储使用,领域服务说白了就是个打包封装,根据情况来决定,例如重构时提取方法即可视为封装服务。在此处可简单认为没有实现实体接口的数据类是值对象:

 1 /// <summary>
 2 /// 实体接口
 3 /// </summary>
 4 public interface ientity {}
 5 
 6 /// <summary>
 7 /// 泛型实体接口,约束id属性
 8 /// </summary>
 9 public interface ientity<tkey> : ientity
10     where tkey : iequatable<tkey>
11 {
12     tkey id { get; set; }
13 }

仓储接口

       仓储接口细分为可读仓储和可写仓储,可写仓储有一个分支为可批量提交仓储,表示修改操作会在调用提交保存方法后批量保存,也就是事务(就是用来替代操作单元的,这东西就有一个提交操作,名字也莫名其妙,我曾经一直无法理解这东西是干嘛的),接口声明参考 ef core,示例实现也基于 ef core。由于已经公开了查询接口类型的 set 属性,使用者可以任意自定义查询。

 1     public interface ibulkoperablevariablerepository<tresult, tvariablerepository, tentity>
 2         where tentity : ientity
 3         where tvariablerepository : ivariablerepository<tentity>
 4     {
 5         tresult savechanges();
 6         task<tresult> savechangesasync(cancellationtoken cancellationtoken);
 7     }
 8 
 9     public interface ibulkoperablevariablerepository<tvariablerepository, tentity>
10         where tentity : ientity
11         where tvariablerepository : ivariablerepository<tentity>
12     {
13         void savechanges();
14         task savechangesasync(cancellationtoken cancellationtoken);
15     }
16 
17     public interface ireadonlyrepository<tentity>
18         where tentity : ientity
19     {
20         iqueryable<tentity> set { get; }
21         tentity find(tentity entity, bool ignorenullvalue);
22         task<tentity> findasync(tentity entity, bool ignorenullvalue);
23 
24     }
25     public interface ireadonlyrepository<tentity, tkey> : ireadonlyrepository<tentity>
26         where tentity : ientity<tkey>
27         where tkey : iequatable<tkey>
28     {
29         tentity find(tkey key);
30         task<tentity> findasync(tkey key);
31         iqueryable<tentity> find(ienumerable<tkey> keys);
32     }
33 
34     public interface ivariablerepository<tentity>
35         where tentity : ientity
36     {
37         void add(tentity entity);
38         task addasync(tentity entity, cancellationtoken cancellationtoken);
39         void update(tentity entity);
40         task updateasync(tentity entity, cancellationtoken cancellationtoken);
41         void delete(tentity entity, bool issoftdelete);
42         task deleteasync(tentity entity, bool issoftdelete, cancellationtoken cancellationtoken);
43         void addrange(ienumerable<tentity> entities);
44         task addrangeasync(ienumerable<tentity> entities, cancellationtoken cancellationtoken);
45         void updaterange(ienumerable<tentity> entities);
46         task updaterangeasync(ienumerable<tentity> entities, cancellationtoken cancellationtoken);
47         void deleterange(ienumerable<tentity> entities, bool issoftdelete);
48         task deleterangeasync(ienumerable<tentity> entities, bool issoftdelete, cancellationtoken cancellationtoken);
49     }
50     public interface ivariablerepository<tentity, tkey> : ivariablerepository<tentity>
51         where tentity : ientity<tkey>
52         where tkey : iequatable<tkey>
53     {
54         void delete(tkey key, bool issoftdelete);
55         task deleteasync(tkey key, bool issoftdelete, cancellationtoken cancellationtoken);
56         void deleterange(ienumerable<tkey> keys, bool issoftdelete);
57         task deleterangeasync(ienumerable<tkey> keys, bool issoftdelete, cancellationtoken cancellationtoken);
58     }
59 
60     public interface irepository<tentity> : ivariablerepository<tentity>, ireadonlyrepository<tentity>
61         where tentity : ientity
62     {
63     }
64 
65     public interface irepository<tentity, tkey> : irepository<tentity>, ivariablerepository<tentity, tkey>, ireadonlyrepository<tentity, tkey>
66         where tentity : ientity<tkey>
67         where tkey : iequatable<tkey>
68     {
69     }

ef core 专用特化版仓储接口

 1     public interface iefcorerepository<tentity, tdbcontext> : ireadonlyrepository<tentity>, ivariablerepository<tentity>, ibulkoperablevariablerepository<int, iefcorerepository<tentity, tdbcontext>, tentity>
 2         where tentity : class, ientity
 3         where tdbcontext : dbcontext
 4     { }
 5 
 6     public interface iefcorerepository<tentity, tkey, tdbcontext> : iefcorerepository<tentity, tdbcontext>, ireadonlyrepository<tentity, tkey>, ivariablerepository<tentity, tkey>
 7         where tentity : class, ientity<tkey>
 8         where tkey : iequatable<tkey>
 9         where tdbcontext : dbcontext
10     { }

 

       cqrs+eda 相关:

命令接口

       分为带返回值命令和无返回值命令

1 public interface icommand<out tresult> : icommand
2 {
3 }
4 
5 public interface icommand : imessage
6 {
7 }

命令总线接口

       同样分为带返回值和无返回值

 1 public interface icommandbus<in tcommand>
 2     where tcommand : icommand
 3 {
 4     task sendcommandasync(tcommand command, cancellationtoken cancellationtoken);
 5 }
 6 
 7 public interface icommandbus<in tcommand, tresult> : icommandbus<tcommand>
 8     where tcommand : icommand<tresult>
 9 {
10     new task<tresult> sendcommandasync(tcommand command, cancellationtoken cancellationtoken);
11 }

命令处理器接口

       同上

 1 public interface icommandhandler<in tcommand>
 2     where tcommand : icommand
 3 {
 4     task handle(tcommand command, cancellationtoken cancellationtoken);
 5 }
 6 
 7 public interface icommandhandler<in tcommand, tresult> : icommandhandler<tcommand>
 8     where tcommand : icommand<tresult>
 9 {
10     new task<tresult> handle(tcommand command, cancellationtoken cancellationtoken);
11 }

命令存储接口

       可用于历史命令追溯,返回值可用于返回存储是否成功或其他必要信息

 1 public interface icommandstore
 2 {
 3     void save(icommand command);
 4 
 5     task saveasync(icommand command, cancellationtoken cancellationtoken);
 6 }
 7 
 8 public interface icommandstore<tresult> : icommandstore
 9 {
10     new tresult save(icommand command);
11 
12     new task<tresult> saveasync(icommand command, cancellationtoken cancellationtoken);
13 }

事件接口

       没有返回值

1 public interface ievent : imessage
2 {
3 }

事件总线接口

       同上

 1 public interface ieventbus
 2 {
 3     void publishevent(ievent @event);
 4 
 5     task publisheventasync(ievent @event, cancellationtoken cancellationtoken);
 6 }
 7 
 8 public interface ieventbus<tresult> : ieventbus
 9 {
10     new tresult publishevent(ievent @event);
11 
12     new task<tresult> publisheventasync(ievent @event, cancellationtoken cancellationtoken);
13 }

事件处理器接口

       同上

1 public interface ieventhandler<in tevent>
2     where tevent : ievent
3 {
4     task handle(tevent @event, cancellationtoken cancellationtoken);
5 }

事件存储接口

       同命令存储接口

 1 public interface ieventstore
 2 {
 3     void save(ievent @event);
 4 
 5     task saveasync(ievent @event, cancellationtoken cancellationtoken = default);
 6 }
 7 
 8 public interface ieventstore<tresult> : ieventstore
 9 {
10     new tresult save(ievent @event);
11 
12     new task<tresult> saveasync(ievent @event, cancellationtoken cancellationtoken = default);
13 }

(命令、事件)消息基础接口

1 public interface imessage
2 {
3     guid id { get; }
4 
5     datetimeoffset timestamp { get; }
6 }

       相关接口定义完毕。

实现

ef core 泛型仓储

       未知主键的实体使用实体对象为条件查找时,使用动态生成表达式的方法

  1     public class efcorerepository<tentity, tkey, tdbcontext> : efcorerepository<tentity, tdbcontext>, iefcorerepository<tentity, tkey, tdbcontext>
  2         where tentity : class, ientity<tkey>
  3         where tkey : iequatable<tkey>
  4         where tdbcontext : dbcontext
  5     {
  6         public efcorerepository(tdbcontext dbcontext) : base(dbcontext)
  7         {
  8         }
  9 
 10         public virtual void delete(tkey key, bool issoftdelete)
 11         {
 12             var entity = find(key);
 13             delete(entity, issoftdelete);
 14         }
 15 
 16         public virtual task deleteasync(tkey key, bool issoftdelete, cancellationtoken cancellationtoken = default)
 17         {
 18             delete(key, issoftdelete);
 19             return task.completedtask;
 20         }
 21 
 22         public virtual void deleterange(ienumerable<tkey> keys, bool issoftdelete)
 23         {
 24             var entities = find(keys).toarray();
 25             dbset.attachrange(entities);
 26             deleterange(entities, issoftdelete);
 27         }
 28 
 29         public virtual task deleterangeasync(ienumerable<tkey> keys, bool issoftdelete, cancellationtoken cancellationtoken = default)
 30         {
 31             deleterange(keys, issoftdelete);
 32             return task.completedtask;
 33         }
 34 
 35         public virtual tentity find(tkey key)
 36         {
 37             return set.singleordefault(x => x.id.equals(key));
 38         }
 39 
 40         public virtual iqueryable<tentity> find(ienumerable<tkey> keys)
 41         {
 42             return set.where(x => keys.contains(x.id));
 43         }
 44 
 45         public override tentity find(tentity entity, bool ignorenullvalue)
 46         {
 47             return base.find(entity, ignorenullvalue);
 48         }
 49 
 50         public virtual task<tentity> findasync(tkey key)
 51         {
 52             return set.singleordefaultasync(x => x.id.equals(key));
 53         }
 54 
 55         public override task<tentity> findasync(tentity entity, bool ignorenullvalue)
 56         {
 57             return base.findasync(entity, ignorenullvalue);
 58         }
 59     }
 60 
 61     public class efcorerepository<tentity, tdbcontext> : iefcorerepository<tentity, tdbcontext>
 62         where tentity : class, ientity
 63         where tdbcontext : dbcontext
 64     {
 65         protected readonly tdbcontext dbcontext;
 66         protected readonly dbset<tentity> dbset;
 67 
 68         protected virtual void processchangedentity()
 69         {
 70             var changedentities = dbcontext.changetracker.entries()
 71                 .where(x => x.state == entitystate.added || x.state == entitystate.modified);
 72             foreach (var entity in changedentities)
 73             {
 74                 (entity as ioptimisticconcurrencysupported)?.generatenewconcurrencystamp();
 75             }
 76 
 77             var changedentitiesgroups = changedentities.groupby(x => x.state);
 78             foreach (var group in changedentitiesgroups)
 79             {
 80                 switch (group)
 81                 {
 82                     case var entities when entities.key == entitystate.added:
 83                         foreach (var entity in entities)
 84                         {
 85                             if (entity is iactivecontrollable)
 86                             {
 87                                 (entity as iactivecontrollable).active ??= true;
 88                             }
 89                         }
 90                         break;
 91                     case var entities when entities.key == entitystate.modified:
 92                         foreach (var entity in entities)
 93                         {
 94                             (entity as ientity)?.processcreationinfowhenmodified(dbcontext);
 95 
 96                             if (entity is iactivecontrollable && (entity as iactivecontrollable).active == null)
 97                             {
 98                                 entity.property(nameof(iactivecontrollable.active)).ismodified = false;
 99                             }
100                         }
101                         break;
102                     default:
103                         break;
104                 }
105             }
106         }
107 
108         protected virtual void resetdeletedmark(params tentity[] entities)
109         {
110             foreach (var entity in entities)
111             {
112                 if (entity is ilogicallydeletable)
113                 {
114                     (entity as ilogicallydeletable).isdeleted = false;
115                 }
116             }
117         }
118 
119         public efcorerepository(tdbcontext dbcontext)
120         {
121             this.dbcontext = dbcontext;
122             dbset = this.dbcontext.set<tentity>();
123         }
124 
125         public virtual void add(tentity entity)
126         {
127             dbset.add(entity);
128         }
129 
130         public virtual task addasync(tentity entity, cancellationtoken cancellationtoken = default)
131         {
132             return dbset.addasync(entity, cancellationtoken).astask();
133         }
134 
135         public virtual void addrange(ienumerable<tentity> entities)
136         {
137             dbset.addrange(entities);
138         }
139 
140         public virtual task addrangeasync(ienumerable<tentity> entities, cancellationtoken cancellationtoken = default)
141         {
142             return dbset.addrangeasync(entities, cancellationtoken);
143         }
144 
145         public virtual void delete(tentity entity, bool issoftdelete)
146         {
147             dbset.attach(entity);
148             if (issoftdelete)
149             {
150                 if (entity is ilogicallydeletable)
151                 {
152                     (entity as ilogicallydeletable).isdeleted = true;
153                 }
154                 else
155                 {
156                     throw new invalidoperationexception($"要求软删除的实体不实现{nameof(ilogicallydeletable)}接口。");
157                 }
158             }
159             else
160             {
161                 dbset.remove(entity);
162             }
163         }
164 
165         public virtual task deleteasync(tentity entity, bool issoftdelete, cancellationtoken cancellationtoken = default)
166         {
167             delete(entity, issoftdelete);
168             return task.completedtask;
169         }
170 
171         public virtual void deleterange(ienumerable<tentity> entities, bool issoftdelete)
172         {
173             dbset.attachrange(entities);
174             foreach (var entity in entities)
175             {
176                 delete(entity, issoftdelete);
177             }
178         }
179 
180         public virtual task deleterangeasync(ienumerable<tentity> entities, bool issoftdelete, cancellationtoken cancellationtoken = default)
181         {
182             deleterange(entities, issoftdelete);
183             return task.completedtask;
184         }
185 
186         public virtual tentity find(tentity entity, bool ignorenullvalue)
187         {
188             var exp = generatewhere(dbcontext, entity, ignorenullvalue);
189 
190             return set.singleordefault(exp);
191         }
192 
193         public virtual task<tentity> findasync(tentity entity, bool ignorenullvalue)
194         {
195             var exp = generatewhere(dbcontext, entity, ignorenullvalue);
196 
197             return set.singleordefaultasync(exp);
198         }
199 
200         public virtual int savechanges()
201         {
202             processchangedentity();
203             return dbcontext.savechanges();
204         }
205 
206         public virtual task<int> savechangesasync(cancellationtoken cancellationtoken = default)
207         {
208             processchangedentity();
209             return dbcontext.savechangesasync(cancellationtoken);
210         }
211 
212         public virtual iqueryable<tentity> set => dbset.asnotracking();
213 
214         public virtual void update(tentity entity)
215         {
216             resetdeletedmark(entity);
217             dbset.update(entity);
218         }
219 
220         public virtual task updateasync(tentity entity, cancellationtoken cancellationtoken = default)
221         {
222             update(entity);
223             return task.completedtask;
224         }
225 
226         public virtual void updaterange(ienumerable<tentity> entities)
227         {
228             resetdeletedmark(entities.toarray());
229             dbset.updaterange(entities);
230         }
231 
232         public virtual task updaterangeasync(ienumerable<tentity> entities, cancellationtoken cancellationtoken = default)
233         {
234             updaterange(entities);
235             return task.completedtask;
236         }
237 
238         static private expression<func<tentity, bool>> generatewhere(tdbcontext dbcontext, tentity entity, bool ignorenullvalue)
239         {
240             //查找实体类型主键
241             var model = dbcontext.model.findentitytype(typeof(tentity));
242             var key = model.findprimarykey();
243 
244             //查找所有主键属性,如果没有主键就使用所有实体属性
245             ienumerable<propertyinfo> props;
246             if (key != null)
247             {
248                 props = key.properties.select(x => x.propertyinfo);
249             }
250             else
251             {
252                 props = model.getproperties().select(x => x.propertyinfo);
253             }
254 
255             //生成表达式参数
256             parameterexpression parameter = expression.parameter(typeof(tentity), "x");
257 
258             //初始化提取实体类型所有属性信息生成属性访问表达式并包装备用
259             var keyvalues = props.select(x => new { key = x, value = x.getvalue(entity), propexp = expression.property(parameter, x) });
260             //初始化存储由基础类型组成的属性信息(只要个空集合,实际数据在后面的循环中填充)
261             var primitivekeyvalues = keyvalues.take(0).where(x => isprimitivetype(x.key.propertytype));
262             //初始化基础类型属性的相等比较表达式存储集合(只要个空集合,实际数据在后面的循环中填充)
263             var equals = primitivekeyvalues.take(0).select(x => expression.equal(x.propexp, expression.constant(x.value)));
264             //初始化复杂类型属性存储集合
265             var notprimitivekeyvalues = primitivekeyvalues;
266 
267             //如果还有元素,说明上次用于提取信息的复杂属性内部还存在复杂属性,接下来用提取到的基础类型属性信息生成相等比较表达式并合并到存储集合然后继续提取剩下的复杂类型属性的内部属性
268             while (keyvalues.count() > 0)
269             {
270                 if (ignorenullvalue)
271                 {
272                     keyvalues = keyvalues.where(x => x.value != null);
273                 }
274                 //提取由基础类型组成的属性信息
275                 primitivekeyvalues = keyvalues.where(x => isprimitivetype(x.key.propertytype));
276                 //生成基础类型属性的相等比较表达式
277                 equals = equals.concat(primitivekeyvalues.select(x => expression.equal(x.propexp, expression.constant(x.value))));
278                 //提取复杂类型属性
279                 notprimitivekeyvalues = keyvalues.except(primitivekeyvalues);
280                 //分别提取各个复杂类型属性内部的属性信息继续生成内部属性访问表达式
281                 keyvalues =
282                     from kv in notprimitivekeyvalues
283                     from propinfo in kv.value.gettype().getproperties()
284                     select new { key = propinfo, value = propinfo.getvalue(kv.value), propexp = expression.property(kv.propexp, propinfo) };
285             }
286 
287             //如果相等比较表达式有多个,将所有相等比较表达式用 && 运算连接起来
288             var and = equals.first();
289             foreach (var eq in equals.skip(1))
290             {
291                 and = expression.andalso(and, eq);
292             }
293 
294             //生成完整的过滤条件表达式,形如:  (tentity x) => { return x.a == ? && x.b == ? && x.obj1.m == ? && x.obj1.n == ? && x.obj2.u.v == ?; }
295             var exp = expression.lambda<func<tentity, bool>>(and, parameter);
296 
297             //判断某个类型是否是基础数据类型
298             static bool isprimitivetype(type type)
299             {
300                 var primitivetypes = new[] {
301                     typeof(sbyte)
302                     ,typeof(byte)
303                     ,typeof(short)
304                     ,typeof(ushort)
305                     ,typeof(int)
306                     ,typeof(uint)
307                     ,typeof(long)
308                     ,typeof(ulong)
309                     ,typeof(float)
310                     ,typeof(double)
311                     ,typeof(decimal)
312                     ,typeof(char)
313                     ,typeof(string)
314                     ,typeof(bool)
315                     ,typeof(datetime)
316                     ,typeof(datetimeoffset)
317                     //,typeof(enum)
318                     ,typeof(guid)};
319 
320                 var tmp =
321                     type.isderivedfrom(typeof(nullable<>))
322                     ? nullable.getunderlyingtype(type)
323                     : type;
324 
325                 return tmp.isenum || primitivetypes.contains(tmp);
326             }
327 
328             return exp;
329         }
330     }

命令

       命令基类

 1     public abstract class mediatrcommand : mediatrcommand<unit>, icommand, irequest
 2     {
 3     }
 4 
 5     public abstract class mediatrcommand<tresult> : icommand<tresult>, irequest<tresult>
 6     {
 7         public guid id { get; }
 8 
 9         public datetimeoffset timestamp { get; }
10 
11         public mediatrcommand()
12         {
13             id = guid.newguid();
14             timestamp = datetimeoffset.now;
15         }
16     }

       示例具体命令,命令只包含参数信息,如何使用参数信息完成任务是命令处理器的事

 1     public class listusercommand : mediatrcommand<ipagedlist<applicationuser>>
 2     {
 3         public pageinfo pageinfo { get; }
 4         public queryfilter queryfilter { get; }
 5         public listusercommand(pageinfo pageinfo, queryfilter queryfilter)
 6         {
 7             pageinfo = pageinfo;
 8             queryfilter = queryfilter;
 9         }
10     }

命令总线

 1     public class mediatrcommandbus<tcommand, tresult> : icommandbus<tcommand, tresult>
 2         where tcommand : mediatrcommand<tresult>
 3     {
 4         private readonly imediator mediator;
 5         private readonly icommandstore commandstore;
 6 
 7         public mediatrcommandbus(imediator mediator, icommandstore commandstore)
 8         {
 9             this.mediator = mediator;
10             this.commandstore = commandstore;
11         }
12 
13         public virtual task<tresult> sendcommandasync(tcommand command, cancellationtoken cancellationtoken = default)
14         {
15             commandstore?.saveasync(command, cancellationtoken);
16             return mediator.send(command, cancellationtoken);
17         }
18 
19         task icommandbus<tcommand>.sendcommandasync(tcommand command, cancellationtoken cancellationtoken)
20         {
21             return sendcommandasync(command, cancellationtoken);
22         }
23     }
24 
25     public class mediatrcommandbus<tcommand> : mediatrcommandbus<mediatrcommand<unit>, unit>
26         where tcommand : mediatrcommand<unit>
27     {
28         public mediatrcommandbus(imediator mediator, icommandstore commandstore) : base(mediator, commandstore)
29         {
30         }
31     }

命令处理器

       命令处理器基类

 1     public abstract class mediatrcommandhandler<tcommand, tresult> : icommandhandler<tcommand, tresult>, irequesthandler<tcommand, tresult>
 2     where tcommand : mediatrcommand<tresult>
 3     {
 4         public abstract task<tresult> handle(tcommand command, cancellationtoken cancellationtoken = default);
 5 
 6         task icommandhandler<tcommand>.handle(tcommand command, cancellationtoken cancellationtoken)
 7         {
 8             return handle(command, cancellationtoken);
 9         }
10     }
11 
12     public abstract class mediatrcommandhandler<tcommand> : mediatrcommandhandler<tcommand, unit>
13         where tcommand : mediatrcommand
14     {
15     }

       具体命令处理器示例,使用注入的仓储查询数据,applicationuser 在这里就是事实上的聚合根实体

 1     public class listusercommandhandler : mediatrcommandhandler<listusercommand, ipagedlist<applicationuser>>
 2     {
 3         private iefcorerepository<applicationuser, int, applicationidentitydbcontext> repository;
 4 
 5         public listusercommandhandler(iefcorerepository<applicationuser, int, applicationidentitydbcontext> repository)
 6         {
 7             this.repository = repository;
 8         }
 9 
10         public override task<ipagedlist<applicationuser>> handle(listusercommand command, cancellationtoken cancellationtoken = default)
11         {
12             return repository.set
13                 .orderby(x => x.id)
14                 .topagedlistasync(command.pageinfo.pagenumber, command.pageinfo.pagesize);
15         }
16     }

命令存储

       什么都没干,实际使用时可以使用数据库保存相关信息

 1     public class inprocesscommandstore : icommandstore<bool>
 2     {
 3         public bool save(icommand command)
 4         {
 5             return saveasync(command).result;
 6         }
 7 
 8         public task<bool> saveasync(icommand command, cancellationtoken cancellationtoken = default)
 9         {
10             return task.fromresult(true);
11         }
12 
13         void icommandstore.save(icommand command)
14         {
15             save(command);
16         }
17 
18         task icommandstore.saveasync(icommand command, cancellationtoken cancellationtoken)
19         {
20             return saveasync(command, cancellationtoken);
21         }
22     }

       事件部分和命令基本相同,具体代码可以到文章末尾下载项目代码查看。

使用

       在 startup.configureservices 方法中注册相关服务,事件总线和命令总线都使用 mediatr 实现。.net core 内置 di 支持注册泛型服务,所以某个实体在实际使用时注入泛型仓储就表示这个实体是聚合根,不用提前定义具体的聚合根实体仓储,所以删除使用代码相当于删除了仓储定义。

1  services.addscoped(typeof(icommandbus<>), typeof(mediatrcommandbus<>));
2  services.addscoped(typeof(icommandbus<,>), typeof(mediatrcommandbus<,>));
3  services.addscoped(typeof(icommandstore), typeof(inprocesscommandstore));
4  services.addscoped(typeof(ieventbus), typeof(mediatreventbus));
5  services.addscoped(typeof(ieventbus<>), typeof(mediatreventbus<>));
6  services.addscoped(typeof(ieventstore), typeof(inprocesseventstore));
7  services.addscoped(typeof(iefcorerepository<,>), typeof(efcorerepository<,>));
8  services.addscoped(typeof(iefcorerepository<,,>), typeof(efcorerepository<,,>));
9  services.addmediatr(typeof(listusercommandhandler).gettypeinfo().assembly);

       示例使用比较简单,就不定义服务了,如果需要定义服务,那么使用服务的一般是命令处理器,仓储由服务使用。这里命令处理器直接使用仓储。在控制器中注入命令总线,向命令总线发送命令就可以获取结果。mediatr 会自动根据发送的命令类型查找匹配的命令处理器去调用。

 1     [apicontroller]
 2     [route("api/[controller]")]
 3     public class userscontroller : controllerbase
 4     {
 5         private readonly icommandbus<listusercommand, ipagedlist<applicationuser>> _commandbus;
 6         private readonly imapper _mapper;
 7 
 8         public userscontroller(icommandbus<listusercommand, ipagedlist<applicationuser>> commandbus, imapper mapper)
 9         {
10             _commandbus = commandbus;
11             _mapper = mapper;
12         }
13 
14         /// <summary>
15         /// 获取用户列表
16         /// </summary>
17         /// <param name="page">页码</param>
18         /// <param name="size">每页条目数</param>
19         /// <returns>用户列表</returns>
20         [httpget]
21         [produces("application/json")] //声明接口响应 json 数据
22         public async task<iactionresult> getasync(int? page, int? size)
23         {
24             var cmd = new listusercommand(new pageinfo(page ?? 1, size ?? 10), new queryfilter());
25             var users = await _commandbus.sendcommandasync(cmd, default);
26 
27             return new jsonresult(
28                 new
29                 {
30                     rows = users.select(u => _mapper.map<applicationuserdto>(u)),
31                     total = users.pagecount, //总页数
32                     page = users.pagenumber, //当前页码
33                     records = users.totalitemcount //总记录数
34                 }
35             );
36         }
37     }

       使用就是这么简单。使用者根本不需要知道命令处理器的存在,把命令发送到总线,等着接收结果就可以了。

       事件一般由命令处理器引发,可以改造命令处理器用 di 注入事件总线,然后在命令处理器中向事件总线发送事件,事件总线就会自动触发相应的事件处理器。

结语

       完整的流程大概就是:控制器使用注入的服务执行业务流程,业务服务向命令总线发送命令,命令总线触发处理器处理命令,命令处理器向事件总线发送事件,事件总线触发事件处理器处理事件,事件处理器在处理事件后向事件总线发送新的事件触发后续事件处理器继续处理新的事件(如果需要),直到最后不发送事件的事件处理器完成处理。整个流程完结。在此过程中总线会自动调用注入的总线消息存储来持久化命令和事件,至此,一个环环相扣的极简 ddd+cqrs+eda+es 架构搭建完成!

       想要实际体验的朋友可以到文章末尾下载项目并运行体验。启动调试后访问 /swagger 然后尝试体验调用 api/users 接口。

 

       转载请完整保留以下内容并在显眼位置标注,未经授权删除以下内容进行转载盗用的,保留追究法律责任的权利!

  本文地址:https://www.cnblogs.com/coredx/p/12364960.html

  完整源代码:github

  里面有各种小东西,这只是其中之一,不嫌弃的话可以star一下。