SpringCloud(17)之SpringCloud Stream

news/发布时间2024/9/20 5:37:28

一、Spring Cloud Stream介绍

        Spring Cloud Stream是一个框架,用于构建与共享消息系统连接的高度可扩展的事件驱动微服务。该框架提供了一个灵活的编程模型,该模型建立在已经建立和熟悉的Spring习惯用法和最佳实践之上,包括对持久发布/子语义、使用者组和有状态分区的支持。  

        它可以基于 Spring Boot来创建独立的、可用于生产的  Spring应用程序,Spring Cloud Stream为一些供应商的消息   中间件产品提供了个性化的自动化配置实现,并引入了发布-订阅、消费组、分区这三个核心概念。通   过使用 Spring Cloud Stream ,可以有效简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。目前 Spring Cloud Stream 支持 RabbitMQ  Kafka 自动化配置。

        目前Spring Cloud Stream只适配以下中间件信息:

二、Spring Cloud Stream 工作流程

        Spring Cloud Stream应用程序由一个与中间件无关的核心组成。应用程序通过在外部代理公开的目的地和代码中的输入/输出参数之间建立绑定来与外部世界通信。建立绑定所需的特定于Broker的详细信息由特定于中间件的Binder实现来处理。

        通过Stream可以很好的屏蔽各个中间件的API差异,它统一了API,生产者通过OUTPUT向消息中间件发 送消息,此时并不需要关心消息中间件是Kafka还是RabbitMQ,不需要关注他们的API,只需要用到Stream的API,这样可以降低学习成本。消费方通过INPUT消费指定的消息,也不需要关注消息中间件 API,架构图如上图: 

        我们对上图的对象进行说明:

  • Application Core:生产者、消费者;
  • inputs:消费者;
  • ouputs:生产者;
  • Binder:绑定器,主要和消息中间件进行绑定操作;
  • Middleware:消息中间件服务;

        

        我们项目中真正应用到Stream,只需要按照如上流程图操作即可;

 生产者:

        1:使用Source绑定消息的输出管道。

        2:通过MessageChannel输出消息。

        3:通过@EnableBinding开启binder,将生产者绑定到指定的MQ服务。

消费者:      

        1:通过@EnableBinding绑定到MQ。

        2:通过Sink绑定到输入数据管道。

        3:@StreamListener监听指定管道数据。

 2.1 Spring Cloud Stream 实战

        

        如上图,当用户行程结束,用户需进入支付操作,当用户支付完成时,我们需要更新订单状态,此时我 们可以让支付系统将支付状态发送到MQ中,订单系统订阅MQ消息,根据MQ消息修改订单状态。我们 将使用 SpringCloud Stream实现该功能。

2.1.1 生产者

1)引入依赖 

   hailtaxi-pay 中引入依赖:

        <!--stream--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency>

2) 配置MQ服务

 修改 hailtaxi-pay  application.yml 添加如下配置:

server:port: 18083
spring:application:name: hailtaxi-paycloud:#Consul配置consul:host: localhostport: 8500discovery:#注册到Consul中的服务名字service-name: ${spring.application.name}#Streamstream:binders: # 在此处配置要绑定的rabbitmq的服务信息;defaultRabbit: # 表示定义的名称,用于于binding整合type: rabbit # 消息组件类型environment: # 设置rabbitmq的相关的环境配置spring:rabbitmq:host: 192.168.211.145port: 5672username: guestpassword: guestbindings: # 服务的整合处理output: # 这个名字是一个通道的名称destination: payExchange # 表示要使用的Exchange名称定义content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”binder: defaultRabbit  # 设置要绑定的消息服务的具体设置

3)消息输出管道绑定 

/**** 负责向MQ发送消息*/
@EnableBinding(Source.class)
public class MessageSender {@Resourceprivate MessageChannel output;//消息发送管道/**** 发送消息* @param message* @return*/public Boolean send(Object message) {//消息发送boolean bo = output.send(MessageBuilder.withPayload(message).build());System.out.println("*******send message: "+message);return bo;}
}

 参数说明:

Source.class:绑定一个输出消息管道Channel。

MessageChannel:发送消息对象,默认是DirectWithAttributesChannel,发消息在 AbstractMessageChannel中完成。

MessageBuilder.withPayload:构建消息。

        此时大家可能会有一个疑问?如果我们多个channel,在rabbitMQ中就是说我一个服务有多个交换机该怎么办?

        我们来看下 Source.class里面定义的内容是什么,定义的内容如下:

public interface Source {String OUTPUT = "output";@Output("output")MessageChannel output();
}

        所以说如果此时我们要新的管道的话,我们就可以参考Source来定义新的类,然后OUTPUT就定义新的管道名称,然后再配置文件中我们就定义这个新的管道名称。 

4)消息发送 

  com.itheima.pay.controller.TaxiPayController 中创建支付方法用于发送消息,代码如下:

    /**** 支付  http://localhost:18083/pay/wxpay/1* @return*/@GetMapping(value = "/wxpay/{id}")public TaxiPay pay(@PathVariable(value = "id")String id){//支付操作TaxiPay taxiPay = new TaxiPay(id,310,3);//发送消息messageSender.send(taxiPay);return taxiPay;}

2.1.2 消费者 

1)修改配置 

 修改 hailtaxi-order 的核心配置文件 application.yml ,在文件中配置要监听MQ信息:

server:port: 18082
spring:application:name: hailtaxi-orderzipkin:#zipkin服务地址base-url: http://localhost:9411sleuth:sampler:probability: 1  #采样值,0~1之间,1表示全部信息都手机,值越大,效率越低cloud:#Consul配置consul:host: localhostport: 8500discovery:#注册到Consul中的服务名字service-name: ${spring.application.name}#Streamstream:binders: # 在此处配置要绑定的rabbitmq的服务信息;defaultRabbit: # 表示定义的名称,用于于binding整合type: rabbit # 消息组件类型environment: # 设置rabbitmq的相关的环境配置spring:rabbitmq:host: 192.168.211.145port: 5672username: guestpassword: guestbindings: # 服务的整合处理input: # 这个名字是一个通道的名称destination: payExchange # 表示要使用的Exchange名称定义content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”binder: defaultRabbit  # 设置要绑定的消息服务的具体设置group: paygroup #所属分组

2)消息监听 

  hailtaxi-order 中创建消息监听对象 com.itheima.order.mq.MessageReceiver ,代码如下:

@EnableBinding(Sink.class)
public class MessageReceiver {@Value("${server.port}")private String port;/***** 消息监听* @param message*/@StreamListener(Sink.INPUT)public void receive(String message) {System.out.println("消息监听(增加用户积分、修改订单状态)-->" + message+"-->port:"+port);}
}

参数说明:

Sink.class:绑定消费者管道。

@StreamListener(Sink.INPUT):监听消息配置,指定了消息为application中的input


1.3 消息分组

         消息分组有2个好处,分别是集群合理消费、数据持久化。

 1.3.1集群消费下的分组

1)分组的意义

        分组在项目中是有非常重大的意义,通常应用于消息并发高、消息堆积的场景,这些场景服务消费方通 常会做集群操作,一旦做集群操作,我们又需要项目中的消费者合理消费,比如用户打车支付完成后, 我们需要增加用户积分同时修改订单状态,如果集群环境中有2台服务器都执行该消费操作,此时用户  积分会增加两次,就会造成非幂等问题。 

 

        此时集群中相同服务应该属于同一个组,同一个组中只允许有一个足节点消费某一个信息,这样就可以 避免费幂等问题的出现。

2)分组实战 

        新增一个 hailtaxi-order消费者节点:

  

        此时运行起来,  18082  18182 节点会同时消费所有数据。 

        修改 hailtaxi-order 的核心配置文件 application.yml ,添加分组: 

 

        此时再次测试,可以发现消费者不会重复消费数据。 

1.3.2 数据持久化

        我们把分组去掉,停掉 hailtaxi-order 服务,然后请求 http://localhost:18083/pay/wxpay/1 送数据,发送完数据后,再启动 hailtaxi-order服务,此时发现没有数据可以消费,这是因为数据没 有持久化,是一种广播模式,如果需要数据持久化,得给每个消费节点添加group组即可。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.bcls.cn/wPSg/9959.shtml

如若内容造成侵权/违法违规/事实不符,请联系编程老四网进行投诉反馈email:xxxxxxxx@qq.com,一经查实,立即删除!

相关文章

《读写算》杂志社读写算杂志社2024年第2期目录

教育资讯 教育部印发通知部署&#xff1a;做好2024年寒假期间校外培训治理工作 1《读写算》投稿&#xff1a;cn7kantougao163.com 北京提升学校心理健康工作水平——每校至少配备一名专职心理健康教育教师 1 湖北孝感&#xff1a;2026年达成小学毕业时人人会游泳 2…

C语言编程安全规范

目的 本规范旨在加强编程人员在编程过程中的安全意识&#xff0c;建立编程人员的攻击者思维&#xff0c;养成安全编码的习惯&#xff0c;编写出安全可靠的代码。 2 宏 2.1 用宏定义表达式时&#xff0c;要使用完备的括号 2.2 使用宏时&#xff0c;不允许参数发生变化 3 变量 …

关于页表,页号,物理块号的例题

课本上的图解 题目:在分页系统中地址结构长度为16位&#xff0c;页面大小为2K&#xff0c;作业地址空间为6K&#xff0c;该作业的各页依次存放在2、3、6号物理块中&#xff0c;相对地址2500处有一条指令store 1, 4500&#xff0c;请给出该作业的页表&#xff0c;该指令的物理单…

css5定位

css 一.定位1.概念&#xff08;定位定位模式边位移&#xff09;2.静态位移static&#xff08;不常用&#xff09;3.相对定位relative&#xff08;不脱标&#xff09;&#xff08;占位置&#xff09;4.绝对定位absolute&#xff08;脱标&#xff09;&#xff08;不占位置&#x…

Thomson(汤姆森)简化了其螺旋千斤顶产品的CAD选型配置

线性运动控制解决方案提供商Thomson在其在线工程设计工具中添加了独特的螺旋千斤顶配置和选择工具。新的Thomson螺旋千斤顶产品选型器可帮助设计工程师优化和选定螺旋千斤顶&#xff0c;以满足高达100吨的负载应用。 Thomson螺旋千斤顶产品系列负责人Mitch Katona说&#xff1…

Java学习--学生管理系统(残破版)

代码 Main.java import java.util.ArrayList; import java.util.Scanner;public class Main {public static void main(String[] args) {ArrayList<Student> list new ArrayList<>();loop:while (true) {System.out.println("-----欢迎来到阿宝院校学生管理系…

docker (十二)-私有仓库

docker registry 我们可以使用docker push将自己的image推送到docker hub中进行共享&#xff0c;但是在实际工作中&#xff0c;很多公司的代码不能上传到公开的仓库中&#xff0c;因此我们可以创建自己的镜像仓库。 docker 官网提供了一个docker registry的私有仓库项目&#…

MySQL的21个SQL经验

1. 写完SQL先explain查看执行计划(SQL性能优化) 日常开发写SQL的时候,尽量养成这个好习惯呀:写完SQL后,用explain分析一下,尤其注意走不走索引。 explain select userid,name,age from user where userid =10086 or age =18;2、操作delete或者update语句,加个limit(S…

C#之WPF学习之路(5)

目录 内容控件&#xff08;2&#xff09; TextBlock文字块 TextBox文本框 TextBoxBase基类 TextBox控件 RichTextBox富文本框 ToolTip控件&#xff08;提示工具&#xff09; Popup弹出窗口 Image图像控件 属性成员 事件成员 内容控件&#xff08;2&#xff09; Tex…

Maven【4】(继承)(命令行操作)

文章目录 【1】基础概念【2】继承的作用【3】创建父工程和子工程【4】在父工程中统一管理依赖 【1】基础概念 说到继承&#xff0c;我们很容易想到Java中的继承&#xff0c;有子类和父类&#xff0c;子类继承父类&#xff0c;那么我们maven中的继承是什么呢&#xff1f; Maven…

海外媒体推广发稿平台之选快速提升品牌知名度的8个方法-华媒舍

随着全球化趋势的加深&#xff0c;海外市场对于企业来说变得越来越重要。在海外媒体上宣传品牌是提升知名度和开拓新客户的关键步骤。本文将为您介绍8个使用海外媒体推广发稿平台来快速提升品牌知名度的秘籍。 1. 选择可信赖的平台 在选择推广发稿平台时&#xff0c;首先要确保…

化妆品汞含量检测FDA 21 CFR700.13 标准

化妆品汞含量检测FDA 21 CFR700.13 标准 化妆品汞含量检测FDA 21 CFR700.13 标准 汞是一种有毒的元素&#xff0c;在化妆品中是一种禁用的成分。然而&#xff0c;汞在一些非正规的产品中仍然存在&#xff0c;给消费者的健康带来了潜在的危害。因此&#xff0c;对于化妆品生产企…

正向代理的反爬虫与防DDoS攻击:保护网站免受恶意行为

目录 前言 一、正向代理的原理 二、正向代理的反爬虫功能 1. IP地址隐藏 2. 请求多样化 三、正向代理的防DDoS攻击功能 1. 均衡负载 2. IP过滤 结论 前言 在当前互联网环境下&#xff0c;网站常常受到各种恶意行为的侵袭&#xff0c;其中包括爬虫和DDoS攻击。这些行为…

Android java基础_异常

一.异常的概念 在Java中&#xff0c;异常&#xff08;Exception&#xff09;是指程序执行过程中可能出现的不正常情况或错误。它是一个事件&#xff0c;它会干扰程序的正常执行流程&#xff0c;并可能导致程序出现错误或崩溃。 异常在Java中是以对象的形式表示的&#xff0c;…

我的NPI项目之设备系统启动(八) -- Android14的GKI2.0开发步骤和注意事项

GKI是什么&#xff1f; Google为什么要推行GKI&#xff1f; GKI全称General Kernel Image。GKI在framework和kernel之间提供了标准接口&#xff0c;使得android OS能够轻松适配/维护/兼容不同的设备和linux kernel。 Google引入GKI的目的是将Framework和Kernel进一步的解耦。因…

Peter算法小课堂—动态规划

Peter来啦&#xff0c;好久没有更新了呢 今天&#xff0c;我们来讨论讨论提高组的动态规划。 动态规划 动态规划有好多经典的题&#xff0c;有什么背包问题、正整数拆分、杨辉三角……但是&#xff0c;如果考到陌生的题&#xff0c;怎么办呢&#xff1f;比如说2000年提高组的…

Nacos配置

目录 启动nacos 项目步骤 Nacos服务分级存储模型​编辑 服务跨域集群调用问题 NacosRule负载均衡 服务实例的权重设置 环境隔离-namespace Nacos环境隔离 Nacos和Eureak对比 临时实例和非临时实例 Ncaos与Eureka的共同点 Nacos与Eureka的区别 Nacos配置管理 统一配…

我承认,我低估鸿蒙了 !

2019年&#xff0c;鸿蒙刚出来的时候&#xff0c;我心里是有点犯嘀咕的&#xff0c;虽然很支持国产操作系统&#xff0c;但是我知道&#xff0c;开发操作系统也许不难&#xff0c;但是建立一个全新的生态太难了&#xff01; 如果操作系统中缺乏应用程序&#xff0c;就不会有人…

排序——希尔排序

希尔排序 希尔排序步骤 希尔排序的核心还是插入排序&#xff0c;但是把插入排序分成两部分&#xff0c;1.预排序2.插入排序。先对原数组进行预排序&#xff0c;使数组接近有序&#xff08;让更大的数字和更小的数字更快的分配到两边&#xff09;&#xff0c;然后再对已经接近有…

模拟算法题练习(二)(DNA序列修正、无尽的石头)

目录 &#xff08;一、DNA序列修正&#xff09; 问题分析 方法实现 时间复杂度和空间复杂度分析 &#xff08;二、无尽的石头&#xff09; &#xff08;一、DNA序列修正&#xff09; 问题描述 在生物学中&#xff0c;DNA序列的相似性常被用来研究物种间的亲缘关系。现在我…
推荐文章