{"error":400,"message":"over quota"} RabbitMQ 之 RPC 初探(Java)——云诺说
云诺说 - 小程序开发 - 软件定制
当前位置: RabbitMQ > RabbitMQ 之 RPC 初探(Java)

RabbitMQ 之 RPC 初探(Java)

2019-08-26 03:49 分类:RabbitMQ 作者:云诺 阅读(1303)

版权声明:本文为博主原创文章,如果转载请给出原文链接:http://doofuu.com/article/4156166.html

什么是RPC?

      RPC是远程过程调用(Remote Procedure Call),大致流程是这样的,

  1. server进入等待client消息状态

  2. client发起远程过程调用,并进入等待server回复状态

  3. server接收到client消息,处理完把结果返回给client

  4. client接收到server返回的消息

  5. rpc过程结束

RabbitMQ RPC是这样的,看图:

QQ截图20190826154657.png

从图可以看出,Client、Server都是双重角色,既是消费者也是生产者,具体过程如下:

  1. Client创建一个RPC队列,并向队列发送消息(注意:在消息属性里要包含请求标识ID和Server处理完成后响应队列名,也就是上图中的correlation_id和reply_to),随之进入等待应答状态

  2. Server接受到消息,并根据请求标识ID,做相应的处理,最后把处理的结果发送到reply_to指定的队列中(注意:要包含correlation_id属性)

  3. Client接收到Server的响应结果,根据请求标识correlation_id区分消息内容再做相应的处理

下面是相关实例代码:

consumer:

package rpc;
 
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
 
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
 
public class RpcConsumer {
 
	private static final String RPC_QUEUE_NAME = "rpc_queue";
 
	public static void main(String[] argv) throws IOException{
		ConnectionFactory factory = new ConnectionFactory();
		//rabbitmq监听IP
		factory.setHost("192.168.249.128");
		//rabbitmq监听默认端口
		factory.setPort(5672);
		//设置访问的用户
		factory.setUsername("test");
		factory.setPassword("test");
 
		Connection connection = null;
		try {
			connection = factory.newConnection();
			final Channel channel = connection.createChannel();
 
			channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
 
 
			System.out.println("等待接受producer消息....");
 
			Consumer consumer = new DefaultConsumer(channel) {
				@Override
				public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
					AMQP.BasicProperties replyProps = new AMQP.BasicProperties
							.Builder()
							.correlationId(properties.getCorrelationId())
							.build();
 
					
					String response = "";
					try {
						String message = new String(body,"UTF-8");
						System.out.println("consumer 接收的消息是:" + message);
						
						response = handleMsg(message);
						
						System.out.println("consumer 发送的消息是:" + response);
					}
					catch (RuntimeException e){
						e.printStackTrace();
					}finally {
						channel.basicPublish( "", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));
						channel.basicAck(envelope.getDeliveryTag(), false);
					}
				}
			};
			channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
		} catch (Exception e) {
			
			connection.close();
			e.printStackTrace();
		} 
	}
	private static String handleMsg(String msg) {
		
		Date date = new Date();
		String response = "";
		switch (msg) {
		case "时间":
			response = new SimpleDateFormat("HH:mm").format(date);
			break;
		case "日期":
			response = new SimpleDateFormat("yyyy年MM月dd日 HH时mm分ss秒").format(date);
			break;
		default:
			response = "未知信息";
			break;
		}
		return response;
	}
 
}

producer:

package rpc;
 
import java.io.IOException;
import java.util.UUID;
 
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
 
 
public class RpcProducer {
	private Connection connection;
	private Channel channel;
	private String requestQueueName = "rpc_queue";
	private String replyQueueName;
	
	
	public static void main(String[] args) throws Exception {
		
		RpcProducer producer = new RpcProducer();
		producer.call("时间");
		producer.call("日期");
		producer.call("什么都不是~");
		
		
		//producer.close();
		
	}
 
	public RpcProducer() throws Exception {
		ConnectionFactory factory = new ConnectionFactory();
		//rabbitmq监听IP
		factory.setHost("192.168.249.128");
		//rabbitmq监听默认端口
		factory.setPort(5672);
		//设置访问的用户
		factory.setUsername("test");
		factory.setPassword("test");
 
		connection = factory.newConnection();
		channel = connection.createChannel();
 
		//生成一个临时的接受队列名
		replyQueueName = channel.queueDeclare().getQueue();
	}
 
	public void call(String message) throws Exception {
		
		//生成一个唯一的字符串
		final String corrId = UUID.randomUUID().toString();
 
		//将corrId、replyQueueName打包发送给consumer
		AMQP.BasicProperties props = new AMQP.BasicProperties
				.Builder()
				.correlationId(corrId)
				.replyTo(replyQueueName)
				.build();
 
		System.out.println("producer 发送的消息是 :" + message);
		channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
 
		channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
				//获取corrId相同的消息
				if (properties.getCorrelationId().equals(corrId)) {
					System.out.println("producer 接收的消息是 :" + new String(body, "UTF-8"));
				}
			}
		});
 
	}
 
	public void close() throws Exception {
		channel.close();
		connection.close();
	}
 
}

运行结果:

QQ截图20190826154721.png

QQ截图20190826154734.png

原理简单易懂,但还是有很多注意的地方(摘抄自RabbitMQ官方文档)

A note on RPC


Although RPC is a pretty common pattern in computing, it's often criticised. The problems arise when a programmer is not aware whether a function call is local or if it's a slow RPC. Confusions like that result in an unpredictable system and adds unnecessary complexity to debugging. Instead of simplifying software, misused RPC can result in unmaintainable spaghetti code.

Bearing that in mind, consider the following advice:

  • Make sure it's obvious which function call is local and which is remote.

  • Document your system. Make the dependencies between components clear.

  • Handle error cases. How should the client react when the RPC server is down for a long time?

When in doubt avoid RPC. If you can, you should use an asynchronous pipeline - instead of RPC-like blocking, results are asynchronously pushed to a next computation stage.


To:RPC是阻塞式的,性能上是很慢的,误用RPC会添加系统的复杂度,使代码变得更加难以维护,所以也给出了几点建议


  • 确定那些function该用本地的, 那些该用RPC

  • 书写清晰的说明文档

  • 优雅的处理好因RPC带来的Error问题,

  • 尽量的使用asynchronous pipeline的方式代替RPC , 能不用就别用

(翻译有点烂 , 见谅~~)


最后附上消息属性的说明:

Message properties


The AMQP protocol predefines a set of 14 properties that go with a message. Most of the properties are rarely used, with the exception of the following:

  • deliveryMode: Marks a message as persistent (with a value of 2) or transient (any other value). You may remember this property from the second tutorial.

  • contentType: Used to describe the mime-type of the encoding. For example for the often used JSON encoding it is a good practice to set this property to: application/json.

  • replyTo: Commonly used to name a callback queue.

  • correlationId: Useful to correlate RPC responses with requests.

 

祝生活愉快!

「创作不易,你的支持是本站持续更新最大的动力!」

赞(0) 打赏

谢谢你请我喝奶茶*^_^*

支付宝
微信
1

谢谢你请我喝奶茶*^_^*

支付宝
微信

上一篇:

下一篇:

共有 0 条评论 - RabbitMQ 之 RPC 初探(Java)

博客简介

云诺说是一个致力于分享互联网编程技术交流、小程序开发、小程序源码分享、软件服务定制和生活记录的技术服务型学习博客网站。

微信 :LGY178888

职业 :小程序开发、软件定制

现居 :广东省-广州市-天河区

友情链接

欢迎与本博客交换友情链接,本博客对交换链接的网站没有要求。如果您是本博客的友情链接网站,在遇到网站运行问题时,可以随时联系,我们将免费提供技术类支持! 申请交换友链

站点统计

  • 文章总数:155 篇
  • 草稿数目:0 篇
  • 分类数目:14 个
  • 独立页面:165 个
  • 评论总数:0 条
  • 访问总量: 636770次
  • 最近更新:2024年07月27日