Golang被证实很是适合并发编程,goroutine比异步编程更易读、优雅、高效。本文提出一个适合由Golang实现的Pipeline执行模型,适合批量处理大量数据(ETL)的情景。程序员
想象这样的应用情景:数据库
因为应用中遇到的各类问题,概括出这些需求:
需求一:应分批处理数据,例如规定每批100条。出现问题时(例如任意一个数据库故障)则中断,下次程序启动时使用checkpoint从中断处恢复。
需求二:每一个流程设置合理的并发数、让数据库和NLP服务有合理的负载(不影响其它业务的基础上,尽量占用更多资源以提升ETL性能)。例如,步骤(1)-(4)分别设置并发数一、四、八、2。编程
这就是一个典型的Pipeline(流水线)执行模型。把每一批数据(例如100条)看做流水线上的产品,4个步骤对应流水线上4个处理工序,每一个工序处理完毕后就把半成品交给下一个工序。每一个工序能够同时处理的产品数各不相同。数组
你可能首先想到启用1+4+8+2个goroutine,使用channel来传递数据。我也曾经这么干,结论就是这么干会让程序员疯掉:流程并发控制代码很是复杂,特别是你得处理异常、执行时间超出预期、可控中断等问题,你不得不加入一堆channel,直到你本身都不记得有什么用。并发
为了更高效完成ETL工做,我将Pipeline抽象成模块。我先把代码粘贴出来,再解析含义。模块能够直接使用,主要使用的接口是:NewPipeline、Async、Wait。异步
package main import "sync" func HasClosed(c <-chan struct{}) bool { select { case <-c: return true default: return false } } type SyncFlag interface{ Wait() Chan() <-chan struct{} Done() bool } func NewSyncFlag() (done func(), flag SyncFlag) { f := &syncFlag{ c : make(chan struct{}), } return f.done, f } type syncFlag struct { once sync.Once c chan struct{} } func (f *syncFlag) done() { f.once.Do(func(){ close(f.c) }) } func (f *syncFlag) Wait() { <-f.c } func (f *syncFlag) Chan() <-chan struct{} { return f.c } func (f *syncFlag) Done() bool { return HasClosed(f.c) } type pipelineThread struct { sigs []chan struct{} chanExit chan struct{} interrupt SyncFlag setInterrupt func() err error } func newPipelineThread(l int) *pipelineThread { p := &pipelineThread{ sigs : make([]chan struct{}, l), chanExit : make(chan struct{}), } p.setInterrupt, p.interrupt = NewSyncFlag() for i := range p.sigs { p.sigs[i] = make(chan struct{}) } return p } type Pipeline struct { mtx sync.Mutex workerChans []chan struct{} prevThd *pipelineThread } //建立流水线,参数个数是每一个任务的子过程数,每一个参数对应子过程的并发度。 func NewPipeline(workers ...int) *Pipeline { if len(workers) < 1 { panic("NewPipeline need aleast one argument") } workersChan := make([]chan struct{}, len(workers)) for i := range workersChan { workersChan[i] = make(chan struct{}, workers[i]) } prevThd := newPipelineThread(len(workers)) for _,sig := range prevThd.sigs { close(sig) } close(prevThd.chanExit) return &Pipeline{ workerChans : workersChan, prevThd : prevThd, } } //往流水线推入一个任务。若是第一个步骤的并发数达到设定上限,这个函数会堵塞等待。 //若是流水线中有其它任务失败(返回非nil),任务不被执行,函数返回false。 func (p *Pipeline) Async(works ...func()error) bool { if len(works) != len(p.workerChans) { panic("Async: arguments number not matched to NewPipeline(...)") } p.mtx.Lock() if p.prevThd.interrupt.Done() { p.mtx.Unlock() return false } prevThd := p.prevThd thisThd := newPipelineThread(len(p.workerChans)) p.prevThd = thisThd p.mtx.Unlock() lock := func(idx int) bool { select { case <-prevThd.interrupt.Chan(): return false case <-prevThd.sigs[idx]: //wait for signal } select { case <-prevThd.interrupt.Chan(): return false case p.workerChans[idx]<-struct{}{}: //get lock } return true } if !lock(0) { thisThd.setInterrupt() <-prevThd.chanExit thisThd.err = prevThd.err close(thisThd.chanExit) return false } go func() { //watch interrupt of previous thread select { case <-prevThd.interrupt.Chan(): thisThd.setInterrupt() case <-thisThd.chanExit: } }() go func() { var err error for i,work := range works { close(thisThd.sigs[i]) //signal next thread if work != nil { err = work() } if err != nil || (i+1 < len(works) && !lock(i+1)) { thisThd.setInterrupt() break } <-p.workerChans[i] //release lock } <-prevThd.chanExit if prevThd.interrupt.Done() { thisThd.setInterrupt() } if prevThd.err != nil { thisThd.err = prevThd.err } else { thisThd.err = err } close(thisThd.chanExit) }() return true } //等待流水线中全部任务执行完毕或失败,返回第一个错误,若是无错误则返回nil。 func (p *Pipeline) Wait() error { p.mtx.Lock() lastThd := p.prevThd p.mtx.Unlock() <-lastThd.chanExit return lastThd.err }
使用这个Pipeline组件,咱们的ETL程序将会简单、高效、可靠,让程序员从繁琐的并发流程控制中解放出来:异步编程
package main import "log" func main() { //恢复上次执行的checkpoint,若是是第一次执行就获取一个初始值。 checkpoint := loadCheckpoint() //工序(1)在pipeline外执行,最后一个工序是保存checkpoint pipeline := NewPipeline(4, 8, 2, 1) for { //(1) //加载100条数据,并修改变量checkpoint //data是数组,每一个元素是一条评论,以后的联表、NLP都直接修改data里的每条记录。 data, err := extractReviewsFromA(&checkpoint, 100) if err != nil { log.Print(err) break } //这里有个Golang著名的坑。 //“checkpoint”是循环体外的变量,它在内存中只有一个实例并在循环中不断被修改,因此不能在异步中使用它。 //这里建立一个副本curCheckpoint,储存本次循环的checkpoint。 curCheckpoint := checkpoint ok := pipeline.Async(func() error { //(2) return joinUserFromB(data) }, func() error { //(3) return nlp(data) }, func() error { //(4) return loadDataToC(data) }, func() error { //(5)保存checkpoint log.Print("done:", curCheckpoint) return saveCheckpoint(curCheckpoint) }) if !ok { break } if len(data) < 100 { break } //处理完毕 } err := pipeline.Wait() if err != nil { log.Print(err) } }
示意图:函数
每一个方格表示一批数据,黄色表示正在执行所属工序,白色表示已经完成工序但堵塞等待中。性能
Pipeline的工做方式:学习
Pipeline分别控制每个工序的并发数。
若是第一个工序的并发数已满,Async会堵塞,直到有线程第一个工序完成。
每一个线程的每一个工序的调度,不早于上一个线程同一个工序的调度。
若是某个线程的某个工序处理失败(例如数据库故障),那以后的线程都会停止执行,下一次调用Async返回false,pipeline.Wait()返回第一个错误,整个流水线做业可控中断。
没法避免中断过程当中有checkpoint后的数据写入。下次重启程序将从新写入、覆盖这些数据。
Pipeline解决了这些问题:
若是你刚开始学习Golang,你必定以为channel这东西好棒。但当你理所固然地用一堆channel来串联一条流水线,就是把本身逼疯的开始。实际上Golang有更棒的东西,我不知道那叫什么,反正你能够在func开启一个goroutine的时候,里面调用外面的变量。
package main import ( "fmt" "time" "sync" ) func main() { var wg sync.WaitGroup for i := 0 ; i < 10 ; i++ { my_var := i * 10 wg.Add(1) go func() { defer wg.Done() time.Sleep(time.Second) fmt.Println(my_var) }() } wg.Wait() }
程序会在启动1秒后不按顺序输出0、十、20、…… 90。Runtime建立了10个my_var,每一个goroutine各有一个,因此每一个goroutine输出不同的值。
看起来很简单的东西,其实是Golang的独有特性,涉及到Go runtime的机制,其余语言不得不定义一个对象来解决相似的问题。当我从C++转Go开发时就惊讶:还有这种操做?
上面的Pipeline模块利用了这个特性,它根本不须要任何channel来传递数据,使用者在一个在循环体内定义一个变量来储存一整批的数据,在异步的goroutine中读取、修改这些数据。在goroutine间用channel传递数据的思路转变为:每一批数据由一个goroutine处理,多个gouroutine竞争各个工序的并发数。