2 回答
TA貢獻(xiàn)1828條經(jīng)驗(yàn) 獲得超4個(gè)贊
您可以使用自定義“移動(dòng)”時(shí)間戳TimestampExtractor- 在將結(jié)果寫(xiě)回輸出主題之前,您可以使用 aTransformer并通過(guò)context.forward(key, value, To.all().withTimestamps()).
功能請(qǐng)求票:https ://issues.apache.org/jira/browse/KAFKA-7911
TA貢獻(xiàn)2051條經(jīng)驗(yàn) 獲得超10個(gè)贊
因此,為了解決這個(gè)問(wèn)題,我創(chuàng)建了自定義TimestampExtractor并使用它來(lái)更改流窗口創(chuàng)建時(shí)間以記錄來(lái)自有效負(fù)載的時(shí)間,如下所示。
public class RecordTimeStampExtractor implements TimestampExtractor {
@Override
public long extract(ConsumerRecord<Object, Object> record, long previousTimestamp) {
JsonObject data = (JsonObject) new JsonParser().parse(record.value().toString());
Timestamp recordTimestamp = Timestamp.valueOf(data.get(Constant.SLOT).getAsString());
return recordTimestamp.getTime();
}
}
所以現(xiàn)在我已經(jīng)用我的本地時(shí)區(qū)測(cè)試了它,因?yàn)樽蛱焓?IST 05:30,它的工作正常,kafka 流也正在根據(jù)記錄時(shí)間戳創(chuàng)建窗口。也將使用其他時(shí)區(qū)進(jìn)行測(cè)試并更新答案
添加回答
舉報(bào)
