背景:html
读取一个500w行的大文件,将每一行的数据读取出来作数据整合归并以后,再按照必定的逻辑和算法进行处理后存入redis。golang
文件格式:redis
url地址 用户32位标识 点击次数算法
http://jingpin.pgc.panda.tv/hd/xiaopianpian.html aaaaaaaaaaaaaaaaaaaa 5并发
具体场景: 函数
本节先看一下大文件处理最简单的状况,即在读文件的过程当中针对文件每一行都开启一个协程来作数据合并,看看这种状况下的定位以及优化的思路。高并发
问题现象:oop
若是将整个文件串行执行来作数据整合的话,只须要4 or 5s就能够完成,可是每行并发处理却须要几十秒到几十分钟不等。优化
代码以下:网站
simple.go
package main import ( "bufio" "fmt" "io" "io/ioutil" "log" "net/http" _ "net/http/pprof" "os" "singleflight" "strconv" "strings" "sync" "time" "utils" ) var ( wg sync.WaitGroup mu sync.RWMutex //全局锁 single = &singleflight.Task{} ) func main() { defer func() { if err := recover(); err != nil { fmt.Println(err) } }() go func() { log.Println(http.ListenAndServe("localhost:8080", nil)) }() file := "/data/origin_data/part-r-00000" if fp, err := os.Open(file); err != nil { panic(err) } else { start := time.Now() defer fp.Close() defer func() { //时间消耗 fmt.Println("time cost:", time.Now().Sub(start)) }() //统计下每一个url的点击用户数 hostNums := hostsStat() buf := bufio.NewReader(fp) hostToFans := make(map[string]utils.MidList) //[url][]用户id for { line, err := buf.ReadString('\n') if err != nil { if err == io.EOF { //遭遇行尾 fmt.Println("meet the end") break //跳出死循环 } panic(err) } //每一行单独处理 wg.Add(1) go handleLine(line, hostToFans, hostNums) } wg.Wait() fmt.Println("*************************handle file data complete************************") } } //处理每一行的数据 func handleLine(line string, hostToFans map[string]utils.MidList, hostNums map[string]int) { defer wg.Done() line = strings.TrimSpace(line) components := strings.Split(line, "\t") //先判断是不是合法网站的url schemes := strings.Split(components[0], "/") if utils.In_array(utils.ValidPlatforms, schemes[2]) == false { fmt.Println("invalid url: ", components[0]) return } mu.RLock() if _, ok := hostToFans[components[0]]; ok { mu.RUnlock() click_times, _ := strconv.Atoi(components[2]) mu.Lock() hostToFans[components[0]] = hostToFans[components[0]].Append(components[1], click_times) mu.Unlock() } else { //下一个url mu.RUnlock() startElement := false //用以标识是不是某个url统计的初始元素 //singleflight代码 防止有多个相同url同时访问时,该url对应的[]string尚未初始化,致使屡次make代码的执行 single.Do(components[0], func() (interface{}, error) { mu.RLock() if _, ok := hostToFans[components[0]]; ok { //再判断一遍, 防止高并发的情形下,多个相同url的写map操做,都会进入从新分配空间的步骤 mu.RUnlock() return nil, nil } mu.RUnlock() mu.Lock() click_times, _ := strconv.Atoi(components[2]) hostToFans[components[0]] = utils.NewMidList(hostNums[components[0]]) hostToFans[components[0]] = hostToFans[components[0]].Append(components[1], click_times) mu.Unlock() startElement = true return nil, nil }) if !startElement { mu.Lock() click_times, _ := strconv.Atoi(components[2]) hostToFans[components[0]] = hostToFans[components[0]].Append(components[1], click_times) mu.Unlock() } } } //针对url:用户的统计文件 该文件列出每一个url对应的独立用户个数 func hostsStat() map[string]int { hostStats := "./scripts/data/stat.txt" bytes, _ := ioutil.ReadFile(hostStats) //....some code.... return hostNums }
执行一下这个代码以后,发现程序执行不久,内存占用就噌噌噌的涨到90%+,过了一会cpu占用降低到极低,可是load一直保持再过载的水平,看下图。因此大胆猜想由于gc致使进程夯住了。以后用pprof和gctrace也印证了这个想法,若是对pprof和gctrace不太清楚的同窗能够看笔者以前的文章 golang 如何排查和定位GC问题。
其实在笔者最开始的代码中,已经有一些地方在注意下降内存的消耗了,好比说在初始化每一个url对应的用户id集合时借鉴groupcache的singleflight,确保不会屡次重复申请空间;好比url对应的用户id切片,先算好具体大小再make。虽然代码很简单, 可是上文中的代码显然仍是有一些问题。
经过pprof,我发现程序执行的过程当中,大部分时间都消耗在gc上,以下图。划红线的都是和gc有关的函数。因此问题就变成排查为何gc会这么长时间。
大部分状况下gc被致使的缘由是分配的内存达到某个阈值,很显然,本例属于这种状况,前文提到内存占用稳定在90+。那么为何这个进程会占用这么多的内存呢?笔者一直试图用pprof的heap和profile来分析出这个问题,可是一直无果。直到有一次经过pprof查看goroutine的状态时,发现当前正在工做的协程高达几十万,甚至有时能到达接近150w的量级, 以下图。这样就可以解释一部分问题了,单个协程若是是3K大小,那么当协程数量到达百万时,就算协程里什么都没有也会占用4G的内存。而笔者在作实验的机器只有8g的内存,因此确定会出现内存被吃满频繁gc致使进程夯住。
因此第一步,确定是要控制一下当前的协程数,不能无限的增加。在读取文本内容的loop里,加上对行数的计算,这样每到一个阈值时,就能够休息一下,暂缓下协程增加的速率。加上限制以后,进程不会再卡死,整个的执行时间稳定在20~30s之间。
iterator := 0 for { line, err := buf.ReadString('\n') if err != nil { if err == io.EOF { //遭遇行尾 fmt.Println("meet the end") break //跳出死循环 } panic(err) } //每一行单独处理 这里须要加逻辑防止并发过大致使大量占用cpu和内存,使得整个进程由于gc夯住, //能够每读10w行左右就休息一会下降一下程序同时在线的协程 iterator++ if iterator <= 120000 { wg.Add(1) go handleLine(line, hostToFans, hostNums) } else { iterator = 1 <-time.After(130 * time.Millisecond) //暂停须要5s左右 wg.Add(1) go handleLine(line, hostToFans, hostNums) } } wg.Wait()
对比一下串行执行的结果,能够发现虽然如今并发执行已经稳定,可是就算刨去休眠时间,和串行执行相比仍是慢不少,因此确定还有优化的空间。这个时候pprof的profile以及heap分析就有了施展拳脚的地方了。看下面两张图,分别是内存消耗和cpu消耗图:
cpu耗时分析
内存占用分析
上面只截取了一部分的图,可是从中咱们已经可以找到须要优化的地方了。能够看到strings.Split函数耗时和耗内存都很严重,主要是它会生成slice。分析一下前文的代码能够发现至少判断url是不是合法网站这一块的strings.Split是能够不要的。这里不光会有额外的运行时间还会生成slice占用内存致使gc。因此对这块功能进行改造:
//先判断是不是合法平台的主播 if is_valid_platform(utils.ValidPlatforms, components[0]) == false { fmt.Println("invalid url: ", components[0]) return nil, errors.New("invalid url") } func is_valid_platform(platforms []string, hostUrl string) bool { for _, platform := range platforms { if strings.Index(hostUrl, platform) != -1 { return true } } return false }
这样的就能够减小没必要要的slice引发的分配空间。改完以后再执行,整个任务稳定在15s左右,减去休眠时间的话就是10s。到这里其实已经算是优化的差很少了,可是其实还有一个地方能够看一下。
上面的heap分析图能够看到其实singleflight.(*Task).Do函数占用更多的内存,而且也占用了不少的cpu时间,以下:
除了一些原生函数以外,就属它最高了,并且该函数也只会在每次新的url出现的时候才会执行。能够看下singleflight的主要结构体,会发现使用了指针变量,而指针变量在gc的时候会致使二次遍历,使得整个gc变慢。虽然笔者此处用的singleflight确定是不能修改, 可是若是有可能的话,尽可能仍是要少用指针。
这一节只能算是简单讲了下优化的思路和过程,但愿下一节能把完整版的优化方案写出来。