我有非?;镜?apache beam 管道,它在 GCP Dataflow 上運(yùn)行并從 PubSub 讀取一些數(shù)據(jù),對(duì)其進(jìn)行轉(zhuǎn)換并將其寫(xiě)入 Postgres 數(shù)據(jù)庫(kù)。所有這些都是通過(guò) Apache Beam 的標(biāo)準(zhǔn)讀取器/寫(xiě)入器組件完成的。問(wèn)題是當(dāng)我的管道開(kāi)始接收大量數(shù)據(jù)時(shí),我的 Postgres 端由于等待 ShareLocks 而出現(xiàn)死鎖錯(cuò)誤。很明顯,這種事情的發(fā)生是因?yàn)?Postgres 端溢出。我的管道試圖一次寫(xiě)得太快和太多東西,所以為了避免這種情況,它應(yīng)該放慢速度。因此,我們可以使用諸如背壓之類(lèi)的機(jī)制。我試圖挖掘出有關(guān) Apache Beam 背壓配置的任何信息,不幸的是,官方文檔似乎對(duì)此類(lèi)問(wèn)題只字未提。我對(duì)以下類(lèi)型的異常感到不知所措:java.sql.BatchUpdateException: Batch entry <NUMBER><MY_STATEMENT> was aborted: ERROR: deadlock detected Detail: Process 87768 waits for ShareLock on transaction 1939992; blocked by process 87769.Process 87769 waits for ShareLock on transaction 1939997; blocked by process 87768. Hint: See server log for query details. Where: while inserting index tuple (5997152,9) in relation "<MY_TABLE>" Call getNextException to see other errors in the batch.我想知道是否有任何背壓工具包或類(lèi)似的東西可以幫助我在不編寫(xiě)自己的PostgresIO.Writer.非常感謝。
1 回答

守著一只汪
TA貢獻(xiàn)1872條經(jīng)驗(yàn) 獲得超4個(gè)贊
假設(shè)您使用JdbcIO
寫(xiě)入 Postgres,您可以嘗試增加批處理大?。ㄕ?qǐng)參閱 參考資料withBatchSize(long batchSize)
),默認(rèn)情況下為 1K 條記錄,這可能是不夠的。
此外,如果出現(xiàn) SQL 異常,并且您想要重試,那么您需要確保使用正確的重試策略(參見(jiàn) 參考資料withRetryStrategy(RetryStrategy retryStrategy)
)。在這種情況下,FluentBackoff
將被應(yīng)用。
添加回答
舉報(bào)
0/150
提交
取消