我有一個監(jiān)聽 kafka 主題的消費(fèi)者應(yīng)用程序,在異常情況下,它會按預(yù)期將記錄發(fā)送到 dlq(死信隊(duì)列)。在拋出異常之前,我在消費(fèi)者應(yīng)用程序中設(shè)置標(biāo)頭,并且它正在設(shè)置,但沒有發(fā)送到死信主題,死信主題只有內(nèi)置標(biāo)頭,例如“x-exception-message”、“x-”原始分區(qū)'等...而不是我設(shè)置的那個。以下是我的消費(fèi)者應(yīng)用程序中的一段代碼:modifiedMessage = MessageBuilder.fromMessage(consumedMessage).setHeader("x-ecode", new Integer(100)).setHeader(BinderHeaders.PARTITION_OVERRIDE,consumedMessage.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID)).build();System.out.println("error header:"+modifiedMessage.getHeaders().get("x-ecode",Integer.class)); //100throw new RuntimeException(modifiedMessage.toString());注意:我在 spring.cloud.stream.kafka.binder.header=x-ecode 下的 application.yml 中設(shè)置 x-code在上面的代碼中,我能夠設(shè)置標(biāo)頭,并實(shí)際驗(yàn)證它已設(shè)置但未發(fā)送到死信主題。有效負(fù)載已正確發(fā)送,我如何將該標(biāo)頭發(fā)送到死信主題?我是否需要在 application.yml 中添加任何屬性才能使其發(fā)送?
如何添加額外的自定義標(biāo)頭,同時(shí)拋出異常以將消息存儲在 kafka 上的 dlq 中?
幕布斯7119047
2023-11-10 15:51:19