今天来介绍 go-zero
生态的另外一个组件 go-stash
。这是一个 logstash
的 Go 语言替代版,咱们用 go-stash
相比原先的 logstash
节省了2/3的服务器资源。若是你在用 logstash
,不妨试试,也能够看看基于 go-zero
实现这样的工具是多么的容易,这个工具做者仅用了两天时间。git
先从它的配置中,咱们来看看设计架构。github
Clusters: - Input: Kafka: # Kafka 配置 --> 联动 go-queue Filters: # filter action - Action: drop - Action: remove_field - Action: transfer Output: ElasticSearch: # es 配置 {host, index}
看配置名:kafka
是数据输出端,es
是数据输入端,filter
抽象了数据处理过程。json
对,整个 go-stash
就是如 config 配置中显示的,所见即所得。服务器
从 stash.go
的启动流程大体分为几个部分。由于能够配置多个 cluster
,那从一个 cluster
分析:微信
es
的链接【传入 es
配置】filter processors
【es
前置处理器,作数据过滤以及处理,能够设置多个】es
中 索引配置,启动 handle
,同时将 filter
加入handle【处理输入输出】kafka
,将上面建立的 handle
传入,完成 kafka
和 es
之间的数据消费和数据写入在上面架构图中,中间的 filter
只是从 config 中看到,其实更详细是 MessageHandler
的一部分,作数据过滤和转换,下面来讲说这块。架构
如下代码: https://github.com/tal-tech/g...
type MessageHandler struct { writer *es.Writer indexer *es.Index filters []filter.FilterFunc }
这个就对应上面说的,filter
只是其中一部分,在结构上 MessageHandler
是对接下游 es
,可是没有看到对 kafka
的操做。微服务
别急,从接口设计上 MessageHandler
实现了 go-queue
中 ConsumeHandler
接口。工具
这里,上下游就串联了:性能
MessageHandler
接管了 es
的操做,负责数据处理到数据写入kafka
的 Consume
操做。这样在消费过程当中执行 handler
的操做,从而写入 es
实际上,Consume()
也是这么处理的:url
func (mh *MessageHandler) Consume(_, val string) error { var m map[string]interface{} // 反序列化从 kafka 中的消息 if err := jsoniter.Unmarshal([]byte(val), &m); err != nil { return err } // es 写入index配置 index := mh.indexer.GetIndex(m) // filter 链式处理【由于没有泛型,整个处理都是 `map进map出`】 for _, proc := range mh.filters { if m = proc(m); m == nil { return nil } } bs, err := jsoniter.Marshal(m) if err != nil { return err } // es 写入 return mh.writer.Write(index, string(bs)) }
说完了数据处理,以及上下游的链接点。可是数据要从 kafka -> es
,数据流出这个动做从 kafka
角度看,应该是由开发者主动 pull data from kafka
。
那么数据流是怎么动起来?咱们回到主程序 https://github.com/tal-tech/g...
其实 启动 整个流程中,其实就是一个组合模式:
func main() { // 解析命令行参数,启动优雅退出 ... // service 组合模式 group := service.NewServiceGroup() defer group.Stop() for _, processor := range c.Clusters { // 链接es ... // filter processors 构建 ... // 准备es的写入操做 {写入的index, 写入器writer} handle := handler.NewHandler(writer, indexer) handle.AddFilters(filters...) handle.AddFilters(filter.AddUriFieldFilter("url", "uri")) // 按照配置启动kafka,并将消费操做传入,同时加入组合器 for _, k := range toKqConf(processor.Input.Kafka) { group.Add(kq.MustNewQueue(k, handle)) } } // 启动这个组合器 group.Start() }
整个数据流,就和这个 group
组合器有关了。
group.Start() |- group.doStart() |- [service.Start() for service in group.services]
那么说明加入 group
的 service
都是实现 Start()
。也就是说 kafka
端的启动逻辑在 Start()
:
func (q *kafkaQueue) Start() { q.startConsumers() q.startProducers() q.producerRoutines.Wait() close(q.channel) q.consumerRoutines.Wait() }
kafka
消费程序kafka
消费拉取端【可能会被名字迷惑,其实是从 kafka
拉取消息到 q.channel
】而咱们传入 kafka
中的 handler
,上文说过实际上是 Consume
,而这个方法就是在 q.startConsumers()
中执行的:
q.startConsumers() |- [q.consumeOne(key, value) for msg in q.channel] |- q.handler.Consume(key, value)
这样整个数据流就完全串起来了:
做为 go-stash
第一篇文章,本篇从架构和设计上总体介绍 go-stash
,有关性能和为何咱们要开发一个这样的组件,咱们下篇文章逐渐揭晓。
https://github.com/tal-tech/g...
关于 go-zero
更多的设计和实现文章,能够持续关注咱们。
https://github.com/tal-tech/g...
欢迎使用 go-zero 并 star 支持咱们!
关注『微服务实践』公众号并回复 进群 获取社区群二维码。
go-zero 系列文章见『微服务实践』公众号