2 回答

TA貢獻(xiàn)1772條經(jīng)驗(yàn) 獲得超8個(gè)贊
如果您的目標(biāo)只是將文件復(fù)制到 s3,那么有更簡(jiǎn)單、更合適的工具。也許同步是合適的。
假設(shè)使用 Flink 有意義(例如,因?yàn)槟胍獙?duì)數(shù)據(jù)執(zhí)行一些有狀態(tài)轉(zhuǎn)換),則所有任務(wù)管理器(工作人員)都可以使用相同的 URI 訪問要處理的文件。為此,您可以使用 file:// URI。
您可以執(zhí)行以下操作來監(jiān)視目錄并在新文件出現(xiàn)時(shí)攝取它們:
StreamExecutionEnvironment env =? ??
? StreamExecutionEnvironment.getExecutionEnvironment();
// monitor directory, checking for new files
// every 100 milliseconds
TextInputFormat format = new TextInputFormat(
? new org.apache.flink.core.fs.Path("file:///tmp/dir/"));
DataStream<String> inputStream = env.readFile(
? format,?
? "file:///tmp/dir/",
? FileProcessingMode.PROCESS_CONTINUOUSLY,?
? 100,?
? FilePathFilter.createDefaultFilter());
請(qǐng)注意文檔中的此警告:
如果 watchType 設(shè)置為 FileProcessingMode.PROCESS_CONTINUOUSLY,則修改文件時(shí),將完全重新處理其內(nèi)容。這可能會(huì)破壞“僅一次”語(yǔ)義,因?yàn)樵谖募┪哺郊訑?shù)據(jù)將導(dǎo)致其所有內(nèi)容被重新處理。
這意味著您應(yīng)該自動(dòng)將準(zhǔn)備好攝取的文件移動(dòng)到正在監(jiān)視的文件夾中。
您可以使用流文件接收器寫入S3。Flink 的寫入操作(例如 )writeUsingOutputFormat()
不參與檢查點(diǎn),因此在這種情況下這不是一個(gè)好的選擇。

TA貢獻(xiàn)1829條經(jīng)驗(yàn) 獲得超13個(gè)贊
此問題的完整工作代碼位于以下鏈接中。您需要啟用檢查點(diǎn)以將 .inprogress 文件移動(dòng)到實(shí)際文件
// 每 1000 毫秒啟動(dòng)一個(gè)檢查點(diǎn) env.enableCheckpointing(1000);
StreamingFileSink 未將數(shù)據(jù)提取到 s3
添加回答
舉報(bào)