SpringBoot版本2.x.x 具体是几,忘记了,是支持application.yml或者是application.properties配置的。当然也可以使用Java配置类。
以下是使用Java配置类来配置的。
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;import java.util.HashMap;
import java.util.Map;@Configuration
@EnableKafka
public class KafkaConsumerConfig {//指定kafka 代理集群地址,多个地址用英文逗号隔开private String bootstrapServers="x.x.x.x:xxxx,x.x.x.x:xxxx";//指定默认消费者group id,消费者监听到的也是这个private String groupId="xxx";//消费者在读取一个没有offset的分区或者offset无效时的策略,默认earliest是从头读,latest不是从头读private String autoOffsetReset="earliest";//是否自动提交偏移量offset,默认为true,一般是false,如果为false,则auto-commit-interval属性就会无效private boolean enableAutoCommit=true;//自动提交间隔时间,接收到消息后多久会提交offset,前提需要开启自动提交,也就是enable-auto-commit设置为true,默认单位是毫秒(ms),如果写10s,最后加载的显示值为10000ms,需要符合特定时间格式:1000ms,1S,1M,1H,1D(毫秒,秒,分,小时,天)private String autoCommitInterval="1000";//指定消息key和消息体的编解码方式private String keyDeserializerClass="org.apache.kafka.common.serialization.StringDeserializer";private String valueDeserializerClass ="org.apache.kafka.common.serialization.StringDeserializer";//批量消费每次最多消费多少条信息 可以自己根据业务来配置//private String maxPollRecords="50";//协议类型,为SASL类型private String securityProtocol="SASL_PLAINTEXT";//协议private String saslMechanism="SCRAM-SHA-512";//用户名密码配置private String saslJaas="org.apache.kafka.common.security.scram.ScramLoginModule required username=\"xxxx\" password=\"xxxx\";";@BeanConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());//设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG//factory.setBatchListener(false);//这里为true的时候,KafkaConsumer那里需要使用批量消费方法,不然报错return factory;}@Beanpublic ConsumerFactory<String, String> consumerFactory() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass);//props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);props.put(SaslConfigs.SASL_MECHANISM, saslMechanism);props.put(SaslConfigs.SASL_JAAS_CONFIG,saslJaas);return new DefaultKafkaConsumerFactory<>(props);}
}
消息的代码
@Slf4j
@Service
public class KafkaConsumerService {//单条消费@KafkaListener(topics = "xxxx", groupId = "xxxx")public void consume(ConsumerRecord<String, String> record) {String value = record.value();//业务逻辑...}//批量消费@KafkaListener(topics = "xxxx", groupId = "xxxx")public void consume(List<ConsumerRecord<?, ?>> consumerRecords, Acknowledgment ack) {//...//db.batchSave(consumerRecords);//批量插入或者批量更新数据//手动提交ack.acknowledge();}
}
注意点:spring-kafka,和 kafka-clients 的版本兼容问题。
RSA公钥解密:
/*** 加密算法RSA*/public static final String KEY_ALGORITHM = "RSA";/*** RSA最大解密密文大小*/private static final int MAX_DECRYPT_BLOCK = 128;/*** RSA公钥解密** @param encryptedData 已加密数据(base64编码)* @param publicKey 公钥(BASE64编码)* @return* @throws Exception*/public byte[] decryptByPublicKey(byte[] encryptedData, String publicKey)throws Exception {byte[] keyBytes = Base64Utils.decode(publicKey.getBytes());X509EncodedKeySpec x509KeySpec = new X509EncodedKeySpec(keyBytes);KeyFactory keyFactory = KeyFactory.getInstance(KEY_ALGORITHM);Key publicK = keyFactory.generatePublic(x509KeySpec);Cipher cipher = Cipher.getInstance(keyFactory.getAlgorithm());cipher.init(Cipher.DECRYPT_MODE, publicK);int inputLen = encryptedData.length;ByteArrayOutputStream out = new ByteArrayOutputStream();int offSet = 0;byte[] cache;int i = 0;// 对数据分段解密while (inputLen - offSet > 0) {if (inputLen - offSet > MAX_DECRYPT_BLOCK) {cache = cipher.doFinal(encryptedData, offSet, MAX_DECRYPT_BLOCK);} else {cache = cipher.doFinal(encryptedData, offSet, inputLen - offSet);}out.write(cache, 0, cache.length);i++;offSet = i * MAX_DECRYPT_BLOCK;}byte[] decryptedData = out.toByteArray();out.close();return decryptedData;}
下面是批量消费的实例,可以自己学习一下:
https://cloud.tencent.com/developer/article/2223134?areaSource=102001.4&traceId=KErHitYK-asG0MQLNTqos