2 回答

TA貢獻(xiàn)1875條經(jīng)驗(yàn) 獲得超5個(gè)贊
錯(cuò)誤“處理卡住...”表示某些特定操作花費(fèi)的時(shí)間超過 5m,而不是作業(yè)永久卡住。但是,由于步驟 FileIO.Write/WriteFiles/WriteShardedBundlesToTempFiles/WriteShardsIntoTempFiles 是卡住并且作業(yè)被取消/終止的步驟,所以我會在作業(yè)寫入臨時(shí)文件時(shí)考慮一個(gè)問題。
我發(fā)現(xiàn)了與用于寫入臨時(shí)文件的第二粒度時(shí)間戳 (yyyy-MM-dd_HH-mm-ss) 相關(guān)的BEAM-7689問題。發(fā)生這種情況是因?yàn)槎鄠€(gè)并發(fā)作業(yè)可以共享同一個(gè)臨時(shí)目錄,這可能導(dǎo)致其中一個(gè)作業(yè)在其他作業(yè)完成之前將其刪除。
根據(jù)之前的鏈接,為緩解此問題,請升級到 SDK 2.14。并讓我們知道錯(cuò)誤是否消失。

TA貢獻(xiàn)1951條經(jīng)驗(yàn) 獲得超3個(gè)贊
自從發(fā)布這個(gè)問題以來,我已經(jīng)優(yōu)化了數(shù)據(jù)流作業(yè)以避開瓶頸并增加并行化。就像 rsantiago 解釋的那樣,處理卡住不是錯(cuò)誤,而只是數(shù)據(jù)流傳達(dá)的一種方式,即一個(gè)步驟比其他步驟花費(fèi)的時(shí)間要長得多,這本質(zhì)上是一個(gè)瓶頸,無法用給定的資源清除。我所做的更改似乎已經(jīng)解決了這些問題。新代碼如下:
public void streamData() {
try {
Pipeline pipeline = Pipeline.create(options);
pipeline.apply("Read PubSub Events", PubsubIO.readMessagesWithAttributes().fromSubscription(options.getInputSubscription()))
.apply(options.getWindowDuration() + " Window",
Window.<PubsubMessage>into(FixedWindows.of(parseDuration(options.getWindowDuration())))
.triggering(AfterWatermark.pastEndOfWindow())
.discardingFiredPanes()
.withAllowedLateness(parseDuration("24h")))
.apply(FileIO.<String,PubsubMessage>writeDynamic()
.by(new datePartition(options.getOutputFilenamePrefix()))
.via(Contextful.fn(
(SerializableFunction<PubsubMessage, String>) inputMsg -> new String(inputMsg.getPayload(), StandardCharsets.UTF_8)),
TextIO.sink())
.withDestinationCoder(StringUtf8Coder.of())
.to(options.getOutputDirectory())
.withNaming(type -> new CrowdStrikeFileNaming(type))
.withNumShards(options.getNumShards())
.withTempDirectory(options.getTempLocation()));
pipeline.run();
}
catch(Exception e) {
LOG.error("Unable to deploy pipeline");
LOG.error(e.toString(), e);
}
}
最大的變化涉及刪除 extractMsg() 函數(shù)并將分區(qū)更改為僅使用元數(shù)據(jù)。這兩個(gè)步驟都強(qiáng)制對消息進(jìn)行反序列化/重新序列化并嚴(yán)重影響性能。
此外,由于我的數(shù)據(jù)集是無限的,我必須設(shè)置一個(gè)非零數(shù)量的分片。我想簡化我的文件命名策略,所以我將它設(shè)置為 1 卻不知道它對性能的影響有多大。從那時(shí)起,我為我的工作找到了工人/碎片/機(jī)器類型的良好平衡(不幸的是,主要基于猜測和檢查)。
盡管在足夠大的數(shù)據(jù)負(fù)載下仍然可能觀察到瓶頸,但盡管負(fù)載很重(每天 3-5 tb),管道仍然表現(xiàn)良好。這些更改還顯著改善了自動縮放,但我不知道為什么。數(shù)據(jù)流作業(yè)現(xiàn)在對峰值和谷值的反應(yīng)要快得多。
添加回答
舉報(bào)