第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

如何從 Spark 結(jié)構(gòu)化流獲取 Kafka 輸出中的批次 ID

如何從 Spark 結(jié)構(gòu)化流獲取 Kafka 輸出中的批次 ID

慕森卡 2023-08-04 15:37:10
我正在更新模式下運行 Spark 結(jié)構(gòu)化流作業(yè),并且無法確定是否可以獲取每個更新的批次 ID。例如,當您以更新模式輸出到控制臺時,Spark 將在輸出時顯示每個批次編號:-------------------------------------------Batch: 0-------------------------------------------...-------------------------------------------Batch: 1-------------------------------------------...等等。我需要將相同的信息添加到發(fā)送到 Kafka 的每條消息中。為此,我只能使用 Spark 2.3,因此我無法使用 forEachBatch。我的工作輸出一組特定維度的聚合指標。每個觸發(fā)器,自上次觸發(fā)器以來指標可能已更新 - 具有更新指標的維度將在下一批中輸出,因為我正在更新模式下運行。當我將這些輸出到 Kafka 時,我需要知道哪個批次是最新的 - 因此需要批次號。我認為 forEachBatch 可以滿足我的需要,但不幸的是我無法訪問 Spark 2.4。我可以使用 forEach 來完成這個任務(wù)嗎?我僅限于使用更新模式,因為后期事件可能會出現(xiàn)并更新之前已輸出的指標。這是我用來測試的控制臺模式。此輸出分別顯示每個批次及其編號:StreamingQuery query = logs.writeStream()        .format("console")        .outputMode(OutputMode.Update())        .start();我想做這樣的事情StreamingQuery query = agg.WriteStream()    .format("kafka")    .outputMode(OutputMode.Update())    .option("kafka.bootstrap.servers", "myconnection")    .Option("topic", "mytopic")    .Start();但仍然保留在mytopic中判斷消息來自哪個批次的能力。這可能嗎?
查看完整描述

1 回答

?
12345678_0001

TA貢獻1802條經(jīng)驗 獲得超5個贊

我認為你可以使用版本long version號ForeachWriter


你可以像這樣實現(xiàn)你自己的KafkaCustomSink。



class KafkaCustomSink(val config: Config) extends ForeachWriter[String] {

  var producer: KafkaProducer[String, String] = _

  var _version: Long = _


  override def open(partitionId: Long, version: Long): Boolean = {

    _version = version

    val props = new Properties()

    props.put("bootstrap.servers", config(Constant.OUTPUT_BOOTSTRAP_SERVER))

    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")

    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

    props.put("acks", "0")

    producer = new KafkaProducer[String, String](props)

    true

  }


  override def process(value: String): Unit = {

    //use version here

    val record = new ProducerRecord[String, String](config(Constant.OUTPUT_TOPIC), null, "version : %s, data : %s".format(_version, value))

    producer.send(record)

  }


  override def close(errorOrNull: Throwable): Unit = {

    producer.close()

  }

}


并將其分配給


      logs

          .writeStream

          .outputMode("update")

          .foreach(new KafkaCustomSink(config))

          .trigger(Trigger.ProcessingTime(config(Constant.TRIGGER_INTERVAL).toInt, TimeUnit.SECONDS))

          .option("checkpointLocation", config(Constant.CHECKPOINT_LOCATION))


查看完整回答
反對 回復(fù) 2023-08-04
  • 1 回答
  • 0 關(guān)注
  • 142 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動學(xué)習伙伴

公眾號

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號