RPC是远程过程调用(Remote Procedure Call),大致流程是这样的,
RabbitMQ RPC是这样的,看图:
package rpc; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class RpcConsumer { private static final String RPC_QUEUE_NAME = "rpc_queue"; public static void main(String[] argv) throws IOException{ ConnectionFactory factory = new ConnectionFactory(); //rabbitmq监听IP factory.setHost(""); //rabbitmq监听默认端口 factory.setPort(5672); //设置访问的用户 factory.setUsername("test"); factory.setPassword("test"); Connection connection = null; try { connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); System.out.println("等待接受producer消息...."); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { AMQP.BasicProperties replyProps = new AMQP.BasicProperties .Builder() .correlationId(properties.getCorrelationId()) .build(); String response = ""; try { String message = new String(body,"UTF-8"); System.out.println("consumer 接收的消息是:" + message); response = handleMsg(message); System.out.println("consumer 发送的消息是:" + response); } catch (RuntimeException e){ e.printStackTrace(); }finally { channel.basicPublish( "", properties.getReplyTo(), replyProps, response.getBytes("UTF-8")); channel.basicAck(envelope.getDeliveryTag(), false); } } }; channel.basicConsume(RPC_QUEUE_NAME, false, consumer); } catch (Exception e) { connection.close(); e.printStackTrace(); } } private static String handleMsg(String msg) { Date date = new Date(); String response = ""; switch (msg) { case "时间": response = new SimpleDateFormat("HH:mm").format(date); break; case "日期": response = new SimpleDateFormat("yyyy年MM月dd日 HH时mm分ss秒").format(date); break; default: response = "未知信息"; break; } return response; } }
package rpc; import java.io.IOException; import java.util.UUID; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class RpcProducer { private Connection connection; private Channel channel; private String requestQueueName = "rpc_queue"; private String replyQueueName; public static void main(String[] args) throws Exception { RpcProducer producer = new RpcProducer(); producer.call("时间"); producer.call("日期"); producer.call("什么都不是~"); //producer.close(); } public RpcProducer() throws Exception { ConnectionFactory factory = new ConnectionFactory(); //rabbitmq监听IP factory.setHost(""); //rabbitmq监听默认端口 factory.setPort(5672); //设置访问的用户 factory.setUsername("test"); factory.setPassword("test"); connection = factory.newConnection(); channel = connection.createChannel(); //生成一个临时的接受队列名 replyQueueName = channel.queueDeclare().getQueue(); } public void call(String message) throws Exception { //生成一个唯一的字符串 final String corrId = UUID.randomUUID().toString(); //将corrId、replyQueueName打包发送给consumer AMQP.BasicProperties props = new AMQP.BasicProperties .Builder() .correlationId(corrId) .replyTo(replyQueueName) .build(); System.out.println("producer 发送的消息是 :" + message); channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8")); channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //获取corrId相同的消息 if (properties.getCorrelationId().equals(corrId)) { System.out.println("producer 接收的消息是 :" + new String(body, "UTF-8")); } } }); } public void close() throws Exception { channel.close(); connection.close(); } }
A note on RPC
Although RPC is a pretty common pattern in computing, it's often criticised. The problems arise when a programmer is not aware whether a function call is local or if it's a slow RPC. Confusions like that result in an unpredictable system and adds unnecessary complexity to debugging. Instead of simplifying software, misused RPC can result in unmaintainable spaghetti code.
Bearing that in mind, consider the following advice:
Make sure it's obvious which function call is local and which is remote.
Document your system. Make the dependencies between components clear.
Handle error cases. How should the client react when the RPC server is down for a long time?
When in doubt avoid RPC. If you can, you should use an asynchronous pipeline - instead of RPC-like blocking, results are asynchronously pushed to a next computation stage.
确定那些function该用本地的, 那些该用RPC
尽量的使用asynchronous pipeline的方式代替RPC , 能不用就别用
(翻译有点烂 , 见谅~~)
Message properties
The AMQP protocol predefines a set of 14 properties that go with a message. Most of the properties are rarely used, with the exception of the following:
deliveryMode: Marks a message as persistent (with a value of 2) or transient (any other value). You may remember this property from the second tutorial.
contentType: Used to describe the mime-type of the encoding. For example for the often used JSON encoding it is a good practice to set this property to: application/json.
replyTo: Commonly used to name a callback queue.
correlationId: Useful to correlate RPC responses with requests.
