通常咱们常见的RPC框架都包含以下三个部分:html
此外很少描述,还没研究raftjava
启动consul以后访问管理页面node
提取出服务注册与服务发现两个接口,而后使用Consul实现,这里主要经过consul-client来实现(也能够是consul-api),须要在pom中引入:git
<dependency>
<groupId>com.orbitz.consul</groupId>
<artifactId>consul-client</artifactId>
<version>0.14.1</version>
</dependency>
public interface RegistryService {
void register(RpcURL url);
void unregister(RpcURL url);
}
public class AbstractConsulService {
private static final Logger logger = LoggerFactory.getLogger(AbstractConsulService.class);
protected final static String CONSUL_NAME="consul_node_jim";
protected final static String CONSUL_ID="consul_node_id";
protected final static String CONSUL_TAGS="v3";
protected final static String CONSUL_HEALTH_INTERVAL="1s";
protected Consul buildConsul(String registryHost, int registryPort){
return Consul.builder().withHostAndPort(HostAndPort.fromString(registryHost+":"+registryPort)).build();
}
}
服务的删除暂时未实现github
public class ConsulRegistryService extends AbstractConsulService implements RegistryService {
private final static int CONSUL_CONNECT_PERIOD=1*1000;
@Override
public void register(RpcURL url) {
Consul consul = this.buildConsul(url.getRegistryHost(),url.getRegistryPort());
AgentClient agent = consul.agentClient();
ImmutableRegCheck check = ImmutableRegCheck.builder().tcp(url.getHost()+":"+url.getPort()).interval(CONSUL_HEALTH_INTERVAL).build();
ImmutableRegistration.Builder builder = ImmutableRegistration.builder();
builder.id(CONSUL_ID).name(CONSUL_NAME).addTags(CONSUL_TAGS).address(url.getHost()).port(url.getPort()).addChecks(check);
agent.register(builder.build());
}
@Override
public void unregister(RpcURL url) {
}
}
因为我实现的RPC是基于TCP的,因此服务注册的健康检查也指定为TCP,consul会按指定的IP以及端口创建链接以此判断服务的健康状态。若是是http,则须要调用http方法,同时指定健康检查地址。web
ImmutableRegCheck check = ImmutableRegCheck.builder().tcp(url.getHost()+":"+url.getPort()).interval(CONSUL_HEALTH_INTERVAL).build();
后台的监控信息以下:canvas
虽然只是指定了TCP,可能出于某种机制后台依然会发起HTTP的健康检查请求,上图第一条请求日志。api
public interface DiscoveryService {
List<RpcURL> getUrls(String registryHost, int registryPort);
}
List<RpcURL> urls= Lists.newArrayList();
Consul consul = this.buildConsul(registryHost,registryPort);
HealthClient client = consul.healthClient();
String name = CONSUL_NAME;
ConsulResponse object= client.getAllServiceInstances(name);
List<ImmutableServiceHealth> serviceHealths=(List<ImmutableServiceHealth>)object.getResponse();
for(ImmutableServiceHealth serviceHealth:serviceHealths){
RpcURL url=new RpcURL();
url.setHost(serviceHealth.getService().getAddress());
url.setPort(serviceHealth.getService().getPort());
urls.add(url);
}
服务更新监听,当可用服务列表发现变化时须要通知调用端。缓存
try {
ServiceHealthCache serviceHealthCache = ServiceHealthCache.newCache(client, name);
serviceHealthCache.addListener(new ConsulCache.Listener<ServiceHealthKey, ServiceHealth>() {
@Override
public void notify(Map<ServiceHealthKey, ServiceHealth> map) {
logger.info("serviceHealthCache.addListener notify");
RpcClientInvokerCache.clear();
}
});
serviceHealthCache.start();
} catch (Exception e) {
logger.info("serviceHealthCache.start error:",e);
}
因为以前对客户端的Invoker有缓存,因此当服务列表有变化时须要对缓存信息进行更新。ruby
这里简单的直接对缓存作清除处理,其实好一点的方法应该只对有变化的作处理。
public class RpcClientInvokerCache {
private static CopyOnWriteArrayList<RpcClientInvoker> connectedHandlers = new CopyOnWriteArrayList<>();
public static CopyOnWriteArrayList<RpcClientInvoker> getConnectedHandlersClone(){
return (CopyOnWriteArrayList<RpcClientInvoker>) RpcClientInvokerCache.getConnectedHandlers().clone();
}
public static void addHandler(RpcClientInvoker handler) {
CopyOnWriteArrayList<RpcClientInvoker> newHandlers = getConnectedHandlersClone();
newHandlers.add(handler);
connectedHandlers=newHandlers;
}
public static CopyOnWriteArrayList<RpcClientInvoker> getConnectedHandlers(){
return connectedHandlers;
}
public static RpcClientInvoker get(int i){
return connectedHandlers.get(i);
}
public static int size(){
return connectedHandlers.size();
}
public static void clear(){
CopyOnWriteArrayList<RpcClientInvoker> newHandlers = getConnectedHandlersClone();
newHandlers.clear();
connectedHandlers=newHandlers;
}
}
代码中取服务列表的方法有小问题,未按接口信息取,后续再完成
public class RoundRobinLoadbalanceService implements LoadbalanceService {
private AtomicInteger roundRobin = new AtomicInteger(0);
private static final int MAX_VALUE=1000;
private static final int MIN_VALUE=1;
private AtomicInteger getRoundRobinValue(){
if(this.roundRobin.getAndAdd(1)>MAX_VALUE){
this.roundRobin.set(MIN_VALUE);
}
return this.roundRobin;
}
@Override
public int index(int size) {
return (this.getRoundRobinValue().get() + size) % size;
}
}