第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號(hào)安全,請(qǐng)及時(shí)綁定郵箱和手機(jī)立即綁定
已解決430363個(gè)問(wèn)題,去搜搜看,總會(huì)有你想問(wèn)的

如果我緩存 Spark 數(shù)據(jù)幀,然后覆蓋引用,原始數(shù)據(jù)幀是否仍會(huì)被緩存?

如果我緩存 Spark 數(shù)據(jù)幀,然后覆蓋引用,原始數(shù)據(jù)幀是否仍會(huì)被緩存?

ITMISS 2022-08-02 11:00:23
假設(shè)我有一個(gè)函數(shù)來(lái)生成一個(gè)(py)spark數(shù)據(jù)幀,將數(shù)據(jù)幀緩存到內(nèi)存中作為最后一個(gè)操作。def gen_func(inputs):    df = ... do stuff...    df.cache()    df.count()       return df根據(jù)我的理解,Spark的緩存工作如下:當(dāng)在數(shù)據(jù)幀上調(diào)用一個(gè)動(dòng)作()時(shí),它將從其DAG計(jì)算并緩存到內(nèi)存中,并附加到引用它的對(duì)象上。cache/persistcount()只要存在對(duì)該對(duì)象的引用(可能在其他函數(shù)/其他作用域中),df 將繼續(xù)緩存,并且依賴于 df 的所有 DAG 都將使用內(nèi)存中緩存的數(shù)據(jù)作為起點(diǎn)。如果刪除了對(duì) df 的所有引用,Spark 會(huì)將緩存作為要進(jìn)行垃圾回收的內(nèi)存。它可能不會(huì)立即被垃圾回收,導(dǎo)致一些短期內(nèi)存塊(特別是,如果您生成緩存數(shù)據(jù)并過(guò)快地丟棄它們,則會(huì)導(dǎo)致內(nèi)存泄漏),但最終它將被清除。我的問(wèn)題是,假設(shè)我用于生成一個(gè)數(shù)據(jù)框,但隨后覆蓋原始數(shù)據(jù)框引用(可能帶有a或a)。gen_funcfilterwithColumndf=gen_func(inputs) df=df.filter("some_col = some_val")在 Spark 中,RDD/DF 是不可變的,因此在濾波器之后重新分配的 df 和在濾波器之前的 df 指的是兩個(gè)完全不同的對(duì)象。在本例中,對(duì)原始 df 的引用已被覆蓋。這是否意味著緩存的數(shù)據(jù)框不再可用,將被垃圾回收?這是否意味著新的后置過(guò)濾器將從頭開(kāi)始計(jì)算所有內(nèi)容,盡管它是從以前緩存的數(shù)據(jù)幀生成的?cache/counteddf我之所以問(wèn)這個(gè)問(wèn)題,是因?yàn)槲易罱迯?fù)了代碼中的一些內(nèi)存不足問(wèn)題,在我看來(lái),緩存可能是問(wèn)題所在。但是,我還沒(méi)有真正了解使用緩存的安全方法的全部細(xì)節(jié),以及如何意外地使緩存的內(nèi)存失效。在我的理解中缺少什么?我在執(zhí)行上述操作時(shí)是否偏離了最佳實(shí)踐?
查看完整描述

2 回答

?
叮當(dāng)貓咪

TA貢獻(xiàn)1776條經(jīng)驗(yàn) 獲得超12個(gè)贊

我做了幾個(gè)實(shí)驗(yàn),如下所示。顯然,數(shù)據(jù)幀一旦緩存,就會(huì)保持緩存狀態(tài)(如 和 查詢計(jì)劃等 所示),即使所有 Python 引用都被覆蓋或完全刪除,并且顯式調(diào)用了垃圾回收。getPersistentRDDsInMemorydel


實(shí)驗(yàn) 1:


def func():

    data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')

    data.cache()

    data.count()

    return data


sc._jsc.getPersistentRDDs()


df = func()

sc._jsc.getPersistentRDDs()


df2 = df.filter('col1 != 2')

del df

import gc

gc.collect()

sc._jvm.System.gc()

sc._jsc.getPersistentRDDs()


df2.select('*').explain()


del df2

gc.collect()

sc._jvm.System.gc()

sc._jsc.getPersistentRDDs()

結(jié)果:


>>> def func():

...     data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')

...     data.cache()

...     data.count()

...     return data

...

>>> sc._jsc.getPersistentRDDs()

{}


>>> df = func()

>>> sc._jsc.getPersistentRDDs()

{71: JavaObject id=o234}


>>> df2 = df.filter('col1 != 2')

>>> del df

>>> import gc

>>> gc.collect()

93

>>> sc._jvm.System.gc()

>>> sc._jsc.getPersistentRDDs()

{71: JavaObject id=o240}


>>> df2.select('*').explain()

== Physical Plan ==

*(1) Filter (isnotnull(col1#174L) AND NOT (col1#174L = 2))

+- *(1) ColumnarToRow

   +- InMemoryTableScan [col1#174L], [isnotnull(col1#174L), NOT (col1#174L = 2)]

         +- InMemoryRelation [col1#174L], StorageLevel(disk, memory, deserialized, 1 replicas)

               +- *(1) Project [_1#172L AS col1#174L]

                  +- *(1) Scan ExistingRDD[_1#172L]


>>> del df2

>>> gc.collect()

85

>>> sc._jvm.System.gc()

>>> sc._jsc.getPersistentRDDs()

{71: JavaObject id=o250}

實(shí)驗(yàn) 2:


def func():

    data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')

    data.cache()

    data.count()

    return data


sc._jsc.getPersistentRDDs()


df = func()

sc._jsc.getPersistentRDDs()


df = df.filter('col1 != 2')

import gc

gc.collect()

sc._jvm.System.gc()

sc._jsc.getPersistentRDDs()


df.select('*').explain()


del df

gc.collect()

sc._jvm.System.gc()

sc._jsc.getPersistentRDDs()

結(jié)果:


>>> def func():

...     data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')

...     data.cache()

...     data.count()

...     return data

...

>>> sc._jsc.getPersistentRDDs()

{}


>>> df = func()

>>> sc._jsc.getPersistentRDDs()

{86: JavaObject id=o317}


>>> df = df.filter('col1 != 2')

>>> import gc

>>> gc.collect()

244

>>> sc._jvm.System.gc()

>>> sc._jsc.getPersistentRDDs()

{86: JavaObject id=o323}


>>> df.select('*').explain()

== Physical Plan ==

*(1) Filter (isnotnull(col1#220L) AND NOT (col1#220L = 2))

+- *(1) ColumnarToRow

   +- InMemoryTableScan [col1#220L], [isnotnull(col1#220L), NOT (col1#220L = 2)]

         +- InMemoryRelation [col1#220L], StorageLevel(disk, memory, deserialized, 1 replicas)

               +- *(1) Project [_1#218L AS col1#220L]

                  +- *(1) Scan ExistingRDD[_1#218L]


>>> del df

>>> gc.collect()

85

>>> sc._jvm.System.gc()

>>> sc._jsc.getPersistentRDDs()

{86: JavaObject id=o333}

實(shí)驗(yàn)3(對(duì)照實(shí)驗(yàn),證明無(wú)孔徒有效)


def func():

    data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')

    data.cache()

    data.count()

    return data


sc._jsc.getPersistentRDDs()


df = func()

sc._jsc.getPersistentRDDs()


df2 = df.filter('col1 != 2')

df2.select('*').explain()


df.unpersist()

df2.select('*').explain()

結(jié)果:


>>> def func():

...     data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')

...     data.cache()

...     data.count()

...     return data

...

>>> sc._jsc.getPersistentRDDs()

{}


>>> df = func()

>>> sc._jsc.getPersistentRDDs()

{116: JavaObject id=o398}


>>> df2 = df.filter('col1 != 2')

>>> df2.select('*').explain()

== Physical Plan ==

*(1) Filter (isnotnull(col1#312L) AND NOT (col1#312L = 2))

+- *(1) ColumnarToRow

   +- InMemoryTableScan [col1#312L], [isnotnull(col1#312L), NOT (col1#312L = 2)]

         +- InMemoryRelation [col1#312L], StorageLevel(disk, memory, deserialized, 1 replicas)

               +- *(1) Project [_1#310L AS col1#312L]

                  +- *(1) Scan ExistingRDD[_1#310L]


>>> df.unpersist()

DataFrame[col1: bigint]

>>> sc._jsc.getPersistentRDDs()

{}


>>> df2.select('*').explain()

== Physical Plan ==

*(1) Project [_1#310L AS col1#312L]

+- *(1) Filter (isnotnull(_1#310L) AND NOT (_1#310L = 2))

   +- *(1) Scan ExistingRDD[_1#310L]

回答OP的問(wèn)題:


這是否意味著緩存的數(shù)據(jù)框不再可用,將被垃圾回收?這是否意味著新的后置濾波器df將從頭開(kāi)始計(jì)算所有內(nèi)容,盡管它是從以前緩存的數(shù)據(jù)幀生成的?


實(shí)驗(yàn)表明兩者都沒(méi)有。數(shù)據(jù)幀保持緩存狀態(tài),不進(jìn)行垃圾回收,并且根據(jù)查詢計(jì)劃使用緩存的(不可引用的)數(shù)據(jù)幀計(jì)算新數(shù)據(jù)幀。


與緩存使用相關(guān)的一些有用功能(如果您不想通過(guò) Spark UI 執(zhí)行此操作)是:


sc._jsc.getPersistentRDDs(),其中顯示了緩存的 RDD/數(shù)據(jù)幀的列表,以及


spark.catalog.clearCache(),這將清除所有緩存的 RDD/數(shù)據(jù)幀。


我在執(zhí)行上述操作時(shí)是否偏離了最佳實(shí)踐?


我沒(méi)有權(quán)力對(duì)此進(jìn)行判斷,但正如其中一條評(píng)論所建議的那樣,避免重新分配,因?yàn)閿?shù)據(jù)幀是不可變的。試著想象你正在用scala編碼,你被定義為.做是不可能的。Python本身無(wú)法強(qiáng)制執(zhí)行,但我認(rèn)為最佳做法是避免覆蓋任何數(shù)據(jù)幀變量,這樣,如果您不再需要緩存的結(jié)果,則可以隨時(shí)調(diào)用。dfdfvaldf = df.filter(...)df.unpersist()


查看完整回答
反對(duì) 回復(fù) 2022-08-02
?
qq_遁去的一_1

TA貢獻(xiàn)1725條經(jīng)驗(yàn) 獲得超8個(gè)贊

想提出幾點(diǎn),希望能澄清Spark在緩存方面的行為。


當(dāng)您有


df = ... do stuff...

df.cache()

df.count()

...然后在應(yīng)用程序中的其他位置


   another_df = ... do *same* stuff...

   another_df.*some_action()*

...,您希望重用緩存的數(shù)據(jù)幀。畢竟,重用先前計(jì)算的結(jié)果是緩存的目標(biāo)。意識(shí)到這一點(diǎn),Spark開(kāi)發(fā)人員決定使用分析的邏輯計(jì)劃作為識(shí)別緩存數(shù)據(jù)幀的“關(guān)鍵”,而不是僅僅依賴于來(lái)自應(yīng)用程序端的引用。在 Spark 中,CacheManager 是跟蹤緩存計(jì)算的組件,按索引順序排列:another_dfdfcachedData


  /**

   * Maintains the list of cached plans as an immutable sequence.  Any updates to the list

   * should be protected in a "this.synchronized" block which includes the reading of the

   * existing value and the update of the cachedData var.

   */

  @transient @volatile

  private var cachedData = IndexedSeq[CachedData]()

在查詢規(guī)劃期間(在緩存管理器階段),將掃描此結(jié)構(gòu)以查找正在分析的計(jì)劃的所有子樹,以查看是否已計(jì)算出其中的任何子樹。如果找到匹配項(xiàng),Spark 會(huì)將此子樹替換為相應(yīng)的 from 。InMemoryRelationcachedData


cache()(的簡(jiǎn)單同義詞 ) 函數(shù)通過(guò)調(diào)用 cacheQuery(...) 來(lái)存儲(chǔ)具有存儲(chǔ)級(jí)別的數(shù)據(jù)幀persist()MEMORY_AND_DISKCacheManager

      /**

       * Caches the data produced by the logical representation of the given [[Dataset]].

       * Unlike `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because

       * recomputing the in-memory columnar representation of the underlying table is expensive.

       */

      def cacheQuery(...

請(qǐng)注意,這與使用級(jí)別的 RDD 緩存不同。一旦緩存了數(shù)據(jù)幀,它們就會(huì)保留在內(nèi)存或本地執(zhí)行器磁盤上緩存,直到它們被顯式'ed',或者調(diào)用CacheManager。當(dāng)執(zhí)行程序存儲(chǔ)內(nèi)存完全填滿時(shí),緩存塊開(kāi)始使用 LRU(最近最少使用)推送到磁盤,但永遠(yuǎn)不會(huì)簡(jiǎn)單地“丟棄”。MEMORY_ONLYunpersistclearCache()


順便說(shuō)一句,好問(wèn)題...


查看完整回答
反對(duì) 回復(fù) 2022-08-02
  • 2 回答
  • 0 關(guān)注
  • 148 瀏覽
慕課專欄
更多

添加回答

舉報(bào)

0/150
提交
取消
微信客服

購(gòu)課補(bǔ)貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動(dòng)學(xué)習(xí)伙伴

公眾號(hào)

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號(hào)