2 回答

TA貢獻1772條經(jīng)驗 獲得超8個贊
如果您的目標(biāo)只是將文件復(fù)制到 s3,那么有更簡單、更合適的工具。也許同步是合適的。
假設(shè)使用 Flink 有意義(例如,因為您想要對數(shù)據(jù)執(zhí)行一些有狀態(tài)轉(zhuǎn)換),則所有任務(wù)管理器(工作人員)都可以使用相同的 URI 訪問要處理的文件。為此,您可以使用 file:// URI。
您可以執(zhí)行以下操作來監(jiān)視目錄并在新文件出現(xiàn)時攝取它們:
StreamExecutionEnvironment env =? ??
? StreamExecutionEnvironment.getExecutionEnvironment();
// monitor directory, checking for new files
// every 100 milliseconds
TextInputFormat format = new TextInputFormat(
? new org.apache.flink.core.fs.Path("file:///tmp/dir/"));
DataStream<String> inputStream = env.readFile(
? format,?
? "file:///tmp/dir/",
? FileProcessingMode.PROCESS_CONTINUOUSLY,?
? 100,?
? FilePathFilter.createDefaultFilter());
請注意文檔中的此警告:
如果 watchType 設(shè)置為 FileProcessingMode.PROCESS_CONTINUOUSLY,則修改文件時,將完全重新處理其內(nèi)容。這可能會破壞“僅一次”語義,因為在文件末尾附加數(shù)據(jù)將導(dǎo)致其所有內(nèi)容被重新處理。
這意味著您應(yīng)該自動將準(zhǔn)備好攝取的文件移動到正在監(jiān)視的文件夾中。
您可以使用流文件接收器寫入S3。Flink 的寫入操作(例如 )writeUsingOutputFormat()
不參與檢查點,因此在這種情況下這不是一個好的選擇。

TA貢獻1829條經(jīng)驗 獲得超13個贊
此問題的完整工作代碼位于以下鏈接中。您需要啟用檢查點以將 .inprogress 文件移動到實際文件
// 每 1000 毫秒啟動一個檢查點 env.enableCheckpointing(1000);
StreamingFileSink 未將數(shù)據(jù)提取到 s3
添加回答
舉報