RabbitMQ学习整理————基于RabbitMQ实现RPC

news/发布时间2024/5/15 21:07:05

基于RabbitMQ实现RPC

  • 前言
  • 什么是RPC
  • RabbitMQ如何实现RPC
  • RPC简单示例
  • 通过Spring AMQP实现RPC

前言

这边参考了RabbitMQ的官网,想整理一篇关于RabbitMQ实现RPC调用的博客,打算把两种实现RPC调用的都整理一下,一个是使用官方提供的一个Java client,还有一个是Spring AMQP的整合使用。
代码路径:https://github.com/yzh19961031/blogDemo/tree/master/rabbitmq

什么是RPC

RPC是远程过程调用(Remote Procedure Call)的缩写形式,简单说就是一个节点去请求另一个节点上面的服务并获得响应结果。
我们之前总结的工作模式都是发送消息到指定的队列,再由相关的消费者进行消费,如果存在这样的场景,比如消费者消费完消息需给生产者一个具体的响应,然后生产者再根据这个响应进行其他的业务逻辑,这样就需要使用到RabbitMQ提供的RPC能力。

RabbitMQ如何实现RPC

官方有很详细的介绍文档,这边贴一下地址:https://www.rabbitmq.com/tutorials/tutorial-six-java.html
RabbitMQ实现RPC很简单,正常的流程就是请求以及响应,我们只需要在请求的消息的属性里面添加一个响应队列的地址,这边需要使用到一个BasicProperties这个类。具体配置如下:

// 指定一个回调队列
callbackQueueName = channel.queueDeclare().getQueue();
// 设置replyTo的属性为指定的回调队列
BasicProperties props = new BasicProperties.Builder().replyTo(callbackQueueName).build();channel.basicPublish("", "rpc_queue", props, message.getBytes());

BasicProperties这个类中提供了很多的属性,有14个,很多基本上很少用到,常用的就是几个,我这边也贴一下,其实在我上一篇文章中基于RabbitMQ实现的一个RPC工具里面都有用到这些属性。

  1. contentType 这个属性用来表明消息的类型,默认是"application/octet-stream"这种流的类型,还有常用的比如"application/json","text/plain"等,这些在我的RPC工具里面都有用到。
  2. replyTo 这个就是上面指定的回调队列。
  3. correlationId 这个id可以用来进行消息的确认,将相应与请求相关联。主要是可以确认服务端收到的消息是不是指定客户端发过来的,用于确认。

首先先贴一张官方提供的图,这个是RabbitMQ实现RPC的主要工作流程:
在这里插入图片描述
实现RPC的具体工作流程:

  1. 首先客户端发送一个请求消息,这个请求消息里面有两个属性,一个是replyTo回调队列的地址,一个是correlationId用于标识当前消息唯一的id信息。
  2. 这个消息是发送到指定的rpc_queue这个队列上面。
  3. 对应我们的服务端Server就会等待rpc_queue上面的请求消息,当请求消息来得时候,服务端会进行处理,处理完成会将相应的消息再发送到请求消息属性中的replyTo回调的队列上面。
  4. 客户端发送消息之后,会等待replyTo队列中的消息。当有消息来得时候,会检查响应消息中correlationId属性和请求消息中correlationId是否一致,完成一次PRC调用。

RPC简单示例

我这边根据官网上面提供的例子简单修改整理了一下,这边提供一个大小写转换的功能,就是客户端发送一段小写的字符串,服务端将字符串转为大写再响应过来。详细逻辑可以看下代码中注释,具体代码如下:
首先服务端:

/*** RPC服务端** @author yuanzhihao* @since 2020/11/21*/
public class RPCServer {public static void main(String[] args) throws IOException, TimeoutException {// 首先还是正常获得connection以及channel对象ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.1.108");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();// 定义一个rpc的队列String queueName = "test_rpc";channel.queueDeclare(queueName, false, false, false, null);Object monitor = new Object();// 具体的消费代码里面实现DeliverCallback deliverCallback = (consumerTag, delivery) -> {// 消费者将请求消息中的correlationId信息再作为响应传回replyTo队列AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder().correlationId(delivery.getProperties().getCorrelationId()).build();String response = "";try {// 提供一个大小写转换的方法String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println("toUpperCase(" + message + ")");response = toUpperCase(message);} catch (RuntimeException e) {System.out.println(e.toString());} finally {// 将响应传回replyTo队列channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes(StandardCharsets.UTF_8));// 设置了手动应答 需要手动确认消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);// 执行完成会释放主线程的锁// RabbitMq consumer worker thread notifies the RPC server owner threadsynchronized (monitor) {monitor.notify();}}};// 监听"test_rpc"队列channel.basicConsume(queueName, false, deliverCallback, (consumerTag -> { }));// 这个锁对象是确保我们server的调用逻辑执行完成 首先挂起主线程// Wait and be prepared to consume the message from RPC client.while (true) {synchronized (monitor) {try {monitor.wait();} catch (InterruptedException e) {e.printStackTrace();}}}}// 提供一个大小写转换的方法private static String toUpperCase(String msg) {return msg.toUpperCase();}
}

客户端:

/*** RPC客户端** @author yuanzhihao* @since 2020/11/21*/public class RPCClient {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {// 创建connection以及channel对象ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.1.108");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");try ( Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel()) {// 声明一个队列String queueName = "test_rpc";// 请求消息中需要带一个唯一标识ID String corrId = UUID.randomUUID().toString();// 声明一个回调队列String replayQueueName = channel.queueDeclare().getQueue();// 将correlationId以及回调队列设置在消息的属性中AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replayQueueName).build();// 具体消息内容String msg = "hello rpc";// 发送请求消息channel.basicPublish("",queueName,properties,msg.getBytes());// 设置一个阻塞队列  等待服务端的响应final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);String ctag = channel.basicConsume(replayQueueName, true, (consumerTag, message) -> {// 注意 这边根据correlationId进行下判断if (message.getProperties().getCorrelationId().equals(corrId)) {response.offer(new String(message.getBody(), StandardCharsets.UTF_8));}}, consumerTag -> {});// 获取响应结果String take = response.take();System.out.println("rpc result is "+ take);channel.basicCancel(ctag);}}
}

执行代码,具体的客户端与服务端运行结果在这里插入图片描述 在这里插入图片描述

通过Spring AMQP实现RPC

通过Spring来实现RPC也很简单,主要通过spring提供的一个RabbitTemplate对象中sendAndReceive方法来实现,这个方法是发送消息然后一直等待响应。监听器里面实现的和之前的逻辑大致相同,都需要将response响应消息发送到对应的replyTo回调队列上。下面直接贴一下代码。
首先是服务端,我这边直接是使用配置类的形式,具体一些的配置项可以参考下我之前的那篇博客或者上网搜一下~

/*** 主配置类** @author yuanzhihao* @since 2021/1/9*/
@Configuration
public class RabbitMQConfig {private static final Logger log = LoggerFactory.getLogger(RabbitMQConfig.class);// 注入connectionFactory对象@Beanpublic ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setAddresses("192.168.1.108:5672");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");connectionFactory.setVirtualHost("/");return connectionFactory;}// 声明队列@Beanpublic Queue rpcQueue() {return new Queue("test_rpc",false);}@Beanpublic RabbitTemplate rabbitTemplate() {return new RabbitTemplate(connectionFactory());}// 创建初始化RabbitAdmin对象@Beanpublic RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);// 只有设置为 true,spring 才会加载 RabbitAdmin 这个类rabbitAdmin.setAutoStartup(true);return rabbitAdmin;}// 消息监听器@Beanpublic SimpleMessageListenerContainer simpleMessageListenerContainer(RabbitTemplate rabbitTemplate) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());// 监听的队列container.setQueues(rpcQueue());MessageListener messageListener = message -> {String receiveMsg = new String(message.getBody(), StandardCharsets.UTF_8);log.info("Receive a message message is {}", receiveMsg);// 执行对应逻辑String responseMsg = toUpperCase(receiveMsg);MessageProperties messageProperties = MessagePropertiesBuilder.newInstance().setCorrelationId(message.getMessageProperties().getCorrelationId()).build();// 响应消息 这边就是如果没有绑定交换机和队列的话 消息应该直接传到对应的队列上面rabbitTemplate.send("", message.getMessageProperties().getReplyTo(), new Message(responseMsg.getBytes(StandardCharsets.UTF_8), messageProperties));};// 设置监听器container.setMessageListener(messageListener);return container;}// 提供一个大小写转换的方法private String toUpperCase(String msg) {return msg.toUpperCase();}
}

客户端我采用test单元测试的形式

/*** spring amqp rpc 测试类** @author yuanzhihao* @since 2021/1/9*/
@ContextConfiguration(classes = {RabbitMQConfig.class})
@RunWith(SpringRunner.class)
public class RabbitMQRpcTest {private static final Logger log = LoggerFactory.getLogger(RabbitMQConfig.class);@Autowiredprivate RabbitTemplate rabbitTemplate;// 测试RPC客户端@Testpublic void testRpcClient() {// 设置correlationIdString corrId = UUID.randomUUID().toString();String msg = "hello rpc";MessageProperties messageProperties = MessagePropertiesBuilder.newInstance().setCorrelationId(corrId).build();// 注意 这边如果使用sendAndReceive不指定replyTo回调队列 spring会默认帮我们添加一个回调队列// 格式默认 "amq.rabbitmq.reply-to" 前缀Message message = rabbitTemplate.sendAndReceive("", "test_rpc", new Message(msg.getBytes(StandardCharsets.UTF_8), messageProperties));log.info("The response is {}", new String(message.getBody(), StandardCharsets.UTF_8));}
}

具体实现可以看下代码的注释
代码执行结果:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.bcls.cn/bgrr/5227.shtml

如若内容造成侵权/违法违规/事实不符,请联系编程老四网进行投诉反馈email:xxxxxxxx@qq.com,一经查实,立即删除!

相关文章

MATLAB环境下一种改进的瞬时频率(IF)估计方法

相对于频率成分单一、周期性强的平稳信号来说&#xff0c;具有非平稳、非周期、非可积特性的非平稳信号更普遍地存在于自然界中。调频信号作为非平稳信号的一种&#xff0c;由于其频率时变、距离分辨率高、截获率低等特性&#xff0c;被广泛应用于雷达、地震勘测等领域。调频信…

递归与回溯(一)

递归 递归一定要有出口&#xff0c;不然会无限调用&#xff0c;死循环 string fun(int n){if(n0)return "a";if(n1)return "b";return fun(n - 1) fun(n - 2); }输出前8种结果&#xff1a; 双写数字递归例子 注意递归的return int doubleNum(int n){i…

element 季度选择器组件

效果图&#xff1a; 回传给父组件的值&#xff1a; 季度选择器组件代码&#xff1a; <template><el-form><el-form-item><markclass"mark"v-show"showSeason"click.stop"showSeason false"></mark><el-input…

OpenAI视频生成Sora技术简析

基本介绍 Sora是春节期间OpenAI发布的产品&#xff0c;主要是通过文字描述生成视频&#xff0c;通过大规模视频数据训练而成的生成模型&#xff0c;当前还没开放试用。官方发布的技术报告&#xff1a;https://openai.com/research/video-generation-models-as-world-simulators…

【开源】JAVA+Vue.js实现超市账单管理系统

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块三、系统设计3.1 总体设计3.2 前端设计3.3 后端设计在这里插入图片描述 四、系统展示五、核心代码5.1 查询供应商5.2 查询商品5.3 新增超市账单5.4 编辑超市账单5.5 查询超市账单 六、免责说明 一、摘要 1.1 项目介绍 基于…

Python进阶学习:json.dumps()和json.dump()的区别

Python进阶学习&#xff1a;json.dumps()和json.dump()的区别 &#x1f308; 个人主页&#xff1a;高斯小哥 &#x1f525; 高质量专栏&#xff1a;Matplotlib之旅&#xff1a;零基础精通数据可视化、Python基础【高质量合集】、PyTorch零基础入门教程 &#x1f448; 希望得到您…

微信小程序开启横屏调试

我们先打开小程序项目 开启真机运行 目前是一个竖屏的 然后打开全局配置文件 app.json 给下面的 window 对象 下面加一个 pageOrientation 属性 值为 landscape 运行结果如下 然后 我们开启真机运行 此时 就变成了个横屏的效果

基于java Springboot实现教务管理系统

基于java Springboot实现教务管理系统《视频版-建议收藏》 博主介绍&#xff1a;5年java开发经验&#xff0c;专注Java开发、定制、远程、文档编写指导等,csdn特邀作者、专注于Java技术领域 作者主页 央顺技术团队 Java毕设项目精品实战案例《1000套》 欢迎点赞 收藏 ⭐留言 文…

nginx(二)

nginx的验证模块 输入用户名和密码 第一步先下载httpd 这个安装包 第二步编辑子配置文件 然后去网页访问192.168.68.3/admin/ 连接之后&#xff0c;会出现404&#xff0c;404出现是因为没给网页写页面 如果要写页面&#xff0c;则在/opt/html&#xff0c;建立一个admin&#x…

【k8s资源调度-StatefulSet】

1、部署对象StatefulSet资源&#xff08;无状态应用&#xff09; StatefulSet针对的是有状态应用&#xff0c;有状态应用会对我们的当前pod的网络、文件系统等有关联。 2、配置文件如下 StatefulSet资源的配置文件粗略如下&#xff0c;如下的配置信息包含了数据卷&#xff0c;…

https://htmlunit.sourceforge.io/

https://htmlunit.sourceforge.io/ 爬虫 HtmlUnit – Welcome to HtmlUnit HtmlUnit 3.11.0 API https://mvnrepository.com/artifact/net.sourceforge.htmlunit/htmlunit/2.70.0 https://s01.oss.sonatype.org/service/local/repositories/releases/content/org/htmlunit…

图搜索基础-深度优先搜索

图搜索基础-深度优先搜索 参考原理引入流程解析手推例子 代码实现运行结果结果分析 参考 理论参考&#xff1a;深蓝学院 实现参考&#xff1a;github项目 原理 引入 对于这样一个图&#xff0c;我们试图找到S到G的通路&#xff1a; 计算机程序不会像人眼一样&#xff0c;一…

鸿蒙OS应用开发之显示图片组件6

前面学习了怎么样让图片合适的大小来显示出来,达到最佳的布局显示图片。现在来学习PixelMap图片显示。PixelMap图片是指图片解码后无压缩的位图,用于图片显示或图片处理。 由于PixelMap图片是一种无压缩的图片,比较适合图片处理,比如从网络上加载图片之后,再进行处理再显示…

Spring Boot与Netty:构建高性能的网络应用

点击下载《Spring Boot与Netty&#xff1a;构建高性能的网络应用》 1. 前言 本文将详细探讨如何在Spring Boot应用中集成Netty&#xff0c;以构建高性能的网络应用。我们将首先了解Netty的原理和优势&#xff0c;然后介绍如何在Spring Boot项目中集成Netty&#xff0c;包括详…

OD(9)之Mermaid序列图(Sequence diagrams)使用详解

OD(8)之Mermaid序列图(Sequence diagrams)使用详解 Author: Once Day Date: 2024年2月21日 漫漫长路才刚刚开始… 全系列文章可参考专栏: Mermiad使用指南_Once_day的博客-CSDN博客 参考文章: 关于 Mermaid | Mermaid 中文网 (nodejs.cn)Mermaid | Diagramming and charti…

Easy-Jmeter: 性能测试平台

目录 写在开始1 系统架构2 表结构设计3 测试平台生命周期4 分布式压测5 压力机管理6 用例管理6.1 新增、编辑用例6.2 调试用例6.3 启动测试6.4 动态控量6.5 测试详情6.6 环节日志6.7 实时数据6.8 测试结果 7 测试记录7 用例分析8 系统部署8.1普通部署8.2容器化部署 写在最后 写…

Python中的functools模块详解

大家好&#xff0c;我是海鸽。 函数被定义为一段代码&#xff0c;它接受参数&#xff0c;充当输入&#xff0c;执行涉及这些输入的一些处理&#xff0c;并根据处理返回一个值&#xff08;输出&#xff09;。当一个函数将另一个函数作为输入或返回另一个函数作为输出时&#xf…

【SpringBoot】Spring常用注解总结

目录 ⭐spring springmvc和springboot的区别 Autowired 和Resource的区别和联系 1. SpringBootApplication 2. Spring Bean 相关 2.1. Autowired 2.2. Component,Repository,Service, Controller 2.3. RestController 2.4. Scope 2.5. Configuration 3. 处理常见的 HT…

基于Springboot的校园求职招聘系统(有报告)。Javaee项目,springboot项目。

演示视频&#xff1a; 基于Springboot的校园求职招聘系统&#xff08;有报告&#xff09;。Javaee项目&#xff0c;springboot项目。 项目介绍&#xff1a; 采用M&#xff08;model&#xff09;V&#xff08;view&#xff09;C&#xff08;controller&#xff09;三层体系结构…

HTTP---------状态码

当服务端返回 HTTP 响应时&#xff0c;会带有一个状态码&#xff0c;用于表示特定的请求结果。比如 HTTP/1.1 200 OK&#xff0c;里面的 HTTP/1.1 表示协议版本&#xff0c;200 则是状态码&#xff0c;OK 则是对状态码的描述。 由协议版本、状态码、描述信息组成的行被称为起始…
推荐文章