目前咱们日志收集组件使用的是filebeat6.6.1,在某业务上线之后,发生了日志收集延迟的问题,最差的状况,延迟两天以上。严重影响了下游数据分析项目。node
分析该业务日志以后,发现该业务日志量大,可是单日志filed很是少。git
以前咱们在压测的时候,已经设置了output批量发送。再加上观察kafka集群的性能监控,基本上能够排查是下游集群的影响。github
针对该问题,今天的主角该出场了--pprof。golang
PProf 工具是自带的咱们检测 Golang 开发应用性能的利器。Golang 提供的两个官方包 runtime/pprof, net/http/pprof 能方便的采集程序运行的堆栈、goroutine、内存分配和占用、io 等信息的 .prof 文件,而后可使用 go tool pprof 分析 .prof 文件。两个包的做用是同样的,只是使用方式的差别。web
net/http/pprof 其实就是对runtime/pprof的封装,用于webserver。今天咱们主要使用runtime/pprof。apache
1 开启filebeat pprof
默认filebeat 的pprof 是关闭的。开启的方法以下:json
./filebeat --c /etc/filebeat.yml --path.data /usr/share/filebeat/data --path.logs /var/log/filebeat --httpprof 0.0.0.0:6060
2 查看30scpu信息app
go tool pprof http://0.0.0.0:6060/debug/pprof/profile
30s后,咱们输入top10命令,有以下打印信息:工具
Showing top 10 nodes out of 197 flat flat% sum% cum cum% 21.45s 13.42% 13.42% 70.09s 43.85% runtime.gcDrain 15.49s 9.69% 23.11% 39.83s 24.92% runtime.scanobject 11.38s 7.12% 30.23% 11.38s 7.12% runtime.futex 7.86s 4.92% 35.15% 16.30s 10.20% runtime.greyobject 7.82s 4.89% 40.04% 7.82s 4.89% runtime.markBits.isMarked (inline) 5.59s 3.50% 43.53% 5.59s 3.50% runtime.(*lfstack).pop 5.51s 3.45% 46.98% 6.05s 3.78% runtime.heapBitsForObject 5.26s 3.29% 50.27% 13.92s 8.71% runtime.sweepone 4.04s 2.53% 52.80% 4.04s 2.53% runtime.memclrNoHeapPointers 3.37s 2.11% 54.91% 4.40s 2.75% runtime.runqgrab
发现太多的cpu时间浪费在GC上,基本上能够确定filebeat在小日志场景下,建立了大量的对象。此时你们应该都想到了sync.pool。性能
咱们须要更详细的信息,须要查看具体的调用关系,发现那里在大量的建立对象。
输入 web命令,将会看到以下的图,以图形化的方式展现了GC的占用:
经过调用关系找到了newobject大量调用:
接着找到了根源:
能够看出根源在sarama 库,filebeat 经过sarama 来将message 写到kafka中。主要是encode方法(flate NewWriter)。咱们都知道该方法是用来压缩的,咱们的filebeat 默认是采用了gzip压缩。
因此接下来咱们须要经过代码验证一下猜测了。下面经过heap图侧面证实以前的猜测。
3 旧代码
func (m *Message) encode(pe packetEncoder) error { pe.push(newCRC32Field(crcIEEE)) pe.putInt8(m.Version) attributes := int8(m.Codec) & compressionCodecMask pe.putInt8(attributes) if m.Version >= 1 { if err := (Timestamp{&m.Timestamp}).encode(pe); err != nil { return err } } err := pe.putBytes(m.Key) if err != nil { return err } var payload []byte if m.compressedCache != nil { payload = m.compressedCache m.compressedCache = nil } else if m.Value != nil { switch m.Codec { case CompressionNone: payload = m.Value case CompressionGZIP: var buf bytes.Buffer var writer *gzip.Writer if m.CompressionLevel != CompressionLevelDefault { writer, err = gzip.NewWriterLevel(&buf, m.CompressionLevel) if err != nil { return err } } else { writer = gzip.NewWriter(&buf) } if _, err = writer.Write(m.Value); err != nil { return err } if err = writer.Close(); err != nil { return err } m.compressedCache = buf.Bytes() payload = m.compressedCache case CompressionSnappy: tmp := snappy.Encode(m.Value) m.compressedCache = tmp payload = m.compressedCache case CompressionLZ4: var buf bytes.Buffer writer := lz4.NewWriter(&buf) if _, err = writer.Write(m.Value); err != nil { return err } if err = writer.Close(); err != nil { return err } m.compressedCache = buf.Bytes() payload = m.compressedCache default: return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", m.Codec)} } // Keep in mind the compressed payload size for metric gathering m.compressedSize = len(payload) } if err = pe.putBytes(payload); err != nil { return err } return pe.pop() }
经过代码能够看出,gzip压缩的时候,使用了gzip.NewWriter
方法。此时已经很明显了。
因为大量的小日志,在写到kafka以前,都在大量的gzip压缩,形成了大量的CPU时间浪费在了GC上。
4: 如何解决?
此时对go熟悉的人都会想起使用sync.pool 复用对象,避免频繁GC。
sarama官方最新的代码:
import ( "bytes" "compress/gzip" "fmt" "sync" "github.com/eapache/go-xerial-snappy" "github.com/pierrec/lz4" ) var ( lz4WriterPool = sync.Pool{ New: func() interface{} { return lz4.NewWriter(nil) }, } gzipWriterPool = sync.Pool{ New: func() interface{} { return gzip.NewWriter(nil) }, } ) func compress(cc CompressionCodec, level int, data []byte) ([]byte, error) { switch cc { case CompressionNone: return data, nil case CompressionGZIP: var ( err error buf bytes.Buffer writer *gzip.Writer ) if level != CompressionLevelDefault { writer, err = gzip.NewWriterLevel(&buf, level) if err != nil { return nil, err } } else { writer = gzipWriterPool.Get().(*gzip.Writer) defer gzipWriterPool.Put(writer) writer.Reset(&buf) } if _, err := writer.Write(data); err != nil { return nil, err } if err := writer.Close(); err != nil { return nil, err } return buf.Bytes(), nil case CompressionSnappy: return snappy.Encode(data), nil case CompressionLZ4: writer := lz4WriterPool.Get().(*lz4.Writer) defer lz4WriterPool.Put(writer) var buf bytes.Buffer writer.Reset(&buf) if _, err := writer.Write(data); err != nil { return nil, err } if err := writer.Close(); err != nil { return nil, err } return buf.Bytes(), nil case CompressionZSTD: return zstdCompress(nil, data) default: return nil, PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", cc)} } }
经过最新的代码能够看出,官方只是在不启用gzip压缩的时候(compressionlevel=-1000),会复用对象池。
这并不能知足咱们的需求。
因此更改之后的代码以下:
package sarama import ( "bytes" "compress/gzip" "fmt" "sync" snappy "github.com/eapache/go-xerial-snappy" "github.com/pierrec/lz4" ) var ( lz4WriterPool = sync.Pool{ New: func() interface{} { return lz4.NewWriter(nil) }, } gzipWriterPool = sync.Pool{ New: func() interface{} { return gzip.NewWriter(nil) }, } gzipWriterPoolForCompressionLevel1 = sync.Pool{ New: func() interface{} { gz, err := gzip.NewWriterLevel(nil, 1) if err != nil { panic(err) } return gz }, } gzipWriterPoolForCompressionLevel2 = sync.Pool{ New: func() interface{} { gz, err := gzip.NewWriterLevel(nil, 2) if err != nil { panic(err) } return gz }, } gzipWriterPoolForCompressionLevel3 = sync.Pool{ New: func() interface{} { gz, err := gzip.NewWriterLevel(nil, 3) if err != nil { panic(err) } return gz }, } gzipWriterPoolForCompressionLevel4 = sync.Pool{ New: func() interface{} { gz, err := gzip.NewWriterLevel(nil, 4) if err != nil { panic(err) } return gz }, } gzipWriterPoolForCompressionLevel5 = sync.Pool{ New: func() interface{} { gz, err := gzip.NewWriterLevel(nil, 5) if err != nil { panic(err) } return gz }, } gzipWriterPoolForCompressionLevel6 = sync.Pool{ New: func() interface{} { gz, err := gzip.NewWriterLevel(nil, 6) if err != nil { panic(err) } return gz }, } gzipWriterPoolForCompressionLevel7 = sync.Pool{ New: func() interface{} { gz, err := gzip.NewWriterLevel(nil, 7) if err != nil { panic(err) } return gz }, } gzipWriterPoolForCompressionLevel8 = sync.Pool{ New: func() interface{} { gz, err := gzip.NewWriterLevel(nil, 8) if err != nil { panic(err) } return gz }, } gzipWriterPoolForCompressionLevel9 = sync.Pool{ New: func() interface{} { gz, err := gzip.NewWriterLevel(nil, 9) if err != nil { panic(err) } return gz }, } ) func compress(cc CompressionCodec, level int, data \[\]byte) (\[\]byte, error) { switch cc { case CompressionNone: return data, nil case CompressionGZIP: var ( err error buf bytes.Buffer writer \*gzip.Writer ) switch level { case CompressionLevelDefault: writer = gzipWriterPool.Get().(\*gzip.Writer) defer gzipWriterPool.Put(writer) writer.Reset(&buf) case 1: writer = gzipWriterPoolForCompressionLevel1.Get().(\*gzip.Writer) defer gzipWriterPoolForCompressionLevel1.Put(writer) writer.Reset(&buf) case 2: writer = gzipWriterPoolForCompressionLevel2.Get().(\*gzip.Writer) defer gzipWriterPoolForCompressionLevel2.Put(writer) writer.Reset(&buf) case 3: writer = gzipWriterPoolForCompressionLevel3.Get().(\*gzip.Writer) defer gzipWriterPoolForCompressionLevel3.Put(writer) writer.Reset(&buf) case 4: writer = gzipWriterPoolForCompressionLevel4.Get().(\*gzip.Writer) defer gzipWriterPoolForCompressionLevel4.Put(writer) writer.Reset(&buf) case 5: writer = gzipWriterPoolForCompressionLevel5.Get().(\*gzip.Writer) defer gzipWriterPoolForCompressionLevel5.Put(writer) writer.Reset(&buf) case 6: writer = gzipWriterPoolForCompressionLevel6.Get().(\*gzip.Writer) defer gzipWriterPoolForCompressionLevel6.Put(writer) writer.Reset(&buf) case 7: writer = gzipWriterPoolForCompressionLevel7.Get().(\*gzip.Writer) defer gzipWriterPoolForCompressionLevel7.Put(writer) writer.Reset(&buf) case 8: writer = gzipWriterPoolForCompressionLevel8.Get().(\*gzip.Writer) defer gzipWriterPoolForCompressionLevel8.Put(writer) writer.Reset(&buf) case 9: writer = gzipWriterPoolForCompressionLevel9.Get().(\*gzip.Writer) defer gzipWriterPoolForCompressionLevel9.Put(writer) writer.Reset(&buf) default: writer, err = gzip.NewWriterLevel(&buf, level) if err != nil { return nil, err } } if \_, err := writer.Write(data); err != nil { return nil, err } if err := writer.Close(); err != nil { return nil, err } return buf.Bytes(), nil case CompressionSnappy: return snappy.Encode(data), nil case CompressionLZ4: writer := lz4WriterPool.Get().(\*lz4.Writer) defer lz4WriterPool.Put(writer) var buf bytes.Buffer writer.Reset(&buf) if \_, err := writer.Write(data); err != nil { return nil, err } if err := writer.Close(); err != nil { return nil, err } return buf.Bytes(), nil case CompressionZSTD: return zstdCompress(nil, data) default: return nil, PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", cc)} } }
直接上图:
升级先后cpu利用率对比。
1: PProf 是个性能调优的大杀器。
2: 其实filebeat 还有更多的优化点。好比json 序列化。
3:实际结果cpu使用下降了一半,采集速度却提升了20%。