![]() |
![]() |
很重要--转载声明
- 本站文章无特别说明,皆为原创,版权全部,转载时请用连接的方式,给出原文出处。同时写上原做者:朝十晚八 or Twowords
- 如要转载,请原文转载,如在转载时修改本文,请事先告知,谢绝在转载时经过修改本文达到有利于转载者的目的。
回顾:web
之前一直是C++开发(客户端),最近听同事讲go语言不错,随后便决定先从go语法开始投向go的怀抱。因为历史缘由学习go语法时,用了半天的时间看完了菜鸟教程上相关资料,后来又看了易百教程上的一些实例代码,感受都比较简单,毕竟仍是有C++基础存在的。。。可是找工做大多都是须要工做经验的,那么怎么办才好呢!后来在知乎上看到有一位大神推荐看NSQ和skynet开源框架,权衡之下我决定从NSQ开始学习,进入个人go学习之路。sql
要学习NSQ,首先就是上www查找相关NSQ的资料,没想到百度一下相关资料仍是挺多,有好几个网友的博客都写的不错,思路比较清晰,可是基本都没有彻底的把NSQ分析下来,只能后边继续等待了。文章末尾我会把本身觉着能够帮助理解的文章连接贴上。windows
本篇文章我不打算直接开始分析NSQ框架代码,而是想从一些零散的地方着手,针对性的讲NSQ,这样有利于后期咱们逐模块分析NSQ源码。api
下边就开始咱们文本的中心内容,文章结构是按点划分,会比较零散但都是我的在看NSQ源码时的心得,也能够说是我的以为比较难理解的地方吧,看NSQ以前,只学习了go语法2天时间,所以文中可能会涉及到一些基础性错误,欢迎你们指正。浏览器
我我的电脑是win10 64位,所以在这儿我就给出一个网友写好的Windows下NSQ部署文章NSQ如何在windows上安装 ,安装网友的文章说明个人测试结果图1所示,当启动nsqd链接nsqlookupd时,会有相应的提示,启动nsq_to_file进程时,会往nsqd写入消息都有相应提示,如图1中用红色矩形框选中的是对于的关键提示信息。服务器
图1 NSQ部署测试app
打开浏览器直接输入http://127.0.0.1:4171/
就能够查看NSQ运行状况框架
经过第一小节,咱们简单的把NSQ部署起来,并看到了NSQ的运行状况,还记得咱们启动各个进程的步骤吗,不记得不要紧,看图2所示,该图是出自nsq源码分析之概述,我的觉着这幅图对NSQ总结的很是好,从图中咱们能够了解到下面几个点tcp
图2 NSQ拓扑图函数
了解了nsq的总体结构后,咱们就能够开始按模块分析nsq的源码
3、NSQ启动与退出
nsq优雅的启动与退出使用了SVC包,推荐阅读Nsq源码阅读(1) 启动和优雅退出,这篇文章讲解的很是详细。
NSQ中简单包装了sync.WaitGroup,包装后的待执行函数都会在轻量级的线程中执行,代码以下所示。主线程中启动了多个子线程后,只有等启动的多个子线程结束后主线程才能结束,方法Wrap中的Add和Done调用分别会维护一个引用计数,只有当该引用计数为0时,主线程才会结束等待
1 //若是结构体S,包含一个匿名字段T,那么这个结构体S 就有了T的方法。 2 type WaitGroupWrapper struct { 3 sync.WaitGroup 4 } 5 6 func (w *WaitGroupWrapper) Wrap(cb func()) { 7 w.Add(1) //sync.WaitGroup结构中方法 8 go func() { 9 cb() 10 w.Done() //sync.WaitGroup结构中方法 11 }() 12 }
Go语言式的接口,就是不用显示声明类型T实现了接口I,只要类型T的公开方法彻底知足接口I的要求,就能够把类型T的对象用在须要接口I的地方。好比nsqlookupd.go文件中Main函数最后启动http监听服务时代码
1 l.Lock() 2 l.httpListener = httpListener //把Listener存在NSQLookupd的struct里 3 l.Unlock() 4 //建立httpServer的实例,httpServer在nsqlookupd\http.go文件中定义 5 httpServer := newHTTPServer(ctx) 6 //调用http_api.Serve方法(在http_api\http_server.go中定义)开始在指定的httpListener上接收http链接。 7 l.waitGroup.Wrap(func() { 8 //由于httpServer结构重写了http.Handler接口类的ServeHTTP方法,所以能够当http.Handler使用 9 http_api.Serve(httpListener, httpServer, "HTTP", l.opts.Logger) 10 })
代码第9行调用中的第二个参数httpServer,是一个自定义的struct,而http_apit.Serve须要的参数类型为http.Handler,由于httpServer实现了http.Handler接口类中的接口,所以能够在这个地方使用,对每个类型实现接口方法以下
1 func (s *httpServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { 2 s.router.ServeHTTP(w, req) 3 }
NSQD做为消息接收、转发者,他也是nsqlookupd的客户端,当nsqd启动时,除过本身启动tcp和http服务等待生产者和消费者链接外,还须要做为client链接到nsqlookupd。在nsqd.go文件的Main函数最后,经过waitGroup启动了3个线程,以下代码所示,第2行代码启动了一个lookupLoop循环,该循环是一个死循环,其中有一项功能就是发送心跳值,告诉全部的nsqlookupd,本身还活着。
1 n.waitGroup.Wrap(func() { n.queueScanLoop() }) //循环处理消息的分发 2 n.waitGroup.Wrap(func() { n.lookupLoop() }) //同步nsqd状态到nsqlookup好比:在线、Topic变化、Channel变化等 3 if n.getOpts().StatsdAddress != "" { 4 n.waitGroup.Wrap(func() { n.statsdLoop() }) 5 }
心跳发送代码在lookup.go文件中,处理方式以下
1 case <-ticker: //发送心跳 告诉nsqlookup本身在线 2 // send a heartbeat and read a response (read detects closed conns) 3 for _, lookupPeer := range lookupPeers { 4 n.logf("LOOKUPD(%s): sending heartbeat", lookupPeer) 5 cmd := nsq.Ping() 6 _, err := lookupPeer.Command(cmd) 7 if err != nil { 8 n.logf("LOOKUPD(%s): ERROR %s - %s", lookupPeer, cmd, err) 9 } 10 }
nsqlookupd做为服务器端,启动时就开启了tcp监听,每接受一个nsqd链接,就使用go开启一个handle处理函数,最终和客户端(nsqd)交互功能代码在LookupProtocolV1文件中完成。
此时咱们在看图2的NSQ拓扑图,说过了nsqd做为client连接nsqlookupd服务器端后,咱们在顺道说下nsqd开启tcp监控做为服务器时有哪些是客户端,此时的client包括:生产者和消费者,即图中的writer和reader。
先看下NSQ源码中对Decorator定义,其实就是一个函数的嵌套定义,源码位置在internal/http_api/api_response.go文件中,代码以下:
1 type Decorator func(APIHandler) APIHandler 2 3 type APIHandler func(http.ResponseWriter, *http.Request, httprouter.Params) (interface{}, error)
下边咱们来看两个关于Decorate的使用
1 func Decorate(f APIHandler, ds ...Decorator) httprouter.Handle { 2 decorated := f 3 for _, decorate := range ds { 4 decorated = decorate(decorated) 5 } 6 return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { 7 decorated(w, req, ps) 8 } 9 } 10 11 func Log(l app.Logger) Decorator { //Logger是go对应的log4版本 12 return func(f APIHandler) APIHandler { 13 return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { 14 start := time.Now() //当前时间 15 response, err := f(w, req, ps) //执行http请求 err错误状态 response处理结果 16 elapsed := time.Since(start) //f(APIHandler)调用时长 17 status := 200 18 if e, ok := err.(Err); ok { 19 status = e.Code //重置错误码 20 } 21 l.Output(2, fmt.Sprintf("%d %s %s (%s) %s", 22 status, req.Method, req.URL.RequestURI(), req.RemoteAddr, elapsed)) //输出http处理日志信息 23 return response, err //返回一个接口 和错误状态 24 } 25 } 26 }
首先先来看下代码第一行定义的Decorate函数,这个函数其实就是一个装饰函数,第一个参数为须要被装饰的视图函数,从第二参数开始,都是装饰函数,最后返回装饰好的视图函数。
第11行代码定义了一个Log函数,返回值为Decorator类型,也就是代码第12行return后边的表达式,该表达式也是一个函数定义,其返回值为APIHandler,第13行代码return后边的表达式为其返回值。