對于大數(shù)據(jù)中的許多情況,最好一次處理一小塊記錄緩沖區(qū),而不是一次處理一條記錄。自然的例子是調(diào)用一些支持批處理以提高效率的外部 API。我們?nèi)绾卧?Kafka Streams 中做到這一點(diǎn)?我在 API 中找不到任何看起來像我想要的東西。到目前為止,我有:builder.stream[String, String]("my-input-topic").mapValues(externalApiCall).to("my-output-topic")我想要的是:builder.stream[String, String]("my-input-topic").batched(chunkSize = 2000).map(externalBatchedApiCall).to("my-output-topic")在 Scala 和 Akka Streams 中,該函數(shù)被稱為groupedor batch。在 Spark Structured Streaming 中,我們可以做到mapPartitions.map(_.grouped(2000).map(externalBatchedApiCall))。
添加回答
舉報(bào)
0/150
提交
取消