我在堆棧溢出之前看到過這個(gè)問題的回答(https://stackoverflow.com/questions/29983621/how-to-get-filename-when-using-file-pattern-match-in-google-cloud-dataflow),但不是因?yàn)?apache beam 為 python 添加了可拆分的 dofn 功能。將文件模式傳遞給 gcs 存儲桶時(shí),如何訪問正在處理的當(dāng)前文件的文件名?我想將文件名傳遞到我的轉(zhuǎn)換函數(shù)中:with beam.Pipeline(options=pipeline_options) as p: lines = p | ReadFromText('gs://url to file') data = ( lines | 'Jsonify' >> beam.Map(jsonify) | 'Unnest' >> beam.FlatMap(unnest) | 'Write to BQ' >> beam.io.Write(beam.io.BigQuerySink( 'project_id:dataset_id.table_name', schema=schema, create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND) ) 最終,我想要做的是在轉(zhuǎn)換 json 的每一行時(shí)將文件名傳遞到我的轉(zhuǎn)換函數(shù)中(請參閱此內(nèi)容,然后使用文件名在不同的 BQ 表中進(jìn)行查找以獲取值)。我想一旦我設(shè)法知道如何獲取文件名,我將能夠找出側(cè)面輸入部分,以便在 bq 表中進(jìn)行查找并獲得唯一值。
添加回答
舉報(bào)
0/150
提交
取消