第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

NFS(Netapp 服務(wù)器)-> Flink -> s3

NFS(Netapp 服務(wù)器)-> Flink -> s3

Helenr 2023-07-13 18:14:47
我是 flink (java) 的新手,并嘗試將作為文件路徑安裝的 netapp 文件服務(wù)器上的 xml 文件移動到安裝了 flink 的服務(wù)器上。如何實時進行批處理或流處理以獲取進入文件夾的文件并使用 s3 接收它。我在 flink-starter 中找不到任何從本地文件系統(tǒng)讀取文件的示例,flink 至少是這個用例的正確選擇嗎?如果是這樣,我在哪里可以找到資源來監(jiān)聽文件夾和管理檢查點/保存點?
查看完整描述

2 回答

?
料青山看我應(yīng)如是

TA貢獻1772條經(jīng)驗 獲得超8個贊

如果您的目標(biāo)只是將文件復(fù)制到 s3,那么有更簡單、更合適的工具。也許同步是合適的。

假設(shè)使用 Flink 有意義(例如,因為您想要對數(shù)據(jù)執(zhí)行一些有狀態(tài)轉(zhuǎn)換),則所有任務(wù)管理器(工作人員)都可以使用相同的 URI 訪問要處理的文件。為此,您可以使用 file:// URI。

您可以執(zhí)行以下操作來監(jiān)視目錄并在新文件出現(xiàn)時攝取它們:

StreamExecutionEnvironment env =? ??

? StreamExecutionEnvironment.getExecutionEnvironment();


// monitor directory, checking for new files

// every 100 milliseconds


TextInputFormat format = new TextInputFormat(

? new org.apache.flink.core.fs.Path("file:///tmp/dir/"));


DataStream<String> inputStream = env.readFile(

? format,?

? "file:///tmp/dir/",

? FileProcessingMode.PROCESS_CONTINUOUSLY,?

? 100,?

? FilePathFilter.createDefaultFilter());

請注意文檔中的此警告:

如果 watchType 設(shè)置為 FileProcessingMode.PROCESS_CONTINUOUSLY,則修改文件時,將完全重新處理其內(nèi)容。這可能會破壞“僅一次”語義,因為在文件末尾附加數(shù)據(jù)將導(dǎo)致其所有內(nèi)容被重新處理。

這意味著您應(yīng)該自動將準(zhǔn)備好攝取的文件移動到正在監(jiān)視的文件夾中。

您可以使用流文件接收器寫入S3。Flink 的寫入操作(例如 )writeUsingOutputFormat()不參與檢查點,因此在這種情況下這不是一個好的選擇。


查看完整回答
反對 回復(fù) 2023-07-13
?
烙印99

TA貢獻1829條經(jīng)驗 獲得超13個贊

此問題的完整工作代碼位于以下鏈接中。您需要啟用檢查點以將 .inprogress 文件移動到實際文件

// 每 1000 毫秒啟動一個檢查點 env.enableCheckpointing(1000);

StreamingFileSink 未將數(shù)據(jù)提取到 s3


查看完整回答
反對 回復(fù) 2023-07-13
  • 2 回答
  • 0 關(guān)注
  • 198 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動學(xué)習(xí)伙伴

公眾號

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號