https://segmentfault.com/l/15...java
咱们利用消息队列实现了分布式事务的最终一致性解决方案,请你们围观。能够参考源码:https://github.com/vvsuperman…,项目支持网站: http://rabbitmq.org.cn,最新文章或实现会更新在上面git
为了阐述RPC咱们先创建一个客户端接口,它有一个方法,会发起一个RPC请求,并且会一直阻塞直到有结果返回github
FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient(); String result = fibonacciRpc.call("4"); System.out.println( "fib(4) is " + result);
留意RPC
虽然RPC很常见,但必定要很是当心的使用它,假设rpc调用的是一个很是慢的程序,将致使结果不可预料,并且很是难以调试。json
使用RPC时你能够参考下列一些规范segmentfault
用RabbitMQ实现RPC比较简单,客户端发起请求,服务端返回对这个请求的响应。为了实现这个功能咱们须要一个可以"回调"的队列,咱们直接用默认的队列便可服务器
callbackQueueName = channel.queueDeclare().getQueue(); BasicProperties props = new BasicProperties .Builder() .replyTo(callbackQueueName) .build(); channel.basicPublish("", "rpc_queue", props, message.getBytes()); // ... then code to read a response message from the callback_queue ...
AMQP 0-9-1 协议为每一个消息定义了14个属性,不少属性不多会被用到,但咱们要特别留意以下几个app
咱们须要引入相应的包负载均衡
import com.rabbitmq.client.AMQP.BasicProperties;
在前面的方法中咱们为每个RPC请求都生成了一个队列,这是彻底没有必要的,咱们为每个客户端创建一个队列就能够了。dom
这会引发一个新的问题,由于全部的RPC都是用一个队列,一旦有消息返回,你怎么知道返回的消息对应的是哪一个请求呢?因此咱们就用到了Correlation Id,做为每一个请求独一无二的标识,当咱们收到返回值后,会检查这个Id,匹配对应的响应。若是找不到Id所对应的请求,会直接抛弃它。异步
这里你可能会有疑问,为何要抛弃掉未知消息呢?而不是抛出异常啥的。这跟咱们服务端的竞态条件(possibility of a race condition )会有关系。好比假设咱们RabbitMQ服务挂掉了,它刚给咱们回复消息,还没等到回应,服务器就挂掉了,那么当RabbitMQ服务重启时,会重发消息,客户端会收到一条重复的消息,为了冥等性的考虑,咱们须要仔细的处理返回后的处理方式。
RPC工做过程以下
当客户端启动时,它会建立一个独立的匿名回调队列,而后发送RPC请求,这个RPC
请求会带两个属性:replyTo - RPC调用成功后须要返回的队列名称;correlationId - 每一个请求独一无二的标识。RPC服务提供者会等在队列上,一旦有请求到达,它会当即响应,把本身的活干完,而后返回一个结果,根据replyTo返回到对应的队列。而客户端也会等着队列中的信息返回,一旦有一个消息出现,会检查correlationId,将结果返回给响应的请求发起者
Fibonacci级数
private static int fib(int n) { if (n == 0) return 0; if (n == 1) return 1; return fib(n-1) + fib(n-2); }
咱们定义个一个fibonacci级数,只能接受正整数,并且是效率不怎么高的那种。
rpc.java以下所示
import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class RPCServer { private static final String RPC_QUEUE_NAME = "rpc_queue"; public static void main(String[] argv) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = null; try { connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); channel.basicQos(1); System.out.println(" [x] Awaiting RPC requests"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { AMQP.BasicProperties replyProps = new AMQP.BasicProperties .Builder() .correlationId(properties.getCorrelationId()) .build(); String response = ""; try { String message = new String(body,"UTF-8"); int n = Integer.parseInt(message); System.out.println(" [.] fib(" + message + ")"); response += fib(n); } catch (RuntimeException e){ System.out.println(" [.] " + e.toString()); } finally { channel.basicPublish( "", properties.getReplyTo(), replyProps, response.getBytes("UTF-8")); channel.basicAck(envelope.getDeliveryTag(), false); // RabbitMq consumer worker thread notifies the RPC server owner thread synchronized(this) { this.notify(); } } } }; channel.basicConsume(RPC_QUEUE_NAME, false, consumer); // Wait and be prepared to consume the message from RPC client. while (true) { synchronized(consumer) { try { consumer.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } catch (IOException | TimeoutException e) { e.printStackTrace(); } finally { if (connection != null) try { connection.close(); } catch (IOException _ignore) {} } } }
服务端的代码比较直接,首先创建链接,创建channel以及声明队列。咱们以后可能会创建多个消费者,为了更好的负载均衡,须要在channel.basicQos中设置prefetchCount,而后设置一个basicConsume监听队列,提供一个回调函数来处理请求以及返回值
RPCClient.java
import com.rabbitmq.client.*; import java.io.IOException; import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeoutException; public class RPCClient { private Connection connection; private Channel channel; private String requestQueueName = "rpc_queue"; private String replyQueueName; public RPCClient() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); replyQueueName = channel.queueDeclare().getQueue(); } public String call(String message) throws IOException, InterruptedException { String corrId = UUID.randomUUID().toString(); AMQP.BasicProperties props = new AMQP.BasicProperties .Builder() .correlationId(corrId) .replyTo(replyQueueName) .build(); channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8")); final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1); channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { if (properties.getCorrelationId().equals(corrId)) { response.offer(new String(body, "UTF-8")); } } }); return response.take(); } public void close() throws IOException { connection.close(); } //... }
客户端代码以下,咱们创建一个链接,声明一个'callback'队列,咱们将会往'callback'队列提交消息,并接收RPC的返回值,具体步骤以下:
咱们首先生成一个惟一的correlation Id,并保存,咱们将会使用它来区分以后所接受到的信息。而后发出这个消息,消息会包含两个属性: replyTo以及collelationId。由于消费消息是在另一个进程中,咱们须要阻塞咱们的进程直到结果返回,使用阻塞队列BlockingQueue是一种很是好的方式,这里咱们使用了长度为1的ArrayBlockQueue,handleDelivery的功能是检查消息的的correlationId是否是咱们以前所发送的,若是是,将返回值返回到BlockingQueue。此时主线程会等待返回并从ArrayBlockQueue取到返回值
从客户端发起请求
RPCClient fibonacciRpc = new RPCClient(); System.out.println(" [x] Requesting fib(30)"); String response = fibonacciRpc.call("30"); System.out.println(" [.] Got '" + response + "'"); fibonacciRpc.close();
源代码参考RPCClient.java 和 RPCServer.java
编译
javac -cp $CP RPCClient.java RPCServer.java
咱们的rpc服务端好了,启动服务
java -cp $CP RPCServer # => [x] Awaiting RPC requests
为了获取fibonacci级数咱们只须要运行客户端:
java -cp $CP RPCClient # => [x] Requesting fib(30)
以上的实现方式并不是创建RPC请求惟一的方式,可是它有不少优势:若是一个RPC服务过于缓慢,你能够很是方便的水平扩展,只须要增长消费者的个数便可,咱们的代码仍是比较简单的,有些负责的问题并未解决,好比
基础章节的内容到此就结束了,到这里,你就可以基本明白消息队列的基本用法,接下来咱们能够进入中级内容内容的学习了。