2 回答

TA貢獻(xiàn)1776條經(jīng)驗(yàn) 獲得超12個(gè)贊
我做了幾個(gè)實(shí)驗(yàn),如下所示。顯然,數(shù)據(jù)幀一旦緩存,就會(huì)保持緩存狀態(tài)(如 和 查詢計(jì)劃等 所示),即使所有 Python 引用都被覆蓋或完全刪除,并且顯式調(diào)用了垃圾回收。getPersistentRDDsInMemorydel
實(shí)驗(yàn) 1:
def func():
data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
data.cache()
data.count()
return data
sc._jsc.getPersistentRDDs()
df = func()
sc._jsc.getPersistentRDDs()
df2 = df.filter('col1 != 2')
del df
import gc
gc.collect()
sc._jvm.System.gc()
sc._jsc.getPersistentRDDs()
df2.select('*').explain()
del df2
gc.collect()
sc._jvm.System.gc()
sc._jsc.getPersistentRDDs()
結(jié)果:
>>> def func():
... data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
... data.cache()
... data.count()
... return data
...
>>> sc._jsc.getPersistentRDDs()
{}
>>> df = func()
>>> sc._jsc.getPersistentRDDs()
{71: JavaObject id=o234}
>>> df2 = df.filter('col1 != 2')
>>> del df
>>> import gc
>>> gc.collect()
93
>>> sc._jvm.System.gc()
>>> sc._jsc.getPersistentRDDs()
{71: JavaObject id=o240}
>>> df2.select('*').explain()
== Physical Plan ==
*(1) Filter (isnotnull(col1#174L) AND NOT (col1#174L = 2))
+- *(1) ColumnarToRow
+- InMemoryTableScan [col1#174L], [isnotnull(col1#174L), NOT (col1#174L = 2)]
+- InMemoryRelation [col1#174L], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) Project [_1#172L AS col1#174L]
+- *(1) Scan ExistingRDD[_1#172L]
>>> del df2
>>> gc.collect()
85
>>> sc._jvm.System.gc()
>>> sc._jsc.getPersistentRDDs()
{71: JavaObject id=o250}
實(shí)驗(yàn) 2:
def func():
data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
data.cache()
data.count()
return data
sc._jsc.getPersistentRDDs()
df = func()
sc._jsc.getPersistentRDDs()
df = df.filter('col1 != 2')
import gc
gc.collect()
sc._jvm.System.gc()
sc._jsc.getPersistentRDDs()
df.select('*').explain()
del df
gc.collect()
sc._jvm.System.gc()
sc._jsc.getPersistentRDDs()
結(jié)果:
>>> def func():
... data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
... data.cache()
... data.count()
... return data
...
>>> sc._jsc.getPersistentRDDs()
{}
>>> df = func()
>>> sc._jsc.getPersistentRDDs()
{86: JavaObject id=o317}
>>> df = df.filter('col1 != 2')
>>> import gc
>>> gc.collect()
244
>>> sc._jvm.System.gc()
>>> sc._jsc.getPersistentRDDs()
{86: JavaObject id=o323}
>>> df.select('*').explain()
== Physical Plan ==
*(1) Filter (isnotnull(col1#220L) AND NOT (col1#220L = 2))
+- *(1) ColumnarToRow
+- InMemoryTableScan [col1#220L], [isnotnull(col1#220L), NOT (col1#220L = 2)]
+- InMemoryRelation [col1#220L], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) Project [_1#218L AS col1#220L]
+- *(1) Scan ExistingRDD[_1#218L]
>>> del df
>>> gc.collect()
85
>>> sc._jvm.System.gc()
>>> sc._jsc.getPersistentRDDs()
{86: JavaObject id=o333}
實(shí)驗(yàn)3(對(duì)照實(shí)驗(yàn),證明無(wú)孔徒有效)
def func():
data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
data.cache()
data.count()
return data
sc._jsc.getPersistentRDDs()
df = func()
sc._jsc.getPersistentRDDs()
df2 = df.filter('col1 != 2')
df2.select('*').explain()
df.unpersist()
df2.select('*').explain()
結(jié)果:
>>> def func():
... data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
... data.cache()
... data.count()
... return data
...
>>> sc._jsc.getPersistentRDDs()
{}
>>> df = func()
>>> sc._jsc.getPersistentRDDs()
{116: JavaObject id=o398}
>>> df2 = df.filter('col1 != 2')
>>> df2.select('*').explain()
== Physical Plan ==
*(1) Filter (isnotnull(col1#312L) AND NOT (col1#312L = 2))
+- *(1) ColumnarToRow
+- InMemoryTableScan [col1#312L], [isnotnull(col1#312L), NOT (col1#312L = 2)]
+- InMemoryRelation [col1#312L], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) Project [_1#310L AS col1#312L]
+- *(1) Scan ExistingRDD[_1#310L]
>>> df.unpersist()
DataFrame[col1: bigint]
>>> sc._jsc.getPersistentRDDs()
{}
>>> df2.select('*').explain()
== Physical Plan ==
*(1) Project [_1#310L AS col1#312L]
+- *(1) Filter (isnotnull(_1#310L) AND NOT (_1#310L = 2))
+- *(1) Scan ExistingRDD[_1#310L]
回答OP的問(wèn)題:
這是否意味著緩存的數(shù)據(jù)框不再可用,將被垃圾回收?這是否意味著新的后置濾波器df將從頭開(kāi)始計(jì)算所有內(nèi)容,盡管它是從以前緩存的數(shù)據(jù)幀生成的?
實(shí)驗(yàn)表明兩者都沒(méi)有。數(shù)據(jù)幀保持緩存狀態(tài),不進(jìn)行垃圾回收,并且根據(jù)查詢計(jì)劃使用緩存的(不可引用的)數(shù)據(jù)幀計(jì)算新數(shù)據(jù)幀。
與緩存使用相關(guān)的一些有用功能(如果您不想通過(guò) Spark UI 執(zhí)行此操作)是:
sc._jsc.getPersistentRDDs(),其中顯示了緩存的 RDD/數(shù)據(jù)幀的列表,以及
spark.catalog.clearCache(),這將清除所有緩存的 RDD/數(shù)據(jù)幀。
我在執(zhí)行上述操作時(shí)是否偏離了最佳實(shí)踐?
我沒(méi)有權(quán)力對(duì)此進(jìn)行判斷,但正如其中一條評(píng)論所建議的那樣,避免重新分配,因?yàn)閿?shù)據(jù)幀是不可變的。試著想象你正在用scala編碼,你被定義為.做是不可能的。Python本身無(wú)法強(qiáng)制執(zhí)行,但我認(rèn)為最佳做法是避免覆蓋任何數(shù)據(jù)幀變量,這樣,如果您不再需要緩存的結(jié)果,則可以隨時(shí)調(diào)用。dfdfvaldf = df.filter(...)df.unpersist()

TA貢獻(xiàn)1725條經(jīng)驗(yàn) 獲得超8個(gè)贊
想提出幾點(diǎn),希望能澄清Spark在緩存方面的行為。
當(dāng)您有
df = ... do stuff...
df.cache()
df.count()
...然后在應(yīng)用程序中的其他位置
another_df = ... do *same* stuff...
another_df.*some_action()*
...,您希望重用緩存的數(shù)據(jù)幀。畢竟,重用先前計(jì)算的結(jié)果是緩存的目標(biāo)。意識(shí)到這一點(diǎn),Spark開(kāi)發(fā)人員決定使用分析的邏輯計(jì)劃作為識(shí)別緩存數(shù)據(jù)幀的“關(guān)鍵”,而不是僅僅依賴于來(lái)自應(yīng)用程序端的引用。在 Spark 中,CacheManager 是跟蹤緩存計(jì)算的組件,按索引順序排列:another_dfdfcachedData
/**
* Maintains the list of cached plans as an immutable sequence. Any updates to the list
* should be protected in a "this.synchronized" block which includes the reading of the
* existing value and the update of the cachedData var.
*/
@transient @volatile
private var cachedData = IndexedSeq[CachedData]()
在查詢規(guī)劃期間(在緩存管理器階段),將掃描此結(jié)構(gòu)以查找正在分析的計(jì)劃的所有子樹,以查看是否已計(jì)算出其中的任何子樹。如果找到匹配項(xiàng),Spark 會(huì)將此子樹替換為相應(yīng)的 from 。InMemoryRelationcachedData
cache()(的簡(jiǎn)單同義詞 ) 函數(shù)通過(guò)調(diào)用 cacheQuery(...) 來(lái)存儲(chǔ)具有存儲(chǔ)級(jí)別的數(shù)據(jù)幀persist()MEMORY_AND_DISKCacheManager
/**
* Caches the data produced by the logical representation of the given [[Dataset]].
* Unlike `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because
* recomputing the in-memory columnar representation of the underlying table is expensive.
*/
def cacheQuery(...
請(qǐng)注意,這與使用級(jí)別的 RDD 緩存不同。一旦緩存了數(shù)據(jù)幀,它們就會(huì)保留在內(nèi)存或本地執(zhí)行器磁盤上緩存,直到它們被顯式'ed',或者調(diào)用CacheManager。當(dāng)執(zhí)行程序存儲(chǔ)內(nèi)存完全填滿時(shí),緩存塊開(kāi)始使用 LRU(最近最少使用)推送到磁盤,但永遠(yuǎn)不會(huì)簡(jiǎn)單地“丟棄”。MEMORY_ONLYunpersistclearCache()
順便說(shuō)一句,好問(wèn)題...
添加回答
舉報(bào)