项目中一些服务须要监听其余微服务的启动信息,须要监听到启动后主动向其发请求拉取一些配置等。 但是eureka-client
并未提供监听其余服务启动的事件,eureka-server
却是提供了事件, 能够在本身的eureka-server
中监听服务启动,监听后发送服务启动信息到kafka这些消息队列,服务监听kafka消息, 这种方式要依赖消息队列: 源码; 或者改造eureka-client
, 因为须要改的代码很少,修改源码从新打成依赖本身维护不方便,这里经过javassist直接修改jar里的字节码实现。java
public void init() {
try {
ClassPool classPool = new ClassPool(true);
//添加com.netflix.discovery包的扫描路径
ClassClassPath classPath = new ClassClassPath(Applications.class);
classPool.insertClassPath(classPath);
//获取要修改Application类
CtClass ctClass = classPool.get(APPLICATION_PATH);
//获取addInstance方法
CtMethod addInstanceMethod = ctClass.getDeclaredMethod("addInstance");
//修改addInstance方法
addInstanceMethod.setBody("{instancesMap.put($1.getId(), $1);"
+ "synchronized (instances) {me.flyleft.eureka.client.event.EurekaEventHandler.getInstance().eurekaAddInstance($1);" +
"instances.remove($1);instances.add($1);isDirty = true;}}");
//获取removeInstance方法
CtMethod removeInstanceMethod = ctClass.getDeclaredMethod("removeInstance");
//修改removeInstance方法
removeInstanceMethod.setBody("{me.flyleft.eureka.client.event.EurekaEventHandler.getInstance().eurekaRemoveInstance($1);this.removeInstance($1, true);}");
//覆盖原有的Application类
ctClass.toClass();
//使用类加载器从新加载Application类
classPool.getClassLoader().loadClass(APPLICATION_PATH);
Class.forName(APPLICATION_PATH);
} catch (Exception e) {
throw new EurekaEventException(e);
}
}
复制代码
@SpringBootApplication
@EnableEurekaClient
public class EurekaClientApplication {
public static void main(String[] args) {
//先执行修改字节码代码
EurekaEventHandler.getInstance().init();
new SpringApplicationBuilder(EurekaClientApplication.class).web(true).run(args);
}
}
复制代码
public class EurekaEventObservable extends Observable {
public void sendEvent(EurekaEventPayload payload) {
setChanged();
notifyObservers(payload);
}
}
复制代码
public abstract class AbstractEurekaEventObserver implements Observer, EurekaEventService {
@Override
public void update(Observable o, Object arg) {
if (arg instanceof EurekaEventPayload) {
EurekaEventPayload payload = (EurekaEventPayload) arg;
if (InstanceInfo.InstanceStatus.UP.name().equals(payload.getStatus())) {
LOGGER.info("Receive UP event, payload: {}", payload);
} else {
LOGGER.info("Receive DOWN event, payload: {}", payload);
}
putPayloadInCache(payload);
consumerEventWithAutoRetry(payload);
}
}
}
复制代码
接收到服务启动去执行一些操做,若是执行失败有异常则自动重试指定次数,每一个一段事件重试一次,执行成功则再也不执行git
private void consumerEventWithAutoRetry(final EurekaEventPayload payload) {
rx.Observable.just(payload)
.map(t -> {
// 此处为接收到服务启动去执行的一些操做
consumerEvent(payload);
return payload;
}).retryWhen(x -> x.zipWith(rx.Observable.range(1, retryTime),
(t, retryCount) -> {
//异常处理
if (retryCount >= retryTime) {
if (t instanceof RemoteAccessException || t instanceof RestClientException) {
LOGGER.warn("error.eurekaEventObserver.fetchError, payload {}", payload, t);
} else {
LOGGER.warn("error.eurekaEventObserver.consumerError, payload {}", payload, t);
}
}
return retryCount;
}).flatMap(y -> rx.Observable.timer(retryInterval, TimeUnit.SECONDS)))
.subscribeOn(Schedulers.io())
.subscribe((EurekaEventPayload payload1) -> {
});
}
复制代码
自动重试失败,能够手动重试,添加手动重试接口github
@RestController
@RequestMapping(value = "/v1/eureka/events")
public class EurekaEventEndpoint {
private EurekaEventService eurekaEventService;
public EurekaEventEndpoint(EurekaEventService eurekaEventService) {
this.eurekaEventService = eurekaEventService;
}
@Permission(permissionLogin = true)
@ApiOperation(value = "获取未消费的事件列表")
@GetMapping
public List<EurekaEventPayload> list(@RequestParam(value = "service", required = false) String service) {
return eurekaEventService.unfinishedEvents(service);
}
@Permission(permissionLogin = true)
@ApiOperation(value = "手动重试未消费成功的事件")
@PostMapping("retry")
public List<EurekaEventPayload> retry(@RequestParam(value = "id", required = false) String id, @RequestParam(value = "service", required = false) String service) {
return eurekaEventService.retryEvents(id, service);
}
}
复制代码