欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

windows7下使用MongoDB实现仓储设计

程序员文章站 2022-06-17 22:41:25
简单的介绍一下,我使用mongodb的场景。 我们现在的物联网环境下,有部分数据,采样频率为2000条记录/分钟,这样下来一天24*60*2000=2880000约等于3...

简单的介绍一下,我使用mongodb的场景。

我们现在的物联网环境下,有部分数据,采样频率为2000条记录/分钟,这样下来一天24*60*2000=2880000约等于300万条数据,以后必然还会增加。之前数据库使用的是mssql,对于数据库的压力很大,同时又需要保证历史查询的响应速度,这种情况下,在单表中数据量大,同时存在读写操作。不得已采用mongodb来存储数据。如果使用mongodb,则至少需要三台机器,两台实现读写分离,一台作为仲裁(当然条件不允许也可以不用),每台机器的内存暂时配置在16g,公司小,没办法,据说,使用这个mongodb需要机器内存最少92g,我没有验证过,但是吃内存是公认的,所以内存绝对要保证,就算保证了,也不一定完全就没有意外发生。我们上面的这些特殊的数据是允许少量的丢失的,这些只是做分析使用的,几个月了,暂时还没出现数据丢失的情况,可能最新版本早就修复了吧,新手使用建议多看下官网上的说明。下面直接奔入主题:

一、安装部署和配置环境

1.安装部署mongo-server(v3.4)

参考

这个时候不要启动,接着配置config文件

2.配置config文件

dbpath=c:/program files/mongodb/server/3.4/bin/data/db 
logpath=c:/program files/mongodb/server/3.4/bin/data/log/master.log 
pidfilepath=c:/program files/mongodb/server/3.4/bin/master.pid 
directoryperdb=true 
logappend=true 
replset=testrs 
bind_ip=10.1.5.25
port=27016 
oplogsize=10000
noauth = true
 
storageengine = wiredtiger
wiredtigercachesizegb = 2
syncdelay = 30
wiredtigercollectionblockcompressor = snappy

以上是详细的配置参数,其中路径部分根据需要更改, 这里设置的oplogsize大小为10g,根据业务场景进行调整,另外auth权限为null,因为设置权限会增加服务开销,影响效率,最下面几行是内存引擎,可以控制副本集同步及内存限制,防止内存泄露。

3.启动mongo-server

4.添加副本集配置

conf=
{
  "_id" : "testrs",
  "members" : [
    { "_id" : 0, "host" : "10.1.5.25:27016" },
    { "_id" : 1, "host" : "10.1.5.26:27016" },
    { "_id" : 2, "host" : "10.1.5.27:27016" }
  ]
}

rs.initiate(conf)

此时副本集集群配置已经完成,然后在命令行中输入:rs.status(),查看副本集状态,需要查看同步情况,可以输入命令:db.serverstatus().

5.设置副本集可读写

rs.slaveok()

6..net操作mongo

连接设置,请参考个人封装unitoon.mongo代码所示。

7.性能对比

读写速度:redis>mongo>mssqlserver

可容纳数据量:mssqlserver~mongo>redis

存储数据类型:mongo>mssqlserver>redis

note:内存持续上升,内部没有内存回收机制,若限制内存 ,则可能出现查询速度变慢,数据丢失等问题,建议优化查询效率,建立索引

db.test.ensureindex({"username":1, "age":-1})

强制释放内存命令:db.runcommand({closealldatabases:1})

二、仓储设计

1.基类baseentity

namespace unitooniot.mongo
{
  /// <summary>
  /// 实体基类,方便生成objid
  /// </summary>
  [serializable]
  [protocontract(implicitfields = implicitfields.allpublic)]
  //[protoinclude(10, typeof(normalhistory))]
  public class baseentity
  {
    //[bsonrepresentation(bsontype.objectid)]
    public objectid id { get; set; }
 
    /// <summary>
    /// 数据库名称
    /// </summary>
    public string dbname { get; set; }
 
    /// <summary>
    /// 给对象初值
    /// </summary>
    public baseentity()
    {
      // this.objid = objectid.generatenewid().tostring();
      //this.id = objectid.newobjectid().tostring();
    }
  }
}


这里需要注意时间格式,mongodb默认时间格式为国际时间,所以在写入数据时和读取数据时,时间格式要一致,此例中没有对时间进行特殊处理,由传入的时间格式确定。

2.repository继承接口imongorepository

namespace unitooniot.mongo
{
  public interface imongorepository<tentity> where tentity : class
  {
  }
}


3.mongorepository

using mongodb.driver;
using mongodb.bson;
using system;
using system.collections.generic;
using system.linq;
using system.linq.expressions;
using system.text;
using system.threading.tasks;
using mongodb.bson.serialization.attributes;
using mongodb.driver.linq;
using system.configuration;
using system.io;
using unitooniot.appsetting;
 
namespace unitooniot.mongo
{
 
  public class mongodb
  {
    private static string connectionstringhost ;
    private static string username ;
    private static string password;
    private static imongodatabase _db = null;
    private static readonly object lockhelper = new object();
    /// <summary>
    /// mongodb初始化
    /// </summary>
    public static void init()
    {
      connectionstringhost = "10.1.5.24:27016,10.1.5.24:27016,10.1.5.26:27017";
      //appsettings.getconfigvalue("mongohost");//"10.1.5.24:27016";
      username = appsettings.getconfigvalue("mongousername");
      password = appsettings.getconfigvalue("mongopwd");
    }
    static mongodb()
    {
     
    }
    public static imongodatabase getdb(string dbname,string options=null)
    {
     
      if (_db != null) return _db;
      lock (lockhelper)
      {
 
        if (_db != null) return _db;
        var database = dbname;
        var username = username;
        var password = password;
        var authentication = string.empty;
        var host = string.empty;
        if (!string.isnullorwhitespace(username))
        {
          authentication = string.concat(username, ':', password, '@');
        }
        if (!string.isnullorempty(options) && !options.startswith("?"))
        {
          options = string.concat('?', options);
        }
 
 
 
        host = string.isnullorempty(connectionstringhost) ? "localhost" : connectionstringhost;
        database = database ?? "testdb";
        //mongodb://[username:password@]host1[:port1][,host2[:port2],…[,hostn[:portn]]][/[database][?options]]
 
        var constring = options!=null? $"mongodb://{authentication}{host}/{database}{options}"
          : $"mongodb://{authentication}{host}/{database}";
 
        var url = new mongourl(constring);
        var mcs = mongoclientsettings.fromurl(url);
        mcs.maxconnectionlifetime = timespan.frommilliseconds(1000);
        var client = new mongoclient(mcs);
               
        _db = client.getdatabase(url.databasename);
      }
      return _db;
    }
  }
  /// <summary>
  /// mongodb 数据库操作类
  /// </summary>
  public class mongorepository<t>: imongorepository<t> where t : baseentity
  {
    #region readonly field
    /// <summary>
    /// 表名
    /// </summary>
    private readonly imongocollection<t> _collection = null;
    /// <summary>
    /// 数据库对象
    /// </summary>
    private readonly imongodatabase _database;
    #endregion
 
    /// <summary>
    /// 构造函数
    /// </summary>
    public mongorepository()
    {
      this._database = mongodb.getdb(activator.createinstance<t>().dbname, "readpreference =secondarypreferred ");//primarypreferred/secondarypreferred/nearest
      _collection = _database.getcollection<t>(typeof(t).name);
    }
   
 
    #region 增加
    /// <summary>
    /// 插入对象
    /// </summary>
    /// <param name="t">插入的对象</param>
    public virtual t insert(t t)
    {
      // var flag = objectid.generatenewid();
      // t.gettype().getproperty("id").setvalue(t, flag);  
      //t.time = datetime.now;
 
      _collection.insertone(t);
      return t;
    }
    /// <summary>
    /// 批量插入
    /// </summary>
    /// <param name="ts">要插入的对象集合</param>
    public virtual ienumerable<t> insertbatch(ienumerable<t> ts)
    {
      _collection.insertmany(ts);
      return ts;
    }
 
    /// <summary>
    /// 插入对象
    /// </summary>
    /// <param name="t">插入的对象</param>
    public virtual void insertasync(t t)
    {
      //var flag = objectid.generatenewid();
      // t.gettype().getproperty("id").setvalue(t, flag);
      // t.time = datetime.now;
       _collection.insertoneasync(t);
    }
    /// <summary>
    /// 批量插入
    /// </summary>
    /// <param name="ts">要插入的对象集合</param>
    public virtual void insertbatchasync(ienumerable<t> ts)
    {
       _collection.insertmanyasync(ts);
    }
    #endregion
 
    #region 删除
    /// <summary>
    /// 删除
    /// </summary>
    /// <returns></returns>
    public virtual long delete(t t)
    {
      var filter = builders<t>.filter.eq("id", t.id);
      var result = _collection.deleteone(filter);
      return result.deletedcount;     
    }
    /// <summary>
    /// 删除
    /// </summary>
    /// <returns></returns>
    public virtual void deleteasync(t t)
    {
      var filter = builders<t>.filter.eq("id", t.id);
      _collection.deleteoneasync(filter);
    }
 
    /// <summary>
    /// 按条件表达式删除
    /// </summary>
    /// <param name="predicate">条件表达式</param>
    /// <returns></returns>
    public virtual long delete(expression<func<t, bool>> predicate)
    {
      var result = _collection.deleteone(predicate);
      return result.deletedcount;
    }
    /// <summary>
    /// 按条件表达式删除
    /// </summary>
    /// <param name="predicate">条件表达式</param>
    /// <returns></returns>
    public virtual void deleteasync(expression<func<t, bool>> predicate)
    {
      _collection.deleteoneasync(predicate);
    }
 
 
    /// <summary>
    /// 按条件表达式批量删除
    /// </summary>
    /// <param name="predicate">条件表达式</param>
    /// <returns></returns>
    public virtual long deletebatch(expression<func<t, bool>> predicate)
    {
      var result = _collection.deletemany(predicate);
      return result.deletedcount;
    }
    /// <summary>
    /// 按条件表达式批量删除
    /// </summary>
    /// <param name="predicate">条件表达式</param>
    /// <returns></returns>
    public virtual void deletebatchasync(expression<func<t, bool>> predicate)
    {
       _collection.deletemanyasync(predicate);
    }
 
    /// <summary>
    /// 按检索条件删除
    /// 建议用builders<t>构建复杂的查询条件
    /// </summary>
    /// <param name="filter">条件</param>
    /// <returns></returns>
    public virtual long delete(filterdefinition<t> filter)
    {
      var result = _collection.deleteone(filter);
      return result.deletedcount;
    }
 
    /// <summary>
    /// 按检索条件删除
    /// 建议用builders<t>构建复杂的查询条件
    /// </summary>
    /// <param name="filter">条件</param>
    /// <returns></returns>
    public virtual void deleteasync(filterdefinition<t> filter)
    {
       _collection.deleteoneasync(filter);
    }
    #endregion
 
    #region 修改
    /// <summary>
    /// 修改(id不变)
    /// </summary>  
    /// <returns></returns>
    public virtual long update(t t)
    {     
      var filterbuilder = builders<t>.filter;
      var filter = filterbuilder.eq("id",t.id);
      var update = _collection.replaceone(filter, t, new updateoptions() { isupsert = true });
      return update.modifiedcount;
    }
    /// <summary>
    /// 修改(id不变)
    /// </summary>  
    /// <returns></returns>
    public virtual void updateasync(t t)
    {
      var filterbuilder = builders<t>.filter;
      var filter = filterbuilder.eq("id", t.id);
       _collection.replaceoneasync(filter, t, new updateoptions() { isupsert = true });
       
    }
 
 
    /// <summary>
    /// 用新对象替换新文档
    /// </summary>
    /// <param name="filter">查询条件</param>
    /// <param name="t">对象</param>
    /// <returns>修改影响文档数</returns>
    public virtual long update(expression<func<t, bool>> filter, t t)
    {
      var update = _collection.replaceone(filter, t, new updateoptions() { isupsert = true });
      return update.modifiedcount;
    }
 
 
    /// <summary>
    /// 用新对象替换新文档
    /// </summary>
    /// <param name="filter">查询条件</param>
    /// <param name="t">对象</param>
    /// <returns>修改影响文档数</returns>
    public virtual long update(filterdefinition<t> filter, t t)
    {
      var update = _collection.replaceone(filter, t, new updateoptions() { isupsert = true });
      return update.modifiedcount;
    }
    /// <summary>
    /// 用新对象替换新文档
    /// </summary>
    /// <param name="filter">查询条件</param>
    /// <param name="t">对象</param>
    /// <returns>修改影响文档数</returns>
    public virtual void updateasync(expression<func<t, bool>> filter, t t)
    {
      _collection.replaceoneasync(filter, t, new updateoptions() { isupsert = true });
      
    }
    /// <summary>
    /// 用新对象替换新文档
    /// </summary>
    /// <param name="filter">查询条件</param>
    /// <param name="t">对象</param>
    /// <returns>修改影响文档数</returns>
    public virtual void updateasync(filterdefinition<t> filter, t t)
    {
       _collection.replaceoneasync(filter, t, new updateoptions() { isupsert = true });
       
    }
    /// <summary>
    /// 根据id和条件文档
    /// </summary>
    /// <param name="update">修改条件-形如:builders/<t/>.update.set(filed, value)</param>
    /// <param name="id">对象id</param>
    /// <returns>修改影响文档数</returns>
    public virtual long update(string id, updatedefinition<t> update)
    {
      var filterbuilder = builders<t>.filter;
      var filter = filterbuilder.eq("id", new objectid(id));
      var result = _collection.updateone(filter, update, new updateoptions() { isupsert = true });
      return result.modifiedcount;
    }
    /// <summary>
    /// 根据id和条件文档
    /// </summary>
    /// <param name="update">修改条件-形如:builders/<t/>.update.set(filed, value)</param>
    /// <param name="id">对象id</param>
    /// <returns>修改影响文档数</returns>
    public virtual void updateasync(string id, updatedefinition<t> update)
    {
      var filterbuilder = builders<t>.filter;
      var filter = filterbuilder.eq("id", new objectid(id));
      _collection.updateoneasync(filter, update, new updateoptions() { isupsert = true });    
    }
    /// <summary>
    /// 根据条件修改文档
    /// </summary>
    /// <param name="update">修改条件-形如:builders/<t/>.update.set(filed, value)</param>
    /// <param name="filter">查询条件builders/<t/>.filter.eq(filed, value)</param>
    /// <returns>修改影响文档数</returns>
    public virtual void update(updatedefinition<t> update,expression<func<t, bool>> filter)
    {
      _collection.updateone(filter, update, new updateoptions() { isupsert = true });
    }
    /// <summary>
    /// 根据条件修改文档
    /// </summary>
    /// <param name="update">修改条件-形如:builders/<t/>.update.set(filed, value)</param>
    /// <param name="filter">查询条件builders/<t/>.filter.eq(filed, value)</param>
    /// <returns>修改影响文档数</returns>
    public virtual long update(updatedefinition<t> update, filterdefinition<t> filter)
    {
      var result = _collection.updateone(filter, update, new updateoptions() { isupsert = true });
      return result.modifiedcount;
    }
    /// <summary>
    /// 根据条件修改文档
    /// </summary>
    /// <param name="update">修改条件-形如:builders/<t/>.update.set(filed, value)</param>
    /// <param name="filter">查询条件builders/<t/>.filter.eq(filed, value)</param>
    /// <returns>修改影响文档数</returns>
    public virtual void updateasync(updatedefinition<t> update, expression<func<t, bool>> filter)
    {
      _collection.updateoneasync(filter, update, new updateoptions() { isupsert = true });
    }
    /// <summary>
    /// 根据条件修改文档
    /// </summary>
    /// <param name="update">修改条件-形如:builders/<t/>.update.set(filed, value)</param>
    /// <param name="filter">查询条件builders/<t/>.filter.eq(filed, value)</param>
    /// <returns>修改影响文档数</returns>
    public virtual void updateasync(updatedefinition<t> update, filterdefinition<t> filter)
    {
       _collection.updateoneasync(filter, update, new updateoptions() { isupsert = true });
    }
 
    /// <summary>
    /// 根据条件批量修改文档
    /// </summary>
    /// <param name="update">修改条件-形如:builders/<t/>.update.set(filed, value)</param>
    /// <param name="filter">查询条件builders/<t/>.filter.eq(filed, value)</param>
    /// <returns>修改影响文档数</returns>
    public virtual long updatebatch(updatedefinition<t> update, expression<func<t, bool>> filter)
    {
      var result = _collection.updatemany(filter, update, new updateoptions() { isupsert = true });
      return result.modifiedcount;
    }
 
    /// <summary>
    /// 根据条件批量修改文档
    /// </summary>
    /// <param name="update">修改条件-形如:builders/<t/>.update.set(filed, value)</param>
    /// <param name="filter">查询条件builders/<t/>.filter.eq(filed, value)</param>
    /// <returns>修改影响文档数</returns>
    public virtual long updatebatch(updatedefinition<t> update, filterdefinition<t> filter)
    {
      var result = _collection.updatemany(filter, update, new updateoptions() { isupsert = true });
      return result.modifiedcount;
    }
    /// <summary>
    /// 根据条件批量修改文档
    /// </summary>
    /// <param name="update">修改条件-形如:builders/<t/>.update.set(filed, value)</param>
    /// <param name="filter">查询条件builders/<t/>.filter.eq(filed, value)</param>
    /// <returns>修改影响文档数</returns>
    public virtual void updatebatchasync(updatedefinition<t> update, expression<func<t, bool>> filter)
    {
       _collection.updatemanyasync(filter, update, new updateoptions() { isupsert = true });
    }
 
    /// <summary>
    /// 根据条件批量修改文档
    /// </summary>
    /// <param name="update">修改条件-形如:builders/<t/>.update.set(filed, value)</param>
    /// <param name="filter">查询条件builders/<t/>.filter.eq(filed, value)</param>
    /// <returns>修改影响文档数</returns>
    public virtual void updatebatchasync(updatedefinition<t> update, filterdefinition<t> filter)
    {
      _collection.updatemanyasync(filter, update, new updateoptions() { isupsert = true });
    }
    #endregion
 
    #region 查询 
 
    #region getcollection
 
    /// <summary>
    /// 获取操作对象的imongocollection集合,强类型对象集合
    /// </summary>
    /// <returns></returns>
    public virtual imongocollection<t> getcollection()
    {
      return _database.getcollection<t>(typeof(t).name);
    }
 
    #endregion
 
    #region getsingle
    /// <summary>
    /// 查询数据库,检查是否存在指定id的对象
    /// </summary>
    /// <param name="id">对象的id值</param>
    /// <returns>存在则返回指定的对象,否则返回null</returns>
    public virtual t getbyid(string id)
    {
      var filterbuilder = builders<t>.filter;
      var filter = filterbuilder.eq("id", new objectid(id));
      var data = _collection.find(filter).firstordefault();
      return data;
    }
    /// <summary>
    /// 查询数据库,检查是否存在指定id的对象
    /// </summary>
    /// <param name="id">对象的id值</param>
    /// <returns>存在则返回指定的对象,否则返回null</returns>
    public virtual async task<t> getasyncbyid(string id)
    {
      var filterbuilder = builders<t>.filter;
      var filter = filterbuilder.eq("id", new objectid(id));
      var data = await _collection.findasync(filter);
      return await data.singleordefaultasync();
    }
    /// <summary>
    /// 查询数据
    /// </summary>
    /// <param name="filter">过滤条件</param>
    /// <returns></returns>
    public virtual t get(filterdefinition<t> filter)
    {
      return _collection.find(filter).firstordefault();
    }
    /// <summary>
    /// 查询数据
    /// </summary>
    /// <param name="filter">条件表达式</param>
    /// <returns></returns>
    public virtual t get(expression<func<t,bool>> filter)
    {
      return _collection.find(filter).firstordefault();
    }
    /// <summary>
    /// 查询数据
    /// </summary>
    /// <param name="filter">过滤条件</param>
    /// <returns></returns>
    public virtual async task<t> getasync(filterdefinition<t> filter)
    {
      var data = await _collection.findasync(filter);
      return await data.singleordefaultasync();
    }
    /// <summary>
    /// 查询数据
    /// </summary>
    /// <param name="filter">条件表达式</param>
    /// <returns></returns>
    public virtual async task<t> getasync(expression<func<t, bool>> filter)
    {
      var data = await _collection.findasync(filter);
      return await data.singleordefaultasync();
    }
 
 
    #endregion
 
    #region getmany
    /// <summary>
    /// 查询部分数据
    /// </summary>
    /// <param name="filter">过滤条件</param>
    /// <returns></returns>
    public virtual ienumerable<t> getmany(filterdefinition<t> filter)
    {
      return _collection.find(filter).toenumerable();
    }
 
    /// <summary>
    /// 查询部分数据
    /// </summary>
    /// <param name="filter">条件表达式</param>
    /// <returns></returns>
    public virtual ienumerable<t> getmany(expression<func<t,bool>> filter)
    {
      //return _collection.asqueryable().where(filter).tolist();
      //return _collection.asqueryable().where(filter);
      return _collection.find(filter).toenumerable(); //.toenumerable(); 
    }
 
    /// <summary>
    /// 查询部分数据
    /// </summary>
    /// <param name="filter">过滤条件</param>
    /// <returns></returns>
    public virtual async task<ienumerable<t>> getmanyasync(filterdefinition<t> filter)
    {
      var data = await _collection.findasync(filter);
      return await data.tolistasync();
    }
 
    /// <summary>
    /// 查询部分数据
    /// </summary>
    /// <param name="filter">过滤条件</param>
    /// <returns></returns>
    public virtual async task<ienumerable<t>> getmanyasync(expression<func<t, bool>> filter)
    {
      var data = await _collection.findasync(filter);
      return await data.tolistasync();
    }
     
    #endregion
 
    #region getall
 
    /// <summary>
    /// 查询所有记录,复杂查询直接用linq处理(避免全表扫描)
    /// </summary>
    /// <returns>要查询的对象</returns>
    public virtual ienumerable<t> getall()
    {
      var data = _collection.asqueryable();
      return data.toenumerable();
    }
    /// <summary>
    /// 查询所有记录,复杂查询直接用linq处理(避免全表扫描)
    /// </summary>
    /// <returns>要查询的对象</returns>
    public virtual async task<ienumerable<t>> getallasync()
    {
      var data = _collection.asqueryable();
      return await data.tolistasync();
    }
 
    /// <summary>
    /// 查询所有记录,复杂查询直接用linq处理(避免全表扫描)
    /// </summary>
    /// <returns>要查询的对象</returns>
    public virtual iqueryable<t> getallqueryable()
    {
      return _collection.asqueryable();
    }
 
    #endregion
 
    #region mapreduce
    /// <summary> 
    /// mapreduce
    /// </summary>   
    /// <returns>返回一个list列表数据</returns> 
    public ienumerable<t> getmap(bsonjavascript map,bsonjavascript reduce)
    {
      return _collection.mapreduce<t>(map,reduce).tolist();
    }
 
 
    #endregion
 
    #endregion
  }
 
}


好了,就介绍到这里。