使 Spring Kafka 重新加載 SSL 上下文的推薦方法是什么?我需要在不停機(jī)的情況下將新證書插入到我的 Kafka 生產(chǎn)者使用的信任庫中。但是我發(fā)現(xiàn),一旦應(yīng)用程序啟動(dòng)并創(chuàng)建了 Kafka 生產(chǎn)者,就會(huì)創(chuàng)建并緩存 SSLContext 的實(shí)例。有一種方法可以重新配置它,但到目前為止我發(fā)現(xiàn)的唯一方法是通過調(diào)用 destroy 方法DefaultKafkaProducerFactory(在證書更新后)銷毀任何現(xiàn)有的生產(chǎn)者,這會(huì)導(dǎo)致任何后續(xù)調(diào)用KafkaTemplate.send強(qiáng)制創(chuàng)建一個(gè)新的生產(chǎn)者,這反過來重新加載 SSL 上下文。我覺得這就像用大錘來解決這個(gè)問題,可能會(huì)有更優(yōu)雅的解決方案。我還注意到,如果在發(fā)送消息時(shí)調(diào)用destroy,我會(huì)得到以下異常,當(dāng)我們無法承受丟失任何事件時(shí),它看起來不是很積極。java.util.concurrent.CompletionException: org.apache.kafka.common.KafkaException: Producer closed while send in progress at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) at java.util.concurrent.CompletableFuture$AsyncSupply.run$$$capture(CompletableFuture.java:1592) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java) at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)Caused by: org.apache.kafka.common.KafkaException: Producer closed while send in progress at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:826) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:803) at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:444) at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:372) at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:190) at org.springframework.kafka.core.KafkaOperations$send.call(Unknown Source) at com.example.event.publisher.kafka.KafkaEventPublisher.doPublish(KafkaEventPublisher.groovy:57)
2 回答

開滿天機(jī)
TA貢獻(xiàn)1786條經(jīng)驗(yàn) 獲得超13個(gè)贊
Spring Kafka 2.6.5、Kafka 2.4.1,我改用 KafkaProducerFactory.reset()。
@Autowired
private final KafkaTemplate<String, byte[]> kafkaTemplate;
private void reloadProducer() {
kafkaTemplate.getProducerFactory().reset();
}
下次調(diào)用 send() 時(shí),Spring 將使用新證書重新創(chuàng)建一個(gè)新的 KafkaConsumer 品牌。

眼眸繁星
TA貢獻(xiàn)1873條經(jīng)驗(yàn) 獲得超9個(gè)贊
看起來簡單地重置某些值的配置將觸發(fā) SSL 引擎工廠的重建。他們甚至?xí){(diào)出會(huì)導(dǎo)致熱重載的文件密鑰配置。
添加回答
舉報(bào)
0/150
提交
取消