官网英文版学习——RabbitMQ学习笔记(八)Remote procedure call (RPC)

在第四篇学习笔记中,咱们学习了如何使用工做队列在多个工做者之间分配耗时的任务。java

 

可是,若是咱们须要在远程计算机上运行一个函数并等待结果呢?这是另外一回事。这种模式一般称为远程过程调用或RPC。安全

 

在本篇学习笔记中,咱们将使用RabbitMQ构建一个RPC系统:客户机和可伸缩的RPC服务器。因为咱们没有任何值得分发的耗时任务,因此咱们将建立一个返回斐波那契数的虚拟RPC服务。服务器

为了说明如何使用RPC服务,咱们将建立一个简单的客户端类。它将公开一个名为call的方法,该方法发送一个RPC请求并阻塞,直到收到答案:app

FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();
String result = fibonacciRpc.call("4");
System.out.println( "fib(4) is " + result);

 

一般,在RabbitMQ上执行RPC是很容易的。客户端发送请求消息,服务器用响应消息进行响应。为了接收响应,咱们须要向请求发送一个“回调”队列地址。咱们可使用默认队列(在Java客户机中是独占的)。让咱们试一试:dom

callbackQueueName = channel.queueDeclare().getQueue();

BasicProperties props = new BasicProperties
                            .Builder()
                            .replyTo(callbackQueueName)
                            .build();

channel.basicPublish("", "rpc_queue", props, message.getBytes());

// ... then code to read a response message from the callback_queue ...

 

消息属性ide

 

AMQP 0-9-1协议预先定义了一组包含消息的14个属性。大多数属性不多被使用,除了如下状况:函数

 

deliveryMode:将消息标记为持久化(值为2)或瞬变(任何其余值)。您可能还记得第二个教程中的这个属性。学习

 

contentType:用于描述编码的mime类型。例如,对于一般使用的JSON编码,最好将这个属性设置为:application/ JSON。fetch

 

replyTo:一般用于命名回调队列。ui

correlationid:有助于将RPC响应与请求关联起来。

correlationId做用

咱们为每一个请求设置一个惟一值correctionid。用于当队列接收到响应后区分是哪一个请求的响应。稍后,当咱们在回调队列中接收到消息时,咱们将查看此属性,并基于此,咱们将可以将响应与请求匹配。若是咱们看到一个未知的correlationId值,咱们能够安全地丢弃消息—它不属于咱们的请求。

 

咱们的RPC将这样工做:

当客户端启动时,它将建立一个匿名独占回调队列(官方教程建立的是匿名的)。

 

对于RPC请求,客户端发送一条消息,该消息具备两个属性:replyTo,它被设置为回调队列和correlationId,它被设置为每一个请求的惟一值。

 

请求被发送到rpc_queue队列。

 

RPC工做程序(即:server)正在等待该队列上的请求。当出现请求时,它会执行该任务并使用replyTo字段中的队列将结果发送回客户机。

客户端等待回调队列上的数据。当消息出现时,它会检查相关属性。若是它匹配来自请求的值,它将向应用程序返回响应。

服务端代码:

package com.rabbitmq.cn;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class RPCServer {
//	定义一个远程队列名称
	private static final String RPCQUEUENAME = "RPCqueue"; 
//	斐波那契数函数
	private static int fib(int n) {
	    if (n ==0) return 0;
	    if (n == 1) return 1;
	    return fib(n-1) + fib(n-2);
	  }
	public static void main(String[] args) throws IOException, TimeoutException {
		// TODO Auto-generated method stub
//		建立工厂获取链接
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("192.168.10.185");
		factory.setPort(5672);
		factory.setPassword("123456");
		factory.setUsername("admin");
		Connection connection = null;
		try{
//		得到链接
		connection = factory.newConnection();
//		建立队列
		Channel channel = connection.createChannel();
//		声明一个远程的消息队列
		channel.queueDeclare(RPCQUEUENAME, false, false, false, null);
//		为了减轻服务器负担,当多个服务一同工做时,能够设置以下参数
		channel.basicQos(1);
		System.out.println(" [x] Awaiting RPC requests");
//		执行客户端发送的请求任务
		Consumer consumer = new DefaultConsumer(channel){
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope,
					BasicProperties properties, byte[] body) throws IOException {
//				设置返回的消息属性
				AMQP.BasicProperties replyPros = new BasicProperties().builder()
						.correlationId(properties.getCorrelationId()).build();
				String response = "";
				try {
//				对传递过来的文字版数字解析,解析后调用函数处理,处理之后的结果做为返回值准备返回给客户端
					String message = new String(body, "utf-8");
					int n = Integer.parseInt(message);
					System.out.println(" [.] fib(" + message + ")");
		            response += fib(n);
				} catch (Exception e) {
					// TODO: handle exception
				}finally{
//				返回处理后的结果给客户端
					channel.basicPublish("", properties.getReplyTo(), replyPros, response.getBytes());
					channel.basicAck(envelope.getDeliveryTag(), false);
//			    RabbitMq consumer worker thread notifies the RPC server owner thread 
		            synchronized(this) {
		            	this.notify();
				}
			}
		}
	};
	/*The RPC worker (aka: server) is waiting for requests on that queue. 
	When a request appears, it does the job and sends a message with the result back to the Client, 
	using the queue from the replyTo field.*/
// 		Wait and be prepared to consume the message from RPC client.
	  channel.basicConsume(RPCQUEUENAME, false, consumer);
     
      while (true) {
      	synchronized(consumer) {
      		try {
      			consumer.wait();
      	    } catch (InterruptedException e) {
      	    	e.printStackTrace();	    	
      	    }
      	}
      }}catch (IOException | TimeoutException e) {
          e.printStackTrace();
      }
      finally {
        if (connection != null)
          try {
            connection.close();
          } catch (IOException _ignore) {}
    	  
      }
}}

服务端代码过程显示,一般咱们从创建链接、通道和声明队列开始。

 

咱们可能但愿运行多个服务器进程。为了将负载均匀地分布到多个服务器上,咱们须要在channel.basicQos中设置prefetchCount设置。

 

咱们使用basicconsumption访问队列,在队列中咱们以对象(DefaultConsumer)的形式提供回调,该对象将执行该工做并将响应发送回。

客户端代码:

package com.rabbitmq.cn;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;

public class RPCClient {

  private Connection connection;
  private Channel channel;
  private String requestQueueName = "RPCqueue";
  private String replyQueueName;

  public RPCClient() throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("192.168.10.185");
    factory.setPort(5672);
    factory.setUsername("admin");
    factory.setPassword("123456");
    
    connection = factory.newConnection();
    channel = connection.createChannel();
//  建立临时队列
    replyQueueName = channel.queueDeclare().getQueue();
  }

  public String call(String message) throws IOException, InterruptedException {
//	 经过uuid生成请求段的correctionId
    final String corrId = UUID.randomUUID().toString();
//	设置correctionId和replyTo属性
    AMQP.BasicProperties props = new AMQP.BasicProperties
            .Builder()
            .correlationId(corrId)
            .replyTo(replyQueueName)
            .build();
//	发送请求到请求队列中
    channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
//	定义一个阻塞的消息队列,来挂起线程等待相应
    final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);
//	消费消息
    channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        if (properties.getCorrelationId().equals(corrId)) {
          response.offer(new String(body, "UTF-8"));
        }
      }
    });

    return response.take();
  }

  public void close() throws IOException {
    connection.close();
  }

  public static void main(String[] argv) {
    RPCClient fibonacciRpc = null;
    String response = null;
    try {
//    经过构造函数获取链接,并建立一个临时匿名的队列
      fibonacciRpc = new RPCClient();

      System.out.println(" [x] Requesting fib(30)");
      response = fibonacciRpc.call("30");
      System.out.println(" [.] Got '" + response + "'");
    }
    catch  (IOException | TimeoutException | InterruptedException e) {
      e.printStackTrace();
    }
    finally {
      if (fibonacciRpc!= null) {
        try {
          fibonacciRpc.close();
        }
        catch (IOException _ignore) {}
      }
    }
  }
}

 

客户端代码稍微复杂一些:

 

咱们创建一个链接和通道,并为回复声明一个独占的“回调”队列。

 

咱们订阅“回调”队列,以便接收RPC响应。

 

咱们的调用方法生成实际的RPC请求。

 

在这里,咱们首先生成一个惟一的correlationId号并保存它——咱们在DefaultConsumer中实现的handleDelivery将使用这个值来捕获适当的响应。

 

接下来,咱们发布请求消息,有两个属性:replyTo和correlationId。

 

此时,咱们能够坐下来等待合适的答复。

因为咱们的消费者交付处理是在一个单独的线程中进行的,因此在响应到达以前,咱们须要一些东西来挂起主线程。使用BlockingQueue是一种可能的解决方案。这里咱们建立了ArrayBlockingQueue,它的容量设置为1,由于咱们须要等待一个响应。

 

handleDelivery方法作的是一项很是简单的工做,对于每一个消耗的响应消息,它检查correlationId是不是咱们要查找的那个。若是是,它将响应放置到BlockingQueue。

 

与此同时,主线程正在等待响应从BlockingQueue接收。

最后,咱们将响应返回给用户。

运行后,咱们获得结果

相关文章
相关标签/搜索