kafka消费消息并对消息进行RSA公钥解密

news/发布时间2024/5/15 11:09:13

SpringBoot版本2.x.x 具体是几,忘记了,是支持application.yml或者是application.properties配置的。当然也可以使用Java配置类。

以下是使用Java配置类来配置的。

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;import java.util.HashMap;
import java.util.Map;@Configuration
@EnableKafka
public class KafkaConsumerConfig  {//指定kafka 代理集群地址,多个地址用英文逗号隔开private String bootstrapServers="x.x.x.x:xxxx,x.x.x.x:xxxx";//指定默认消费者group id,消费者监听到的也是这个private String groupId="xxx";//消费者在读取一个没有offset的分区或者offset无效时的策略,默认earliest是从头读,latest不是从头读private String autoOffsetReset="earliest";//是否自动提交偏移量offset,默认为true,一般是false,如果为false,则auto-commit-interval属性就会无效private boolean  enableAutoCommit=true;//自动提交间隔时间,接收到消息后多久会提交offset,前提需要开启自动提交,也就是enable-auto-commit设置为true,默认单位是毫秒(ms),如果写10s,最后加载的显示值为10000ms,需要符合特定时间格式:1000ms,1S,1M,1H,1D(毫秒,秒,分,小时,天)private String autoCommitInterval="1000";//指定消息key和消息体的编解码方式private String keyDeserializerClass="org.apache.kafka.common.serialization.StringDeserializer";private String valueDeserializerClass ="org.apache.kafka.common.serialization.StringDeserializer";//批量消费每次最多消费多少条信息 可以自己根据业务来配置//private String maxPollRecords="50";//协议类型,为SASL类型private String securityProtocol="SASL_PLAINTEXT";//协议private String saslMechanism="SCRAM-SHA-512";//用户名密码配置private String saslJaas="org.apache.kafka.common.security.scram.ScramLoginModule required username=\"xxxx\" password=\"xxxx\";";@BeanConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());//设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG//factory.setBatchListener(false);//这里为true的时候,KafkaConsumer那里需要使用批量消费方法,不然报错return factory;}@Beanpublic ConsumerFactory<String, String> consumerFactory() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass);//props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);props.put(SaslConfigs.SASL_MECHANISM, saslMechanism);props.put(SaslConfigs.SASL_JAAS_CONFIG,saslJaas);return new DefaultKafkaConsumerFactory<>(props);}
}

消息的代码

@Slf4j
@Service
public class KafkaConsumerService {//单条消费@KafkaListener(topics = "xxxx", groupId = "xxxx")public void consume(ConsumerRecord<String, String> record) {String value = record.value();//业务逻辑...}//批量消费@KafkaListener(topics = "xxxx", groupId = "xxxx")public void consume(List<ConsumerRecord<?, ?>> consumerRecords, Acknowledgment ack) {//...//db.batchSave(consumerRecords);//批量插入或者批量更新数据//手动提交ack.acknowledge();}
}

注意点:spring-kafka,和 kafka-clients 的版本兼容问题。

RSA公钥解密:

	/*** 加密算法RSA*/public static final String KEY_ALGORITHM = "RSA";/*** RSA最大解密密文大小*/private static final int MAX_DECRYPT_BLOCK = 128;/*** RSA公钥解密** @param encryptedData 已加密数据(base64编码)* @param publicKey     公钥(BASE64编码)* @return* @throws Exception*/public byte[] decryptByPublicKey(byte[] encryptedData, String publicKey)throws Exception {byte[] keyBytes = Base64Utils.decode(publicKey.getBytes());X509EncodedKeySpec x509KeySpec = new X509EncodedKeySpec(keyBytes);KeyFactory keyFactory = KeyFactory.getInstance(KEY_ALGORITHM);Key publicK = keyFactory.generatePublic(x509KeySpec);Cipher cipher = Cipher.getInstance(keyFactory.getAlgorithm());cipher.init(Cipher.DECRYPT_MODE, publicK);int inputLen = encryptedData.length;ByteArrayOutputStream out = new ByteArrayOutputStream();int offSet = 0;byte[] cache;int i = 0;// 对数据分段解密while (inputLen - offSet > 0) {if (inputLen - offSet > MAX_DECRYPT_BLOCK) {cache = cipher.doFinal(encryptedData, offSet, MAX_DECRYPT_BLOCK);} else {cache = cipher.doFinal(encryptedData, offSet, inputLen - offSet);}out.write(cache, 0, cache.length);i++;offSet = i * MAX_DECRYPT_BLOCK;}byte[] decryptedData = out.toByteArray();out.close();return decryptedData;}

下面是批量消费的实例,可以自己学习一下:
https://cloud.tencent.com/developer/article/2223134?areaSource=102001.4&traceId=KErHitYK-asG0MQLNTqos

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.bcls.cn/akBX/7771.shtml

如若内容造成侵权/违法违规/事实不符,请联系编程老四网进行投诉反馈email:xxxxxxxx@qq.com,一经查实,立即删除!

相关文章

光伏预测 | Matlab基于CNN-SE-Attention-ITCN的多特征变量光伏预测

光伏预测 | Matlab基于CNN-SE-Attention-ITCN的多特征变量光伏预测 目录 光伏预测 | Matlab基于CNN-SE-Attention-ITCN的多特征变量光伏预测预测效果基本描述模型简介程序设计参考资料 预测效果 基本描述 Matlab基于CNN-SE-Attention-ITCN的多特征变量光伏预测 运行环境: Matla…

设计模式——抽象工厂模式

定义: 抽象工厂模式&#xff08;Abstract Factory Pattern&#xff09;提供一个创建一系列或相互依赖对象的接口&#xff0c;而无须指定它们具体的类。 概述:一个工厂可以提供创建多种相关产品的接口&#xff0c;而无需像工厂方法一样&#xff0c;为每一个产品都提供一个具体…

Folx GO+ 5.27 Mac上优秀好用的下载工具

Folx Pro 5 for Mac是Mac平台上公认的最好的下载工具&#xff0c;功能可以与迅雷相媲美。目前Folx Pro 5 特别版正式上线&#xff0c;新版的Folx整体界面更加简洁漂亮&#xff0c;支持HTTP FTP下载&#xff0c;torrent种子下载&#xff0c;高速下载&#xff0c;定时下载&#x…

git实操gitee

第一步&#xff1a;远程连接远程仓库 git remote add origin gitgitee.com:name/project.git 上面信息可以在下面看到。 第二步&#xff1a;拉取项目 git pull origin master --allow-unrelated-histories 第三步&#xff1a;修改项目&#xff0c;保持后提交 git add . gi…

k8s-heml管理 17

Helm是Kubernetes 应用的包管理工具&#xff0c;主要用来管理 Charts&#xff0c;类似Linux系统的 yum。Helm Chart 是用来封装 Kubernetes 原生应用程序的一系列 YAML 文件。可以在你部署应用的时候自定义应用程序的一些 Metadata&#xff0c;以便于应用程序的分发。 对于应用…

激光条纹中心线提取算法FPGA实现方案

1 概述 激光条纹中心线提取是3D线激光测量领域一个较为基础且重要的算法。目前&#xff0c;激光条纹中心线提取已有多种成熟的算法&#xff0c;有很多相关的博客和论文。 激光条纹中心线提取的真实意义在于工程化和产品化的实际应用&#xff0c;而很多算法目前只能用于学术研究…

【pytorch】常用代码

文章目录 条件与概率torch.tensor()torch.rand()torch.randn()torch.randint()torch.multinominal() 逻辑运算torch.argmax()torch.max()torch.sum()torch.tanh()torch.pow() 功能性操作 torch.nn.functionalF.normalize()F.elu()F.relu()F.softmax() 张量计算torch.zeros()tor…

Diffbot 小记

文章目录 关于 Diffbot产品和价格 关于 Diffbot 官网&#xff1a;https://www.diffbot.com官方文档&#xff1a;https://docs.diffbot.com/docs/getting-started-with-diffbotAPI 手册 : https://docs.diffbot.com/reference/introduction-to-diffbot-apis 产品和价格 两周内…

【Android】坐标系

Android 系统中有两种坐标系&#xff0c;分别为 Android 坐标系和 View 坐标系。了解这两种坐标系能够帮助我们实现 View 的各种操作&#xff0c;比如我们要实现 View 的滑动&#xff0c;你连这个 View 的位置都不知道&#xff0c;那如何去操作呢&#xff1f; 一、Android 坐标…

【Flink精讲】Flink状态及Checkpoint调优

RocksDB大状态调优 RocksDB 是基于 LSM Tree 实现的&#xff08;类似 HBase&#xff09; &#xff0c;写数据都是先缓存到内存中&#xff0c; 所以 RocksDB 的写请求效率比较高。 RocksDB 使用内存结合磁盘的方式来存储数据&#xff0c;每 次获取数据时&#xff0c;先从内存中 …

软件工程复习笔记

一、软件工程概述 软件 = 程序 + 数据 + 相关文档 软件危机(Software Crisis) 指由于落后的软件生产方式无法满足迅速增长的计算机软件需求,从而导致软件开发与维护过程中出现一系列严重问题的现象。 软件工程三要素 方法、工具、过程 软件工程目标 在给定成本、进度的…

【Linux】部署前后端分离项目---(Nginx自启,负载均衡)

目录 前言 一 Nginx&#xff08;自启动&#xff09; 2.1 Nginx的安装 2.2 设置自启动Nginx 二 Nginx负载均衡tomcat 2.1 准备两个tomcat 2.1.1 复制tomcat 2.1.2 修改server.xml文件 2.1.3 开放端口 2.2 Nginx配置 2.2.1 修改nginx.conf文件 2.2.2 重启Nginx服务 2…

STM32 4位数码管和74HC595

4位数码管 在使用一位数码管的时候&#xff0c;会用到8个IO口&#xff0c;那如果使用4位数码管&#xff0c;难道要使用32个IO口吗&#xff1f;肯定是不行的&#xff0c;太浪费了IO口了。把四个数码管全部接一起共用8个IO口&#xff0c;然后分别给他们一个片选。所以4位数码管共…

本地项目如何上传到gitee

文章目录 一、在gitee上新建远程仓库二、初始化本地仓库三、执行git命令上传代码 一、在gitee上新建远程仓库 仓库名称必填&#xff0c;路径自动跟仓库名称保持一致 解释说明&#xff1a; 仓库名称&#xff1a;必填&#xff0c;每个仓库都需要有一个名称&#xff0c;同一个码…

关于Kinect 互动沙盘 深度图 Shader Graph 分层

把Kinect的深度图穿给Shader Graph using com.rfilkov.kinect; using UnityEngine; using UnityEngine.UI; public class GetDepthTex : MonoBehaviour { public Material Mat_SandTable; void Update() { Mat_SandTable.SetTexture("_MainTex"…

Ubuntu上Jenkins自动化部署Gitee上SpringBoot项目

文章目录 安装安装JDK安装Maven安装GitNodeJS安装&#xff08;可选&#xff09;安装Jenkins 配置Jenkins为Jenkins更换插件源设置jenkins时区安装插件全局工具配置添加Gitee凭证Gitee项目配置 部署后端1.新建任务2.配置源码管理3.构建触发器4.到Gitee中添加WebHook5.构建环境6.…

Python爬虫实战第一例【一】

前情提要 今天我们开始更新Python爬虫实战例子&#xff0c;该系列预计会更很多很多期&#xff0c;因为实在有太多了&#xff01;&#xff01; 同样作为新人0&#xff0c;作者尽量在自己完全理解的基础上尽可能通俗易懂的讲解给大家&#xff0c;还望大家多多支持&#xff01; …

今日必读的7篇大模型论文

1.Google DeepMind&#xff1a;大模型能做多跳推理吗&#xff1f; 来自 Google DeepMind、伦敦大学学院、Google Research 和特拉维夫大学的研究团队探讨了大型语言模型&#xff08;LLMs&#xff09;是否能够对复杂的提示执行多跳推理&#xff0c;如“The mother of the singe…

Vue.js+SpringBoot开发生活废品回收系统

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、研究内容三、界面展示3.1 登录注册3.2 资源类型&资源品类模块3.3 回收机构模块3.4 资源求购/出售/交易单模块3.5 客服咨询模块 四、免责说明 一、摘要 1.1 项目介绍 生活废品回收系统是可持续发展的解决方案&#xff0c;旨在鼓…

openGauss学习笔记-229 openGauss性能调优-系统调优-配置Ustore

文章目录 openGauss学习笔记-229 openGauss性能调优-系统调优-配置Ustore229.1 设计原理229.2 核心优势229.3 使用指导 openGauss学习笔记-229 openGauss性能调优-系统调优-配置Ustore Ustore存储引擎&#xff0c;又名In-place Update存储引擎&#xff08;原地更新&#xff09…
推荐文章