2 回答

TA貢獻(xiàn)1805條經(jīng)驗(yàn) 獲得超10個(gè)贊
您的基本算法是“我希望將 的前 10 個(gè)值df['Node']設(shè)置為 的第一個(gè)值ndf,將接下來的 10 個(gè)值設(shè)置為 的下一個(gè)值ndf,依此類推”。這在 Dask 中很難,因?yàn)樗恢烂總€(gè)分區(qū)中有多少行:您正在從 CSV 讀取,并且您在 X 字節(jié)中獲得的行數(shù)取決于每個(gè)部分中的數(shù)據(jù)是什么樣的. 其他格式為您提供更多信息...
因此,您肯定需要兩次遍歷數(shù)據(jù)。您可以使用索引來找出劃分并可能進(jìn)行一些排序。在我看來,你能做的最簡(jiǎn)單的事情就是測(cè)量分割長(zhǎng)度,然后得到每個(gè)開始的偏移量:
lengths = df.map_partitions(len).compute()
offsets = np.cumsum(lengths.values)
offsets -= offsets[0]
現(xiàn)在使用自定義延遲功能來處理零件
@dask.delayed
def add_node(part, offset, ndf):
index = pd.Series(range(offset, offset + len(part)) // 10,
index=part.index) # 10 is the repeat factor
part['Node'] = index.map(ndf)
return part
df2 = dd.from_delayed([add_node(d, off, ndf)
for d, off in zip(df.to_delayed(), offsets)])

TA貢獻(xiàn)1830條經(jīng)驗(yàn) 獲得超3個(gè)贊
使用相同的工作流程,您可以divisions按照此處的建議手動(dòng)設(shè)置
import dask.dataframe as dd
import pandas as pd
import numpy as np
pd.DataFrame(np.random.rand(25000, 2)).to_csv("tempfile.csv", index=False)
df = dd.read_csv("tempfile.csv")
ndf = pd.DataFrame(np.random.randint(1000, 3500, size=2500))
df.divisions = (0, len(df)-1)
df["Note"] = dd.from_array(np.repeat(ndf.values, 10))
我不認(rèn)為使用np.repeat是非常有效的,特別是對(duì)于大 df。
添加回答
舉報(bào)