第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號安全,請及時(shí)綁定郵箱和手機(jī)立即綁定
已解決430363個(gè)問題,去搜搜看,總會有你想問的

Dask - 是否可以通過自定義函數(shù)使用每個(gè)工作線程中的所有線程?

Dask - 是否可以通過自定義函數(shù)使用每個(gè)工作線程中的所有線程?

開心每一天1111 2023-06-27 18:11:17
就我而言,我在 S3 中有多個(gè)文件和一個(gè)自定義函數(shù),該函數(shù)讀取每個(gè)文件并使用所有線程處理它。為了簡化示例,我只生成一個(gè)數(shù)據(jù)幀df,并且假設(shè)我的函數(shù)是tsfresh.extract_features使用多重處理。生成數(shù)據(jù)import pandas as pdfrom tsfresh import extract_featuresfrom tsfresh.examples.robot_execution_failures import download_robot_execution_failures, \load_robot_execution_failuresdownload_robot_execution_failures()ts, y = load_robot_execution_failures()df = []for i in range(5):    tts = ts.copy()    tts["id"] += 88 * i    df.append(tts)    df = pd.concat(df, ignore_index=True)功能def fun(df, n_jobs):    extracted_features = extract_features(df,                                      column_id="id",                                      column_sort="time",                                      n_jobs=n_jobs)簇import daskfrom dask.distributed import Client, progressfrom dask import compute, delayedfrom dask_cloudprovider import FargateClustermy_vpc = # your vpcmy_subnets = # your subnetscpu = 2 ram = 4cluster = FargateCluster(n_workers=1,                         image='rpanai/feats-worker:2020-08-24',                         vpc=my_vpc,                         subnets=my_subnets,                         worker_cpu=int(cpu * 1024),                         worker_mem=int(ram * 1024),                         cloudwatch_logs_group="my_log_group",                         task_role_policies=['arn:aws:iam::aws:policy/AmazonS3FullAccess'],                         scheduler_timeout='20 minutes'                        )cluster.adapt(minimum=1,              maximum=4)client = Client(cluster)client使用所有工作線程(失敗)to_process = [delayed(fun)(df, cpu) for i in range(10)]out = compute(to_process)AssertionError: daemonic processes are not allowed to have children僅使用一個(gè)線程(OK)在這種情況下,它工作正常,但我浪費(fèi)資源。to_process = [delayed(fun)(df, 0) for i in range(10)]out = compute(to_process)問題我知道對于這個(gè)特定的功能,我最終可以使用多線程和其他一些技巧編寫一個(gè)自定義分配器,但我想分配一個(gè)工作,讓每個(gè)工作人員都可以利用所有資源,而不必?fù)?dān)心太多。
查看完整描述

1 回答

?
元芳怎么了

TA貢獻(xiàn)1798條經(jīng)驗(yàn) 獲得超7個(gè)贊

我可以幫助回答您的具體問題tsfresh,但 iftsfresh只是一個(gè)簡單的玩具示例,可能不是您想要的。

對于tsfresh,您通常不會混合使用tsfreshdask 和 dask 的多重處理,而是讓 dask 執(zhí)行所有處理。這意味著,您從一個(gè)單一的開始dask.DataFrame(在您的測試用例中,您可以將 pandas 數(shù)據(jù)幀轉(zhuǎn)換為 dask 數(shù)據(jù)幀 - 對于您的讀取用例,您可以直接從S3?docu讀取),然后在 dask 數(shù)據(jù)幀中分發(fā)特征提?。ㄌ卣魈崛〉暮锰幨?,它在每個(gè)時(shí)間序列上獨(dú)立工作。因此我們可以為每個(gè)時(shí)間序列生成一個(gè)作業(yè))。


我不確定這是否有助于解決您更普遍的問題。在我看來,你(在大多數(shù)情況下)不想混合dask的分布函數(shù)和“本地”多核計(jì)算,而只是讓dask處理一切。因?yàn)槿绻挥?dask 集群上,您甚至可能不知道每臺機(jī)器上有多少個(gè)核心(或者每個(gè)作業(yè)可能只獲得一個(gè)核心)。

這意味著,如果您的作業(yè)可以分發(fā) N 次,并且每個(gè)作業(yè)將啟動 M 個(gè)子作業(yè),您只需將“N x M”作業(yè)交給 dask 并讓它計(jì)算其余部分(包括數(shù)據(jù)局部性)。


查看完整回答
反對 回復(fù) 2023-06-27
  • 1 回答
  • 0 關(guān)注
  • 148 瀏覽
慕課專欄
更多

添加回答

舉報(bào)

0/150
提交
取消
微信客服

購課補(bǔ)貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動學(xué)習(xí)伙伴

公眾號

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號