欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

MongoDB

程序员文章站 2024-01-23 08:21:28
...

复制集简介

Mongodb复制集由一组Mongod实例(进程)组成,包含一个Primary节点和多个Secondary节点,Mongodb Driver(客户端)的所有数据都写入Primary,Secondary从Primary同步写入的数据,以保持复制集内所有成员存储相同的数据集,提供数据的高可用。

下图(图片源于Mongodb官方文档)是一个典型的Mongdb复制集,包含一个Primary节点和2个Secondary节点。

MongoDB

Primary选举

复制集通过replSetInitiate命令(或mongo shell的rs.initiate())进行初始化,初始化后各个成员间开始发送心跳消息,并发起Priamry选举操作,获得『大多数』成员投票支持的节点,会成为Primary,其余节点成为Secondary。

初始化复制集

config = {
    _id : "my_replica_set",
    members : [
         {_id : 0, host : "rs1.example.net:27017"},
         {_id : 1, host : "rs2.example.net:27017"},
         {_id : 2, host : "rs3.example.net:27017"},
   ]
}

rs.initiate(config)

『大多数』的定义

假设复制集内投票成员(后续介绍)数量为N,则大多数为 N/2 + 1,当复制集内存活成员数量不足大多数时,整个复制集将无法选举出Primary,复制集将无法提供写服务,处于只读状态。

投票成员数 大多数 容忍失效数
1 1 0
2 2 0
3 2 1
4 3 1
5 3 2
6 4 2
7 4 3

通常建议将复制集成员数量设置为奇数,从上表可以看出3个节点和4个节点的复制集都只能容忍1个节点失效,从『服务可用性』的角度看,其效果是一样的。(但无疑4个节点能提供更可靠的数据存储)

特殊的Secondary

正常情况下,复制集的Seconary会参与Primary选举(自身也可能会被选为Primary),并从Primary同步最新写入的数据,以保证与Primary存储相同的数据。

Secondary可以提供读服务,增加Secondary节点可以提供复制集的读服务能力,同时提升复制集的可用性。另外,Mongodb支持对复制集的Secondary节点进行灵活的配置,以适应多种场景的需求。

Arbiter

Arbiter节点只参与投票,不能被选为Primary,并且不从Primary同步数据。

比如你部署了一个2个节点的复制集,1个Primary,1个Secondary,任意节点宕机,复制集将不能提供服务了(无法选出Primary),这时可以给复制集添加一个Arbiter节点,即使有节点宕机,仍能选出Primary。

Arbiter本身不存储数据,是非常轻量级的服务,当复制集成员为偶数时,最好加入一个Arbiter节点,以提升复制集可用性。

Priority0

Priority0节点的选举优先级为0,不会被选举为Primary

比如你跨机房A、B部署了一个复制集,并且想指定Primary必须在A机房,这时可以将B机房的复制集成员Priority设置为0,这样Primary就一定会是A机房的成员。(注意:如果这样部署,最好将『大多数』节点部署在A机房,否则网络分区时可能无法选出Primary)

Vote0

Mongodb 3.0里,复制集成员最多50个,参与Primary选举投票的成员最多7个,其他成员(Vote0)的vote属性必须设置为0,即不参与投票。

Hidden

Hidden节点不能被选为主(Priority为0),并且对Driver不可见。

因Hidden节点不会接受Driver的请求,可使用Hidden节点做一些数据备份、离线计算的任务,不会影响复制集的服务。

Delayed

Delayed节点必须是Hidden节点,并且其数据落后与Primary一段时间(可配置,比如1个小时)。

因Delayed节点的数据比Primary落后一段时间,当错误或者无效的数据写入Primary时,可通过Delayed节点的数据来恢复到之前的时间点。

数据同步

Primary与Secondary之间通过oplog来同步数据,Primary上的写操作完成后,会向特殊的local.oplog.rs特殊集合写入一条oplog,Secondary不断的从Primary取新的oplog并应用。

因oplog的数据会不断增加,local.oplog.rs被设置成为一个capped集合,当容量达到配置上限时,会将最旧的数据删除掉。另外考虑到oplog在Secondary上可能重复应用,oplog必须具有幂等性,即重复应用也会得到相同的结果。

如下oplog的格式,包含ts、h、op、ns、o等字段

{
  "ts" : Timestamp(1446011584, 2),
  "h" : NumberLong("1687359108795812092"), 
  "v" : 2, 
  "op" : "i", 
  "ns" : "test.nosql", 
  "o" : { "_id" : ObjectId("563062c0b085733f34ab4129"), "name" : "mongodb", "score" : "100" } 
}
ts: 操作时间,当前timestamp + 计数器,计数器每秒都被重置
h:操作的全局唯一标识
v:oplog版本信息
op:操作类型
i:插入操作
u:更新操作
d:删除操作
c:执行命令(如createDatabase,dropDatabase)
n:空操作,特殊用途
ns:操作针对的集合
o:操作内容,如果是更新操作
o2:操作查询条件,仅update操作包含该字段

Secondary初次同步数据时,会先进行init sync,从Primary(或其他数据更新的Secondary)同步全量数据,然后不断通过tailable cursor从Primary的local.oplog.rs集合里查询最新的oplog并应用到自身。

init sync过程包含如下步骤

T1时间,从Primary同步所有数据库的数据(local除外),通过listDatabases + listCollections + cloneCollection敏命令组合完成,假设T2时间完成所有操作。
从Primary应用[T1-T2]时间段内的所有oplog,可能部分操作已经包含在步骤1,但由于oplog的幂等性,可重复应用。
根据Primary各集合的index设置,在Secondary上为相应集合创建index。(每个集合_id的index已在步骤1中完成)。
oplog集合的大小应根据DB规模及应用写入需求合理配置,配置得太大,会造成存储空间的浪费;配置得太小,可能造成Secondary的init sync一直无法成功。比如在步骤1里由于DB数据太多、并且oplog配置太小,导致oplog不足以存储[T1, T2]时间内的所有oplog,这就Secondary无法从Primary上同步完整的数据集。

修改复制集配置

当需要修改复制集时,比如增加成员、删除成员、或者修改成员配置(如priorty、vote、hidden、delayed等属性),可通过replSetReconfig命令(rs.reconfig())对复制集进行重新配置。

比如将复制集的第2个成员Priority设置为2,可执行如下命令

cfg = rs.conf();
cfg.members[1].priority = 2;
rs.reconfig(cfg);

细说Primary选举

Primary选举除了在复制集初始化时发生,还有如下场景

复制集被reconfig
Secondary节点检测到Primary宕机时,会触发新Primary的选举
当有Primary节点主动stepDown(主动降级为Secondary)时,也会触发新的Primary选举
Primary的选举受节点间心跳、优先级、最新的oplog时间等多种因素影响。

节点间心跳

复制集成员间默认每2s会发送一次心跳信息,如果10s未收到某个节点的心跳,则认为该节点已宕机;如果宕机的节点为Primary,Secondary(前提是可被选为Primary)会发起新的Primary选举。

节点优先级

每个节点都倾向于投票给优先级最高的节点
优先级为0的节点不会主动发起Primary选举
当Primary发现有优先级更高Secondary,并且该Secondary的数据落后在10s内,则Primary会主动降级,让优先级更高的Secondary有成为Primary的机会。

Optime

拥有最新optime(最近一条oplog的时间戳)的节点才能被选为主。

网络分区

只有更大多数投票节点间保持网络连通,才有机会被选Primary;如果Primary与大多数的节点断开连接,Primary会主动降级为Secondary。当发生网络分区时,可能在短时间内出现多个Primary,故Driver在写入时,最好设置『大多数成功』的策略,这样即使出现多个Primary,也只有一个Primary能成功写入大多数。

复制集的读写设置

Read Preference

默认情况下,复制集的所有读请求都发到Primary,Driver可通过设置Read Preference来将读请求路由到其他的节点。

primary: 默认规则,所有读请求发到Primary
primaryPreferred: Primary优先,如果Primary不可达,请求Secondary
secondary: 所有的读请求都发到secondary
secondaryPreferred:Secondary优先,当所有Secondary不可达时,请求Primary
nearest:读请求发送到最近的可达节点上(通过ping探测得出最近的节点)

Write Concern

默认情况下,Primary完成写操作即返回,Driver可通过设置[Write Concern(https://docs.mongodb.org/manual/core/write-concern/)来设置写成功的规则

如下的write concern规则设置写必须在大多数节点上成功,超时时间为5s。

db.products.insert(
  { item: "envelopes", qty : 100, type: "Clasp" },
  { writeConcern: { w: majority, wtimeout: 5000 } }
)

上面的设置方式是针对单个请求的,也可以修改副本集默认的write concern,这样就不用每个请求单独设置。

cfg = rs.conf()
cfg.settings = {}
cfg.settings.getLastErrorDefaults = { w: "majority", wtimeout: 5000 }
rs.reconfig(cfg)

异常处理(rollback)

当Primary宕机时,如果有数据未同步到Secondary,当Primary重新加入时,如果新的Primary上已经发生了写操作,则旧Primary需要回滚部分操作,以保证数据集与新的Primary一致。

旧Primary将回滚的数据写到单独的rollback目录下,数据库管理员可根据需要使用mongorestore进行恢复。

集群搭建之分片

MongoDB是最常用的nodql数据库,在数据库排名中已经上升到了前六。这篇文章介绍如何搭建高可用的MongoDB(分片+副本)集群。

在搭建集群之前,需要首先了解几个概念:路由,分片、副本集、配置服务器等。

相关概念

先来看一张图:

MongoDB

从图中可以看到有四个组件:mongos、config server、shard、replica set。

mongos,数据库集群请求的入口,所有的请求都通过mongos进行协调,不需要在应用程序添加一个路由选择器,mongos自己就是一个请求分发中心,它负责把对应的数据请求请求转发到对应的shard服务器上。在生产环境通常有多mongos作为请求的入口,防止其中一个挂掉所有的mongodb请求都没有办法操作。

config server,顾名思义为配置服务器,存储所有数据库元信息(路由、分片)的配置。mongos本身没有物理存储分片服务器和数据路由信息,只是缓存在内存里,配置服务器则实际存储这些数据。mongos第一次启动或者关掉重启就会从 config server 加载配置信息,以后如果配置服务器信息变化会通知到所有的 mongos 更新自己的状态,这样 mongos 就能继续准确路由。在生产环境通常有多个 config server 配置服务器,因为它存储了分片路由的元数据,防止数据丢失!

shard,分片(sharding)是指将数据库拆分,将其分散在不同的机器上的过程。将数据分散到不同的机器上,不需要功能强大的服务器就可以存储更多的数据和处理更大的负载。基本思想就是将集合切成小块,这些块分散到若干片里,每个片只负责总数据的一部分,最后通过一个均衡器来对各个分片进行均衡(数据迁移)。

replica set,中文翻译副本集,其实就是shard的备份,防止shard挂掉之后数据丢失。复制提供了数据的冗余备份,并在多个服务器上存储数据副本,提高了数据的可用性, 并可以保证数据的安全性。

仲裁者(Arbiter),是复制集中的一个MongoDB实例,它并不保存数据。仲裁节点使用最小的资源并且不要求硬件设备,不能将Arbiter部署在同一个数据集节点中,可以部署在其他应用服务器或者监视服务器中,也可部署在单独的虚拟机中。为了确保复制集中有奇数的投票成员(包括primary),需要添加仲裁节点做为投票,否则primary不能运行时不会自动切换primary。

简单了解之后,我们可以这样总结一下,应用请求mongos来操作mongodb的增删改查,配置服务器存储数据库元信息,并且和mongos做同步,数据最终存入在shard(分片)上,为了防止数据丢失同步在副本集中存储了一份,仲裁在数据存储到分片的时候决定存储到哪个节点。

环境准备

系统系统 CentOS6.5
三台服务器:192.168.0.75/84/86
安装包: mongodb-linux-x86_64-3.4.6.tgz

服务器规划

服务器75 服务器84 服务器86
mongos mongos mongos
config server config server config server
shard server1 主节点 shard server1 副节点 shard server1 仲裁
shard server2 仲裁 shard server2 主节点 shard server2 副节点
shard server3 副节点 shard server3 仲裁 shard server3 主节点

sharding集群搭建及JavaAPI的简易使用

第一部分

在搭建mongoDB之前,我们要考虑几个小问题:

1、我们搭建集群的目的是什么?是多备份提高容错和系统可用性还是横向拓展存储大规模数据还是两者兼有?

  如果是为了多备份那么选择replication集群搭建即可,如果是为了处理大数据则需要搭建sharding集群,如果两者兼有需要对每个shardsvr创建replica。

2、什么是sharding?和replication有什么不同?

  简单而言,replica是mongo提供服务的一个基本单位,单机系统和replication集群对用户来讲没有区别,都只相当于一个服务节点,只不过replication集群有多备份,还有服务端选举,安全性更加有保证。而sharding集群包含3个角色:mongos,configsvr,shardsvr,对于一个集群来说mongos相当于master,负责对外提供服务,shardsvr相当于slave,负责分片存储数据,而configsvr相当于router,负责记录分片元信息。这3种角色中的任何一个角色中的子节点都是一个replica。具体说明参考官方网站对sharding和replication的描述:

  replication:https://docs.mongodb.com/manual/replication/

  sharding:https://docs.mongodb.com/manual/sharding/

3、我们集群的架构又该是什么样子?

  如果对问题1和2有足够的认知的话,那么根据本地硬件环境构建一个什么样的集群大致也就清楚了,每个shardsvr的replication相当于一个slave,我们需要几个子节点就需要创建多少个shardsvr,configsvr是router信息,我们可以将所有机器都组成一个configsvr的replication用以提供router服务,至于mongos,内部使用一个节点也可以,如果需要稳定运行的话也需要组一个小的mongos的replication。

 第二部分

下面是实战环节:

  我这可以有5台服务器用来跑mongodb还有一批数据,当然,这5台机器上也跑着其他框架如spark,Hadoop等等,由于spark和hadoop都是单点故障的(什么?多master?secondary?不存在的,老夫部署集群从来都是单点故障)所以mongos也是一台节点,数据端存放在5台机器上,又由于数据量较大,硬盘较小(别人组的raid5,加一起一台服务器也就1T多空间),所以肯定不考虑备份和稳定性了(2备份硬盘就没多大地方了,hdfs还有其他数据要放),那么架构可以构建如下:

  MongoDB

 

下面shardsvr每一个都是一个单独的replica,开始部署:

1、创建配置文件:

  a) configsvr

systemLog:
   destination: file
   path: "/home/cloud/platform/logs/mongodb/configsvr.log"
   logAppend: true
storage:
   dbPath: "/home/cloud/platform/data/configData"
   journal:
      enabled: true
setParameter:
   enableLocalhostAuthBypass: false
processManagement:
   fork: true
replication:
   replSetName: "configsvr0"
sharding:
   clusterRole: "configsvr"

  b) shardsvr

systemLog:
   destination: file
   path: "/home/cloud/platform/logs/mongodb/shardsvr.log"
   logAppend: true
storage:
   dbPath: "/home/cloud/platform/data/shardData"
   journal:
      enabled: true
setParameter:
   enableLocalhostAuthBypass: false
processManagement:
   fork: true
replication:
   replSetName: "shardsvr1"
sharding:
   clusterRole: "shardsvr"

  c) mongos

systemLog:
   destination: file
   path: "/home/cloud/platform/logs/mongodb/mongos.log"
   logAppend: true
net:
   bindIp: 192.168.12.161
   port: 27017
setParameter:
   enableLocalhostAuthBypass: false
processManagement:
   fork: true
sharding:
   configDB: "configsvr0/192.168.12.161:27019,192.168.12.162:27019,192.168.12.163:27019,192.168.12.164:27019,192.168.12.169:27019"

  注意:每台机器上的配置都略有不同,简易参考官方文档进行修改,replSetName这个是replication的设置,每个角色的子replication应该有相同的值,不同的replication应该有不同的值

  接下来是启动脚本

  a)shardsvr

#!/bin/bash

# use this to initiate: rs.initiate({_id:"shardsvr1",members:[{_id:0,host:"192.168.12.161:27018"}]})
/home/cloud/platform/mongodb-3.4.5/bin/mongod --config /home/cloud/platform/mongodb-3.4.5/shardserver.conf

  b)configsvr

#!/bin/bash

#use this to initiate: rs.initiate({_id:"configsvr0",configsvr:true,members:[{_id:0,host:"192.168.12.161:27019"},{_id:1,host:"192.168.12.162:27019"},{_id:2,host:"192.168.12.163:27019"},{_id:3,host:"192.168.12.164:27019"},{_id:4,host:"192.168.12.169:27019"}]})

MONGO_HOME=/home/cloud/platform/mongodb-3.4.5/
${MONGO_HOME}/bin/mongod --config ${MONGO_HOME}/configserver.conf

  c)mongos

#!/bin/bash

#mogos dont need to initiate,
#sh.enableSharding("dbname") to create database
#sh.shardCollection("dbname.tablename", {id: "hashed"}) to create a shard table split by id


/home/cloud/platform/mongodb-3.4.5/bin/mongos --config /home/cloud/platform/mongodb-3.4.5/mongosserver.conf

2、启动过程

  a、将脚本和配置文件复制到每台机器上

  b、启动每个shardsvr,然后登录到shardsvr上,执行初始化过程:

1、执行start-shardsvr.sh
2、执行bin/mongo --host ${hostIP} --port ${hostport}
    shardsvr的默认端口是27018
    configsvr的默认端口是27019
    mongos的默认端口是27017
    在上面配置文件中未指定端口,一切都以默认为主
3、执行rs.initiate({_id:"shardsvr1",members:[{_id:0,host:"192.168.12.161:27018"}]})
    进行初始化工作
4、执行rs.status()查看shardsvr状态,一个成功的例子如下:

MongoDB

  c、启动所有configsvr,并使用mongo --host --port命令登录到任意一台configsvr的configsvr端口上(default:27019)。并执行初始化工作:

   

rs.initiate({_id:"configsvr0",configsvr:true,members:[{_id:0,host:"192.168.12.161:27019"},{_id:1,host:"192.168.12.162:27019"},{_id:2,host:"192.168.12.163:27019"},{_id:3,host:"192.168.12.164:27019"},{_id:4,host:"192.168.12.169:27019"}]})

  d、启动mongos,这时已经可以在mongos上执行我们的操作了。

//先添加shard分片,如果shardsvr1有多个节点也只用写一个,毕竟一个replica相当于一个节点,使用时会自动找到primary的
sh.addShard("shardsvr1/192.168.12.161:27018"))
//把所有的都加进去之后
printShardingStatus()

 MongoDB

然后就是正常的mongo shell操作了,可以把mongos当成一个普通的单机mongodb来使用,操作基本相同,除了创建sharding表

创建表如下:

sh.enableSharding("dbname") to create database
sh.shardCollection("dbname.tablename", {"_id": "hashed"}) to create a shard table hashed by _id

需要注意的是,"_id"是mongo分片依据,不能重复,如果想以其他字段来进行hash,将命令中的"_id"改为字段名称就可以了,但是mongo还是会自动创建一个_id列用来索引

添加索引:

db.collectionname.ensureIndex({"indexColumn":1})

第三部分

JavaAPI小tips

获取连接:

lazy val mongo = new MongoClient("192.168.12.161", 27017)
lazy val db = mongo.getDatabase("testdb")
lazy val dbColl = db.getCollection("origin2")

插入数据:

var resList = new ArrayList[Document]
var d = new Document
d.append("path", x.getPath)
d.append("name", x.getName)
d.append("content", filterHtml(Source.fromFile(x, detector(x)).getLines().toArray).mkString("\n"))
resList.add(d)
dbColl.insertMany(resList, new InsertManyOptions().ordered(false))

在插入过程中,如果"_id"出现重复值,那么默认情况下会中止当前插入操作并throw一个exception,即之前的数据已经插入进去,后面的数据没插入进表,在后面加入new InsertManyOptions().ordered(false)参数就可以将所有不重复的数据插入完成后再throw一个exception

shading cluster集群的权限管理

mongodb集群的权限管理分为两部分,一部分是最常用的Role-Based Access Control,也就是用户名密码方式,这种验证方式一般出现在单机系统,或者集群中client端连接Mongos端;另一种是Internal Authentication.有英文基础想获取security完整信息的请点击访问官方文档

1、Role-Based Access Control

  官方文档上介绍如下:MongoDB采用基于角色的访问控制(RBAC)来管理对MongoDB系统的访问。 授予用户一个或多个角色,以确定用户对数据库资源和操作的访问。 在角色分配之外,用户无法访问系统。(不得不说。。google翻译做的不错,这一段翻出来竟然毫无违和感)。

  RBAC通过角色来赋予用户权限,在开启权限管理之前,个人建议先创建几个用户,至少要有一个拥有用户管理权限的角色存在。在mongodb中,系统自带了若干角色:

  库使用权限:read, readWrite

  库管理权限:dbAdmin, dbOwner, userAdmin

  集群管理权限:clusterAdmin, clusterManager, clusterMonitor, hostManager

  备份恢复权限:backup, restore

  全局权限:readAnyDatabase, readWriteAnyDatabase, userAdminAnyDatabase, dbAdminAnyDatabase

  超级用户:root,__system(包含所有权限官方不建议使用)

  对于使用来讲,建议首先创建一个包含全局权限的用户(dbAdmin并不能readwrite),这样在启用权限管理之后便可以通过这个用户来创建其他用户。Mongodb权限管理默认是精确到DB的,如果需要区分一个DB下的不同collection的权限需要自定义role。对用户赋予库使用和库管理权限的时候需要指定库,这样用户会获得此库下的对应权限。对mongodb来讲,在哪个库下创建的用户,就需要每次在创建的库进行认证。所以建议在创建用户的时候,库使用用户和库管理用户在指定库下创建用户。

2、Internal Authentication

  这个东西是用来在集群中进行互相认证的,每个Mongo实例在互相访问的过程中会验证彼此的权限,只有满足条件才可以进行数据读写等操作。分为两种方式,一种是Keyfiles,一种是x.509,在普通集群中Keyfiles已经足够了,而且相对比较简单,只需要在每个节点启动的时候指定相同的Keyfile就行。x.509提供了一个SSL/TLS连接,并不常用。

3、开启验证

  首先登录单机Mongod或者集群的mongos创建一个有权限的用户:

use admin
db.createUser({
... user: "gaoze",
... pwd: "gaolaoban",
... roles: [
... {role: "clusterAdmin", db: "admin"},
... {role: "readWriteAnyDatabase", db: "admin"},
... {role: "userAdminAnyDatabase", db: "admin"},
... {role: "dbAdminAnyDatabase", db: "admin"}]
... })

  我们这个帐号创建在admin库下,用户拥有所有库的读写权限和admin权限,还有用户管理权限及集群管理权限,如果是单机系统可以把clusterAdmin那个删掉。

  然后开启认证:

  RBAC的认证开启比较简单,如果是单机系统启动的时候加上--auth参数即可,如果是集群的话,在Mongos的配置里加上keyfile文件,Mongos会自动启动认证。

  Internal Authentication认证需要创建一个keyfile文件,keyfile文件只能包含base64字符集的字符,位数可以为6到1024位,官方文档给出的创建方式如下:

openssl rand -base64 756 > <path-to-keyfile>
chmod 400 <path-to-keyfile>

  如果不好用可以自己随便找一段文字,转成base64字符集,然后用下面操作生成keyfile文件,记得赋予400或者600权限,否则在启动的时候会报错keyfile too open permissions

touch keyfile
echo "keyfile内容" > keyfile

  创建完keyfile文件然后把这个文件分发到每台mongodb的机器上,然后修改配置文件(具体参见上篇构建集群的配置说明):

  shard节点:

systemLog:
   destination: file
   path: "/home/gaoze/platform/logs/mongodb/shardsvr.log"
   logAppend: true
storage:
   dbPath: "/home/gaoze/platform/data/shardData"
   journal:
      enabled: true
setParameter:
   enableLocalhostAuthBypass: 0
processManagement:
   fork: true
replication:
   replSetName: "shardsvr1"
sharding:
   clusterRole: "shardsvr"
security:
   keyFile: "/home/gaoze/platform/mongodb-3.4.5/keyfile0"
   authorization: enabled

  config节点:

systemLog:
   destination: file
   path: "/home/gaoze/platform/logs/mongodb/configsvr.log"
   logAppend: true
storage:
   dbPath: "/home/gaoze/platform/data/configData"
   journal:
      enabled: true
setParameter:
   enableLocalhostAuthBypass: 0
processManagement:
   fork: true
replication:
   replSetName: "configsvr0"
sharding:
   clusterRole: "configsvr"
security:
   keyFile: "/home/gaoze/platform/mongodb-3.4.5/keyfile0"
   authorization: enabled

  mongos(可以看到,mongos不必指定security.authorization):

systemLog:
   destination: file
   path: "/home/gaoze/platform/logs/mongodb/mongos.log"
   logAppend: true
net:
   bindIp: 192.168.2.48
   port: 27017
setParameter:
   enableLocalhostAuthBypass: 0
processManagement:
   fork: true
sharding:
   configDB: "configsvr0/192.168.2.48:27019,192.168.2.49:27019"
security:
   keyFile: "/home/cloud/platform/mongodb-3.4.5/keyfile0"

4、验证下的集群使用

  shell登录进去需要进行验证才可以使用集群,简而言之就是:

use admin
db.auth("gaoze", "gaolaoban")

  我们在其他库上创建个使用者(在目标库上创建用户!):

use test
db.createUser({user: "rw",    pwd: "1",    roles: [{role: "readWrite", db: "test"}]})

  然后我们在test库上使用新用户来进行操作:

use test
db.auth("rw", "1")

  可以试验我们的新用户是可以读写的。

Java单例MongoDB工具类

我经常对MongoDB进行一些基础操作,将这些常用操作合并到一个工具类中,方便自己开发使用。

没用Spring Data、Morphia等框架是为了减少学习、维护成本,另外自己直接JDBC方式的话可以更灵活,为自己以后的积累留一个脚印。

JAVA驱动版本:

        <!-- MongoDB驱动 -->
        <dependency>
            <groupId>org.mongodb</groupId>
            <artifactId>mongo-java-driver</artifactId>
            <version>3.0.2</version>
        </dependency>

工具类代码如下:

package utils;

import java.util.ArrayList;
import java.util.List;

import org.apache.commons.configuration.CompositeConfiguration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.bson.types.ObjectId;

import com.mongodb.BasicDBObject;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientOptions;
import com.mongodb.MongoClientOptions.Builder;
import com.mongodb.WriteConcern;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.MongoIterable;
import com.mongodb.client.model.Filters;
import com.mongodb.client.result.DeleteResult;

/**
 * MongoDB工具类 Mongo实例代表了一个数据库连接池,即使在多线程的环境中,一个Mongo实例对我们来说已经足够了<br>
 * 注意Mongo已经实现了连接池,并且是线程安全的。 <br>
 * 设计为单例模式, 因 MongoDB的Java驱动是线程安全的,对于一般的应用,只要一个Mongo实例即可,<br>
 * Mongo有个内置的连接池(默认为10个) 对于有大量写和读的环境中,为了确保在一个Session中使用同一个DB时,<br>
 * DB和DBCollection是绝对线程安全的<br>
 * 
 * @author zhoulingfei
 * @date 2015-5-29 上午11:49:49
 * @version 0.0.0
 * @Copyright (c)1997-2015 NavInfo Co.Ltd. All Rights Reserved.
 */
public enum MongoDBUtil {

    /**
    * 定义一个枚举的元素,它代表此类的一个实例
    */
    instance;

    private MongoClient mongoClient;

    static {
        System.out.println("===============MongoDBUtil初始化========================");
        CompositeConfiguration config = new CompositeConfiguration();
        try {
            config.addConfiguration(new PropertiesConfiguration("mongodb.properties"));
        } catch (ConfigurationException e) {
            e.printStackTrace();
        }
        // 从配置文件中获取属性值
        String ip = config.getString("host");
        int port = config.getInt("port");
        instance.mongoClient = new MongoClient(ip, port);

        // or, to connect to a replica set, with auto-discovery of the primary, supply a seed list of members
        // List<ServerAddress> listHost = Arrays.asList(new ServerAddress("localhost", 27017),new ServerAddress("localhost", 27018));
        // instance.mongoClient = new MongoClient(listHost);

        // 大部分用户使用mongodb都在安全内网下,但如果将mongodb设为安全验证模式,就需要在客户端提供用户名和密码:
        // boolean auth = db.authenticate(myUserName, myPassword);
        Builder options = new MongoClientOptions.Builder();
        // options.autoConnectRetry(true);// 自动重连true
        // options.maxAutoConnectRetryTime(10); // the maximum auto connect retry time
        options.connectionsPerHost(300);// 连接池设置为300个连接,默认为100
        options.connectTimeout(15000);// 连接超时,推荐>3000毫秒
        options.maxWaitTime(5000); //
        options.socketTimeout(0);// 套接字超时时间,0无限制
        options.threadsAllowedToBlockForConnectionMultiplier(5000);// 线程队列数,如果连接线程排满了队列就会抛出“Out of semaphores to get db”错误。
        options.writeConcern(WriteConcern.SAFE);//
        options.build();
    }

    // ------------------------------------共用方法---------------------------------------------------
    /**
    * 获取DB实例 - 指定DB
    * 
    * @param dbName
    * @return
    */
    public MongoDatabase getDB(String dbName) {
        if (dbName != null && !"".equals(dbName)) {
            MongoDatabase database = mongoClient.getDatabase(dbName);
            return database;
        }
        return null;
    }

    /**
    * 获取collection对象 - 指定Collection
    * 
    * @param collName
    * @return
    */
    public MongoCollection<Document> getCollection(String dbName, String collName) {
        if (null == collName || "".equals(collName)) {
            return null;
        }
        if (null == dbName || "".equals(dbName)) {
            return null;
        }
        MongoCollection<Document> collection = mongoClient.getDatabase(dbName).getCollection(collName);
        return collection;
    }

    /**
    * 查询DB下的所有表名
    */
    public List<String> getAllCollections(String dbName) {
        MongoIterable<String> colls = getDB(dbName).listCollectionNames();
        List<String> _list = new ArrayList<String>();
        for (String s : colls) {
            _list.add(s);
        }
        return _list;
    }

    /**
    * 获取所有数据库名称列表
    * 
    * @return
    */
    public MongoIterable<String> getAllDBNames() {
        MongoIterable<String> s = mongoClient.listDatabaseNames();
        return s;
    }

    /**
    * 删除一个数据库
    */
    public void dropDB(String dbName) {
        getDB(dbName).drop();
    }

    /**
    * 查找对象 - 根据主键_id
    * 
    * @param collection
    * @param id
    * @return
    */
    public Document findById(MongoCollection<Document> coll, String id) {
        ObjectId _idobj = null;
        try {
            _idobj = new ObjectId(id);
        } catch (Exception e) {
            return null;
        }
        Document myDoc = coll.find(Filters.eq("_id", _idobj)).first();
        return myDoc;
    }

    /** 统计数 */
    public int getCount(MongoCollection<Document> coll) {
        int count = (int) coll.count();
        return count;
    }

    /** 条件查询 */
    public MongoCursor<Document> find(MongoCollection<Document> coll, Bson filter) {
        return coll.find(filter).iterator();
    }

    /** 分页查询 */
    public MongoCursor<Document> findByPage(MongoCollection<Document> coll, Bson filter, int pageNo, int pageSize) {
        Bson orderBy = new BasicDBObject("_id", 1);
        return coll.find(filter).sort(orderBy).skip((pageNo - 1) * pageSize).limit(pageSize).iterator();
    }

    /**
    * 通过ID删除
    * 
    * @param coll
    * @param id
    * @return
    */
    public int deleteById(MongoCollection<Document> coll, String id) {
        int count = 0;
        ObjectId _id = null;
        try {
            _id = new ObjectId(id);
        } catch (Exception e) {
            return 0;
        }
        Bson filter = Filters.eq("_id", _id);
        DeleteResult deleteResult = coll.deleteOne(filter);
        count = (int) deleteResult.getDeletedCount();
        return count;
    }

    /**
    * FIXME
    * 
    * @param coll
    * @param id
    * @param newdoc
    * @return
    */
    public Document updateById(MongoCollection<Document> coll, String id, Document newdoc) {
        ObjectId _idobj = null;
        try {
            _idobj = new ObjectId(id);
        } catch (Exception e) {
            return null;
        }
        Bson filter = Filters.eq("_id", _idobj);
        // coll.replaceOne(filter, newdoc); // ���全替代
        coll.updateOne(filter, new Document("$set", newdoc));
        return newdoc;
    }

    public void dropCollection(String dbName, String collName) {
        getDB(dbName).getCollection(collName).drop();
    }

    /**
    * 关闭Mongodb
    */
    public void close() {
        if (mongoClient != null) {
            mongoClient.close();
            mongoClient = null;
        }
    }

    /**
    * 测试入口
    * 
    * @param args
    */
    public static void main(String[] args) {

        String dbName = "GC_MAP_DISPLAY_DB";
        String collName = "COMMUNITY_BJ";
        MongoCollection<Document> coll = MongoDBUtil.instance.getCollection(dbName, collName);

        // 插入多条
        // for (int i = 1; i <= 4; i++) {
        // Document doc = new Document();
        // doc.put("name", "zhoulf");
        // doc.put("school", "NEFU" + i);
        // Document interests = new Document();
        // interests.put("game", "game" + i);
        // interests.put("ball", "ball" + i);
        // doc.put("interests", interests);
        // coll.insertOne(doc);
        // }

        // // 根据ID查询
        // String id = "556925f34711371df0ddfd4b";
        // Document doc = MongoDBUtil2.instance.findById(coll, id);
        // System.out.println(doc);

        // 查询多个
        // MongoCursor<Document> cursor1 = coll.find(Filters.eq("name", "zhoulf")).iterator();
        // while (cursor1.hasNext()) {
        // org.bson.Document _doc = (Document) cursor1.next();
        // System.out.println(_doc.toString());
        // }
        // cursor1.close();

        // 查询多个
        // MongoCursor<Person> cursor2 = coll.find(Person.class).iterator();

        // 删除数据库
        // MongoDBUtil2.instance.dropDB("testdb");

        // 删除表
        // MongoDBUtil2.instance.dropCollection(dbName, collName);

        // 修改数据
        // String id = "556949504711371c60601b5a";
        // Document newdoc = new Document();
        // newdoc.put("name", "时候");
        // MongoDBUtil.instance.updateById(coll, id, newdoc);

        // 统计表
        // System.out.println(MongoDBUtil.instance.getCount(coll));

        // 查询所有
        Bson filter = Filters.eq("count", 0);
        MongoDBUtil.instance.find(coll, filter);

    }

}

原址:

1.官网

https://docs.mongodb.com/manual/introduction/

2.MongDB教程

http://www.runoob.com/mongodb/mongodb-tutorial.html

3.MongoDB文档、集合、数据库简介

https://www.linuxidc.com/Linux/2016-12/138529.htm

4.MongoDB复制集原理

https://www.linuxidc.com/Linux/2017-09/146670.htm

5.集群搭建之分片

https://www.linuxidc.com/Linux/2017-12/149919.htm

5.MongoDB 搭建分片集群

https://www.linuxidc.com/Linux/2016-09/135648.htm

6.sharding集群搭建及JavaAPI的简易使用

https://www.linuxidc.com/Linux/2017-12/149538.htm

7.shading cluster集群的权限管理

https://www.linuxidc.com/Linux/2017-12/149537.htm

8.Java单例MongoDB工具类

https://www.linuxidc.com/Linux/2018-02/150712.htm

9.Sprint Boot 集成MongoDB

https://www.linuxidc.com/Linux/2017-11/148919.htm

10.MongoDB集群单mongos的问题解决实例

https://www.linuxidc.com/Linux/2017-12/149534.htm

相关标签: MongDB 集群搭建