欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

简单的学习,实现,领域事件,事件存储,事件溯源

程序员文章站 2022-06-19 10:34:59
为什么写这篇文章 自己以前都走了弯路,以为学习战术设计就会DDD了,其实DDD的精华在战略设计,但是对于我们菜鸟来说,学习一些技术概念也是挺好的 经常看到这些术语,概念太多,也想简单学习一下,记忆力比较差记录一下实现的细节 领域事件 1.领域事件是过去发生的与业务有关的事实,一但发生就不可更改,所以 ......

为什么写这篇文章

自己以前都走了弯路,以为学习战术设计就会ddd了,其实ddd的精华在战略设计,但是对于我们菜鸟来说,学习一些技术概念也是挺好的
经常看到这些术语,概念太多,也想简单学习一下,记忆力比较差记录一下实现的细节

领域事件


1.领域事件是过去发生的与业务有关的事实,一但发生就不可更改,所以存储事件时只能追加
3.领域事件具有时间点的特征,所有事件连接起来会形成明显的时间轴
4.领域事件会导致目标对象状态的变化,聚合根的行为会产生领域事件,所以会改变聚合的状态
在聚合根里面维护一个领域事件的聚合,每一个事件对应一个handle,通过反射维护一个数据字典,通过事件查找到指定的handle
领域事件实现的方式:目前看到有3种方式,mediatr,消息队列 ,发布订阅模式
eshoponcontainers 中使用的是mediatr
enode 中使用的是equeue,equeue是一个纯c#写的消息队列
使用已经写好的消息队列rabbitmq ,kafka

事件存储,事件溯源,事件快照


事件存储:存储所有聚合根里面发生过的事件
1.事件存储中可以做并发的处理,比如command 重复,领域事件的重复
2.领域事件的重复通过聚合根id+版本号判断,可以在数据库中建立联合唯一索引,在存储事件时检测重复,记录重复的事件,根据业务做处理
3.这里要保证存储事件与发布领域事件的一致性
如何保证存储事件与发布领域事件的一致性
先存储事件然后在发布领域事件,如果发生异常,就一直重试,一直到成功为止,也可以做一定的处理,比如重试到一定的次数,就通知,进行人工处理
我选择了cap + policy + dapper
事件溯源:在事件存储中记录导致状态变化的一系列领域事件。通过持久化记录改变状态的事件,通过重新播放获得状态改变的历史。 事件回放可以返回系统到任何状态
聚合快照:聚合的生命周期各有长短,有的聚合里面有大量的事件,,事件越多加载事件以及重建聚合的执行效率就会越来越低,快照里面存储的是聚合
1.定时存储整个聚合根:使用定时器每隔一段时间就存储聚合到快照表中
2.定量存储整个聚合根:根据事件存储中的数量来存储聚合到快照表中
事件溯源的实现方式
1.首先我们需要实现聚合in memory,
2.在commandhandler中订阅 command命令,
创建聚合时 ,在内存中维护一个数据字典,key为:聚合根的id,value为:聚合
修改,删除,聚合时,根据聚合根的id,查询出聚合
如果内存中聚合不存在时:根据聚合根的id 从聚合快照表中查询出聚合,然后根据聚合快照存储的时间,聚合根id,查询事件存储中的所有事件,然后回放事件,得到聚合最终的状态

记录遇到的问题


由于基础非常的差,所以实现的方式都是以最简单的方式来写的,存在许多的问题,代码中有问题的地方希望大家提出来,让我学习一下
代码的实现目前还没有写快照的部分,也没有处理eventstorage中的命令重复与聚合根+版本号重复,具体的请看汤总的enode,里面有全部的实现
1.怎样保证存储事件,发布事件的最终一致性
2.怎么解析eventstorage中的事件,回放事件
先存储事件,当事件存储成功之后,在发布事件
存储事件失败:就一直重试,发布事件失败,使用的是cap,cap内部使用的是本地消息表的方式,如果发布事件失败,也一直重试,如果服务器重启了,rabbitmq里面消息为ack,消息没有丢,重连后会继续执行
存储事件,发布事件

    /// <summary>
    /// 存储聚合根中的事件到eventstorage 发布事件
    /// </summary>
    /// <typeparam name="taggregationroot"></typeparam>
    /// <param name="event"></param>
    /// <returns></returns>
    public async task appendeventstoragepublisheventasync<taggregationroot>(taggregationroot @event)
        where taggregationroot : iaggregationroot
    {
        var domaineventlist = @event.uncommittedevents.tolist();
        if (domaineventlist.count == 0)
        {
            throw new exception("请添加事件!");
        }

        await tryappendeventstorageasync(domaineventlist).continuewith(async e =>
        {
            if (e.result == (int)eventstoragestatus.success)
            {
                await trypublishdomaineventasync(domaineventlist).configureawait(false);
                @event.clearevents();
            }
        });
    }

    /// <summary>
    /// 发布领域事件
    /// </summary>
    /// <returns></returns>
    public async task publishdomaineventasync(list<idomainevent> domaineventlist)
    {
        using (var connection =
            new sqlconnection(connectionstr))
        {
            if (connection.state == connectionstate.closed)
            {
                await connection.openasync().configureawait(false);
            }
            using (var transaction = await connection.begintransactionasync().configureawait(false))
            {
                try
                {
                    if (domaineventlist.count > 0)
                    {
                        foreach (var domainevent in domaineventlist)
                        {
                            await _cappublisher.publishasync(domainevent.getroutingkey(), domainevent).configureawait(false);
                        }
                    }
                    await transaction.commitasync().configureawait(false);
                }
                catch (exception e)
                {
                    await transaction.rollbackasync().configureawait(false);
                    throw;
                }
            }
        }
    }

    /// <summary>
    /// 发布领域事件重试
    /// </summary>
    /// <param name="domaineventlist"></param>
    /// <returns></returns>
    public async task trypublishdomaineventasync(list<idomainevent> domaineventlist)
    {
        var policy = policy.handle<socketexception>().or<ioexception>().or<exception>()
            .retryforeverasync(onretry: exception =>
            {
                task.factory.startnew(() =>
                {
                    //记录重试的信息
                    _loggerhelper.loginfo("发布领域事件异常", exception.message);
                });
            });
        await policy.executeasync(async () =>
        {
            await publishdomaineventasync(domaineventlist).configureawait(false);
        });

    }

    /// <summary>
    /// 存储聚合根中的事件到eventstorage中
    /// </summary>
    /// <returns></returns>
    public async task<int> appendeventstorageasync(list<idomainevent> domaineventlist)
    {
        if (domaineventlist.count == 0)
        {
            throw new exception("请添加事件!");
        }
        var status = (int)eventstoragestatus.failure;
        using (var connection = new sqlconnection(connectionstr))
        {
            try
            {
                if (connection.state == connectionstate.closed)
                {
                    await connection.openasync().configureawait(false);
                }
                using (var transaction = await connection.begintransactionasync().configureawait(false))
                {
                    try
                    {
                        if (domaineventlist.count > 0)
                        {
                            foreach (var domainevent in domaineventlist)
                            {
                                eventstorage eventstorage = new eventstorage
                                {
                                    id = guid.newguid(),
                                    aggregaterootid = domainevent.aggregaterootid,
                                    aggregateroottype = domainevent.aggregateroottype,
                                    createdatetime = domainevent.createdatetime,
                                    version = domainevent.version,
                                    eventdata = events(domainevent)
                                };
                                var eventstoragesql =
                                    $"insert into eventstorageinfo(id,aggregaterootid,aggregateroottype,createdatetime,version,eventdata) values (@id,@aggregaterootid,@aggregateroottype,@createdatetime,@version,@eventdata)";
                                await connection.executeasync(eventstoragesql, eventstorage, transaction).configureawait(false);
                            }
                        }
                        await transaction.commitasync().configureawait(false);
                        status = (int)eventstoragestatus.success;
                    }
                    catch (exception e)
                    {
                        await transaction.rollbackasync().configureawait(false);
                        throw;
                    }
                }

            }
            catch (exception e)
            {
                connection.close();
                throw;
            }
        }
        return status;
    }

    /// <summary>
    /// appendeventstorageasync异常重试
    /// </summary>
    public async task<int> tryappendeventstorageasync(list<idomainevent> domaineventlist)
    {
        var policy = policy.handle<socketexception>().or<ioexception>().or<exception>()
            .retryforeverasync(onretry: exception =>
            {
                task.factory.startnew(() =>
                {
                    //记录重试的信息
                    _loggerhelper.loginfo("存储事件异常", exception.message);
                });
            });
        var result = await policy.executeasync(async () =>
          {
              var resulted = await appendeventstorageasync(domaineventlist).configureawait(false);
              return resulted;
          });
        return result;
    }

    /// <summary>
    /// 根据domainevent序列化事件json
    /// </summary>
    /// <param name="domainevent"></param>
    /// <returns></returns>
    public string events(idomainevent domainevent)
    {
        concurrentdictionary<string, string> dictionary = new concurrentdictionary<string, string>();
        //获取领域事件的类型(方便解析json)
        var domaineventtypename = domainevent.gettype().name;
        var domaineventstr = jsonconvert.serializeobject(domainevent);
        dictionary.getoradd(domaineventtypename, domaineventstr);
        var eventdata = jsonconvert.serializeobject(dictionary);
        return eventdata;
    }

解析eventstorage中存储的事件

    public async task<list<idomainevent>> getaggregaterooteventstoragebyid(guid aggregaterootid)
    {
        try
        {
            using (var connection = new sqlconnection(connectionstr))
            {
                var eventstoragelist = await connection.queryasync<eventstorage>($"select * from dbo.eventstorageinfo where aggregaterootid='{aggregaterootid}'");
                list<idomainevent> domaineventlist = new list<idomainevent>();
                foreach (var item in eventstoragelist)
                {
                    var dictionarydomainevent = jsonconvert.deserializeobject<dictionary<string, string>>(item.eventdata);
                    foreach (var entry in dictionarydomainevent)
                    {
                        var domaineventtype = typenameprovider.gettype(entry.key);
                        if (domaineventtype != null)
                        {
                            var domainevent = jsonconvert.deserializeobject(entry.value, domaineventtype) as idomainevent;
                            domaineventlist.add(domainevent);
                        }
                    }
                }
                return domaineventlist;
            }
        }
        catch (exception ex)
        {
            throw;
        }

注意事项

1.事件没持久化就代表事件还没发生成功,事件存储可能失败,必须先存储事件,在发布事件,保证存储事件与发布事件一致性
1.使用事件驱动,必须要做好冥等的处理
2.如果业务场景中有状态时:通过状态来控制
3.新建一张表,用来记录消费的信息,消费端的代码里面,根据唯一的标识,判断是否处理过该事件
4.q端的任何更新都应该把聚合根id和事件版本号作为条件,q端的更新不用遵循聚合的原则,可以使用最简单的方式处理
5.仓储是用来重建聚合的,它的行为和集合一样只有get ,add ,delete
6.ddd不是技术,是思想,核心在战略模块,战术设计是实现的一种选择,战略设计,需要面向对象的分析能力,职责分配,深层次的分析业务

感谢


虽然学习ddd的时间不短了,感觉还是在入门阶段,在学习的过程中有许多的不解,经常问enode群里面的大佬,也经常@汤总,谢谢大家的帮助与解惑。