读写锁是一种同步机制,容许多个读操做同时读取数据,可是只容许一个写操做写数据。锁的状态有三种:读模式加锁、写模式加锁、无锁。
就拿文件行数这个变量来看,若是开启了日志文件按小时按行数切割的功能,要先读取当前文件行数变量值。当并发状况下,多个 goroutine 在打日志,读取文件行数和修改文件行数便成为一对“读写”操做,因此须要用读写锁,读写锁对于读操做不会致使锁竞争和 goroutine 阻塞。缓存
// WriteMsg write logger message into file. func (w *fileLogWriter) WriteMsg(when time.Time, msg string, level int) error { ··· if w.Rotate { w.RLock() if w.needRotateHourly(len(msg), h) { w.RUnlock() w.Lock() if w.needRotateHourly(len(msg), h) { if err := w.doRotate(when); err != nil { fmt.Fprintf(os.Stderr, "FileLogWriter(%q): %s\n", w.Filename, err) } } w.Unlock() } else if w.needRotateDaily(len(msg), d) { w.RUnlock() w.Lock() if w.needRotateDaily(len(msg), d) { if err := w.doRotate(when); err != nil { fmt.Fprintf(os.Stderr, "FileLogWriter(%q): %s\n", w.Filename, err) } } w.Unlock() } else { w.RUnlock() } } w.Lock() _, err := w.fileWriter.Write([]byte(msg)) if err == nil { w.maxLinesCurLines++ w.maxSizeCurSize += len(msg) } w.Unlock() ··· }
第一处是开启异步选项时,启动一个 goroutine 监听 msgChan 是否为空,发现不为空便取走日志信息进行输出。并发
// Async set the log to asynchronous and start the goroutine func (bl *BeeLogger) Async(msgLen ...int64) *BeeLogger { ··· go bl.startLogger() ··· } // start logger chan reading. // when chan is not empty, write logs. func (bl *BeeLogger) startLogger() { gameOver := false for { select { case bm := <-bl.msgChan: bl.writeToLoggers(bm.when, bm.msg, bm.level) logMsgPool.Put(bm) ··· } ··· } }
文件输出引擎 file.go 文件中,初始化 fileWriter *os.File 时启动一个 goroutine 执行 dailyRotate() :异步
func (w *fileLogWriter) initFd() error { fd := w.fileWriter fInfo, err := fd.Stat() if err != nil { return fmt.Errorf("get stat err: %s", err) } w.maxSizeCurSize = int(fInfo.Size()) w.dailyOpenTime = time.Now() w.dailyOpenDate = w.dailyOpenTime.Day() w.maxLinesCurLines = 0 if w.Daily { go w.dailyRotate(w.dailyOpenTime) // <------ } if fInfo.Size() > 0 && w.MaxLines > 0 { count, err := w.lines() if err != nil { return err } w.maxLinesCurLines = count } return nil }
dailyRotate() 方法中,tm 定时器时间一到,便会往 tm.C 通道发送当前时间,此时 a 语句便中止阻塞,能够继续往下执行。async
func (w *fileLogWriter) dailyRotate(openTime time.Time) { y, m, d := openTime.Add(24 * time.Hour).Date() nextDay := time.Date(y, m, d, 0, 0, 0, 0, openTime.Location()) tm := time.NewTimer(time.Duration(nextDay.UnixNano() - openTime.UnixNano() + 100)) <-tm.C // <--- a 语句 w.Lock() if w.needRotate(0, time.Now().Day()) { if err := w.doRotate(time.Now()); err != nil { fmt.Fprintf(os.Stderr, "FileLogWriter(%q): %s\n", w.Filename, err) } } w.Unlock() }
由于删除文件涉及文件 IO 处理,为了不阻塞主线程,便交由另外 goroutine 去作。,go w.deleteOldLog()
,超过 MaxDays 的日志文件即是失效的。url
// DoRotate means it need to write file in new file. // new file name like xx.2013-01-01.log (daily) or xx.001.log (by line or size) func (w *fileLogWriter) doRotate(logTime time.Time) error { ··· err = os.Rename(w.Filename, fName) ··· startLoggerErr := w.startLogger() go w.deleteOldLog() ··· } func (w *fileLogWriter) deleteOldLog() { dir := filepath.Dir(w.Filename) filepath.Walk(dir, func(path string, info os.FileInfo, err error) (returnErr error) { defer func() { if r := recover(); r != nil { fmt.Fprintf(os.Stderr, "Unable to delete old log '%s', error: %v\n", path, r) } }() if info == nil { return } if !info.IsDir() && info.ModTime().Add(24*time.Hour*time.Duration(w.MaxDays)).Before(time.Now()) { if strings.HasPrefix(filepath.Base(path), filepath.Base(w.fileNameOnly)) && strings.HasSuffix(filepath.Base(path), w.suffix) { os.Remove(path) } } return }) }
doRotate() 方法大致逻辑:线程
重命名以前写入的日志文件,err = os.Rename(w.Filename, fName)
3d
_,err:=os.Lstat(fName)
:若以 fName 为名的文件不存在则返回 err 不为空。os.Chmod(fName, os.FileMode(rotatePerm))
修改文件权限。从新启动 Logger :rest
w.startLogger()
;注意到下面代码段中的 a 语句和 b 语句,它们并非返回错误阻止代码继续执行,而是即便发生错误也会保证重启一个新的 Logger。若是是执行到 a 语句这种状况,有多是该日志文件已经被别的程序删除或者其余缘由致使文件不存在,但大可没必要由于一个日志文件的丢失而阻止了新 Logger 的启动,简而言之,这个错误是能够忽略的。日志
// DoRotate means it need to write file in new file. // new file name like xx.2013-01-01.log (daily) or xx.001.log (by line or size) func (w *fileLogWriter) doRotate(logTime time.Time) error { // file exists // Find the next available number num := 1 fName := "" rotatePerm, err := strconv.ParseInt(w.RotatePerm, 8, 64) if err != nil { return err } _, err = os.Lstat(w.Filename) if err != nil { //even if the file is not exist or other ,we should RESTART the logger goto RESTART_LOGGER // <------- a 语句 } if w.MaxLines > 0 || w.MaxSize > 0 { for ; err == nil && num <= 999; num++ { fName = w.fileNameOnly + fmt.Sprintf(".%s.%03d%s", logTime.Format("2006-01-02"), num, w.suffix) _, err = os.Lstat(fName) } } else { fName = fmt.Sprintf("%s.%s%s", w.fileNameOnly, w.dailyOpenTime.Format("2006-01-02"), w.suffix) _, err = os.Lstat(fName) for ; err == nil && num <= 999; num++ { fName = w.fileNameOnly + fmt.Sprintf(".%s.%03d%s", w.dailyOpenTime.Format("2006-01-02"), num, w.suffix) _, err = os.Lstat(fName) } } // return error if the last file checked still existed if err == nil { return fmt.Errorf( "Rotate: Cannot find free log number to rename %s", w.Filename) } // close fileWriter before rename w.fileWriter.Close() // Rename the file to its new found name // even if occurs error,we MUST guarantee to restart new logger err = os.Rename(w.Filename, fName) if err != nil { goto RESTART_LOGGER // <------- b 语句 } err = os.Chmod(fName, os.FileMode(rotatePerm)) RESTART_LOGGER: // <------- startLoggerErr := w.startLogger() go w.deleteOldLog() if startLoggerErr != nil { return fmt.Errorf("Rotate StartLogger: %s", startLoggerErr) } if err != nil { return fmt.Errorf("Rotate: %s", err) } return nil }
a 语句处,开启 goroutine 前计数器加一,执行完该 goroutine 后计数器减一,即 b 语句。code
// Async set the log to asynchronous and start the goroutine func (bl *BeeLogger) Async(msgLen ...int64) *BeeLogger { ··· bl.wg.Add(1) // <----- a 语句 go bl.startLogger() return bl } // start logger chan reading. // when chan is not empty, write logs. func (bl *BeeLogger) startLogger() { gameOver := false for { select { case bm := <-bl.msgChan: bl.writeToLoggers(bm.when, bm.msg, bm.level) logMsgPool.Put(bm) case sg := <-bl.signalChan: // Now should only send "flush" or "close" to bl.signalChan bl.flush() if sg == "close" { for _, l := range bl.outputs { l.Destroy() } bl.outputs = nil gameOver = true } bl.wg.Done() // <------ b 语句 } if gameOver { break } } }
分析并发执行下面 Flush() 方法的状况。假设有 A , B , C 三个 goroutine,而且假设 A 先执行到 e 语句,从
a 语句知道初始计数器为 1 ,因此 e 语句必须等到上述 startLogger-goroutine 执行 b 语句完毕后才中止阻塞。然后 A 再让计数器加一。由于 bl.signalChan
的缓存大小为1,因此 B,C 阻塞在 d 语句,等到 B,C 其中之一能执行 e 语句的时候计数器必然大于0,才不会致使永久阻塞。因此 f 语句要放在 e 语句以后。
// Flush flush all chan data. func (bl *BeeLogger) Flush() { if bl.asynchronous { bl.signalChan <- "flush" // <------ d 语句 bl.wg.Wait() // <------ e 语句 bl.wg.Add(1) // <------ f 语句 return } bl.flush() }
所以再看下面的 Close() 方法,它是不能并发执行的,会致使 "panic: close of closed channel"
错误。不过笔者暂时没懂为何 beego logs 不把这里作一下改进,让 Close() 也支持并发调用很差吗?
// Close close logger, flush all chan data and destroy all adapters in BeeLogger. func (bl *BeeLogger) Close() { if bl.asynchronous { bl.signalChan <- "close" bl.wg.Wait() // <------ g 语句 close(bl.msgChan) } else { bl.flush() for _, l := range bl.outputs { l.Destroy() } bl.outputs = nil } close(bl.signalChan) }