1 回答

TA貢獻(xiàn)1829條經(jīng)驗(yàn) 獲得超6個(gè)贊
的coder參數(shù)WriteToText需要一個(gè)apache_beam.coders.Coder實(shí)例。您可以嘗試JsonCoder從基Coder類繼承,但我認(rèn)為您也可以使用 a 將數(shù)據(jù)轉(zhuǎn)換為字符串Map:
def expand(self, pcoll):
"""
PTransform Method run when called on Class Name
:type pcoll: PCollection
:param pcoll: A pcollection
"""
return (pcoll
| "print intermediate" >> beam.Map(print_row))
| "to_json" >> beam.Map(lambda x: json.dumps(x, default=str)))
| "write data gcs" >> beam.io.WriteToText(self.bucket, file_name_suffix=".json"))
添加回答
舉報(bào)