2 回答

TA貢獻(xiàn)1786條經(jīng)驗(yàn) 獲得超11個(gè)贊
您注入一個(gè)字符串,它將保留“classpath:”在字符串值中并將其作為屬性提供給 DefaultKafkaConsumerFactory,嘗試注入到 spring 資源中,例如:
import org.springframework.core.io.Resource;
@Value("classpath:path/to/file/in/classpath")
Resource resourceFile;
然后你可以訪問該文件,你可以獲得絕對(duì)路徑,如下所示:
resourceFile.getFile().getAbsolutePath()
這個(gè)想法是你可以提供 DefaultKafkaConsumerFactory 的絕對(duì)路徑
但是您也可以嘗試刪除“classpath:”并像當(dāng)前代碼一樣注入為 String ,這可能取決于 DefaultKafkaConsumerFactory 如何處理該屬性。但我不明白為什么上面的絕對(duì)路徑不起作用。

TA貢獻(xiàn)1827條經(jīng)驗(yàn) 獲得超4個(gè)贊
對(duì)于像我這樣使用 Spring Boot 和 Spring Kafka 并且不重寫 DefaultKafkaConsumerFactory 的人(僅使用屬性進(jìn)行配置),您可以實(shí)現(xiàn)一個(gè)BeanPostProcessor類。它提供了兩種方法:
postProcessAfterInitialization
和postProcessBeforeInitialization
工廠鉤子允許對(duì)新 bean 實(shí)例進(jìn)行自定義修改 - 例如,檢查標(biāo)記接口或使用代理包裝 bean。通常,通過標(biāo)記接口等填充 Bean 的后處理器將實(shí)現(xiàn) postProcessBeforeInitialization(java.lang.Object, java.lang.String),而用代理包裝 Bean 的后處理器通常將實(shí)現(xiàn) postProcessAfterInitialization(java.lang.Object) ,java.lang.String)。
我將 Spring Boot 與 Spring Kafka 一起使用,我只想更改本地配置文件。
在我的代碼示例中,我使用它來覆蓋 Kafka Location 屬性,因?yàn)閷?duì)于 SSL,它不會(huì)從類路徑讀取。
這就是代碼:
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig;
import java.io.IOException;
import java.util.Arrays;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import org.apache.kafka.common.config.SslConfigs;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.Resource;
@Configuration
@RequiredArgsConstructor
public class KafkaConfiguration implements BeanPostProcessor {
? @Value("${spring.kafka.ssl.key-store-location:}")
? private Resource keyStoreResource;
? @Value("${spring.kafka.properties.schema.registry.ssl.truststore.location:}")
? private Resource trustStoreResource;
? private final Environment environment;
? @SneakyThrows
? @Override
? public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
? ? if (bean instanceof KafkaProperties) {
? ? ? KafkaProperties kafkaProperties = (KafkaProperties) bean;
? ? ? if(isLocalProfileActive()) {
? ? ? ? configureStoreLocation(kafkaProperties);
? ? ? }
? ? }
? ? return BeanPostProcessor.super.postProcessAfterInitialization(bean, beanName);
? }
? private boolean isLocalProfileActive() {
? ? return Arrays.stream(environment.getActiveProfiles()).anyMatch(profile -> "local".equals(profile));
? }
? private void configureStoreLocation(KafkaProperties kafkaProperties) throws IOException {
? ? kafkaProperties.getSsl().setKeyStoreLocation(new FileSystemResource(keyStoreResource.getFile().getAbsolutePath()));
? ? kafkaProperties.getProperties().put(SchemaRegistryClientConfig.CLIENT_NAMESPACE + SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keyStoreResource.getFile().getAbsolutePath());
? ? kafkaProperties.getSsl().setTrustStoreLocation(new FileSystemResource(trustStoreResource.getFile().getAbsolutePath()));
? ? kafkaProperties.getProperties().put(SchemaRegistryClientConfig.CLIENT_NAMESPACE + SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStoreResource.getFile().getAbsolutePath());
? }
}
這樣我就可以在我的屬性文件中添加:
spring.kafka.ssl.key-store-location=classpath:mykeystore.jks
代碼將從中獲取絕對(duì)路徑并設(shè)置它。它還可以根據(jù)配置文件進(jìn)行過濾。
值得一提的是,BeanPostProcessor 會(huì)針對(duì)每個(gè)bean 運(yùn)行,因此請(qǐng)確保您過濾了您想要的內(nèi)容。
添加回答
舉報(bào)