go微服务框架go-micro架构学习(源码分析)

产品嘴里的一个小项目,从立项到开发上线,随着时间和需求的不断激增,会愈来愈复杂,变成一个大项目,若是前期项目架构没设计的很差,代码会愈来愈臃肿,难以维护,后期的每次产品迭代上线都会牵一发而动全身。项目微服务化,松耦合模块间的关系,是一个很好的选择,随然增长了维护成本,可是仍是很值得的。 node

微服务化项目除了稳定性我我的还比较关心的几个问题:git

     一: 服务间数据传输的效率和安全性。github

     二: 服务的动态扩充,也就是服务的注册和发现,服务集群化。redis

     三: 微服务功能的可订制化,由于并非全部的功能都会很符合你的需求,不免须要根据本身的须要二次开发一些功能。算法

go-micro是go语言下的一个很好的rpc微服务框架,功能很完善,并且我关心的几个问题也解决的很好:json

     一:服务间传输格式为protobuf,效率上没的说,很是的快,也很安全。缓存

     二:go-micro的服务注册和发现是多种多样的。我我的比较喜欢etcdv3的服务服务发现和注册。安全

     三:主要的功能都有相应的接口,只要实现相应的接口,就能够根据本身的须要订制插件。架构

     业余时间把go-micro的源码系统地读了一遍,越读越感受这个框架写的好,从中也学到了不少东西。就想整理一系列的帖子,把学习go-micro的心得和你们分享。app

通讯流程

go-micro的通讯流程大至以下:

  Server监听客户端的调用,和Brocker推送过来的信息进行处理。而且Server端须要向Register注册本身的存在或消亡,这样Client才能知道本身的状态。

    Register服务的注册的发现。

    Client端从Register中获得Server的信息,而后每次调用都根据算法选择一个的Server进行通讯,固然通讯是要通过编码/解码,选择传输协议等一系列过程的。

    若是有须要通知全部的Server端可使用Brocker进行信息的推送。

    Brocker 信息队列进行信息的接收和发布。

 

     go-micro之因此能够高度订制和他的框架结构是分不开的,go-micro由8个关键的interface组成,每个interface均可以根据本身的需求从新实现,这8个主要的inteface也构成了go-micro的框架结构。

这些接口go-micir都有他本身默认的实现方式,还有一个go-plugins是对这些接口实现的可替换项。你也能够根据需求实现本身的插件。

    

经过 go-plugins 能够设置其余服务发现,如mdns, etcd,etcdv3,zookeeper,kubernetes.等等。

#部分代码
import "github.com/micro/go-plugins/registry/etcdv3"

// 我这里用的etcd 作为服务发现,若是使用consul能够去掉
    etcdv3.NewRegistry()
	//etcd.NewRegistry()
	//mdns.NewMDNSService()
	//zookeeper.NewRegistry()
	//kubernetes.NewRegistry()
	service := micro.NewService(micro.Name("greeter"),
		micro.Version("latest"),
		micro.Metadata(map[string]string{"type": "hello world"}))
	service.Init()

这篇帖子主要是给你们介绍go-micro的主体结构和这些接口的功能,具体细节之后的文章咱们再慢慢说。

Transort

服务之间通讯的接口。也就是服务发送和接收的最终实现方式,是由这些接口定制的。

源码:

// Package transport is an interface for synchronous communication
package transport

import (
	"time"
)

type Message struct {
	Header map[string]string
	Body   []byte
}

type Socket interface {
	Recv(*Message) error
	Send(*Message) error
	Close() error
	Local() string
	Remote() string
}

type Client interface {
	Socket
}

type Listener interface {
	Addr() string
	Close() error
	Accept(func(Socket)) error
}

// Transport is an interface which is used for communication between
// services. It uses socket send/recv semantics and had various
// implementations {HTTP, RabbitMQ, NATS, ...}
type Transport interface {
	Init(...Option) error
	Options() Options
	Dial(addr string, opts ...DialOption) (Client, error)
	Listen(addr string, opts ...ListenOption) (Listener, error)
	String() string
}

type Option func(*Options)

type DialOption func(*DialOptions)

type ListenOption func(*ListenOptions)

var (
	DefaultTransport Transport = newHTTPTransport()

	DefaultDialTimeout = time.Second * 5
)

func NewTransport(opts ...Option) Transport {
	return newHTTPTransport(opts...)
}

 Transport 的Listen方法是通常是Server端进行调用的,他监听一个端口,等待客户端调用。

    Transport 的Dial就是客户端进行链接服务的方法。他返回一个Client接口,这个接口返回一个Client接口,这个Client嵌入了Socket接口,这个接口的方法就是具体发送和接收通讯的信息。

    http传输是go-micro默认的同步通讯机制。固然还有不少其余的插件:grpc,nats,tcp,udp,rabbitmq,nats,都是目前已经实现了的方式。在go-plugins里你均可以找到。

Codec

 有了传输方式,下面要解决的就是传输编码和解码问题,go-micro有不少种编码解码方式,默认的实现方式是protobuf,固然也有其余的实现方式,json、protobuf、jsonrpc、mercury等等。

// Package codec is an interface for encoding messages
package codec

import (
	"io"
)

const (
	Error MessageType = iota
	Request
	Response
	Publication
)

type MessageType int

// Takes in a connection/buffer and returns a new Codec
type NewCodec func(io.ReadWriteCloser) Codec

// Codec encodes/decodes various types of messages used within go-micro.
// ReadHeader and ReadBody are called in pairs to read requests/responses
// from the connection. Close is called when finished with the
// connection. ReadBody may be called with a nil argument to force the
// body to be read and discarded.
type Codec interface {
	Reader
	Writer
	Close() error
	String() string
}

type Reader interface {
	ReadHeader(*Message, MessageType) error
	ReadBody(interface{}) error
}

type Writer interface {
	Write(*Message, interface{}) error
}

// Marshaler is a simple encoding interface used for the broker/transport
// where headers are not supported by the underlying implementation.
type Marshaler interface {
	Marshal(interface{}) ([]byte, error)
	Unmarshal([]byte, interface{}) error
	String() string
}

// Message represents detailed information about
// the communication, likely followed by the body.
// In the case of an error, body may be nil.
type Message struct {
	Id       string
	Type     MessageType
	Target   string
	Method   string
	Endpoint string
	Error    string

	// The values read from the socket
	Header map[string]string
	Body   []byte
}

 Codec接口的Write方法就是编码过程,两个Read是解码过程。

Registry

 服务的注册和发现,目前实现的consul,mdns, etcd,etcdv3,zookeeper,kubernetes.等等,

// Package registry is an interface for service discovery
package registry

import (
	"errors"
)

// The registry provides an interface for service discovery
// and an abstraction over varying implementations
// {consul, etcd, zookeeper, ...}
type Registry interface {
	Init(...Option) error
	Options() Options
	Register(*Service, ...RegisterOption) error
	Deregister(*Service) error
	GetService(string) ([]*Service, error)
	ListServices() ([]*Service, error)
	Watch(...WatchOption) (Watcher, error)
	String() string
}

type Option func(*Options)

type RegisterOption func(*RegisterOptions)

type WatchOption func(*WatchOptions)

var (
	DefaultRegistry = NewRegistry()

	// Not found error when GetService is called
	ErrNotFound = errors.New("not found")
	// Watcher stopped error when watcher is stopped
	ErrWatcherStopped = errors.New("watcher stopped")
)

// Register a service node. Additionally supply options such as TTL.
func Register(s *Service, opts ...RegisterOption) error {
	return DefaultRegistry.Register(s, opts...)
}

// Deregister a service node
func Deregister(s *Service) error {
	return DefaultRegistry.Deregister(s)
}

// Retrieve a service. A slice is returned since we separate Name/Version.
func GetService(name string) ([]*Service, error) {
	return DefaultRegistry.GetService(name)
}

// List the services. Only returns service names
func ListServices() ([]*Service, error) {
	return DefaultRegistry.ListServices()
}

// Watch returns a watcher which allows you to track updates to the registry.
func Watch(opts ...WatchOption) (Watcher, error) {
	return DefaultRegistry.Watch(opts...)
}

func String() string {
	return DefaultRegistry.String()
}

 简单来讲,就是Service 进行Register,来进行注册,Client 使用watch方法进行监控,当有服务加入或者删除时这个方法会被触发,以提醒客户端更新Service信息。

Selector

  以Registry为基础,Selector 是客户端级别的负载均衡,当有客户端向服务发送请求时, selector根据不一样的算法从Registery中的主机列表,获得可用的Service节点,进行通讯。目前实现的有循环算法和随机算法,默认的是随机算法。

// Package selector is a way to pick a list of service nodes
package selector

import (
	"errors"
	"github.com/micro/go-micro/registry"
)

// Selector builds on the registry as a mechanism to pick nodes
// and mark their status. This allows host pools and other things
// to be built using various algorithms.
type Selector interface {
	Init(opts ...Option) error
	Options() Options
	// Select returns a function which should return the next node
	Select(service string, opts ...SelectOption) (Next, error)
	// Mark sets the success/error against a node
	Mark(service string, node *registry.Node, err error)
	// Reset returns state back to zero for a service
	Reset(service string)
	// Close renders the selector unusable
	Close() error
	// Name of the selector
	String() string
}

// Next is a function that returns the next node
// based on the selector's strategy
type Next func() (*registry.Node, error)

// Filter is used to filter a service during the selection process
type Filter func([]*registry.Service) []*registry.Service

// Strategy is a selection strategy e.g random, round robin
type Strategy func([]*registry.Service) Next

var (
	DefaultSelector = NewSelector()

	ErrNotFound      = errors.New("not found")
	ErrNoneAvailable = errors.New("none available")
)

 默认的是实现是本地缓存,当前实现的有blacklist,label,named等方式。

 Broker

Broker是消息发布和订阅的接口。很简单的一个例子,由于服务的节点是不固定的,若是有须要修改全部服务行为的需求,可使服务订阅某个主题,当有信息发布时,全部的监听服务都会收到信息,根据你的须要作相应的行为。

// Package broker is an interface used for asynchronous messaging
package broker

// Broker is an interface used for asynchronous messaging.
type Broker interface {
	Options() Options
	Address() string
	Connect() error
	Disconnect() error
	Init(...Option) error
	Publish(string, *Message, ...PublishOption) error
	Subscribe(string, Handler, ...SubscribeOption) (Subscriber, error)
	String() string
}

// Handler is used to process messages via a subscription of a topic.
// The handler is passed a publication interface which contains the
// message and optional Ack method to acknowledge receipt of the message.
type Handler func(Publication) error

type Message struct {
	Header map[string]string
	Body   []byte
}

// Publication is given to a subscription handler for processing
type Publication interface {
	Topic() string
	Message() *Message
	Ack() error
}

// Subscriber is a convenience return type for the Subscribe method
type Subscriber interface {
	Options() SubscribeOptions
	Topic() string
	Unsubscribe() error
}

var (
	DefaultBroker Broker = newHttpBroker()
)

func NewBroker(opts ...Option) Broker {
	return newHttpBroker(opts...)
}

func Init(opts ...Option) error {
	return DefaultBroker.Init(opts...)
}

func Connect() error {
	return DefaultBroker.Connect()
}

func Disconnect() error {
	return DefaultBroker.Disconnect()
}

func Publish(topic string, msg *Message, opts ...PublishOption) error {
	return DefaultBroker.Publish(topic, msg, opts...)
}

func Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
	return DefaultBroker.Subscribe(topic, handler, opts...)
}

func String() string {
	return DefaultBroker.String()
}

 Broker默认的实现方式是http方式,可是这种方式不要在生产环境用。go-plugins里有不少成熟的消息队列实现方式,有kafka、nsq、rabbitmq、redis,等等。

 Client

Client是请求服务的接口,他封装Transport和Codec进行rpc调用,也封装了Brocker进行信息的发布。

// Package client is an interface for an RPC client
package client

import (
	"context"
	"time"

	"github.com/micro/go-micro/codec"
)

// Client is the interface used to make requests to services.
// It supports Request/Response via Transport and Publishing via the Broker.
// It also supports bidiectional streaming of requests.
type Client interface {
	Init(...Option) error
	Options() Options
	NewMessage(topic string, msg interface{}, opts ...MessageOption) Message
	NewRequest(service, endpoint string, req interface{}, reqOpts ...RequestOption) Request
	Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error
	Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error)
	Publish(ctx context.Context, msg Message, opts ...PublishOption) error
	String() string
}

// Router manages request routing
type Router interface {
	SendRequest(context.Context, Request) (Response, error)
}

// Message is the interface for publishing asynchronously
type Message interface {
	Topic() string
	Payload() interface{}
	ContentType() string
}

// Request is the interface for a synchronous request used by Call or Stream
type Request interface {
	// The service to call
	Service() string
	// The action to take
	Method() string
	// The endpoint to invoke
	Endpoint() string
	// The content type
	ContentType() string
	// The unencoded request body
	Body() interface{}
	// Write to the encoded request writer. This is nil before a call is made
	Codec() codec.Writer
	// indicates whether the request will be a streaming one rather than unary
	Stream() bool
}

// Response is the response received from a service
type Response interface {
	// Read the response
	Codec() codec.Reader
	// read the header
	Header() map[string]string
	// Read the undecoded response
	Read() ([]byte, error)
}

// Stream is the inteface for a bidirectional synchronous stream
type Stream interface {
	// Context for the stream
	Context() context.Context
	// The request made
	Request() Request
	// The response read
	Response() Response
	// Send will encode and send a request
	Send(interface{}) error
	// Recv will decode and read a response
	Recv(interface{}) error
	// Error returns the stream error
	Error() error
	// Close closes the stream
	Close() error
}

// Option used by the Client
type Option func(*Options)

// CallOption used by Call or Stream
type CallOption func(*CallOptions)

// PublishOption used by Publish
type PublishOption func(*PublishOptions)

// MessageOption used by NewMessage
type MessageOption func(*MessageOptions)

// RequestOption used by NewRequest
type RequestOption func(*RequestOptions)

var (
	// DefaultClient is a default client to use out of the box
	DefaultClient Client = newRpcClient()
	// DefaultBackoff is the default backoff function for retries
	DefaultBackoff = exponentialBackoff
	// DefaultRetry is the default check-for-retry function for retries
	DefaultRetry = RetryOnError
	// DefaultRetries is the default number of times a request is tried
	DefaultRetries = 1
	// DefaultRequestTimeout is the default request timeout
	DefaultRequestTimeout = time.Second * 5
	// DefaultPoolSize sets the connection pool size
	DefaultPoolSize = 100
	// DefaultPoolTTL sets the connection pool ttl
	DefaultPoolTTL = time.Minute
)

// Makes a synchronous call to a service using the default client
func Call(ctx context.Context, request Request, response interface{}, opts ...CallOption) error {
	return DefaultClient.Call(ctx, request, response, opts...)
}

// Publishes a publication using the default client. Using the underlying broker
// set within the options.
func Publish(ctx context.Context, msg Message, opts ...PublishOption) error {
	return DefaultClient.Publish(ctx, msg, opts...)
}

// Creates a new message using the default client
func NewMessage(topic string, payload interface{}, opts ...MessageOption) Message {
	return DefaultClient.NewMessage(topic, payload, opts...)
}

// Creates a new client with the options passed in
func NewClient(opt ...Option) Client {
	return newRpcClient(opt...)
}

// Creates a new request using the default client. Content Type will
// be set to the default within options and use the appropriate codec
func NewRequest(service, endpoint string, request interface{}, reqOpts ...RequestOption) Request {
	return DefaultClient.NewRequest(service, endpoint, request, reqOpts...)
}

// Creates a streaming connection with a service and returns responses on the
// channel passed in. It's up to the user to close the streamer.
func NewStream(ctx context.Context, request Request, opts ...CallOption) (Stream, error) {
	return DefaultClient.Stream(ctx, request, opts...)
}

func String() string {
	return DefaultClient.String()
}

 固然他也支持双工通讯 Stream 这些具体的实现方式和使用方式,之后会详细解说。

     默认的是rpc实现方式,他还有grpc和http方式,在go-plugins里能够找到

Server

Server看名字你们也知道是作什么的了。监听等待rpc请求。监听broker的订阅信息,等待信息队列的推送等。

// Package server is an interface for a micro server
package server

import (
	"context"
	"os"
	"os/signal"
	"syscall"

	"github.com/google/uuid"
	"github.com/micro/go-log"
	"github.com/micro/go-micro/codec"
	"github.com/micro/go-micro/registry"
)

// Server is a simple micro server abstraction
type Server interface {
	Options() Options
	Init(...Option) error
	Handle(Handler) error
	NewHandler(interface{}, ...HandlerOption) Handler
	NewSubscriber(string, interface{}, ...SubscriberOption) Subscriber
	Subscribe(Subscriber) error
	Start() error
	Stop() error
	String() string
}

// Router handle serving messages
type Router interface {
	// ServeRequest processes a request to completion
	ServeRequest(context.Context, Request, Response) error
}

// Message is an async message interface
type Message interface {
	Topic() string
	Payload() interface{}
	ContentType() string
}

// Request is a synchronous request interface
type Request interface {
	// Service name requested
	Service() string
	// The action requested
	Method() string
	// Endpoint name requested
	Endpoint() string
	// Content type provided
	ContentType() string
	// Header of the request
	Header() map[string]string
	// Body is the initial decoded value
	Body() interface{}
	// Read the undecoded request body
	Read() ([]byte, error)
	// The encoded message stream
	Codec() codec.Reader
	// Indicates whether its a stream
	Stream() bool
}

// Response is the response writer for unencoded messages
type Response interface {
	// Encoded writer
	Codec() codec.Writer
	// Write the header
	WriteHeader(map[string]string)
	// write a response directly to the client
	Write([]byte) error
}

// Stream represents a stream established with a client.
// A stream can be bidirectional which is indicated by the request.
// The last error will be left in Error().
// EOF indicates end of the stream.
type Stream interface {
	Context() context.Context
	Request() Request
	Send(interface{}) error
	Recv(interface{}) error
	Error() error
	Close() error
}

// Handler interface represents a request handler. It's generated
// by passing any type of public concrete object with endpoints into server.NewHandler.
// Most will pass in a struct.
//
// Example:
//
//      type Greeter struct {}
//
//      func (g *Greeter) Hello(context, request, response) error {
//              return nil
//      }
//
type Handler interface {
	Name() string
	Handler() interface{}
	Endpoints() []*registry.Endpoint
	Options() HandlerOptions
}

// Subscriber interface represents a subscription to a given topic using
// a specific subscriber function or object with endpoints.
type Subscriber interface {
	Topic() string
	Subscriber() interface{}
	Endpoints() []*registry.Endpoint
	Options() SubscriberOptions
}

type Option func(*Options)

var (
	DefaultAddress        = ":0"
	DefaultName           = "server"
	DefaultVersion        = "latest"
	DefaultId             = uuid.New().String()
	DefaultServer  Server = newRpcServer()
	DefaultRouter         = newRpcRouter()
)

// DefaultOptions returns config options for the default service
func DefaultOptions() Options {
	return DefaultServer.Options()
}

// Init initialises the default server with options passed in
func Init(opt ...Option) {
	if DefaultServer == nil {
		DefaultServer = newRpcServer(opt...)
	}
	DefaultServer.Init(opt...)
}

// NewServer returns a new server with options passed in
func NewServer(opt ...Option) Server {
	return newRpcServer(opt...)
}

// NewSubscriber creates a new subscriber interface with the given topic
// and handler using the default server
func NewSubscriber(topic string, h interface{}, opts ...SubscriberOption) Subscriber {
	return DefaultServer.NewSubscriber(topic, h, opts...)
}

// NewHandler creates a new handler interface using the default server
// Handlers are required to be a public object with public
// endpoints. Call to a service endpoint such as Foo.Bar expects
// the type:
//
//	type Foo struct {}
//	func (f *Foo) Bar(ctx, req, rsp) error {
//		return nil
//	}
//
func NewHandler(h interface{}, opts ...HandlerOption) Handler {
	return DefaultServer.NewHandler(h, opts...)
}

// Handle registers a handler interface with the default server to
// handle inbound requests
func Handle(h Handler) error {
	return DefaultServer.Handle(h)
}

// Subscribe registers a subscriber interface with the default server
// which subscribes to specified topic with the broker
func Subscribe(s Subscriber) error {
	return DefaultServer.Subscribe(s)
}

// Run starts the default server and waits for a kill
// signal before exiting. Also registers/deregisters the server
func Run() error {
	if err := Start(); err != nil {
		return err
	}

	ch := make(chan os.Signal, 1)
	signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL)
	log.Logf("Received signal %s", <-ch)

	return Stop()
}

// Start starts the default server
func Start() error {
	config := DefaultServer.Options()
	log.Logf("Starting server %s id %s", config.Name, config.Id)
	return DefaultServer.Start()
}

// Stop stops the default server
func Stop() error {
	log.Logf("Stopping server")
	return DefaultServer.Stop()
}

// String returns name of Server implementation
func String() string {
	return DefaultServer.String()
}

参考连接:https://yq.aliyun.com/articles/633797