Spring Cloud Netflix 做为springcloud 咱们经常使用的一个项目,其子项目Eureka,zuul,Rebbion是我熟悉的。可是Spring Cloud Netflix 被宣布进入了维护模式, 意思再也不添加新特性了,这对于咱们来讲很不友好了。 你们纷纷寻找相应的替代工具。(具体能够网上搜索)java
但这不影响咱们学习一些组件的框架思想。我对注册发现,负载均衡这块比较感兴趣。因此在此记录下本身的阅读心得。node
版本说明:Finchley.SR1git
当咱们在springboot的启动类上加上@EnableEurekaServer
,一个基本的注册中心就能够生效了。github
@SpringBootApplication
@EnableEurekaServer
public class EurekaServerApplication {
public static void main(String[] args) {
SpringApplication.run(EurekaServerApplication.class, args);
}
}
复制代码
@EnableEurekaServer
仅仅是引入EurekaServerMarkerConfiguration
类。 Marker的英文意思是标记的意思,spring相关框架中有不少相似xxxMarkerxxx
这样的注解.其实他们的意思就是一个开关。会在其余地方进行开关的判断,有对应xxxMarkerxxx
类就表示打开,没有表示关闭。web
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(EurekaServerMarkerConfiguration.class)
public @interface EnableEurekaServer {
}
复制代码
EurekaServerMarkerConfiguration
开关打开的是哪一个类呢??spring
org.springframework.cloud.netflix.eureka.server
项目spring.factories资源文件中自动注入类EurekaServerAutoConfiguration
,此类在自动注入的过程当中,会判断开关是否打开来决定是否自动注入相关类json
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration
复制代码
@Configuration
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,
InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter {
.....
}
复制代码
由此看出EurekaServerMarkerConfiguration
开关打开的EurekaServerAutoConfiguration
。缓存
下面咱们看看EurekaServerAutoConfiguration
配置了什么东西。 (1.先看注解上相关配置springboot
@Configuration
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,
InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter {
...
}
复制代码
EurekaServerInitializerConfiguration
类,此类继承了SmartLifecycle
接口,因此会在spring启动完毕时回调此类的start()方法/eureka/server.properties
的配置属性。(2.再看类内部相关配置(代码比较长,这里只讲内容,建议打开源码看) 寻找类中的Bean服务器
@ConfigurationProperties(“eureka.server”)
映射咱们的配置文件中的eureka.server.xxxx
格式的配置信息(此类很重要啊,咱们想修改EurekaServer的配置信息,能够配置eureka.server.xxxx
覆盖此类中的默认配置)(ServerCodecs)CloudServerCodecs
InstanceRegistry
(注意PeerAwareInstanceRegistry实现了AbstractInstanceRegistry,这里准确的说是 对等节点+当前节点同步器)DefaultEurekaServerContext
/eureka
开头的请求都交给Jersey 框架去解析。容器是com.sun.jersey.spi.container.servlet.ServletContainer
com.netflix.discovery","com.netflix.eureka"
包路径下的接口。一般咱们再springmvc中经过Controller概念来表示接口,Jersey框架下用ApplicationResource的概念来表示接口。暴露的接口其实就是eureka各个应用通讯的接口。(下面再说这些接口)EurekaServerAutoConfiguration
基本上就作了这些工做。咱们来归类总结下
针对当前Eureka实例的相关组件:
/eureka
路径的相关接口,注册拦截/eureka
的拦截器,注册com.sun.jersey.spi.container.servlet.ServletContainer
容器来处理对应的请求两个针对集群下相关组件:
两个针对启动相关类:
至此:咱们也能够大体了解了一个EurekaServer大体长什么样子了。
EurekaServerContext做为上下文,应该是核心所在。上文讲过注册DefaultEurekaServerContext
。此类中有@Inject,@PostConstruct, @PreDestroy
注解的方法,重点来看看。
@Inject
public DefaultEurekaServerContext(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes, ApplicationInfoManager applicationInfoManager) {
this.serverConfig = serverConfig;
this.serverCodecs = serverCodecs;
this.registry = registry;
this.peerEurekaNodes = peerEurekaNodes;
this.applicationInfoManager = applicationInfoManager;
}
复制代码
@Inject
注解的方法,参数由IOC容器注入。serverConfig ,serverCodecs ,registry ,peerEurekaNodes
咱们已经认识了。ApplicationInfoManager 是用来管理应用信息的,也就是实例注册信息,由ApplicationInfoManager统一管理。
@PostConstruct
修饰的方法会在服务器加载Servle的时候运行,而且只会被服务器执行一次,被@PostConstruct
修饰的方法会在构造函数以后,init()方法以前运行.
@PostConstruct
@Override
public void initialize() {
logger.info("Initializing ...");
peerEurekaNodes.start();
try {
registry.init(peerEurekaNodes);
} catch (Exception e) {
throw new RuntimeException(e);
}
logger.info("Initialized");
}
复制代码
这个方法很简明,主要有两个重要的的点:
PeerEurekaNodes: 用于管理PeerEurekaNode节点集合。 peerEurekaNodes.start();
public void start() {
//建立一个单线程定时任务线程池:线程的名称叫作Eureka-PeerNodesUpdater
taskExecutor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
thread.setDaemon(true);
return thread;
}
}
);
try {
// 解析Eureka Server URL,并更新PeerEurekaNodes列表
updatePeerEurekaNodes(resolvePeerUrls());
//建立任务
//任务内容为:解析Eureka Server URL,并更新PeerEurekaNodes列表
Runnable peersUpdateTask = new Runnable() {
@Override
public void run() {
try {
updatePeerEurekaNodes(resolvePeerUrls());
} catch (Throwable e) {
logger.error("Cannot update the replica Nodes", e);
}
}
};
//交给线程池执行,执行间隔10min
taskExecutor.scheduleWithFixedDelay(
peersUpdateTask,
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
TimeUnit.MILLISECONDS
);
} catch (Exception e) {
throw new IllegalStateException(e);
}
for (PeerEurekaNode node : peerEurekaNodes) {
logger.info("Replica node URL: {}", node.getServiceUrl());
}
}
复制代码
解析配置的对等体URL。就是在配置文件中配置的多个Eureka注册中心的URL.
protected void updatePeerEurekaNodes(List<String> newPeerUrls) {
//计算须要移除的url= 原来-新配置。
Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls);
toShutdown.removeAll(newPeerUrls);
//计算须要增长的url= 新配置-原来的。
Set<String> toAdd = new HashSet<>(newPeerUrls);
toAdd.removeAll(peerEurekaNodeUrls);
//没有变化就不更新
if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change
return;
}
List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes);
// 删除须要移除url对应的节点。
if (!toShutdown.isEmpty()) {
int i = 0;
while (i < newNodeList.size()) {
PeerEurekaNode eurekaNode = newNodeList.get(i);
if (toShutdown.contains(eurekaNode.getServiceUrl())) {
newNodeList.remove(i);
eurekaNode.shutDown();
} else {
i++;
}
}
}
// 添加须要增长的url对应的节点
if (!toAdd.isEmpty()) {
logger.info("Adding new peer nodes {}", toAdd);
for (String peerUrl : toAdd) {
newNodeList.add(createPeerEurekaNode(peerUrl));
}
}
//更新节点列表
this.peerEurekaNodes = newNodeList;
//更新节点url列表
this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);
}
复制代码
总结:start()方法,其实就是完成新配置的eureka集群信息的初始化更新工做。
对等节点同步器的初始化。
public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {
//统计最近X秒内的来自对等节点复制的续约数量(默认1秒)
this.numberOfReplicationsLastMin.start();
this.peerEurekaNodes = peerEurekaNodes;
//初始化返回结果缓存
initializedResponseCache();
//更新续约阀值
scheduleRenewalThresholdUpdateTask();
//初始化远程区域注册 相关信息
initRemoteRegionRegistry();
...
}
复制代码
启动一个定时任务,任务名称为Eureka-MeasureRateTimer
,每1秒统计从对等节点复制的续约数,将当前的桶的统计数据放到lastBucket,当前桶置为0
this.numberOfReplicationsLastMin = new MeasuredRate(1000 * 60 * 1);
--
this.timer = new Timer("Eureka-MeasureRateTimer", true);
---
timer.schedule(new TimerTask() {
@Override
public void run() {
try {
// Zero out the current bucket.
lastBucket.set(currentBucket.getAndSet(0));
} catch (Throwable e) {
logger.error("Cannot reset the Measured Rate", e);
}
}
}, sampleInterval, sampleInterval);
复制代码
注意:此统计器用于节点之间复制的统计。
精辟,缓存来实现返回结果的缓存,优秀设计啊。
使用goole cache初始化一个缓存类ResponseCacheImpl
,缓存(all applications, delta changes and for individual applications
)请求的结果, 此类中有两个缓存:
readWriteCacheMap
: 读写缓存。初始化容量1000,失效时间3分钟。readOnlyCacheMap
:只读缓存,shouldUseReadOnlyResponseCache
属性控制是否启用,默认是启用的。此缓存会使用,名为Eureka-CacheFillTimer
的timer,每30s更新从 readWriteCacheMap
中更新readOnlyCacheMap
中的缓存值。取值逻辑: 先从readOnlyCacheMap
取值,没有去readWriteCacheMap
,没有去经过CacheLoader加载,而CacheLoader会到维护应用实例注册信息的Map中获取。
这里就产生了一个疑问,为啥有搞个二级缓存来缓存结果呢?不是很理解。
使用名为ReplicaAwareInstanceRegistry - RenewalThresholdUpdater
的timer,每15(900s)分钟执行updateRenewalThreshold()
任务,更新续约阀值。
private void updateRenewalThreshold() {
try {
Applications apps = eurekaClient.getApplications();
int count = 0;
//统计全部注册instance个数
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
if (this.isRegisterable(instance)) {
++count;
}
}
}
synchronized (lock) {
当总数》预期值时 或者 关闭了自我保护模式,更新
if ((count * 2) > (serverConfig.getRenewalPercentThreshold() * expectedNumberOfRenewsPerMin)
|| (!this.isSelfPreservationModeEnabled())) {
this.expectedNumberOfRenewsPerMin = count * 2;
this.numberOfRenewsPerMinThreshold = (int) ((count * 2) * serverConfig.getRenewalPercentThreshold());
}
}
logger.info("Current renewal threshold is : {}", numberOfRenewsPerMinThreshold);
} catch (Throwable e) {
logger.error("Cannot update renewal threshold", e);
}
}
复制代码
expectedNumberOfRenewsPerMin
每分钟最大的续约数量 (30s/次,2次/s): =客户端数量count*2numberOfRenewsPerMinThreshold
每分钟续约阈值。serverConfig.getRenewalPercentThreshold()*expectedNumberOfRenewsPerMin
serverConfig.getRenewalPercentThreshold()默认是0.85当每分钟续约数小于numberOfRenewsPerMinThreshold
阈值时,而且自我保护没有关闭的状况下,开启自我保护,此期间不剔除任何一个客户端。(下面的EvictionTask()
驱逐任务会讲到如何利用)
此四个地方都会更新两个值
初始化 远程区域注册 相关信息
@PreDestroy
修饰的方法会在服务器卸载Servlet的时候执行,而且只会被服务器执行一次,被@PreDestroy
修饰的方法会Destroy
方法以后执行,在Servlet被完全卸载以前.
public void shutdown() {
registry.shutdown();
peerEurekaNodes.shutdown();
}
复制代码
停掉init()时启动的定时任务
清空集群url缓存,集群节点缓存。
总结:EurekaServerContext
的初始化作了不少事情,很精辟,建议多阅读,多学习
EurekaServerInitializerConfiguration
实现了SmartLifecycle
接口,在spring启动后,执行start()方法
eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
log.info("Started Eureka Server");
//发布注册中心能够注册事件
publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
//状态为运行状态
EurekaServerInitializerConfiguration.this.running = true;
//发布注册中心启动完成事件
publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
复制代码
这里重点看先EurekaServerBootstrap.contextInitialized
EurekaServerBootstrap
的contextInitialized主要干了两件事
initEurekaEnvironment();初始化环境
initEurekaServerContext();初始化上下文
复制代码
主要是数据中心等环境变量的初始化
此方法中最重要的是
从相邻eureka节点拷贝注册列表信息
int registryCount = this.registry.syncUp();
容许开始与客户端的数据传输,即开始做为Server服务
this.registry.openForTraffic(this.applicationInfoManager, registryCount);
复制代码
@Override
public int syncUp() {
// Copy entire entry from neighboring DS node
int count = 0;
//重试次数
for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
if (i > 0) {
try {
Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
} catch (InterruptedException e) {
logger.warn("Interrupted during registry transfer..");
break;
}
}
//从eurekaClient获取服务列表
Applications apps = eurekaClient.getApplications();
//遍历注册
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
try {
if (isRegisterable(instance)) {
register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
count++;
}
} catch (Throwable t) {
logger.error("During DS init copy", t);
}
}
}
}
return count;
}
复制代码
容许开始与客户端的数据传输,即开始做为Server服务
InstanceRegistry.openForTraffic public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
super.openForTraffic(applicationInfoManager,
count == 0 ? this.defaultOpenForTrafficCount : count);
}
PeerAwareInstanceRegistryImpl.openForTraffic public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
//每分钟期待的续约数(默认30s续约,60s就是2次)
this.expectedNumberOfRenewsPerMin = count * 2;
// 每分钟续约的阀值:0.85 * expectedNumberOfRenewsPerMin
this.numberOfRenewsPerMinThreshold =
(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
....
logger.info("Changing status to UP");
//applicationInfoManager设置状态为UP
applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
super.postInit();
}
复制代码
protected void postInit() {
//又启动了一个续约数统计器,此统计器用于配合驱逐任务
renewsLastMin.start();
if (evictionTaskRef.get() != null) {
evictionTaskRef.get().cancel();
}
evictionTaskRef.set(new EvictionTask());
evictionTimer.schedule(evictionTaskRef.get(),
serverConfig.getEvictionIntervalTimerInMs(),
serverConfig.getEvictionIntervalTimerInMs());
}
复制代码
建立一个名为Eureka-EvictionTimer
的定时器来执行EvictionTask()任务。 EvictionTask()任务:
@Override
public void run() {
// 获取延迟秒数,就是延迟几秒下线时间。
long compensationTimeMs = getCompensationTimeMs();
//驱逐操做
evict(compensationTimeMs);
}
复制代码
evict()驱逐操做:清理过时租约
public void evict(long additionalLeaseMs) {
// 判断是否开启自我保护,自我保护期间不剔除任何任务
if (!isLeaseExpirationEnabled()) {
logger.debug("DS: lease expiration is currently disabled.");
return;
}
List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
//循环得到 全部过时的租约
for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
if (leaseMap != null) {
for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
Lease<InstanceInfo> lease = leaseEntry.getValue();
// 判断是否过时
if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
expiredLeases.add(lease);
}
}
}
}
// 计算 最大容许清理租约数量
int registrySize = (int) getLocalRegistrySize();
int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
int evictionLimit = registrySize - registrySizeThreshold;
// 计算 清理租约数量
int toEvict = Math.min(expiredLeases.size(), evictionLimit);
if (toEvict > 0) {
Random random = new Random(System.currentTimeMillis());
// 遍历清理。
for (int i = 0; i < toEvict; i++) {
int next = i + random.nextInt(expiredLeases.size() - i);
Collections.swap(expiredLeases, i, next);
Lease<InstanceInfo> lease = expiredLeases.get(i);
String appName = lease.getHolder().getAppName();
String id = lease.getHolder().getId();
EXPIRED.increment();
internalCancel(appName, id, false);
}
}
}
复制代码
isLeaseExpirationEnabled():判断是否开启自我保护的两个条件
Lease.isExpire():是否过时的判断:
public boolean isExpired(long additionalLeaseMs) {
return (
//或者明确实例下线时间。
evictionTimestamp > 0
//或者距离最后更新时间已通过去至少3分钟
|| System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
}
复制代码
//续约时更新lastUpdateTimestamp,加上了过时间隔?
public void renew() {
lastUpdateTimestamp = System.currentTimeMillis() + duration;
}
复制代码
过时时间判断: System.currentTimeMillis()> lastUpdateTimestamp + duration + additionalLeaseMs 这里加了两次duration, 也就是180秒,加上延迟下线时间。也就是最少须要3分钟才判断下线。
至此Eureka server的初始化就完成了。 这里经过debug模式来看看初始化过程当中的定时任务。
Eureka Server 启动后,就是对外提供服务了。等待客户端来注册。
Eureka是一个基于REST(Representational State Transfer)服务,咱们从官方文档中能够看到其对外提供的接口: 官方文档
上文说过,Eureka 使用jersey框架来作MVC框架,暴露接口。ApplicationResource
相似springmvc中的Controller。
在com.netflix.eureka.resources
包下咱们能够看到这些ApplicationResource
ApplicationResource.addInstance
对应的就是服务注册接口
@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info, @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
....
//使用PeerAwareInstanceRegistryImpl#register() 注册实例信息。
registry.register(info, "true".equals(isReplication));
return Response.status(204).build(); // 204 to be backwards compatible
}
复制代码
InstanceRegistry
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
//发布注册事件,
handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication);
super.register(info, isReplication);
}
PeerAwareInstanceRegistryImpl
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
//租期90s
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
//注册实例
super.register(info, leaseDuration, isReplication);
//复制到其余节点。
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
复制代码
AbstractInstanceRegistry public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
read.lock()读锁
1.从缓存中获取实例名称对应的租约信息
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
REGISTER.increment(isReplication);
2.统计数+1
REGISTER.increment(isReplication);
//gmap为null.则建立一个Map。
3.租约的处理分两种状况:
租约已经存在:
比较新租约与旧租约的LastDirtyTimestamp,使用LastDirtyTimestamp最晚的租约
租约不存在,即新注册:
synchronized (lock) {
更新期待每分钟续约数
更新续约阈值
}
将租约放入appname对应的map中。
4.在最近注册队(recentRegisteredQueue)里添加一个当前注册信息
5.状态的处理:
将当前实例的OverriddenStatus状态,放到Eureka Server的overriddenInstanceStatusMap;
根据OverriddenStatus状态,设置状态
7.实例actionType=ADDED
registrant.setActionType(ActionType.ADDED);
8. 维护recentlyChangedQueue,保存最近操做
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
9.更新最后更新时间
registrant.setLastUpdatedTimestamp();
10.使当前实例的结果缓存ResponseCache失效()
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
}
复制代码
此处能够看源码阅读,在此不讲了
咱们获取的实例信息,其实都是从缓存中获取的String payLoad = responseCache.get(cacheKey);
@GET
public Response getApplication(@PathParam("version") String version, @HeaderParam("Accept") final String acceptHeader, @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept) {
if (!registry.shouldAllowAccess(false)) {
return Response.status(Status.FORBIDDEN).build();
}
EurekaMonitors.GET_APPLICATION.increment();
CurrentRequestVersion.set(Version.toEnum(version));
KeyType keyType = Key.KeyType.JSON;
if (acceptHeader == null || !acceptHeader.contains("json")) {
keyType = Key.KeyType.XML;
}
Key cacheKey = new Key(
Key.EntityType.Application,
appName,
keyType,
CurrentRequestVersion.get(),
EurekaAccept.fromString(eurekaAccept)
);
String payLoad = responseCache.get(cacheKey);
if (payLoad != null) {
logger.debug("Found: {}", appName);
return Response.ok(payLoad).build();
} else {
logger.debug("Not Found: {}", appName);
return Response.status(Status.NOT_FOUND).build();
}
}
复制代码
因为篇幅限制:
至此:Eureka服务端内容大致讲完,只讲了些大概,具体建议跟源码。
若有错误,敬请指出