欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  科技

ABP 框架审计日志改造支持mongodb和redis(本篇不讨论,下一篇详细介绍), 数据库底层迁移 Mysql 集群

程序员文章站 2022-06-09 16:05:37
背景 笔者 目前使用的IOT 项目是使用abp 框架作为后台,虽然abp的框架适用于中小型项目框架,但由于架构优美,笔者认为还是可以经过改造,作为大型项目中使用。但IOT 的这个项目目前刚上线不久,十几天数据库已经有了上百GB,而且由于实施检查设备状态,调用设备状态维护表,审计日志压力很大,单单审计 ......

背景

 

笔者 目前使用的iot 项目是使用abp 框架作为后台,虽然abp的框架适用于中小型项目框架,但由于架构优美,笔者认为还是可以经过改造,作为大型项目中使用。但iot 的这个项目目前刚上线不久,十几天数据库已经有了上百gb,而且由于实施检查设备状态,调用设备状态维护表,审计日志压力很大,单单审计日志一天的数据量就有几十万,目前在架构上,笔者做了几个优化处理;

1、针对审计日志,笔者重写了abp 原有的 iauditingstore,实现mongodb和redis 两种转移,并且针对审计日志内容做了过滤,disableauditing特性标记指定的类或方法不进行记录。

ps:abp 虽然有mongodb 的封装,但它的出发点是和ef 同一个模式,左右系统唯一的orm,如果要使用abp 的mongo 封装,必须要替代ef,或者重写abp unitofworkoptions,否则直接用会出现工作单元转换失败的问题。

2、站点层面使用nginx 做了反向代理,进行多站点服务,通信模式由原来的队列、改为服务化,eventbus等方式

3、数据库底层 做了percona xtradb cluster—mysql 集群处理迁移。

 

思考评估:1、审计日志这样处理,从源头做了缩减,并且进行nosql拆分,有助于缓解数据库压力。

                 2、中间层的处理是一般iot 中间件各种脚手架的组合,成熟,也有经过多年生产环境的检验。

                 3、数据库底层 使用percona xtradb cluster,是因为它支持集群,可以缓解数据库请求压力,又支持abp的事务;

               但从真正大系统考虑,其实最理性的模式应该是分片,结合soa、或者微服务才能真正解决底层压力,目前考量了tidb(张善友 张队推荐的)、oceanbase(淘宝 自有数据库,生产环境十年)、mycat中间件(听说这个坑多)等,

               为了暂时不做大改造,只能先使用 percona xtradb cluster,后续可能使用orleans(azure 云框架)、akka.net(大型的框架) 或者 service fabric(微服务框架)

     

二、percona xtradb cluster 评估

 

 

优点如下:

1.当执行一个查询时,在本地节点上执行。因为所有数据都在本地,无需远程访问。

2.无需集中管理。可以在任何时间点失去任何节点,但是集群将照常工作。

3.良好的读负载扩展,任意节点都可以查询。

缺点如下:

1.加入新节点,开销大。需要复制完整的数据。

2.不能有效的解决写缩放问题,所有的写操作都将发生在所有节点上。

3.有多少个节点就有多少重复的数据。

 

percona xtradb cluster是mysql高可用性和可扩展性的解决方案.

percona xtradb cluster提供的特性有:

1.同步复制,事务要么在所有节点提交或不提交。

2.多主复制,可以在任意节点进行写操作。

3.在从服务器上并行应用事件,真正意义上的并行复制。

4.节点自动配置。

5.数据一致性,不再是异步复制。

percona xtradb cluster完全兼容mysql和percona server,表现在:

1.数据的兼容性

2.应用程序的兼容性:无需更改应用程序

 

1.集群是有节点组成的,推荐配置至少3个节点,但是也可以运行在2个节点上。

2.每个节点都是普通的mysql/percona服务器,可以将现有的数据库服务器组成集群,反之,也可以将集群拆分成单独的服务器。

3.每个节点都包含完整的数据副本。

 

三、部署流程

 

1、环境准备

 

  在腾讯云上开设三个测试服务器,系统 镜像 centos 7.5 64

 

ABP 框架审计日志改造支持mongodb和redis(本篇不讨论,下一篇详细介绍), 数据库底层迁移 Mysql 集群

 

用远程工具连接三台测试服务器,完成如下操作

 

(1)  关闭firewalld防火墙

# systemctl disable firewalld --now

关闭防火墙或者允许3306, 4444, 4567和4568四个端口的连接

 

(2)关闭selinux

# setenforce 0
# sed -i 's,^selinux=enforcing,selinux=disabled,g' /etc/selinux/config

 

2、主节点部署

 

(1)安装pxc yum源

yum install http://www.percona.com/downloads/percona-release/redhat/0.1-3/percona-release-0.1-3.noarch.rpm

(2) 安装pxc

# yum install percona-xtradb-cluster-56

 

最终下载下来的版本是percona-xtradb-cluster-56-5.6.30

(3) 修改    /etc/my.cnf

     vim  /etc/my.cnf

 

[mysqld]

datadir=/var/lib/mysql
user=mysql

wsrep_provider=/usr/lib64/galera3/libgalera_smm.so
#集群的ip
wsrep_cluster_address=gcomm://节点ip1,节点ip2,节点ip3

binlog_format=row

default_storage_engine=innodb

innodb_autoinc_lock_mode=2

#当前主节点的ip
wsrep_node_address=当前节点ip


wsrep_sst_method=xtrabackup-v2

wsrep_cluster_name=my_centos_cluster
#初始化一个mysql的用户和密码
wsrep_sst_auth="admin:123456"

 

(4)启动主节点

 systemctl start mysql@bootstrap.service

(5)进入mysql

登录 (初始化状态,无密码,遇到要输密码直接回车)

mysql -uroot -p

 

(6) 登录客户端查看数据库的状态,在进行权限配置允许ip访问,默认无法远程访问,但是我们需要远程通过图形化等界面查看,所以要做如下配置

mysql> show status like 'wsrep%';

create user 'admin'@'localhost' identified by '123456';//如果这里报错,看一下是否有 用户存在了

grant reload, lock tables, replication client on *.* to 'admin'@'localhost'; 

flush privileges;

 

完成后可以用navicat for mysql 连接看一下是否可以成功访问

 

3、其他两个节点的配置

(1)安装pxc yum源

yum install http://www.percona.com/downloads/percona-release/redhat/0.1-3/percona-release-0.1-3.noarch.rpm

(2) 安装pxc

# yum install percona-xtradb-cluster-56

(3) 修改    /etc/my.cnf

     vim  /etc/my.cnf

 

[mysqld]

datadir=/var/lib/mysql
user=mysql

wsrep_provider=/usr/lib64/galera3/libgalera_smm.so
#集群的ip
wsrep_cluster_address=gcomm://节点ip1,节点ip2,节点ip3

binlog_format=row

default_storage_engine=innodb

innodb_autoinc_lock_mode=2

#当前主节点的ip
wsrep_node_address=当前节点ip


wsrep_sst_method=xtrabackup-v2

wsrep_cluster_name=my_centos_cluster
#初始化一个mysql的用户和密码
wsrep_sst_auth="admin:123456"

 (4)启动当前节点(这一步和主节点不一样)

systemctl start mysql

(5)进入mysql

登录 (初始化状态,无密码,遇到要输密码直接回车)

mysql -uroot -p

 

(6) 登录客户端查看数据库的状态,在进行权限配置允许ip访问,默认无法远程访问,但是我们需要远程通过图形化等界面查看,所以要做如下配置

mysql> show status like 'wsrep%';

create user 'admin'@'localhost' identified by '123456';//如果这里报错,看一下是否有 用户存在了

grant reload, lock tables, replication client on *.* to 'admin'@'localhost'; 

flush privileges;

 

完成后可以用navicat for mysql 连接看一下是否可以成功访问

 (7)可以在mysql中执行如下命令查看

 show status like 'wsrep%';

 如果正常,可以出现如下界面,标识当前三个集群节点

ABP 框架审计日志改造支持mongodb和redis(本篇不讨论,下一篇详细介绍), 数据库底层迁移 Mysql 集群

 

(8)如果出现启动节点时候出现异常,可以查看提示的操作,看看日志,百度一下看看是什么错误,怎么解决,因为各种错误都有,就不好一一解释了。

比如笔者在操作过程中就出现如下错误

ecstop=/usr/bin/mysql-systemd stop (code=exited, status=2)

后面查找原因应该是 防火墙等问题,进行关闭拦截等操作,就是一开始 环境准备的后面那一步,关闭防火墙、selinux,

 

 主节点重启

systemctl stop mysql@bootstrap.service
systemctl start mysql@bootstrap.service

其他节点也再次启动

systemctl start mysql

 

4、abp 进行数据库迁移

 

(1)abp 想要进行mysql 支持,网上的教程有,我就不重复造*自己参考

 

(2) 将数据库连接字符串改为 主节点

 <add name="default" connectionstring="server=主节点ip;port=3306;database=abpzero4_6db;uid=admin;password=123456;" providername="mysql.data.mysqlclient" />

(3) 执行迁移 

ABP 框架审计日志改造支持mongodb和redis(本篇不讨论,下一篇详细介绍), 数据库底层迁移 Mysql 集群

 

 (4)查看对应的三台服务器集群都自动同步该数据库

 

 ABP 框架审计日志改造支持mongodb和redis(本篇不讨论,下一篇详细介绍), 数据库底层迁移 Mysql 集群

(5)在appservice 中建立测试服务进行增删改查、事务等测试

using abp.application.services;
using abp.application.services.dto;
using abp.automapper;
using abp.domain.repositories;
using abp.domain.uow;
using automapper;
using system;
using system.collections.generic;
using system.data.entity;
using system.linq;
using system.linq.expressions;
using system.text;
using system.threading.tasks;
using system.linq.dynamic;
using abp.linq.extensions;

using mycompanyname.abpzerotemplate;
using mycompanyname.abpzerotemplate.zldb_domain;
using mycompanyname.abpzerotemplate.authorization.consignee.exporting;
using mycompanyname.abpzerotemplate.zldb_domain.dtos;
using mycompanyname.abpzerotemplate.dto;

namespace mycompanyname.abpzerotemplate
{



    /// <summary>
    /// 收货地址 业务实现接口
    /// </summary>
    public class consigneeappservice : abpzerotemplateappservicebase, iconsigneeappservice
    {
        private readonly irepository<consignee, guid> _consigneerepository;
        private readonly iconsigneelistexcelexporter _iconsigneelistexcelexporter;


        /// <summary>
        /// 构造函数自动注入我们所需要的类或接口
        /// </summary>
        public consigneeappservice(irepository<consignee, guid> consigneerepository,
        iconsigneelistexcelexporter iconsigneelistexcelexporter)
        {
            _consigneerepository = consigneerepository;
            _iconsigneelistexcelexporter = iconsigneelistexcelexporter;
            //_consigneemongodbrepository = consigneemongodbrepository;
        }

        /// <summary>
        /// 获取所有数据列表
        /// </summary>
        /// <returns>返回数据集合</returns>
        public async task<list<consigneedto>> getalllist(string guid = "")
        {
            //try
            //{
            //    var model = new consignee() { dealerid = system.guid.newguid() };
            //    var mr = _consigneemongodbrepository.insert(model);
            //}
            //catch (exception ex)
            //{

            //    throw;
            //}


          

            //调用task仓储的特定方法getallwithpeople
            var resultlist = await _consigneerepository.getalllistasync();

        

            return mapper.map<list<consigneedto>>(resultlist).tolist();
        }

        /// <summary>
        /// 获取分页数据列表 分页具体代码需要适当修改,如orderby 需要匹配 创建时间 或者其他数据id(int)
        /// </summary>
        /// <returns>返回数据集合</returns>
        public async task<pagedresultdto<consigneedto>> getpagedlistasync(pagedandfilteredinputdto input)
        {
            var query = _consigneerepository.getall();
            //todo:根据传入的参数添加过滤条件

            var resultcount = await query.countasync();
            var resultconsignee = await query
            .orderby(x => x.id)
            .pageby(input)
            .tolistasync();

            var resultlistdtos = resultconsignee.mapto<list<consigneedto>>();


            return new pagedresultdto<consigneedto>(
            resultcount,
            resultlistdtos
            );
        }

        /// <summary>
        /// 获取指定条件的数据列表  webapi 无法使用
        /// </summary>
        /// <returns>返回数据集合</returns>
        public async task<list<consigneedto>> getlistbycodition(expression<func<consignee, bool>> predicate)
        {

            var resultlist = await _consigneerepository.getalllistasync(predicate);
            return mapper.map<list<consigneedto>>(resultlist).tolist();
        }


        /// <summary>
        /// 导出excel 具体方法
        /// </summary>
        /// <returns>excel文件</returns>
        /// public async task<filedto> getconsigneetoexcel()
        ///{
        ///    var resultlist = await _consigneerepository.getalllistasync();
        ///   var consigneedtos= mapper.map<list<consigneedto>>(resultlist).tolist();
        ///   return _iconsigneelistexcelexporter.exporttofile(consigneedtos);
        /// }

        /// <summary>
        /// 根据指定id 获取数据实体
        /// </summary>
        /// <param name="input">当前id</param>
        /// <returns></returns>
        public async task<consigneedto> getconsigneeforeditasync(nullableiddto<system.guid> input)
        {
            var output = new consigneedto();

            consigneedto consigneeeditdto;

            if (input.id.hasvalue)
            {
                var entity = await _consigneerepository.getasync(input.id.value);
                consigneeeditdto = entity.mapto<consigneedto>();
            }
            else
            {
                consigneeeditdto = new consigneedto();
            }

            output = consigneeeditdto;
            return output;
        }

        /// <summary>
        /// 根据id创建或编辑操作
        /// </summary>
        /// <param name="input">实体</param>
        /// <returns></returns>
        public async task createorupdateconsigneeasync(consigneedto input)
        {
            if (!string.isnullorwhitespace(input.id))
            {
                await update(input);
            }
            else
            {
                await create(input);
            }
        }

        /// <summary>
        /// 新增 
        /// </summary>
        /// <param name="input">新增参数</param>
        /// <returns>新增实体</returns>
        public async task<guid> create(consigneedto input)
        {
            input.id = new consignee().id.tostring();
            var resultobj = input.mapto<consignee>();
            var result = await _consigneerepository.insertasync(resultobj);

            return result.id;
        }

        /// <summary>
        /// 新增 
        /// </summary>
        /// <param name="input">新增参数</param>
        /// <returns>新增实体</returns>
        public async task<int> createlist(list<consigneedto> list)
        {
            foreach (var input in list) {
                if (input.contact.contains("ex")) {
                    throw new exception("测试分布式异常!");
                }
                input.id = new consignee().id.tostring();

                var resultobj = input.mapto<consignee>();
                var result = await _consigneerepository.insertasync(resultobj);
            }
            

            return list.count();
        }

        /// <summary>
        /// 修改
        /// </summary>
        /// <param name="input">修改参数</param>
        /// <returns>修改实体</returns>
        public async task<consigneedto> update(consigneedto input)
        {
            consignee obj = await _consigneerepository.getasync(new guid(input.id));
            input.mapto(obj);
            var result = await _consigneerepository.updateasync(obj);
            return obj.mapto<consigneedto>();
        }

        /// <summary>
        /// 删除
        /// </summary>
        /// <param name="input">删除dto</param>
        /// <returns>无返回值</returns>
        public async system.threading.tasks.task delete(entitydto<string> input)
        {
            await _consigneerepository.deleteasync(new guid(input.id));
        }

        /// <summary>
        /// 删除 webapi 无法使用
        /// </summary>
        /// <param name="predicate">删除条件</param>
        /// <returns>无返回值</returns>
        public async system.threading.tasks.task deletebycondition(expression<func<consignee, bool>> predicate)
        {
            await _consigneerepository.deleteasync(predicate);

        }
    }




}

在swagger ui中增删改查都已经正常,而且数据在三个数据库中正常同步

 

ABP 框架审计日志改造支持mongodb和redis(本篇不讨论,下一篇详细介绍), 数据库底层迁移 Mysql 集群

 

针对事务,做了人为异常处理,确认会实现回滚

 

ABP 框架审计日志改造支持mongodb和redis(本篇不讨论,下一篇详细介绍), 数据库底层迁移 Mysql 集群

 

 五、后记

 这一次只是做了简单的实验性测试,后续需要在加强深入检测,才可以用生产环境中。