第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號安全,請及時(shí)綁定郵箱和手機(jī)立即綁定
已解決430363個(gè)問題,去搜搜看,總會有你想問的

數(shù)據(jù)流中的動態(tài)目標(biāo)問題

數(shù)據(jù)流中的動態(tài)目標(biāo)問題

我有一個(gè) Dataflow 作業(yè),它從 pubsub 讀取數(shù)據(jù)并根據(jù)時(shí)間和文件名將內(nèi)容寫入 GCS,其中文件夾路徑基于 YYYY/MM/DD。這允許根據(jù)日期在文件夾中生成文件,并使用 apache beamFileIO和Dynamic Destinations.大約兩周前,我注意到未確認(rèn)消息的異常堆積。重新啟動 df 作業(yè)后,錯(cuò)誤消失了,新文件正在 GCS 中寫入。幾天后,寫入再次停止,除了這一次,有錯(cuò)誤聲稱處理被卡住了。經(jīng)過一些可靠的 SO 研究后,我發(fā)現(xiàn)這可能是由于 Beam 2.90 之前的死鎖問題造成的,因?yàn)樗褂?Conscrypt 庫作為默認(rèn)安全提供程序。所以,我從 Beam 2.8 升級到 Beam 2.11。再一次,它起作用了,直到它沒有。我更仔細(xì)地查看了這個(gè)錯(cuò)誤,并注意到它有一個(gè) SimpleDateFormat 對象的問題,它不是線程安全的。因此,我轉(zhuǎn)而使用線程安全的 Java.time 和 DateTimeFormatter。它一直有效,直到它沒有。但是,這一次,錯(cuò)誤略有不同,并沒有指向我的代碼中的任何內(nèi)容:錯(cuò)誤如下所示。Processing stuck in step FileIO.Write/WriteFiles/WriteShardedBundlesToTempFiles/WriteShardsIntoTempFiles for at least 05m00s without outputting or completing in state process  at sun.misc.Unsafe.park(Native Method)  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)  at org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:469)  at org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:76)  at org.apache.beam.runners.dataflow.worker.MetricTrackingWindmillServerStub.getStateData(MetricTrackingWindmillServerStub.java:202)  at org.apache.beam.runners.dataflow.worker.WindmillStateReader.startBatchAndBlock(WindmillStateReader.java:409)  at org.apache.beam.runners.dataflow.worker.WindmillStateReader$WrappedFuture.get(WindmillStateReader.java:311)  at org.apache.beam.runners.dataflow.worker.WindmillStateReader$BagPagingIterable$1.computeNext(WindmillStateReader.java:700)  at org.apache.beam.vendor.guava.v20_0.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145)此錯(cuò)誤在作業(yè)部署后大約 5 小時(shí)開始發(fā)生,并且隨著時(shí)間的推移而增加。寫作在 24 小時(shí)內(nèi)顯著減慢。我有 60 名工人,我懷疑每次出現(xiàn)錯(cuò)誤時(shí)都會有一名工人失敗,這最終會殺死工作。
查看完整描述

2 回答

?
慕田峪4524236

TA貢獻(xiàn)1875條經(jīng)驗(yàn) 獲得超5個(gè)贊

錯(cuò)誤“處理卡住...”表示某些特定操作花費(fèi)的時(shí)間超過 5m,而不是作業(yè)永久卡住。但是,由于步驟 FileIO.Write/WriteFiles/WriteShardedBundlesToTempFiles/WriteShardsIntoTempFiles 是卡住并且作業(yè)被取消/終止的步驟,所以我會在作業(yè)寫入臨時(shí)文件時(shí)考慮一個(gè)問題。

我發(fā)現(xiàn)了與用于寫入臨時(shí)文件的第二粒度時(shí)間戳 (yyyy-MM-dd_HH-mm-ss) 相關(guān)的BEAM-7689問題。發(fā)生這種情況是因?yàn)槎鄠€(gè)并發(fā)作業(yè)可以共享同一個(gè)臨時(shí)目錄,這可能導(dǎo)致其中一個(gè)作業(yè)在其他作業(yè)完成之前將其刪除。

根據(jù)之前的鏈接,為緩解此問題,請升級到 SDK 2.14。并讓我們知道錯(cuò)誤是否消失。


查看完整回答
反對 回復(fù) 2022-10-12
?
飲歌長嘯

TA貢獻(xiàn)1951條經(jīng)驗(yàn) 獲得超3個(gè)贊

自從發(fā)布這個(gè)問題以來,我已經(jīng)優(yōu)化了數(shù)據(jù)流作業(yè)以避開瓶頸并增加并行化。就像 rsantiago 解釋的那樣,處理卡住不是錯(cuò)誤,而只是數(shù)據(jù)流傳達(dá)的一種方式,即一個(gè)步驟比其他步驟花費(fèi)的時(shí)間要長得多,這本質(zhì)上是一個(gè)瓶頸,無法用給定的資源清除。我所做的更改似乎已經(jīng)解決了這些問題。新代碼如下:


public void streamData() {


        try {

            Pipeline pipeline = Pipeline.create(options);


            pipeline.apply("Read PubSub Events", PubsubIO.readMessagesWithAttributes().fromSubscription(options.getInputSubscription()))

            .apply(options.getWindowDuration() + " Window",

                    Window.<PubsubMessage>into(FixedWindows.of(parseDuration(options.getWindowDuration())))

                          .triggering(AfterWatermark.pastEndOfWindow()) 

                          .discardingFiredPanes()

                          .withAllowedLateness(parseDuration("24h")))

            .apply(FileIO.<String,PubsubMessage>writeDynamic()

                    .by(new datePartition(options.getOutputFilenamePrefix()))

                    .via(Contextful.fn(

                            (SerializableFunction<PubsubMessage, String>) inputMsg -> new String(inputMsg.getPayload(), StandardCharsets.UTF_8)),

                            TextIO.sink())

                    .withDestinationCoder(StringUtf8Coder.of())

                    .to(options.getOutputDirectory())

                    .withNaming(type -> new CrowdStrikeFileNaming(type))

                    .withNumShards(options.getNumShards())

                    .withTempDirectory(options.getTempLocation()));


            pipeline.run();

        }


        catch(Exception e) {


            LOG.error("Unable to deploy pipeline");

            LOG.error(e.toString(), e);

        }


    }

最大的變化涉及刪除 extractMsg() 函數(shù)并將分區(qū)更改為僅使用元數(shù)據(jù)。這兩個(gè)步驟都強(qiáng)制對消息進(jìn)行反序列化/重新序列化并嚴(yán)重影響性能。


此外,由于我的數(shù)據(jù)集是無限的,我必須設(shè)置一個(gè)非零數(shù)量的分片。我想簡化我的文件命名策略,所以我將它設(shè)置為 1 卻不知道它對性能的影響有多大。從那時(shí)起,我為我的工作找到了工人/碎片/機(jī)器類型的良好平衡(不幸的是,主要基于猜測和檢查)。


盡管在足夠大的數(shù)據(jù)負(fù)載下仍然可能觀察到瓶頸,但盡管負(fù)載很重(每天 3-5 tb),管道仍然表現(xiàn)良好。這些更改還顯著改善了自動縮放,但我不知道為什么。數(shù)據(jù)流作業(yè)現(xiàn)在對峰值和谷值的反應(yīng)要快得多。


查看完整回答
反對 回復(fù) 2022-10-12
  • 2 回答
  • 0 關(guān)注
  • 105 瀏覽

添加回答

舉報(bào)

0/150
提交
取消
微信客服

購課補(bǔ)貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動學(xué)習(xí)伙伴

公眾號

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號