golang操做mongo使用的包是"gopkg.in/mgo.v2",coding过程当中须要并发读写mongo数据库,简单观摩了下源码,记录下本身的一些理解,若有错误,敬请斧正。 golang
通常来讲,咱们直接这样建立一个session:mongodb
Session, err = mgo.Dial(URL)数据库
if err != nil {数组
log.Println(err)缓存
}安全
来看看Dial这个函数作了什么:服务器
func Dial(url string) (*Session, error) {网络
session, err := DialWithTimeout(url, 10*time.Second)session
if err == nil {数据结构
session.SetSyncTimeout(1 * time.Minute)
session.SetSocketTimeout(1 * time.Minute)
}
return session, err
}
调用DialWithTimeout函数设置默认的超时时间是10秒。该函数中调用了DialWithInfo这个函数,而DialWithInfo函数中比较重要是是调用了newSession,看看这个函数作了什么操做:
func newSession(consistency Mode, cluster *mongoCluster, timeout time.Duration) (session *Session) {
cluster.Acquire()
session = &Session{
cluster_: cluster,
syncTimeout: timeout,
sockTimeout: timeout,
poolLimit: 4096,
}
debugf("New session %p on cluster %p", session, cluster)
session.SetMode(consistency, true)
session.SetSafe(&Safe{})
session.queryConfig.prefetch = defaultPrefetch
return session
}
返回的session设置了一些默认的参数,暂时先忽略,直接看看Session的数据结构:
type Session struct {
m sync.RWMutex
cluster_ *mongoCluster
slaveSocket *mongoSocket
masterSocket *mongoSocket
slaveOk bool
consistency Mode
queryConfig query
safeOp *queryOp
syncTimeout time.Duration
sockTimeout time.Duration
defaultdb string
sourcedb string
dialCred *Credential
creds []Credential
poolLimit int
bypassValidation bool
}
m是mgo.Session的并发锁,所以全部的Session实例都是线程安全的。slaveSocket,masterSocket表明了该Session到mongodb主节点和从节点的一个物理链接的缓存。而Session的策略老是优先使用缓存的链接。是否缓存链接,由consistency也就是该Session的模式决定。假设在并发程序中,使用同一个Session实例,不使用Copy,而该Session实例的模式又刚好会缓存链接,那么,全部的经过该Session实例的操做,都会经过同一条链接到达mongodb。虽然mongodb自己的网络模型是非阻塞通讯,请求能够经过一条链路,非阻塞地处理,可是会影响效率。
其次mgo.Session缓存的一主一从链接,实例自己不负责维护。也就是说,当slaveSocket,masterSocket任意其一,链接断开,Session本身不会重置缓存,该Session的使用者若是不主动重置缓存,调用者获得的将永远是EOF。这种状况在主从切换时就会发生,在网络抖动时也会发生。
mgo的DB句柄须要你作一个copy操做:
// Copy works just like New, but preserves the exact authentication
// information from the original session.
func (s *Session) Copy() *Session {
s.m.Lock()
scopy := copySession(s, true)
s.m.Unlock()
scopy.Refresh()
return scopy
}
copySession将源Session浅拷贝到临时Session中,这样源Session的配置就拷贝到了临时Session中。关键的Refresh,将源Session浅拷贝到临时Session的链接缓存指针,也就是slaveSocket,masterSocket置为空,这样临时Session就不存在缓存链接,而转为去尝试获取一个空闲的链接。
mgo自身维护了一套到mongodb集群的链接池。这套链接池机制以mongodb数据库服务器为最小单位,每一个mongodb都会在mgo内部,对应一个mongoServer结构体的实例,一个实例表明着mgo持有的到该数据库的链接。看看这个链接池的定义:
type mongoServer struct {
sync.RWMutex
Addr string
ResolvedAddr string
tcpaddr *net.TCPAddr
unusedSockets []*mongoSocket
liveSockets []*mongoSocket
closed bool
abended bool
sync chan bool
dial dialer
pingValue time.Duration
pingIndex int
pingCount uint32
pingWindow [6]time.Duration
info *mongoServerInfo
}
info表明了该实例对应的数据库服务器在集群中的信息——是否master,ReplicaSetName等。而两个Slice,就是传说中的链接池。unusedSockets存储当前空闲的链接,liveSockets存储当前活跃中的链接,Session缓存的链接就同时存放在liveSockets切片中,而临时Session获取到的链接就位于unusedSockets切片中。
每一个mongoServer都会隶属于一个mongoCluster结构,至关于mgo在内部,模拟出了mongo数据库集群的模型。
type mongoCluster struct {
sync.RWMutex
serverSynced sync.Cond
userSeeds []string
dynaSeeds []string
servers mongoServers
masters mongoServers
references int
syncing bool
direct bool
failFast bool
syncCount uint
setName string
cachedIndex map[string]bool
sync chan bool
dial dialer
}
mongoCluster持有一系列mongoServer的实例,以主从结构分散到两个数组中。 每一个Session都会存储本身对应的,要操做的mongoCluster的引用。
前面的描述能够总结成下面这张图:
那么咱们在使用的时候就能够建立一个Session,而后clone操做,用clone获得的copysession完成操做,结束后关闭这个copysession就能够了。