产品嘴里的一个小项目,从立项到开发上线,随着时间和需求的不断激增,会愈来愈复杂,变成一个大项目,若是前期项目架构没设计的很差,代码会愈来愈臃肿,难以维护,后期的每次产品迭代上线都会牵一发而动全身。项目微服务化,松耦合模块间的关系,是一个很好的选择,随然增长了维护成本,可是仍是很值得的。 node
微服务化项目除了稳定性我我的还比较关心的几个问题:git
一: 服务间数据传输的效率和安全性。github
二: 服务的动态扩充,也就是服务的注册和发现,服务集群化。redis
三: 微服务功能的可订制化,由于并非全部的功能都会很符合你的需求,不免须要根据本身的须要二次开发一些功能。算法
go-micro是go语言下的一个很好的rpc微服务框架,功能很完善,并且我关心的几个问题也解决的很好:json
一:服务间传输格式为protobuf,效率上没的说,很是的快,也很安全。缓存
二:go-micro的服务注册和发现是多种多样的。我我的比较喜欢etcdv3的服务服务发现和注册。安全
三:主要的功能都有相应的接口,只要实现相应的接口,就能够根据本身的须要订制插件。架构
业余时间把go-micro的源码系统地读了一遍,越读越感受这个框架写的好,从中也学到了不少东西。就想整理一系列的帖子,把学习go-micro的心得和你们分享。app
go-micro的通讯流程大至以下:
Server监听客户端的调用,和Brocker推送过来的信息进行处理。而且Server端须要向Register注册本身的存在或消亡,这样Client才能知道本身的状态。
Register服务的注册的发现。
Client端从Register中获得Server的信息,而后每次调用都根据算法选择一个的Server进行通讯,固然通讯是要通过编码/解码,选择传输协议等一系列过程的。
若是有须要通知全部的Server端可使用Brocker进行信息的推送。
Brocker 信息队列进行信息的接收和发布。
go-micro之因此能够高度订制和他的框架结构是分不开的,go-micro由8个关键的interface组成,每个interface均可以根据本身的需求从新实现,这8个主要的inteface也构成了go-micro的框架结构。
这些接口go-micir都有他本身默认的实现方式,还有一个go-plugins是对这些接口实现的可替换项。你也能够根据需求实现本身的插件。
经过 go-plugins 能够设置其余服务发现,如mdns, etcd,etcdv3,zookeeper,kubernetes.等等。
#部分代码 import "github.com/micro/go-plugins/registry/etcdv3" // 我这里用的etcd 作为服务发现,若是使用consul能够去掉 etcdv3.NewRegistry() //etcd.NewRegistry() //mdns.NewMDNSService() //zookeeper.NewRegistry() //kubernetes.NewRegistry() service := micro.NewService(micro.Name("greeter"), micro.Version("latest"), micro.Metadata(map[string]string{"type": "hello world"})) service.Init()
这篇帖子主要是给你们介绍go-micro的主体结构和这些接口的功能,具体细节之后的文章咱们再慢慢说。
服务之间通讯的接口。也就是服务发送和接收的最终实现方式,是由这些接口定制的。
源码:
// Package transport is an interface for synchronous communication package transport import ( "time" ) type Message struct { Header map[string]string Body []byte } type Socket interface { Recv(*Message) error Send(*Message) error Close() error Local() string Remote() string } type Client interface { Socket } type Listener interface { Addr() string Close() error Accept(func(Socket)) error } // Transport is an interface which is used for communication between // services. It uses socket send/recv semantics and had various // implementations {HTTP, RabbitMQ, NATS, ...} type Transport interface { Init(...Option) error Options() Options Dial(addr string, opts ...DialOption) (Client, error) Listen(addr string, opts ...ListenOption) (Listener, error) String() string } type Option func(*Options) type DialOption func(*DialOptions) type ListenOption func(*ListenOptions) var ( DefaultTransport Transport = newHTTPTransport() DefaultDialTimeout = time.Second * 5 ) func NewTransport(opts ...Option) Transport { return newHTTPTransport(opts...) }
Transport 的Listen方法是通常是Server端进行调用的,他监听一个端口,等待客户端调用。
Transport 的Dial就是客户端进行链接服务的方法。他返回一个Client接口,这个接口返回一个Client接口,这个Client嵌入了Socket接口,这个接口的方法就是具体发送和接收通讯的信息。
http传输是go-micro默认的同步通讯机制。固然还有不少其余的插件:grpc,nats,tcp,udp,rabbitmq,nats,都是目前已经实现了的方式。在go-plugins里你均可以找到。
有了传输方式,下面要解决的就是传输编码和解码问题,go-micro有不少种编码解码方式,默认的实现方式是protobuf,固然也有其余的实现方式,json、protobuf、jsonrpc、mercury等等。
// Package codec is an interface for encoding messages package codec import ( "io" ) const ( Error MessageType = iota Request Response Publication ) type MessageType int // Takes in a connection/buffer and returns a new Codec type NewCodec func(io.ReadWriteCloser) Codec // Codec encodes/decodes various types of messages used within go-micro. // ReadHeader and ReadBody are called in pairs to read requests/responses // from the connection. Close is called when finished with the // connection. ReadBody may be called with a nil argument to force the // body to be read and discarded. type Codec interface { Reader Writer Close() error String() string } type Reader interface { ReadHeader(*Message, MessageType) error ReadBody(interface{}) error } type Writer interface { Write(*Message, interface{}) error } // Marshaler is a simple encoding interface used for the broker/transport // where headers are not supported by the underlying implementation. type Marshaler interface { Marshal(interface{}) ([]byte, error) Unmarshal([]byte, interface{}) error String() string } // Message represents detailed information about // the communication, likely followed by the body. // In the case of an error, body may be nil. type Message struct { Id string Type MessageType Target string Method string Endpoint string Error string // The values read from the socket Header map[string]string Body []byte }
Codec接口的Write方法就是编码过程,两个Read是解码过程。
服务的注册和发现,目前实现的consul,mdns, etcd,etcdv3,zookeeper,kubernetes.等等,
// Package registry is an interface for service discovery package registry import ( "errors" ) // The registry provides an interface for service discovery // and an abstraction over varying implementations // {consul, etcd, zookeeper, ...} type Registry interface { Init(...Option) error Options() Options Register(*Service, ...RegisterOption) error Deregister(*Service) error GetService(string) ([]*Service, error) ListServices() ([]*Service, error) Watch(...WatchOption) (Watcher, error) String() string } type Option func(*Options) type RegisterOption func(*RegisterOptions) type WatchOption func(*WatchOptions) var ( DefaultRegistry = NewRegistry() // Not found error when GetService is called ErrNotFound = errors.New("not found") // Watcher stopped error when watcher is stopped ErrWatcherStopped = errors.New("watcher stopped") ) // Register a service node. Additionally supply options such as TTL. func Register(s *Service, opts ...RegisterOption) error { return DefaultRegistry.Register(s, opts...) } // Deregister a service node func Deregister(s *Service) error { return DefaultRegistry.Deregister(s) } // Retrieve a service. A slice is returned since we separate Name/Version. func GetService(name string) ([]*Service, error) { return DefaultRegistry.GetService(name) } // List the services. Only returns service names func ListServices() ([]*Service, error) { return DefaultRegistry.ListServices() } // Watch returns a watcher which allows you to track updates to the registry. func Watch(opts ...WatchOption) (Watcher, error) { return DefaultRegistry.Watch(opts...) } func String() string { return DefaultRegistry.String() }
简单来讲,就是Service 进行Register,来进行注册,Client 使用watch方法进行监控,当有服务加入或者删除时这个方法会被触发,以提醒客户端更新Service信息。
以Registry为基础,Selector 是客户端级别的负载均衡,当有客户端向服务发送请求时, selector根据不一样的算法从Registery中的主机列表,获得可用的Service节点,进行通讯。目前实现的有循环算法和随机算法,默认的是随机算法。
// Package selector is a way to pick a list of service nodes package selector import ( "errors" "github.com/micro/go-micro/registry" ) // Selector builds on the registry as a mechanism to pick nodes // and mark their status. This allows host pools and other things // to be built using various algorithms. type Selector interface { Init(opts ...Option) error Options() Options // Select returns a function which should return the next node Select(service string, opts ...SelectOption) (Next, error) // Mark sets the success/error against a node Mark(service string, node *registry.Node, err error) // Reset returns state back to zero for a service Reset(service string) // Close renders the selector unusable Close() error // Name of the selector String() string } // Next is a function that returns the next node // based on the selector's strategy type Next func() (*registry.Node, error) // Filter is used to filter a service during the selection process type Filter func([]*registry.Service) []*registry.Service // Strategy is a selection strategy e.g random, round robin type Strategy func([]*registry.Service) Next var ( DefaultSelector = NewSelector() ErrNotFound = errors.New("not found") ErrNoneAvailable = errors.New("none available") )
默认的是实现是本地缓存,当前实现的有blacklist,label,named等方式。
Broker是消息发布和订阅的接口。很简单的一个例子,由于服务的节点是不固定的,若是有须要修改全部服务行为的需求,可使服务订阅某个主题,当有信息发布时,全部的监听服务都会收到信息,根据你的须要作相应的行为。
// Package broker is an interface used for asynchronous messaging package broker // Broker is an interface used for asynchronous messaging. type Broker interface { Options() Options Address() string Connect() error Disconnect() error Init(...Option) error Publish(string, *Message, ...PublishOption) error Subscribe(string, Handler, ...SubscribeOption) (Subscriber, error) String() string } // Handler is used to process messages via a subscription of a topic. // The handler is passed a publication interface which contains the // message and optional Ack method to acknowledge receipt of the message. type Handler func(Publication) error type Message struct { Header map[string]string Body []byte } // Publication is given to a subscription handler for processing type Publication interface { Topic() string Message() *Message Ack() error } // Subscriber is a convenience return type for the Subscribe method type Subscriber interface { Options() SubscribeOptions Topic() string Unsubscribe() error } var ( DefaultBroker Broker = newHttpBroker() ) func NewBroker(opts ...Option) Broker { return newHttpBroker(opts...) } func Init(opts ...Option) error { return DefaultBroker.Init(opts...) } func Connect() error { return DefaultBroker.Connect() } func Disconnect() error { return DefaultBroker.Disconnect() } func Publish(topic string, msg *Message, opts ...PublishOption) error { return DefaultBroker.Publish(topic, msg, opts...) } func Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) { return DefaultBroker.Subscribe(topic, handler, opts...) } func String() string { return DefaultBroker.String() }
Broker默认的实现方式是http方式,可是这种方式不要在生产环境用。go-plugins里有不少成熟的消息队列实现方式,有kafka、nsq、rabbitmq、redis,等等。
Client是请求服务的接口,他封装Transport和Codec进行rpc调用,也封装了Brocker进行信息的发布。
// Package client is an interface for an RPC client package client import ( "context" "time" "github.com/micro/go-micro/codec" ) // Client is the interface used to make requests to services. // It supports Request/Response via Transport and Publishing via the Broker. // It also supports bidiectional streaming of requests. type Client interface { Init(...Option) error Options() Options NewMessage(topic string, msg interface{}, opts ...MessageOption) Message NewRequest(service, endpoint string, req interface{}, reqOpts ...RequestOption) Request Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error) Publish(ctx context.Context, msg Message, opts ...PublishOption) error String() string } // Router manages request routing type Router interface { SendRequest(context.Context, Request) (Response, error) } // Message is the interface for publishing asynchronously type Message interface { Topic() string Payload() interface{} ContentType() string } // Request is the interface for a synchronous request used by Call or Stream type Request interface { // The service to call Service() string // The action to take Method() string // The endpoint to invoke Endpoint() string // The content type ContentType() string // The unencoded request body Body() interface{} // Write to the encoded request writer. This is nil before a call is made Codec() codec.Writer // indicates whether the request will be a streaming one rather than unary Stream() bool } // Response is the response received from a service type Response interface { // Read the response Codec() codec.Reader // read the header Header() map[string]string // Read the undecoded response Read() ([]byte, error) } // Stream is the inteface for a bidirectional synchronous stream type Stream interface { // Context for the stream Context() context.Context // The request made Request() Request // The response read Response() Response // Send will encode and send a request Send(interface{}) error // Recv will decode and read a response Recv(interface{}) error // Error returns the stream error Error() error // Close closes the stream Close() error } // Option used by the Client type Option func(*Options) // CallOption used by Call or Stream type CallOption func(*CallOptions) // PublishOption used by Publish type PublishOption func(*PublishOptions) // MessageOption used by NewMessage type MessageOption func(*MessageOptions) // RequestOption used by NewRequest type RequestOption func(*RequestOptions) var ( // DefaultClient is a default client to use out of the box DefaultClient Client = newRpcClient() // DefaultBackoff is the default backoff function for retries DefaultBackoff = exponentialBackoff // DefaultRetry is the default check-for-retry function for retries DefaultRetry = RetryOnError // DefaultRetries is the default number of times a request is tried DefaultRetries = 1 // DefaultRequestTimeout is the default request timeout DefaultRequestTimeout = time.Second * 5 // DefaultPoolSize sets the connection pool size DefaultPoolSize = 100 // DefaultPoolTTL sets the connection pool ttl DefaultPoolTTL = time.Minute ) // Makes a synchronous call to a service using the default client func Call(ctx context.Context, request Request, response interface{}, opts ...CallOption) error { return DefaultClient.Call(ctx, request, response, opts...) } // Publishes a publication using the default client. Using the underlying broker // set within the options. func Publish(ctx context.Context, msg Message, opts ...PublishOption) error { return DefaultClient.Publish(ctx, msg, opts...) } // Creates a new message using the default client func NewMessage(topic string, payload interface{}, opts ...MessageOption) Message { return DefaultClient.NewMessage(topic, payload, opts...) } // Creates a new client with the options passed in func NewClient(opt ...Option) Client { return newRpcClient(opt...) } // Creates a new request using the default client. Content Type will // be set to the default within options and use the appropriate codec func NewRequest(service, endpoint string, request interface{}, reqOpts ...RequestOption) Request { return DefaultClient.NewRequest(service, endpoint, request, reqOpts...) } // Creates a streaming connection with a service and returns responses on the // channel passed in. It's up to the user to close the streamer. func NewStream(ctx context.Context, request Request, opts ...CallOption) (Stream, error) { return DefaultClient.Stream(ctx, request, opts...) } func String() string { return DefaultClient.String() }
固然他也支持双工通讯 Stream 这些具体的实现方式和使用方式,之后会详细解说。
默认的是rpc实现方式,他还有grpc和http方式,在go-plugins里能够找到
Server看名字你们也知道是作什么的了。监听等待rpc请求。监听broker的订阅信息,等待信息队列的推送等。
// Package server is an interface for a micro server package server import ( "context" "os" "os/signal" "syscall" "github.com/google/uuid" "github.com/micro/go-log" "github.com/micro/go-micro/codec" "github.com/micro/go-micro/registry" ) // Server is a simple micro server abstraction type Server interface { Options() Options Init(...Option) error Handle(Handler) error NewHandler(interface{}, ...HandlerOption) Handler NewSubscriber(string, interface{}, ...SubscriberOption) Subscriber Subscribe(Subscriber) error Start() error Stop() error String() string } // Router handle serving messages type Router interface { // ServeRequest processes a request to completion ServeRequest(context.Context, Request, Response) error } // Message is an async message interface type Message interface { Topic() string Payload() interface{} ContentType() string } // Request is a synchronous request interface type Request interface { // Service name requested Service() string // The action requested Method() string // Endpoint name requested Endpoint() string // Content type provided ContentType() string // Header of the request Header() map[string]string // Body is the initial decoded value Body() interface{} // Read the undecoded request body Read() ([]byte, error) // The encoded message stream Codec() codec.Reader // Indicates whether its a stream Stream() bool } // Response is the response writer for unencoded messages type Response interface { // Encoded writer Codec() codec.Writer // Write the header WriteHeader(map[string]string) // write a response directly to the client Write([]byte) error } // Stream represents a stream established with a client. // A stream can be bidirectional which is indicated by the request. // The last error will be left in Error(). // EOF indicates end of the stream. type Stream interface { Context() context.Context Request() Request Send(interface{}) error Recv(interface{}) error Error() error Close() error } // Handler interface represents a request handler. It's generated // by passing any type of public concrete object with endpoints into server.NewHandler. // Most will pass in a struct. // // Example: // // type Greeter struct {} // // func (g *Greeter) Hello(context, request, response) error { // return nil // } // type Handler interface { Name() string Handler() interface{} Endpoints() []*registry.Endpoint Options() HandlerOptions } // Subscriber interface represents a subscription to a given topic using // a specific subscriber function or object with endpoints. type Subscriber interface { Topic() string Subscriber() interface{} Endpoints() []*registry.Endpoint Options() SubscriberOptions } type Option func(*Options) var ( DefaultAddress = ":0" DefaultName = "server" DefaultVersion = "latest" DefaultId = uuid.New().String() DefaultServer Server = newRpcServer() DefaultRouter = newRpcRouter() ) // DefaultOptions returns config options for the default service func DefaultOptions() Options { return DefaultServer.Options() } // Init initialises the default server with options passed in func Init(opt ...Option) { if DefaultServer == nil { DefaultServer = newRpcServer(opt...) } DefaultServer.Init(opt...) } // NewServer returns a new server with options passed in func NewServer(opt ...Option) Server { return newRpcServer(opt...) } // NewSubscriber creates a new subscriber interface with the given topic // and handler using the default server func NewSubscriber(topic string, h interface{}, opts ...SubscriberOption) Subscriber { return DefaultServer.NewSubscriber(topic, h, opts...) } // NewHandler creates a new handler interface using the default server // Handlers are required to be a public object with public // endpoints. Call to a service endpoint such as Foo.Bar expects // the type: // // type Foo struct {} // func (f *Foo) Bar(ctx, req, rsp) error { // return nil // } // func NewHandler(h interface{}, opts ...HandlerOption) Handler { return DefaultServer.NewHandler(h, opts...) } // Handle registers a handler interface with the default server to // handle inbound requests func Handle(h Handler) error { return DefaultServer.Handle(h) } // Subscribe registers a subscriber interface with the default server // which subscribes to specified topic with the broker func Subscribe(s Subscriber) error { return DefaultServer.Subscribe(s) } // Run starts the default server and waits for a kill // signal before exiting. Also registers/deregisters the server func Run() error { if err := Start(); err != nil { return err } ch := make(chan os.Signal, 1) signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL) log.Logf("Received signal %s", <-ch) return Stop() } // Start starts the default server func Start() error { config := DefaultServer.Options() log.Logf("Starting server %s id %s", config.Name, config.Id) return DefaultServer.Start() } // Stop stops the default server func Stop() error { log.Logf("Stopping server") return DefaultServer.Stop() } // String returns name of Server implementation func String() string { return DefaultServer.String() }