windows7下使用MongoDB实现仓储设计
简单的介绍一下,我使用mongodb的场景。
我们现在的物联网环境下,有部分数据,采样频率为2000条记录/分钟,这样下来一天24*60*2000=2880000约等于300万条数据,以后必然还会增加。之前数据库使用的是mssql,对于数据库的压力很大,同时又需要保证历史查询的响应速度,这种情况下,在单表中数据量大,同时存在读写操作。不得已采用mongodb来存储数据。如果使用mongodb,则至少需要三台机器,两台实现读写分离,一台作为仲裁(当然条件不允许也可以不用),每台机器的内存暂时配置在16g,公司小,没办法,据说,使用这个mongodb需要机器内存最少92g,我没有验证过,但是吃内存是公认的,所以内存绝对要保证,就算保证了,也不一定完全就没有意外发生。我们上面的这些特殊的数据是允许少量的丢失的,这些只是做分析使用的,几个月了,暂时还没出现数据丢失的情况,可能最新版本早就修复了吧,新手使用建议多看下官网上的说明。下面直接奔入主题:
一、安装部署和配置环境
1.安装部署mongo-server(v3.4)
参考
这个时候不要启动,接着配置config文件
2.配置config文件
dbpath=c:/program files/mongodb/server/3.4/bin/data/db logpath=c:/program files/mongodb/server/3.4/bin/data/log/master.log pidfilepath=c:/program files/mongodb/server/3.4/bin/master.pid directoryperdb=true logappend=true replset=testrs bind_ip=10.1.5.25 port=27016 oplogsize=10000 noauth = true storageengine = wiredtiger wiredtigercachesizegb = 2 syncdelay = 30 wiredtigercollectionblockcompressor = snappy
以上是详细的配置参数,其中路径部分根据需要更改, 这里设置的oplogsize大小为10g,根据业务场景进行调整,另外auth权限为null,因为设置权限会增加服务开销,影响效率,最下面几行是内存引擎,可以控制副本集同步及内存限制,防止内存泄露。
3.启动mongo-server
4.添加副本集配置
conf= { "_id" : "testrs", "members" : [ { "_id" : 0, "host" : "10.1.5.25:27016" }, { "_id" : 1, "host" : "10.1.5.26:27016" }, { "_id" : 2, "host" : "10.1.5.27:27016" } ] } rs.initiate(conf)
此时副本集集群配置已经完成,然后在命令行中输入:rs.status(),查看副本集状态,需要查看同步情况,可以输入命令:db.serverstatus().
5.设置副本集可读写
rs.slaveok()
6..net操作mongo
连接设置,请参考个人封装unitoon.mongo代码所示。
7.性能对比
读写速度:redis>mongo>mssqlserver
可容纳数据量:mssqlserver~mongo>redis
存储数据类型:mongo>mssqlserver>redis
note:内存持续上升,内部没有内存回收机制,若限制内存 ,则可能出现查询速度变慢,数据丢失等问题,建议优化查询效率,建立索引
db.test.ensureindex({"username":1, "age":-1})
强制释放内存命令:db.runcommand({closealldatabases:1})
二、仓储设计
1.基类baseentity
namespace unitooniot.mongo { /// <summary> /// 实体基类,方便生成objid /// </summary> [serializable] [protocontract(implicitfields = implicitfields.allpublic)] //[protoinclude(10, typeof(normalhistory))] public class baseentity { //[bsonrepresentation(bsontype.objectid)] public objectid id { get; set; } /// <summary> /// 数据库名称 /// </summary> public string dbname { get; set; } /// <summary> /// 给对象初值 /// </summary> public baseentity() { // this.objid = objectid.generatenewid().tostring(); //this.id = objectid.newobjectid().tostring(); } } }
这里需要注意时间格式,mongodb默认时间格式为国际时间,所以在写入数据时和读取数据时,时间格式要一致,此例中没有对时间进行特殊处理,由传入的时间格式确定。
2.repository继承接口imongorepository
namespace unitooniot.mongo { public interface imongorepository<tentity> where tentity : class { } }
3.mongorepository
using mongodb.driver; using mongodb.bson; using system; using system.collections.generic; using system.linq; using system.linq.expressions; using system.text; using system.threading.tasks; using mongodb.bson.serialization.attributes; using mongodb.driver.linq; using system.configuration; using system.io; using unitooniot.appsetting; namespace unitooniot.mongo { public class mongodb { private static string connectionstringhost ; private static string username ; private static string password; private static imongodatabase _db = null; private static readonly object lockhelper = new object(); /// <summary> /// mongodb初始化 /// </summary> public static void init() { connectionstringhost = "10.1.5.24:27016,10.1.5.24:27016,10.1.5.26:27017"; //appsettings.getconfigvalue("mongohost");//"10.1.5.24:27016"; username = appsettings.getconfigvalue("mongousername"); password = appsettings.getconfigvalue("mongopwd"); } static mongodb() { } public static imongodatabase getdb(string dbname,string options=null) { if (_db != null) return _db; lock (lockhelper) { if (_db != null) return _db; var database = dbname; var username = username; var password = password; var authentication = string.empty; var host = string.empty; if (!string.isnullorwhitespace(username)) { authentication = string.concat(username, ':', password, '@'); } if (!string.isnullorempty(options) && !options.startswith("?")) { options = string.concat('?', options); } host = string.isnullorempty(connectionstringhost) ? "localhost" : connectionstringhost; database = database ?? "testdb"; //mongodb://[username:password@]host1[:port1][,host2[:port2],…[,hostn[:portn]]][/[database][?options]] var constring = options!=null? $"mongodb://{authentication}{host}/{database}{options}" : $"mongodb://{authentication}{host}/{database}"; var url = new mongourl(constring); var mcs = mongoclientsettings.fromurl(url); mcs.maxconnectionlifetime = timespan.frommilliseconds(1000); var client = new mongoclient(mcs); _db = client.getdatabase(url.databasename); } return _db; } } /// <summary> /// mongodb 数据库操作类 /// </summary> public class mongorepository<t>: imongorepository<t> where t : baseentity { #region readonly field /// <summary> /// 表名 /// </summary> private readonly imongocollection<t> _collection = null; /// <summary> /// 数据库对象 /// </summary> private readonly imongodatabase _database; #endregion /// <summary> /// 构造函数 /// </summary> public mongorepository() { this._database = mongodb.getdb(activator.createinstance<t>().dbname, "readpreference =secondarypreferred ");//primarypreferred/secondarypreferred/nearest _collection = _database.getcollection<t>(typeof(t).name); } #region 增加 /// <summary> /// 插入对象 /// </summary> /// <param name="t">插入的对象</param> public virtual t insert(t t) { // var flag = objectid.generatenewid(); // t.gettype().getproperty("id").setvalue(t, flag); //t.time = datetime.now; _collection.insertone(t); return t; } /// <summary> /// 批量插入 /// </summary> /// <param name="ts">要插入的对象集合</param> public virtual ienumerable<t> insertbatch(ienumerable<t> ts) { _collection.insertmany(ts); return ts; } /// <summary> /// 插入对象 /// </summary> /// <param name="t">插入的对象</param> public virtual void insertasync(t t) { //var flag = objectid.generatenewid(); // t.gettype().getproperty("id").setvalue(t, flag); // t.time = datetime.now; _collection.insertoneasync(t); } /// <summary> /// 批量插入 /// </summary> /// <param name="ts">要插入的对象集合</param> public virtual void insertbatchasync(ienumerable<t> ts) { _collection.insertmanyasync(ts); } #endregion #region 删除 /// <summary> /// 删除 /// </summary> /// <returns></returns> public virtual long delete(t t) { var filter = builders<t>.filter.eq("id", t.id); var result = _collection.deleteone(filter); return result.deletedcount; } /// <summary> /// 删除 /// </summary> /// <returns></returns> public virtual void deleteasync(t t) { var filter = builders<t>.filter.eq("id", t.id); _collection.deleteoneasync(filter); } /// <summary> /// 按条件表达式删除 /// </summary> /// <param name="predicate">条件表达式</param> /// <returns></returns> public virtual long delete(expression<func<t, bool>> predicate) { var result = _collection.deleteone(predicate); return result.deletedcount; } /// <summary> /// 按条件表达式删除 /// </summary> /// <param name="predicate">条件表达式</param> /// <returns></returns> public virtual void deleteasync(expression<func<t, bool>> predicate) { _collection.deleteoneasync(predicate); } /// <summary> /// 按条件表达式批量删除 /// </summary> /// <param name="predicate">条件表达式</param> /// <returns></returns> public virtual long deletebatch(expression<func<t, bool>> predicate) { var result = _collection.deletemany(predicate); return result.deletedcount; } /// <summary> /// 按条件表达式批量删除 /// </summary> /// <param name="predicate">条件表达式</param> /// <returns></returns> public virtual void deletebatchasync(expression<func<t, bool>> predicate) { _collection.deletemanyasync(predicate); } /// <summary> /// 按检索条件删除 /// 建议用builders<t>构建复杂的查询条件 /// </summary> /// <param name="filter">条件</param> /// <returns></returns> public virtual long delete(filterdefinition<t> filter) { var result = _collection.deleteone(filter); return result.deletedcount; } /// <summary> /// 按检索条件删除 /// 建议用builders<t>构建复杂的查询条件 /// </summary> /// <param name="filter">条件</param> /// <returns></returns> public virtual void deleteasync(filterdefinition<t> filter) { _collection.deleteoneasync(filter); } #endregion #region 修改 /// <summary> /// 修改(id不变) /// </summary> /// <returns></returns> public virtual long update(t t) { var filterbuilder = builders<t>.filter; var filter = filterbuilder.eq("id",t.id); var update = _collection.replaceone(filter, t, new updateoptions() { isupsert = true }); return update.modifiedcount; } /// <summary> /// 修改(id不变) /// </summary> /// <returns></returns> public virtual void updateasync(t t) { var filterbuilder = builders<t>.filter; var filter = filterbuilder.eq("id", t.id); _collection.replaceoneasync(filter, t, new updateoptions() { isupsert = true }); } /// <summary> /// 用新对象替换新文档 /// </summary> /// <param name="filter">查询条件</param> /// <param name="t">对象</param> /// <returns>修改影响文档数</returns> public virtual long update(expression<func<t, bool>> filter, t t) { var update = _collection.replaceone(filter, t, new updateoptions() { isupsert = true }); return update.modifiedcount; } /// <summary> /// 用新对象替换新文档 /// </summary> /// <param name="filter">查询条件</param> /// <param name="t">对象</param> /// <returns>修改影响文档数</returns> public virtual long update(filterdefinition<t> filter, t t) { var update = _collection.replaceone(filter, t, new updateoptions() { isupsert = true }); return update.modifiedcount; } /// <summary> /// 用新对象替换新文档 /// </summary> /// <param name="filter">查询条件</param> /// <param name="t">对象</param> /// <returns>修改影响文档数</returns> public virtual void updateasync(expression<func<t, bool>> filter, t t) { _collection.replaceoneasync(filter, t, new updateoptions() { isupsert = true }); } /// <summary> /// 用新对象替换新文档 /// </summary> /// <param name="filter">查询条件</param> /// <param name="t">对象</param> /// <returns>修改影响文档数</returns> public virtual void updateasync(filterdefinition<t> filter, t t) { _collection.replaceoneasync(filter, t, new updateoptions() { isupsert = true }); } /// <summary> /// 根据id和条件文档 /// </summary> /// <param name="update">修改条件-形如:builders/<t/>.update.set(filed, value)</param> /// <param name="id">对象id</param> /// <returns>修改影响文档数</returns> public virtual long update(string id, updatedefinition<t> update) { var filterbuilder = builders<t>.filter; var filter = filterbuilder.eq("id", new objectid(id)); var result = _collection.updateone(filter, update, new updateoptions() { isupsert = true }); return result.modifiedcount; } /// <summary> /// 根据id和条件文档 /// </summary> /// <param name="update">修改条件-形如:builders/<t/>.update.set(filed, value)</param> /// <param name="id">对象id</param> /// <returns>修改影响文档数</returns> public virtual void updateasync(string id, updatedefinition<t> update) { var filterbuilder = builders<t>.filter; var filter = filterbuilder.eq("id", new objectid(id)); _collection.updateoneasync(filter, update, new updateoptions() { isupsert = true }); } /// <summary> /// 根据条件修改文档 /// </summary> /// <param name="update">修改条件-形如:builders/<t/>.update.set(filed, value)</param> /// <param name="filter">查询条件builders/<t/>.filter.eq(filed, value)</param> /// <returns>修改影响文档数</returns> public virtual void update(updatedefinition<t> update,expression<func<t, bool>> filter) { _collection.updateone(filter, update, new updateoptions() { isupsert = true }); } /// <summary> /// 根据条件修改文档 /// </summary> /// <param name="update">修改条件-形如:builders/<t/>.update.set(filed, value)</param> /// <param name="filter">查询条件builders/<t/>.filter.eq(filed, value)</param> /// <returns>修改影响文档数</returns> public virtual long update(updatedefinition<t> update, filterdefinition<t> filter) { var result = _collection.updateone(filter, update, new updateoptions() { isupsert = true }); return result.modifiedcount; } /// <summary> /// 根据条件修改文档 /// </summary> /// <param name="update">修改条件-形如:builders/<t/>.update.set(filed, value)</param> /// <param name="filter">查询条件builders/<t/>.filter.eq(filed, value)</param> /// <returns>修改影响文档数</returns> public virtual void updateasync(updatedefinition<t> update, expression<func<t, bool>> filter) { _collection.updateoneasync(filter, update, new updateoptions() { isupsert = true }); } /// <summary> /// 根据条件修改文档 /// </summary> /// <param name="update">修改条件-形如:builders/<t/>.update.set(filed, value)</param> /// <param name="filter">查询条件builders/<t/>.filter.eq(filed, value)</param> /// <returns>修改影响文档数</returns> public virtual void updateasync(updatedefinition<t> update, filterdefinition<t> filter) { _collection.updateoneasync(filter, update, new updateoptions() { isupsert = true }); } /// <summary> /// 根据条件批量修改文档 /// </summary> /// <param name="update">修改条件-形如:builders/<t/>.update.set(filed, value)</param> /// <param name="filter">查询条件builders/<t/>.filter.eq(filed, value)</param> /// <returns>修改影响文档数</returns> public virtual long updatebatch(updatedefinition<t> update, expression<func<t, bool>> filter) { var result = _collection.updatemany(filter, update, new updateoptions() { isupsert = true }); return result.modifiedcount; } /// <summary> /// 根据条件批量修改文档 /// </summary> /// <param name="update">修改条件-形如:builders/<t/>.update.set(filed, value)</param> /// <param name="filter">查询条件builders/<t/>.filter.eq(filed, value)</param> /// <returns>修改影响文档数</returns> public virtual long updatebatch(updatedefinition<t> update, filterdefinition<t> filter) { var result = _collection.updatemany(filter, update, new updateoptions() { isupsert = true }); return result.modifiedcount; } /// <summary> /// 根据条件批量修改文档 /// </summary> /// <param name="update">修改条件-形如:builders/<t/>.update.set(filed, value)</param> /// <param name="filter">查询条件builders/<t/>.filter.eq(filed, value)</param> /// <returns>修改影响文档数</returns> public virtual void updatebatchasync(updatedefinition<t> update, expression<func<t, bool>> filter) { _collection.updatemanyasync(filter, update, new updateoptions() { isupsert = true }); } /// <summary> /// 根据条件批量修改文档 /// </summary> /// <param name="update">修改条件-形如:builders/<t/>.update.set(filed, value)</param> /// <param name="filter">查询条件builders/<t/>.filter.eq(filed, value)</param> /// <returns>修改影响文档数</returns> public virtual void updatebatchasync(updatedefinition<t> update, filterdefinition<t> filter) { _collection.updatemanyasync(filter, update, new updateoptions() { isupsert = true }); } #endregion #region 查询 #region getcollection /// <summary> /// 获取操作对象的imongocollection集合,强类型对象集合 /// </summary> /// <returns></returns> public virtual imongocollection<t> getcollection() { return _database.getcollection<t>(typeof(t).name); } #endregion #region getsingle /// <summary> /// 查询数据库,检查是否存在指定id的对象 /// </summary> /// <param name="id">对象的id值</param> /// <returns>存在则返回指定的对象,否则返回null</returns> public virtual t getbyid(string id) { var filterbuilder = builders<t>.filter; var filter = filterbuilder.eq("id", new objectid(id)); var data = _collection.find(filter).firstordefault(); return data; } /// <summary> /// 查询数据库,检查是否存在指定id的对象 /// </summary> /// <param name="id">对象的id值</param> /// <returns>存在则返回指定的对象,否则返回null</returns> public virtual async task<t> getasyncbyid(string id) { var filterbuilder = builders<t>.filter; var filter = filterbuilder.eq("id", new objectid(id)); var data = await _collection.findasync(filter); return await data.singleordefaultasync(); } /// <summary> /// 查询数据 /// </summary> /// <param name="filter">过滤条件</param> /// <returns></returns> public virtual t get(filterdefinition<t> filter) { return _collection.find(filter).firstordefault(); } /// <summary> /// 查询数据 /// </summary> /// <param name="filter">条件表达式</param> /// <returns></returns> public virtual t get(expression<func<t,bool>> filter) { return _collection.find(filter).firstordefault(); } /// <summary> /// 查询数据 /// </summary> /// <param name="filter">过滤条件</param> /// <returns></returns> public virtual async task<t> getasync(filterdefinition<t> filter) { var data = await _collection.findasync(filter); return await data.singleordefaultasync(); } /// <summary> /// 查询数据 /// </summary> /// <param name="filter">条件表达式</param> /// <returns></returns> public virtual async task<t> getasync(expression<func<t, bool>> filter) { var data = await _collection.findasync(filter); return await data.singleordefaultasync(); } #endregion #region getmany /// <summary> /// 查询部分数据 /// </summary> /// <param name="filter">过滤条件</param> /// <returns></returns> public virtual ienumerable<t> getmany(filterdefinition<t> filter) { return _collection.find(filter).toenumerable(); } /// <summary> /// 查询部分数据 /// </summary> /// <param name="filter">条件表达式</param> /// <returns></returns> public virtual ienumerable<t> getmany(expression<func<t,bool>> filter) { //return _collection.asqueryable().where(filter).tolist(); //return _collection.asqueryable().where(filter); return _collection.find(filter).toenumerable(); //.toenumerable(); } /// <summary> /// 查询部分数据 /// </summary> /// <param name="filter">过滤条件</param> /// <returns></returns> public virtual async task<ienumerable<t>> getmanyasync(filterdefinition<t> filter) { var data = await _collection.findasync(filter); return await data.tolistasync(); } /// <summary> /// 查询部分数据 /// </summary> /// <param name="filter">过滤条件</param> /// <returns></returns> public virtual async task<ienumerable<t>> getmanyasync(expression<func<t, bool>> filter) { var data = await _collection.findasync(filter); return await data.tolistasync(); } #endregion #region getall /// <summary> /// 查询所有记录,复杂查询直接用linq处理(避免全表扫描) /// </summary> /// <returns>要查询的对象</returns> public virtual ienumerable<t> getall() { var data = _collection.asqueryable(); return data.toenumerable(); } /// <summary> /// 查询所有记录,复杂查询直接用linq处理(避免全表扫描) /// </summary> /// <returns>要查询的对象</returns> public virtual async task<ienumerable<t>> getallasync() { var data = _collection.asqueryable(); return await data.tolistasync(); } /// <summary> /// 查询所有记录,复杂查询直接用linq处理(避免全表扫描) /// </summary> /// <returns>要查询的对象</returns> public virtual iqueryable<t> getallqueryable() { return _collection.asqueryable(); } #endregion #region mapreduce /// <summary> /// mapreduce /// </summary> /// <returns>返回一个list列表数据</returns> public ienumerable<t> getmap(bsonjavascript map,bsonjavascript reduce) { return _collection.mapreduce<t>(map,reduce).tolist(); } #endregion #endregion } }
好了,就介绍到这里。