如何将golang的并发编程运用到实际开发

前言:这几天在写一个工具脚本分析线上的大量的日志文件,原本应该是索然无味的一个工做,可是本着作到极致的原则,激发了我不断思考如何优化。本文将从开发过程当中的最开始版本,一点点讲解优化的过程,最终用golang实现了一个相似java的worker线程池,收获满满。java

一,无脑开goroutine阶段

1,任务背景 这个工具的做用简单介绍以下:首先是线上的日志量是很是庞大的,而后要去读取日志文件的内容,而后一条条日志项分析,匹配出想要的日志项写入文件,再提取该文件中的数据计算。日志项格式模拟数据以下:golang

{
 "id":xx,"time":"2017-11-19","key1":"value1","key2":"value2".......  
}
复制代码

2,无脑开gorountine 先给下代码(只放核心代码,省略文件操做和异常错误处理),再来讲说这个阶段的思路bash

var wirteChan = make(chan []byte) //用于写入文件
var waitgroup sync.WaitGroup //用于控制同步
func main(){
    //省略写入文件的打开操做,毕竟咱们主要讲并发这块
    
    //初始化写入的channel
	InitWriter(outLogFileWriter)
	//接下来去遍历每一个日志文件,每读出一个日志项就开一个gorountine去处理
	for _,file := range logDir {
	    if file.IsDir() {
			continue
		} else {
			HandlerFile(arg.dir + "/" + file.Name()) //处理每一个文件
		}
	}
}
/**
 * 初始化Writer的channel
 */
func InitWriter(outLogFileWriter *bufio.Writer) {
	go func() {
		for data := range wirteChan {
			nn, err := outLogFileWriter.Write(data)
		}
	}()
}
//处理每一个文件,而后开G去处理每一个日志项
func HandlerFile(fileName string) {
	file, err := os.Open(fileName)
	defer file.Close()
	br := bufio.NewReader(file)
	for {
		data, err := br.ReadBytes('\n')
		if err == io.EOF {
			break
		} else {
			go Handler(data) //每次开一个G去处理,处理完写入writeChannel
		}
	}
}
复制代码

解析:写博客很不喜欢放大篇幅代码,因此上面给的只是重要的代码,上面代码注释有说到的也不重复说了。好了,咱们来想一想上面的代码有什么问题?咱们如今就假设咱们就只有一个很是大的文件,文件中每一个记录项都无脑开一个G去执行。那么,运行一下,咱们会发现,好慢呀~。问题出在哪里呢?首先咱们没法控制G的数量,其第二天志文件很是大,这样运行下来,G的数量是很是庞大的,多个G要往一个channel中写数据,那么也会发生严重的阻塞。种种缘由,致使了这个方法是不适用的并发

二,加入带缓冲的任务队列

1,任务队列 在上面咱们说到,咱们没法控制任务的数量,那么,我在这里就加入了一个任务队列,来对任务进行排队,同时能够控制任务的数量。上代码:函数

/**
 * Job结构体,包含要处理的数据和处理函数(这个可根据须要修改)
 */
type Job struct {
	Data []byte
	Proc func([]byte)
}
//Job队列,存储要作的Job,将每一个任务打包成Job发送到这里
var JobQueue chan Job = make(chan Job, arg.maxqueue)
//启动处理函数处理
func Handler(Data []byte) {
    for range job := <-Queue {
        job.Proc(Data)
    }
}
复制代码

解析:在这个时候,抽象出来了任务模型Job,因为函数调用其实就是函数地址加函数参数,因此咱们能够将处理函数也放进Job中。而后让处理函数去处理就好了。想到这里,稍微有点佩服本身了,接着兴致勃勃的运行一下。嗯,好像没快多少(其实这个取决了你的处理函数,就是Job中的Proc)。What?冷静下来分析一下,真以为本身真可爱。我仅仅是对任务进行了包装,而后用了一个带缓冲的任务队列,因为建立的Job远远大于单个M的处理能力,带缓冲只是稍微把问题拖后了一点。工具

三,Job/Worker模型

其实写到这里,内心对如何优化已经有点B数了。我想起了java中的线程池的概念,我能够创建一个线程池,而后池中包含多个worker(数量能够指定),每一个worker去队列中取任务处理,处理完则继续取任务。同时为了提升通用性,参数类型都改成了interface{}。好了,接下来看看代码,这里的代码都很关键,因此就所有放上来了学习

type Job struct {
	Data interface{}
	Proc func(interface{})
}
//Job队列,存储要作的Job
var JobQueue chan Job = make(chan Job, arg.maxqueue)
//Woker,用来从Job队列中取出Job执行
type Worker struct {
	WokerPool  chan chan Job //表示属于哪一个Worker池,同时接收JobChannel注册
	JobChannel chan Job      //任务管道,经过这个管道获取任务执行
	Quit       chan bool     //用来中止Worker
}
//新建一个Worker,须要传入Worker池参数
func NewWorker(wokerPool chan chan Job) Worker {
	return Worker{
		WokerPool:  wokerPool,
		JobChannel: make(chan Job),
		Quit:       make(chan bool),
	}
}
//Worker的启动:包含:(1) 把该worker的JobChannel注册到WorkerPool中去  (2) 监听JobChannel上有没有新的任务到来 (3) 监听是否受到关闭的请求
func (worker Worker) Start() {
	go func() {
		for {
			worker.WokerPool <- worker.JobChannel //每次作完任务后就从新注册上去通知本worker又处于可用状态了
			select {
			case job := <-worker.JobChannel:
				job.Proc(job.Data)
			case quit := <-worker.Quit: //接收到关闭信息,直接退出便可
				if quit {
					return
				}
			}
		}
	}()
}
//Worker的关闭:只要发送一个关闭信号便可
func (worker Worker) Stop() {
	go func() {
		worker.Quit <- true
	}()
}
//管理Worker的调度器,包含最大worker数量和workerpool
type Dispatcher struct {
	MaxWorker  int
	WorkerPool chan chan Job
}

//启动一个调度器
func (dispatcher *Dispatcher) Run() {
	//启动maxworker个worker
	for i := 0; i < dispatcher.MaxWorker; i++ {
		worker := NewWorker(dispatcher.WorkerPool)
		worker.Start()
	}
	//接下来启动调度服务
	go dispatcher.dispatch()
}

func (dispatcher *Dispatcher) dispatch() {
	for {
		select {
		case job := <-JobQueue:
			go func(job Job) {
				jobChannel := <-dispatcher.WorkerPool //获取一个可用的worker
				jobChannel <- job                     //将该job发送给该worker
			}(job)
		}
	}
}

//新建一个调度器
func NewDispatcher(maxWorker int) *Dispatcher {
	workerPool := make(chan chan Job, maxWorker)
	return &Dispatcher{
		WorkerPool: workerPool,
		MaxWorker:  maxWorker,
	}
}
复制代码

解析:代码中每句都注释得很是清楚了,就不重复了。咱们能够经过这样来开启这个模型:dispatcher := NewDispatcher(MaxWorker) dispatcher.Run()。有一点须要强调的是,处理函数这块须要根据本身的业务去写,而后和数据打包成Job再发给JobQueue就好了。接着我运行了个人脚本,几十G的文件通过三轮的处理函数(就是说我须要三轮处理,每轮处理都根据上轮的结果)耗时在三分钟到四分钟之间,并且CPU占用率等也不高。对于耗时高的,可使用pprof工具分析一下到底慢在了哪里优化

四,总结

由于以前刚学了golang的并发原理,而后恰好有这个任务,因而本身就开始了从零一点点的摸索和优化,整个工具写完,本身对golang的并发的理解又更加的深刻了,并且对锁,文件操做等也熟悉了起来。收获不少东西,因此我鼓励学习一个新东西,不能只懂原理,还要本身多动手一下,这样才牢固。其实这个模型仍是存在一些不足之处,后续会继续优化。 期间也参考了一些很不错的博客,在这里也表示感谢。ui

相关文章
相关标签/搜索