51Reboot 将在 2020.1.16日 21:00 为您带来分享主题《大佬教你如何从 ES 初学者到 ES专家》html
直播连接(提早报名):https://ke.qq.com/course/482014?taid=4309905192737502&tuin=31589b0epython
再次整理了一下这个日志收集系统的框,以下图linux
此次要实现的代码的总体逻辑为:nginx
完整代码地址为: https://github.com/pythonsite/logagentgit
高可用的分布式key-value存储,能够用于配置共享和服务发现github
相似的项目:zookeeper和consul算法
开发语言:go服务器
接口:提供restful的接口,使用简单restful
实现算法:基于raft算法的强一致性,高可用的服务存储目录app
etcd的应用场景:
官网对etcd的有一个很是简明的介绍:
etcd搭建:
下载地址:https://github.com/coreos/etcd/releases/
根据本身的环境下载对应的版本而后启动起来就能够了
启动以后能够经过以下命令验证一下:
[root@localhost etcd-v3.2.18-linux-amd64]# ./etcdctl set name zhaofan zhaofan [root@localhost etcd-v3.2.18-linux-amd64]# ./etcdctl get name zhaofan [root@localhost etcd-v3.2.18-linux-amd64]#
其实这个东西翻译过来就是上下文管理,那么context的做用是作什么,主要有以下两个做用:
经过下面一个简单的例子进行理解:
package main import ( "fmt" "time" "net/http" "context" "io/ioutil" ) type Result struct{ r *http.Response err error } func process(){ ctx,cancel := context.WithTimeout(context.Background(),2*time.Second) defer cancel() tr := &http.Transport{} client := &http.Client{Transport:tr} c := make(chan Result,1) req,err := http.NewRequest("GET","http://www.google.com",nil) if err != nil{ fmt.Println("http request failed,err:",err) return } // 若是请求成功了会将数据存入到管道中 go func(){ resp,err := client.Do(req) pack := Result{resp,err} c <- pack }() select{ case <- ctx.Done(): tr.CancelRequest(req) fmt.Println("timeout!") case res := <-c: defer res.r.Body.Close() out,_:= ioutil.ReadAll(res.r.Body) fmt.Printf("server response:%s",out) } return } func main() { process() }
写一个经过 context 保存上下文,代码例子如:
package main import ( "github.com/Go-zh/net/context" "fmt" ) func add(ctx context.Context,a,b int) int { traceId := ctx.Value("trace_id").(string) fmt.Printf("trace_id:%v\n",traceId) return a+b } func calc(ctx context.Context,a, b int) int{ traceId := ctx.Value("trace_id").(string) fmt.Printf("trace_id:%v\n",traceId) //再将ctx传入到add中 return add(ctx,a,b) } func main() { //将ctx传递到calc中 ctx := context.WithValue(context.Background(),"trace_id","123456") calc(ctx,20,30) }
关于经过 go 链接 etcd 的简单例子:(这里有个小问题须要注意就是 etcd 的启动方式,默认启动可能会链接不上,尤为你是在虚拟你安装,因此须要经过以下命令启动:
./etcd --listen-client-urls http://0.0.0.0:2371 --advertise-client-urls http://0.0.0.0:2371 --listen-peer-urls http://0.0.0.0:2381
)
package main import ( etcd_client "github.com/coreos/etcd/clientv3" "time" "fmt" ) func main() { cli, err := etcd_client.New(etcd_client.Config{ Endpoints:[]string{"192.168.0.118:2371"}, DialTimeout:5*time.Second, }) if err != nil{ fmt.Println("connect failed,err:",err) return } fmt.Println("connect success") defer cli.Close() }
下面一个例子是经过链接etcd,存值并取值
package main import ( "github.com/coreos/etcd/clientv3" "time" "fmt" "context" ) func main() { cli,err := clientv3.New(clientv3.Config{ Endpoints:[]string{"192.168.0.118:2371"}, DialTimeout:5*time.Second, }) if err != nil{ fmt.Println("connect failed,err:",err) return } fmt.Println("connect succ") defer cli.Close() ctx,cancel := context.WithTimeout(context.Background(),time.Second) _,err = cli.Put(ctx,"logagent/conf/","sample_value") cancel() if err != nil{ fmt.Println("put failed,err",err) return } ctx, cancel = context.WithTimeout(context.Background(),time.Second) resp,err := cli.Get(ctx,"logagent/conf/") cancel() if err != nil{ fmt.Println("get failed,err:",err) return } for _,ev := range resp.Kvs{ fmt.Printf("%s:%s\n",ev.Key,ev.Value) } }
关于 context 官网也有一个例子很是有用,用于控制开启的 goroutine 的退出,代码以下:
package main import ( "context" "fmt" ) func main() { // gen generates integers in a separate goroutine and // sends them to the returned channel. // The callers of gen need to cancel the context once // they are done consuming generated integers not to leak // the internal goroutine started by gen. gen := func(ctx context.Context) <-chan int { dst := make(chan int) n := 1 go func() { for { select { case <-ctx.Done(): return // returning not to leak the goroutine case dst <- n: n++ } } }() return dst } ctx, cancel := context.WithCancel(context.Background()) defer cancel() // cancel when we are finished consuming integers for n := range gen(ctx) { fmt.Println(n) if n == 5 { break } } }
关于官网文档中的WithDeadline演示的代码例子:
package main import ( "context" "fmt" "time" ) func main() { d := time.Now().Add(50 * time.Millisecond) ctx, cancel := context.WithDeadline(context.Background(), d) // Even though ctx will be expired, it is good practice to call its // cancelation function in any case. Failure to do so may keep the // context and its parent alive longer than necessary. defer cancel() select { case <-time.After(1 * time.Second): fmt.Println("overslept") case <-ctx.Done(): fmt.Println(ctx.Err()) } }
经过上面的代码有了一个基本的使用,那么若是咱们经过etcd来作配置管理,若是配置更改以后,咱们如何通知对应的服务器配置更改,经过下面例子演示:
package main import ( "github.com/coreos/etcd/clientv3" "time" "fmt" "context" ) func main() { cli,err := clientv3.New(clientv3.Config{ Endpoints:[]string{"192.168.0.118:2371"}, DialTimeout:5*time.Second, }) if err != nil { fmt.Println("connect failed,err:",err) return } defer cli.Close() // 这里会阻塞 rch := cli.Watch(context.Background(),"logagent/conf/") for wresp := range rch{ for _,ev := range wresp.Events{ fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value) } } }
实现一个 kafka 的消费者代码的简单例子:
package main import ( "github.com/Shopify/sarama" "strings" "fmt" "time" ) func main() { consumer,err := sarama.NewConsumer(strings.Split("192.168.0.118:9092",","),nil) if err != nil{ fmt.Println("failed to start consumer:",err) return } partitionList,err := consumer.Partitions("nginx_log") if err != nil { fmt.Println("Failed to get the list of partitions:",err) return } fmt.Println(partitionList) for partition := range partitionList{ pc,err := consumer.ConsumePartition("nginx_log",int32(partition),sarama.OffsetNewest) if err != nil { fmt.Printf("failed to start consumer for partition %d:%s\n",partition,err) return } defer pc.AsyncClose() go func(partitionConsumer sarama.PartitionConsumer){ for msg := range pc.Messages(){ fmt.Printf("partition:%d Offset:%d Key:%s Value:%s",msg.Partition,msg.Offset,string(msg.Key),string(msg.Value)) } }(pc) } time.Sleep(time.Hour) consumer.Close() }
可是上面的代码并非最佳代码,由于咱们最后是经过time.sleep等待goroutine的执行,咱们能够更改成经过sync.WaitGroup方式实现
package main import ( "github.com/Shopify/sarama" "strings" "fmt" "sync" ) var ( wg sync.WaitGroup ) func main() { consumer,err := sarama.NewConsumer(strings.Split("192.168.0.118:9092",","),nil) if err != nil{ fmt.Println("failed to start consumer:",err) return } partitionList,err := consumer.Partitions("nginx_log") if err != nil { fmt.Println("Failed to get the list of partitions:",err) return } fmt.Println(partitionList) for partition := range partitionList{ pc,err := consumer.ConsumePartition("nginx_log",int32(partition),sarama.OffsetNewest) if err != nil { fmt.Printf("failed to start consumer for partition %d:%s\n",partition,err) return } defer pc.AsyncClose() go func(partitionConsumer sarama.PartitionConsumer){ wg.Add(1) for msg := range partitionConsumer.Messages(){ fmt.Printf("partition:%d Offset:%d Key:%s Value:%s",msg.Partition,msg.Offset,string(msg.Key),string(msg.Value)) } wg.Done() }(pc) } //time.Sleep(time.Hour) wg.Wait() consumer.Close() }
关于 etcd 处理的代码为:
package main import ( "github.com/coreos/etcd/clientv3" "time" "github.com/astaxie/beego/logs" "context" "fmt" ) var Client *clientv3.Client var logConfChan chan string // 初始化etcd func initEtcd(addr []string,keyfmt string,timeout time.Duration)(err error){ var keys []string for _,ip := range ipArrays{ //keyfmt = /logagent/%s/log_config keys = append(keys,fmt.Sprintf(keyfmt,ip)) } logConfChan = make(chan string,10) logs.Debug("etcd watch key:%v timeout:%v", keys, timeout) Client,err = clientv3.New(clientv3.Config{ Endpoints:addr, DialTimeout: timeout, }) if err != nil{ logs.Error("connect failed,err:%v",err) return } logs.Debug("init etcd success") waitGroup.Add(1) for _, key := range keys{ ctx,cancel := context.WithTimeout(context.Background(),2*time.Second) // 从etcd中获取要收集日志的信息 resp,err := Client.Get(ctx,key) cancel() if err != nil { logs.Warn("get key %s failed,err:%v",key,err) continue } for _, ev := range resp.Kvs{ logs.Debug("%q : %q\n", ev.Key, ev.Value) logConfChan <- string(ev.Value) } } go WatchEtcd(keys) return } func WatchEtcd(keys []string){ // 这里用于检测当须要收集的日志信息更改时及时更新 var watchChans []clientv3.WatchChan for _,key := range keys{ rch := Client.Watch(context.Background(),key) watchChans = append(watchChans,rch) } for { for _,watchC := range watchChans{ select{ case wresp := <-watchC: for _,ev:= range wresp.Events{ logs.Debug("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value) logConfChan <- string(ev.Kv.Value) } default: } } time.Sleep(time.Second) } waitGroup.Done() } func GetLogConf()chan string{ return logConfChan }
一样的这里增长对了限速的处理,毕竟日志收集程序不能影响了当前业务的性能,因此增长了limit.go用于限制速度:
package main import ( "time" "sync/atomic" "github.com/astaxie/beego/logs" ) type SecondLimit struct { unixSecond int64 curCount int32 limit int32 } func NewSecondLimit(limit int32) *SecondLimit { secLimit := &SecondLimit{ unixSecond:time.Now().Unix(), curCount:0, limit:limit, } return secLimit } func (s *SecondLimit) Add(count int) { sec := time.Now().Unix() if sec == s.unixSecond { atomic.AddInt32(&s.curCount,int32(count)) return } atomic.StoreInt64(&s.unixSecond,sec) atomic.StoreInt32(&s.curCount, int32(count)) } func (s *SecondLimit) Wait()bool { for { sec := time.Now().Unix() if (sec == atomic.LoadInt64(&s.unixSecond)) && s.curCount == s.limit { time.Sleep(time.Microsecond) logs.Debug("limit is running,limit:%d s.curCount:%d",s.limit,s.curCount) continue } if sec != atomic.LoadInt64(&s.unixSecond) { atomic.StoreInt64(&s.unixSecond,sec) atomic.StoreInt32(&s.curCount,0) } logs.Debug("limit is exited") return false } }
此次基本实现了日志收集的前半段的处理,后面将把日志扔到es中,并最终在页面上呈现
做者;coder
原文连接:https://www.cnblogs.com/zhaof/p/8910761.html