对于go这样一门新生语言来讲,生态链还处于发展阶段,微服务框架也是如此,下面将基于grpc-go版本搭建一个微服务通信框架.git
服务注册与发布主要解决的服务依赖问题,一般意义上,若是A服务调用B服务时,最直接的作法是配置IP地址和端口.但随着服务依赖变多时,配置将会十分庞杂,且当服务发生迁移时,那么全部相关服务的配置均须要修改,这将十分难以维护以及容易出现问题. 所以为了解决这种服务依赖关系,服务注册与发布应运而生.github
所以,服务注册与发布能够概况为,服务将信息上报,客户端拉取服务信息,经过服务名进行调用,当服务宕机时客户端踢掉故障服务,服务新上线时客户端自动添加到调用列表.json
grpc-go的整个实现大量使用go的接口特性,所以经过扩展接口,能够很容易的实现服务的注册与发现,这里服务注册中心考虑到可用性以及一致性,通常采用etcd或zookeeper来实现,这里实现etcd的版本.
完整代码以及使用示例见:grpc-wrapperapp
具体须要实现几个接口,针对客户端,最简单的实现方式只须要实现两个接口方法Resolve(),以及Next(),而后使用轮询的负载均衡方式.
主要经过etcd的Get接口以及Watch接口实现.负载均衡
//用于生成Watcher,监听注册中心中的服务信息变化
func (er *etcdRegistry) Resolve(target string) (naming.Watcher, error) {
ctx, cancel := context.WithTimeout(context.TODO(), resolverTimeOut)
w := &etcdWatcher{
cli: er.cli,
target: target + "/",
ctx: ctx,
cancel: cancel,
}
return w, nil
}
复制代码
//Next接口主要用于获取注册的服务信息,经过channel以及watch,当服务信息发生
//变化时,Next接口会将变化返回给grpc框架从而实现服务信息变动.
func (ew *etcdWatcher) Next() ([]*naming.Update, error) {
var updates []*naming.Update
//初次获取时,建立监听channel,并返回获取到的服务信息
if ew.watchChan == nil {
//create new chan
resp, err := ew.cli.Get(ew.ctx, ew.target, etcd.WithPrefix(), etcd.WithSerializable())
if err != nil {
return nil, err
}
for _, kv := range resp.Kvs {
var upt naming.Update
if err := json.Unmarshal(kv.Value, &upt); err != nil {
continue
}
updates = append(updates, &upt)
}
//建立etcd的watcher监听target(服务名)的信息.
opts := []etcd.OpOption{etcd.WithRev(resp.Header.Revision + 1), etcd.WithPrefix(), etcd.WithPrevKV()}
ew.watchChan = ew.cli.Watch(context.TODO(), ew.target, opts...)
return updates, nil
}
//阻塞监听,服务发生变化时才返回给上层
wrsp, ok := <-ew.watchChan
if !ok {
err := status.Error(codes.Unavailable, "etcd watch closed")
return nil, err
}
if wrsp.Err() != nil {
return nil, wrsp.Err()
}
for _, e := range wrsp.Events {
var upt naming.Update
var err error
switch e.Type {
case etcd.EventTypePut:
err = json.Unmarshal(e.Kv.Value, &upt)
upt.Op = naming.Add
case etcd.EventTypeDelete:
err = json.Unmarshal(e.PrevKv.Value, &upt)
upt.Op = naming.Delete
}
if err != nil {
continue
}
updates = append(updates, &upt)
}
return updates, nil
}
复制代码
服务端只须要上报服务信息,并定时保持心跳,这里经过etcd的Put接口以及KeepAlive接口实现. 具体以下:框架
func (er *etcdRegistry) Register(ctx context.Context, target string, update naming.Update, opts ...wrapper.RegistryOptions) (err error) {
//将服务信息序列化成json格式
var upBytes []byte
if upBytes, err = json.Marshal(update); err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}
ctx, cancel := context.WithTimeout(context.TODO(), resolverTimeOut)
er.cancal = cancel
rgOpt := wrapper.RegistryOption{TTL: wrapper.DefaultRegInfTTL}
for _, opt := range opts {
opt(&rgOpt)
}
switch update.Op {
case naming.Add:
lsRsp, err := er.lsCli.Grant(ctx, int64(rgOpt.TTL/time.Second))
if err != nil {
return err
}
//Put服务信息到etcd,并设置key的值TTL,经过后面的KeepAlive接口
//对TTL进行续期,超过TTL的时间未收到续期请求,则说明服务可能挂了,从而清除服务信息
etcdOpts := []etcd.OpOption{etcd.WithLease(lsRsp.ID)}
key := target + "/" + update.Addr
_, err = er.cli.KV.Put(ctx, key, string(upBytes), etcdOpts...)
if err != nil {
return err
}
//保持心跳
lsRspChan, err := er.lsCli.KeepAlive(context.TODO(), lsRsp.ID)
if err != nil {
return err
}
go func() {
for {
_, ok := <-lsRspChan
if !ok {
grpclog.Fatalf("%v keepalive channel is closing", key)
break
}
}
}()
case naming.Delete:
_, err = er.cli.Delete(ctx, target+"/"+update.Addr)
default:
return status.Error(codes.InvalidArgument, "unsupported op")
}
return nil
}
复制代码
grpc
etcd
grpc-wrapper微服务