上篇文章简单地介绍了 SOFA-Boot 的功能特性,对 Readiness 健康检查的配置举例说明。重点介绍了如何在 SOFA-Boot 中引入 SOFA-RPC 中间件,给出了基于 bolt、rest 和 dubbo 等不一样协议通道的服务发布与消费的全流程。java
本文将进一步介绍 SOFA-RPC 中间件提供的丰富而强大的功能,包括单向调用、同步调用、Future调用、回调,泛化调用,过滤器配置等。spring
SOFA-RPC 提供单向调用、同步调用、异步调用和回调四种调用机制。为了区分四者的不一样之处,这里给出 SOFA 官方提供的原理图。缓存
下面给出详细阐述和配置说明:bash
当前线程发起调用后,不关心调用结果,不作超时控制,只要请求已经发出,就完成本次调用。目前支持 bolt 协议。网络
使用单向方式须要在服务引用的时候经过 sofa:global-attrs
元素的 type
属性声明调用方式为 oneway
,这样使用该服务引用发起调用时就是使用的单向方式了。多线程
<sofa:reference id="helloOneWayServiceReference" interface="com.ostenant.sofa.rpc.example.invoke.HelloOneWayService">
<sofa:binding.bolt>
<sofa:global-attrs type="oneway"/>
</sofa:binding.bolt>
</sofa:reference>
复制代码
单向调用不保证成功,并且发起方没法知道调用结果。所以一般用于能够重试,或者定时通知类的场景,调用过程是有可能由于网络问题,机器故障等缘由,致使请求失败。业务场景须要能接受这样的异常场景,才可使用。
当前线程发起调用后,须要在指定的超时时间内,等到响应结果,才能完成本次调用。若是超时时间内没有获得结果,那么会抛出超时异常。
服务接口与实现类
SOFA-RPC 缺省采用的就是同步调用,能够省略 sofa:global-attrs
配置项。
服务端发布配置
<bean id="helloSyncServiceImpl" class="com.ostenant.sofa.rpc.example.invoke.HelloSyncServiceImpl"/>
<sofa:service ref="helloSyncServiceImpl" interface="com.ostenant.sofa.rpc.example.invoke.HelloSyncService">
<sofa:binding.bolt/>
</sofa:service>
复制代码
客户端引用配置
<sofa:reference id="helloSyncServiceReference" interface="com.ostenant.sofa.rpc.example.invoke.HelloSyncService">
<sofa:binding.bolt/>
</sofa:reference>
复制代码
服务端启动入口
SpringApplication springApplication = new SpringApplication(SyncServerApplication.class);
ApplicationContext applicationContext = springApplication.run(args);
复制代码
客户端启动入口
SpringApplication springApplication = new SpringApplication(SyncClientApplication.class);
ApplicationContext applicationContext = springApplication.run(args);
复制代码
客户端调用
HelloSyncService helloSyncServiceReference = (HelloSyncService) applicationContext.getBean("helloSyncServiceReference");
System.out.println(helloSyncServiceReference.saySync("sync"));
复制代码
同步调用是最经常使用的方式。注意要根据对端的处理能力,合理设置超时时间。
Future 方式下,客户端发起调用后不会等待服务端的结果,继续执行后面的业务逻辑。服务端返回的结果会被 SOFA-RPC 缓存,当客户端须要结果的时候,须要主动获取。目前支持 bolt 协议。
服务接口和实现类
HelloFutureService.java
public interface HelloFutureService {
String sayFuture(String future);
}
复制代码
HelloFutureServiceImpl.java
public class HelloFutureServiceImpl implements HelloFutureService {
@Override
public String sayFuture(String future) {
return future;
}
}
复制代码
服务端发布配置
<bean id="helloFutureServiceImpl" class="com.ostenant.sofa.rpc.example.invoke.HelloFutureServiceImpl"/>
<sofa:service ref="helloFutureServiceImpl" interface="com.ostenant.sofa.rpc.example.invoke.HelloFutureService">
<sofa:binding.bolt/>
</sofa:service>
复制代码
客户端引用配置
使用 Future 方式须要在服务引用的时候经过 sofa:global-attrs
元素的 type
属性声明调用方式为 future
。
<sofa:reference id="helloFutureServiceReference" interface="com.ostenant.sofa.rpc.example.invoke.HelloFutureService">
<sofa:binding.bolt>
<sofa:global-attrs type="future"/>
</sofa:binding.bolt>
</sofa:reference>
复制代码
这样使用该服务引用发起调用时就是使用的 Future
方式了。
服务端启动入口
SpringApplication springApplication = new SpringApplication(FutureServerApplication.class);
ApplicationContext applicationContext = springApplication.run(args);
复制代码
客户端启动入口
SpringApplication springApplication = new SpringApplication(FutureClientApplication.class);
ApplicationContext applicationContext = springApplication.run(args);
复制代码
客户端获取返回结果有两种方式:
SofaResponseFuture
直接获取结果。第一个参数是获取结果的超时时间,第二个参数表示是否清除线程上下文中的结果。HelloFutureService helloFutureServiceReference = (HelloFutureService) applicationContext
.getBean("helloFutureServiceReference");
helloFutureServiceReference.sayFuture("future");
try {
String result = (String)SofaResponseFuture.getResponse(1000, true);
System.out.println("Future result: " + result)
} catch (InterruptedException e) {
e.printStackTrace();
}
复制代码
HelloFutureService helloFutureServiceReference = (HelloFutureService) applicationContext
.getBean("helloFutureServiceReference");
helloFutureServiceReference.sayFuture("future");
try {
Future future = SofaResponseFuture.getFuture(true);
String result = (String)future.get(1000, TimeUnit.MILLISECONDS);
System.out.println("Future result: " + result)
} catch (InterruptedException e) {
e.printStackTrace();
}
复制代码
Future 方式适用于非阻塞编程模式。对于客户端程序处理后,不须要当即获取返回结果,能够先完成后续程序代码执行,在后续业务中,主动从当前线程上下文获取调用返回结果。减小了网络 IO 等待形成的代码运行阻塞和延迟。
当前线程发起调用,则本次调用立刻结束,能够立刻执行下一次调用。发起调用时须要注册一个回调,该回调须要分配一个异步线程池。待响应返回后,会在回调的异步线程池,来执行回调逻辑。
服务接口和实现类
HelloCallbackService.java
public interface HelloCallbackService {
String sayCallback(String callback);
}
复制代码
HelloCallbackServiceImpl.java
public class HelloCallbackServiceImpl implements HelloCallbackService {
@Override
public String sayCallback(String string) {
return string;
}
}
复制代码
业务回调类
客户端回调类须要实现 com.alipay.sofa.rpc.core.invoke.SofaResponseCallback
接口。
CallbackImpl.java
public class CallbackImpl implements SofaResponseCallback {
@Override
public void onAppResponse(Object appResponse, String methodName, RequestBase request) {
System.out.println("callback client process:" + appResponse);
}
@Override
public void onAppException(Throwable throwable, String methodName, RequestBase request) {
}
@Override
public void onSofaException(SofaRpcException sofaException, String methodName, RequestBase request) {
}
}
复制代码
SofaResponseCallback 接口提供了 3 个方法:
服务端发布配置
<bean id="helloCallbackServiceImpl" class="helloFutureServiceReference" interface="com.ostenant.sofa.rpc.example.invoke.HelloCallbackServiceImpl"/>
<sofa:service ref="helloCallbackServiceImpl" interface="helloFutureServiceReference" interface="com.ostenant.sofa.rpc.example.invoke.HelloCallbackService">
<sofa:binding.bolt/>
</sofa:service>
复制代码
客户端引用配置
在服务引用的时候经过 sofa:global-attrs
元素的 type
属性声明调用方式为 callback
,再经过 callback-ref
声明回调的实现类。
<bean id="callbackImpl" class="com.ostenant.sofa.rpc.example.invoke.CallbackImpl"/>
<sofa:reference id="helloCallbackServiceReference" interface="com.ostenant.sofa.rpc.example.invoke.HelloCallbackService">
<sofa:binding.bolt>
<sofa:global-attrs type="callback" callback-ref="callbackImpl"/>
</sofa:binding.bolt>
</sofa:reference>
复制代码
这样使用该服务引用发起调用时,就是使用的回调方式了。在结果返回时,由 SOFA-RPC 自动调用该回调类的相应方法。
服务端启动入口
SpringApplication springApplication = new SpringApplication(CallbackServerApplication.class);
ApplicationContext applicationContext = springApplication.run(args);
复制代码
客户端启动入口
SpringApplication springApplication = new SpringApplication(CallbackClientApplication.class);
ApplicationContext applicationContext = springApplication.run(args);
复制代码
客户端发起调用
HelloCallbackService helloCallbackServiceReference = (HelloCallbackService) applicationContext
.getBean("helloCallbackServiceReference");
helloCallbackServiceReference.sayCallback("callback");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
复制代码
sayCallback() 的返回值不该该直接获取。在客户端注册的回调类中,返回值会以参数的形式传入正确的方法,以回调的形式完成后续逻辑处理。
Callback 方式适用于异步非阻塞编程模式。客户端程序所在线程发起调用后,继续执行后续操做,不须要主动去获取返回值。服务端程序处理完成,将返回值传回一个异步线程池,由子线程经过回调函数进行返回值处理。很大状况的减小了网络 IO 阻塞,解决了单线程的瓶颈,实现了异步编程。
泛化调用方式可以在客户端不依赖服务端的接口状况下发起调用,目前支持 bolt 协议。因为不知道服务端的接口,所以须要经过字符串的方式将服务端的接口,调用的方法,参数及结果类进行描述。
泛化参数类
SampleGenericParamModel.java
public class SampleGenericParamModel {
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
复制代码
泛化返回类
SampleGenericResultModel.java
public class SampleGenericResultModel {
private String name;
private String value;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}
复制代码
服务接口和实现类
SampleGenericService.java
public interface SampleGenericService {
SampleGenericResultModel sayGeneric(SampleGenericParamModel sampleGenericParamModel);
}
复制代码
SampleGenericParamModel:做为 sayGeneric() 的输入参数类型,有一个 name
成员变量,做为真正的方法入参。
SampleGenericResultModel:做为 sayGeneric() 的返回结果类型,声明了 name
和 value
两个成员变量,做为真实的返回值。
SampleGenericServiceImpl.java
public class SampleGenericServiceImpl implements SampleGenericService {
@Override
public SampleGenericResultModel sayGeneric(SampleGenericParamModel sampleGenericParamModel) {
String name = sampleGenericParamModel.getName();
SampleGenericResultModel resultModel = new SampleGenericResultModel();
resultModel.setName(name);
resultModel.setValue("sample generic value");
return resultModel;
}
}
复制代码
服务端发布配置
<bean id="sampleGenericServiceImpl" class="com.ostenant.sofa.rpc.example.generic.SampleGenericServiceImpl"/>
<sofa:service ref="sampleGenericServiceImpl" interface="com.ostenant.sofa.rpc.example.generic.SampleGenericService">
<sofa:binding.bolt/>
</sofa:service>
复制代码
客户端引用配置
<sofa:reference id="sampleGenericServiceReference" interface="com.alipay.sofa.rpc.api.GenericService">
<sofa:binding.bolt>
<sofa:global-attrs generic-interface="com.ostenant.sofa.rpc.example.generic.SampleGenericService"/>
</sofa:binding.bolt>
</sofa:reference>
复制代码
在泛化调用过程当中,客户端配置有两点须要注意:
sofa:reference
指向的服务接口须要声明为 SOFA-RPC 提供的泛化接口 com.alipay.sofa.rpc.api.GenericService
。sofa:global-attrs
须要声明属性 generic-interface
,value 为真实的服务接口名称。服务端启动入口
SpringApplication springApplication = new SpringApplication(SampleGenericServerApplication.class);
ApplicationContext applicationContext = springApplication.run(args);
复制代码
客户端启动入口
SpringApplication springApplication = new SpringApplication(SampleGenericClientApplication.class);
ApplicationContext applicationContext = springApplication.run(args);
复制代码
客户端发起调用
GenericService sampleGenericServiceReference = (GenericService) applicationContext
.getBean("sampleGenericServiceReference");
复制代码
因为客户端没有调用服务的参数类,所以经过 com.alipay.hessian.generic.model.GenericObjectGenericObject
进行描述。
// 准备方法参数
GenericObject genericParam = new GenericObject(
"com.ostenant.sofa.rpc.example.generic.SampleGenericParamModel");
genericParam.putField("name", "Harrison");
复制代码
GenericObject
持有一个 Map<String, Object>
类型的变量,你可以经过 GenericObject
提供的 putField()
方法,将参数类的属性和值放到这个 Map
中,以此来描述参数类。
经过 GenericService
的 $genericInvoke(arg1, agr2, arg3)
方法能够发起服务的泛化调用,各个参数含义以下:
参数 | 含义 | 参数可选 |
---|---|---|
arg1 | 目标方法名称 | 必填 |
arg2 | 参数类型的数组,要求严格遵循前后次序 | 必填 |
arg3 | 参数值的数组,要求与参数类型数组保持一致 | 必填 |
arg4 | 返回值的Class类型 | 可选 |
方式一:
GenericObject genericResult = (GenericObject) sampleGenericServiceReference.$genericInvoke(
// 目标方法名称
"sayGeneric",
// 参数类型名称
new String[] { "com.ostenant.sofa.rpc.example.generic.SampleGenericParamModel" },
// 参数的值
new Object[] { genericParam });
// 验证返回结果
System.out.println("Type: " + genericResult.getType());
System.out.println("Name: " + genericResult.getField("name"));
System.out.println("Value: " + genericResult.getField("value"));
复制代码
方式二:
SampleGenericResultModel sampleGenericResult = sampleGenericServiceReference.$genericInvoke(
// 目标方法名称
"sayGeneric",
// 参数类型名称
new String[] { "com.ostenant.sofa.rpc.example.generic.SampleGenericParamModel" },
// 参数的值
new Object[] { genericParam },
// 返回值的Class类型
SampleGenericResultModel.class);
// 验证返回结果
System.out.println("Type: " + sampleGenericResult.getClass().getName());
System.out.println("Name: " + sampleGenericResult.getName());
System.out.println("Value: " + sampleGenericResult.getValue());
复制代码
查看控制台输出
两种方式输出以下:
Type: com.ostenant.sofa.rpc.example.generic.SampleGenericResultModel
Name: Harrison
Value: sample generic value
复制代码
SOFA-RPC 经过过滤器 Filter 来实现对请求和响应的拦截处理。用户能够自定义 Filter 实现拦截扩展,目前支持 bolt 协议。开发人员经过继承 com.alipay.sofa.rpc.filter.Filter
实现过滤器的自定义。
服务接口与实现类
FilterService.java
public interface FilterService {
String sayFilter(String filter);
}
复制代码
FilterServiceImpl.java
public class FilterServiceImpl implements FilterService {
@Override
public String sayFilter(String filter) {
return filters;
}
}
复制代码
服务端过滤器
在 Filter 实现类中,invoke()
方法实现具体的拦截逻辑,经过 FilterInvoker.invoke(SofaRequest)
触发服务的调用,在该方法先后能够实现具体的拦截处理。
public class SampleServerFilter extends Filter {
@Override
public SofaResponse invoke(FilterInvoker invoker, SofaRequest request) throws SofaRpcException {
System.out.println("SampleFilter before server process");
try {
return invoker.invoke(request);
} finally {
System.out.println("SampleFilter after server process");
}
}
}
复制代码
服务端发布配置
服务端须要配置服务实现类、过滤器,而后在 sofa:service
的 sofa:global-attrs
标签配置 filter
属性,实现二者的绑定。
<bean id="sampleFilter" class="com.ostenant.sofa.rpc.example.filter.SampleServerFilter"/>
<bean id="filterService" class="com.ostenant.sofa.rpc.example.filter.FilterServiceImpl"/>
<sofa:service ref="filterService" interface="com.ostenant.sofa.rpc.example.filter.FilterService">
<sofa:binding.bolt>
<sofa:global-attrs filter="sampleFilter"/>
</sofa:binding.bolt>
</sofa:service>
复制代码
客户端过滤器
public class SampleClientFilter extends Filter {
@Override
public SofaResponse invoke(FilterInvoker invoker, SofaRequest request) throws SofaRpcException {
System.out.println("SampleFilter before client invoke");
try {
return invoker.invoke(request);
} finally {
System.out.println("SampleFilter after client invoke");
}
}
}
复制代码
客户端引用配置
一样的,客户端过滤器须要在 sofa:reference
的 sofa:global-attrs
标签中配置 filter
属性,实现客户端引用类的调用拦截。
<bean id="sampleFilter" class="com.alipay.sofa.rpc.samples.filter.SampleClientFilter"/>
<sofa:reference id="filterServiceReference" interface="com.ostenant.sofa.rpc.example.filter.FilterService">
<sofa:binding.bolt>
<sofa:global-attrs filter="sampleFilter"/>
</sofa:binding.bolt>
</sofa:reference>
复制代码
服务端启动类
SpringApplication springApplication = new SpringApplication(FilterServerApplication.class);
ApplicationContext applicationContext = springApplication.run(args);
复制代码
客户端启动类
SpringApplication springApplication = new SpringApplication(FilterClientApplication.class);
ApplicationContext applicationContext = springApplication.run(args);
复制代码
客户端调用
FilterService filterServiceReference = (FilterService) applicationContext.getBean("filterServiceReference");
try {
// sleep 5s, 便于观察过滤器效果
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String result = filterServiceReference.sayFilter("filter");
System.out.println("Invoke result: " + result);
复制代码
查看拦截输出
SampleFilter before server process
SampleFilter after server process
复制代码
SampleFilter before client invoke
SampleFilter after client invoke
Invoke result: filter
复制代码
过滤器配置生效,总结过滤器拦截前后次序以下:
本文介绍了 SOFA-RPC 的集中调用方式,包括单向调用、同步调用、Future调用、回调,引入了 SOFA-RPC 独有的泛化调用机制,同时对过滤器的配置进行了简单介绍。
欢迎关注技术公众号: 零壹技术栈
本账号将持续分享后端技术干货,包括虚拟机基础,多线程编程,高性能框架,异步、缓存和消息中间件,分布式和微服务,架构学习和进阶等学习资料和文章。