本篇咱们着重分析Eureka服务端的逻辑实现,主要涉及到服务的注册流程分析。java
在Eureka的服务治理中,会涉及到下面一些概念:算法
服务注册:Eureka Client会经过发送REST请求的方式向Eureka Server注册本身的服务,提供自身的元数据,好比 IP 地址、端口、运行情况指标的URL、主页地址等信息。Eureka Server接收到注册请求后,就会把这些元数据信息存储在一个ConcurrentHashMap中。spring
服务续约:在服务注册后,Eureka Client会维护一个心跳来持续通知Eureka Server,说明服务一直处于可用状态,防止被剔除。Eureka Client在默认的状况下会每隔30秒发送一次心跳来进行服务续约。json
服务同步:Eureka Server之间会互相进行注册,构建Eureka Server集群,不一样Eureka Server之间会进行服务同步,用来保证服务信息的一致性。缓存
获取服务:服务消费者(Eureka Client)在启动的时候,会发送一个REST请求给Eureka Server,获取上面注册的服务清单,而且缓存在Eureka Client本地,默认缓存30秒。同时,为了性能考虑,Eureka Server也会维护一份只读的服务清单缓存,该缓存每隔30秒更新一次。服务器
服务调用:服务消费者在获取到服务清单后,就能够根据清单中的服务列表信息,查找到其余服务的地址,从而进行远程调用。Eureka有Region和Zone的概念,一个Region能够包含多个Zone,在进行服务调用时,优先访问处于同一个Zone中的服务提供者。网络
服务下线:当Eureka Client须要关闭或重启时,就不但愿在这个时间段内再有请求进来,因此,就须要提早先发送REST请求给Eureka Server,告诉Eureka Server本身要下线了,Eureka Server在收到请求后,就会把该服务状态置为下线(DOWN),并把该下线事件传播出去。app
服务剔除:有时候,服务实例可能会由于网络故障等缘由致使不能提供服务,而此时该实例也没有发送请求给Eureka Server来进行服务下线,因此,还须要有服务剔除的机制。Eureka Server在启动的时候会建立一个定时任务,每隔一段时间(默认60秒),从当前服务清单中把超时没有续约(默认90秒)的服务剔除。负载均衡
自我保护:既然Eureka Server会定时剔除超时没有续约的服务,那就有可能出现一种场景,网络一段时间内发生了异常,全部的服务都没可以进行续约,Eureka Server就把全部的服务都剔除了,这样显然不太合理。因此,就有了自我保护机制,当短期内,统计续约失败的比例,若是达到必定阈值,则会触发自我保护的机制,在该机制下,Eureka Server不会剔除任何的微服务,等到正常后,再退出自我保护机制。ide
上一篇中咱们搭建了一个简单的Eureka客户端和服务端。若是你有启动过观看启动日志不难发现:
这里有个EurekaServerBootstrap
类,启动日志中给出:Setting the eureka configuration..,Initialized server context
。看起来这个应该是个启动类,跟进去看一下,有个很显眼的方法:
这个方法的调用先按住不表,咱们先从启动类上添加的 EnableEurekaServer
注解着手,看看为何添加了一个注解就能激活 Rureka。
从server启动类上的EnableEurekaServer
注解进入:
接下来引用了EurekaServerMarkerConfiguration
,看到在这个注解上有个注释:启用这个注解的目的是为了激活:EurekaServerAutoConfiguration类;
进入EurekaServerAutoConfiguration看到在类头部有一个注解:
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
即EurekaServerAutoConfiguration
启动的条件是EurekaServerMarkerConfiguration
注解先加载。
上面这一张图标识出了从启动注解到预启动类的流程,可是你会发现实际上 EurekaServerAutoConfiguration 也没有作什么事情:配置初始化,启动一些基本的过滤器。一样在类头部的引用上有一个Import注解:
@Import(EurekaServerInitializerConfiguration.class)
因此在 EurekaServerAutoConfiguration 初始化的时候,会引用到 EurekaServerInitializerConfiguration,激活它的初始化。EurekaServerInitializerConfiguration 实现了SmartLifecycle.start方法,在spring 初始化的时候会被启动,激活 run 方法。能够看到在 run 方法中调用的就是:
eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
即咱们上面截图中的EurekaServerBootstrap.contextInitialized()
方法。
总体的调用流程以下:
具体的初始化信息见下图:
在上面讲到Eureka server启动过程当中,启动一个Eureka Client的时候,initEurekaServerContext()
里面会进行服务同步和服务剔除,syncUp()方法所属的类是:PeerAwareInstanceRegistry,即server端的服务注册逻辑都在这里面。由于没有使用AWS的服务器,因此默认实例化的实现类为:PeerAwareInstanceRegistryImpl。
PeerAwareInstanceRegistry registry; if (isAws(applicationInfoManager.getInfo())) { registry = new AwsInstanceRegistry( eurekaServerConfig, eurekaClient.getEurekaClientConfig(), serverCodecs, eurekaClient ); awsBinder = new AwsBinderDelegate(eurekaServerConfig, eurekaClient.getEurekaClientConfig(), registry, applicationInfoManager); awsBinder.start(); } else { registry = new PeerAwareInstanceRegistryImpl( eurekaServerConfig, eurekaClient.getEurekaClientConfig(), serverCodecs, eurekaClient ); }
PeerAwareInstanceRegistryImpl 继承了一个抽象类 AbstractInstanceRegistry:
@Singleton public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry { }
AbstractInstanceRegistry中的实现逻辑是真正的服务注册存储所在地:
public abstract class AbstractInstanceRegistry implements InstanceRegistry { private static final Logger logger = LoggerFactory.getLogger(AbstractInstanceRegistry.class); private static final String[] EMPTY_STR_ARRAY = new String[0]; private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>(); protected Map<String, RemoteRegionRegistry> regionNameVSRemoteRegistry = new HashMap<String, RemoteRegionRegistry>(); protected final ConcurrentMap<String, InstanceStatus> overriddenInstanceStatusMap = CacheBuilder .newBuilder().initialCapacity(500) .expireAfterAccess(1, TimeUnit.HOURS) .<String, InstanceStatus>build().asMap(); .... .... .... }
全部的服务实例信息都保存在 server 本地的map当中。因此在server端启动的时候会去拉别的server上存储的client实例,而后存储到本地缓存。
若是是某个client主动发出了注册请求,那么是如何注册到服务端呢?
仍是查看日志:启动服务端,而后再启动客户端,查看服务端日志:
这里能看到刚才启动的客户端已经在服务端注册了,注册逻辑走的类是:AbstractInstanceRegistry。
上面也提到 是服务注册的逻辑实现类,完成保存客户端信息的方法是:
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) { ...... }
代码就不贴了,主要实现的逻辑是保存当前注册的客户端信息。咱们知道客户端是发送了一次http请求给服务端,那么真正的注册逻辑应该是从一个http请求的接收处进来的。跟着使用了register方法的地方去找,PeerAwareInstanceRegistryImpl里面有调用:
@Override public void register(final InstanceInfo info, final boolean isReplication) { int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS; if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) { leaseDuration = info.getLeaseInfo().getDurationInSecs(); } super.register(info, leaseDuration, isReplication); //将新节点信息告诉别的服务端 replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication); }
这里没有改写父类的register逻辑,下面还多了一句:replicateToPeers,这里主要作的逻辑是:给兄弟 server节点发送register 请求,告诉他们有客户端来注册。
继续看谁调用了这里,能够找到:ApplicationResource 的addInstance方法调用了:
@POST @Consumes({"application/json", "application/xml"}) public Response addInstance(InstanceInfo info, @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) { ...... registry.register(info, "true".equals(isReplication)); return Response.status(204).build(); // 204 to be backwards compatible }
而这里很显然是一个接口,逻辑就很清晰了:
另外,咱们查看addInstance方法被谁调用的过程当中发现:PeerReplicationResource--->batchReplication 方法也调用了注册的逻辑。
这个方法一看居然解答了以前个人疑惑:服务端之间是如何发送心跳的。原来实现是在这里。经过dispatch方法来区分当前的调用是何种请求,
能够看到,服务注册,心跳检测,服务取消,服务下线,服务剔除的入口都在这里:
@Path("batch") @POST public Response batchReplication(ReplicationList replicationList) { try { ReplicationListResponse batchResponse = new ReplicationListResponse(); for (ReplicationInstance instanceInfo : replicationList.getReplicationList()) { try { batchResponse.addResponse(dispatch(instanceInfo)); } catch (Exception e) { batchResponse.addResponse(new ReplicationInstanceResponse(Status.INTERNAL_SERVER_ERROR.getStatusCode(), null)); logger.error("{} request processing failed for batch item {}/{}", instanceInfo.getAction(), instanceInfo.getAppName(), instanceInfo.getId(), e); } } return Response.ok(batchResponse).build(); } catch (Throwable e) { logger.error("Cannot execute batch Request", e); return Response.status(Status.INTERNAL_SERVER_ERROR).build(); } } private ReplicationInstanceResponse dispatch(ReplicationInstance instanceInfo) { ApplicationResource applicationResource = createApplicationResource(instanceInfo); InstanceResource resource = createInstanceResource(instanceInfo, applicationResource); String lastDirtyTimestamp = toString(instanceInfo.getLastDirtyTimestamp()); String overriddenStatus = toString(instanceInfo.getOverriddenStatus()); String instanceStatus = toString(instanceInfo.getStatus()); Builder singleResponseBuilder = new Builder(); switch (instanceInfo.getAction()) { case Register: singleResponseBuilder = handleRegister(instanceInfo, applicationResource); break; case Heartbeat: singleResponseBuilder = handleHeartbeat(serverConfig, resource, lastDirtyTimestamp, overriddenStatus, instanceStatus); break; case Cancel: singleResponseBuilder = handleCancel(resource); break; case StatusUpdate: singleResponseBuilder = handleStatusUpdate(instanceInfo, resource); break; case DeleteStatusOverride: singleResponseBuilder = handleDeleteStatusOverride(instanceInfo, resource); break; } return singleResponseBuilder.build(); }
从这个入口进去,你们能够跟踪一下感兴趣的逻辑。