2 回答

TA貢獻1824條經驗 獲得超6個贊
python線程不會真正幫助你并行處理,因為它們是在同一個“真正的CPU線程”上執(zhí)行的,python線程在你處理異步HTTP調用時很有幫助。
關于來自文檔:ProcessPoolExecutor
concurrent.futures.ProcessPoolExecutor()
ProcessPoolExecutor 類是一個執(zhí)行器子類,它使用進程池異步執(zhí)行調用。ProcessPoolExecutor使用多處理模塊,這允許它避開全局解釋器鎖,但也意味著只能執(zhí)行和返回可拾取的對象。
如果您需要高CPU處理,它可以為您提供幫助,您可以使用:
import concurrent
def manipulate_values(k_v):
k, v = k_v
return_values = []
for i in v :
new_value = i ** 2 - 13
return_values.append(new_value)
return k, return_values
with concurrent.futures.ProcessPoolExecutor() as executor:
example_dict = dict(executor.map(manipulate_values, example_dict1.items()))
這是一個簡單的基準測試,使用一個簡單的循環(huán)來處理你的數(shù)據(jù),而不是使用,我的場景假設對于要處理的每個項目,你需要大約50ms的CPU時間:forProcessPoolExecutor
您可以看到如果要處理的每個項目的CPU時間高的真正好處ProcessPoolExecutor
from simple_benchmark import BenchmarkBuilder
import time
import concurrent
b = BenchmarkBuilder()
def manipulate_values1(k_v):
k, v = k_v
time.sleep(0.05)
return k, v
def manipulate_values2(v):
time.sleep(0.05)
return v
@b.add_function()
def test_with_process_pool_executor(d):
with concurrent.futures.ProcessPoolExecutor() as executor:
return dict(executor.map(manipulate_values1, d.items()))
@b.add_function()
def test_simple_for_loop(d):
for key, value in d.items():
d[key] = manipulate_values2((key, value))
@b.add_arguments('Number of keys in dict')
def argument_provider():
for exp in range(2, 10):
size = 2**exp
yield size, {i: [i] * 10_000 for i in range(size)}
r = b.run()
r.plot()
如果您沒有為 ProcessPoolExecutor 設置工作線程數(shù),則默認的工作線程數(shù)將等于計算機上的處理器數(shù)(對于基準測試,我使用的是一臺 CPU 為 8 的電腦)。
但在您的情況下,根據(jù)問題中提供的數(shù)據(jù),處理1個項目將需要約3 μs:
%timeit manipulate_values([367, 30, 847, 482, 887, 654, 347, 504, 413, 821])
2.32 μs ± 25.8 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)
因此,如果要處理的一個項目的 CPU 時間較短,則最好使用簡單的 for 循環(huán)。
@user3666197提出的一個很好的觀點是,當你有巨大的項目/列表時,我使用列表中的隨機數(shù)對這兩種方法進行了基準測試:1_000_000_000
如您所見,在這種情況下更適合使用ProcessPoolExecutor
from simple_benchmark import BenchmarkBuilder
import time
import concurrent
from random import choice
b = BenchmarkBuilder()
def manipulate_values1(k_v):
k, v = k_v
return_values = []
for i in v:
new_value = i ** 2 - 13
return_values.append(new_value)
return k, return_values
def manipulate_values2(v):
return_values = []
for i in v:
new_value = i ** 2 - 13
return_values.append(new_value)
return return_values
@b.add_function()
def test_with_process_pool_executor(d):
with concurrent.futures.ProcessPoolExecutor() as executor:
return dict(executor.map(manipulate_values1, d.items()))
@b.add_function()
def test_simple_for_loop(d):
for key, value in d.items():
d[key] = manipulate_values2(value)
@b.add_arguments('Number of keys in dict')
def argument_provider():
for exp in range(2, 5):
size = 2**exp
yield size, {i: [choice(range(1000)) for _ in range(1_000_000)] for i in range(size)}
r = b.run()
r.plot()
預期,因為處理一個項目需要大約209ms:
l = [367] * 1_000_000
%timeit manipulate_values2(l)
# 209 ms ± 1.45 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
不過,最快的選擇是將numpy.arrays與循環(huán)解決方案一起使用:for
from simple_benchmark import BenchmarkBuilder
import time
import concurrent
import numpy as np
b = BenchmarkBuilder()
def manipulate_values1(k_v):
k, v = k_v
return k, v ** 2 - 13
def manipulate_values2(v):
return v ** 2 - 13
@b.add_function()
def test_with_process_pool_executor(d):
with concurrent.futures.ProcessPoolExecutor() as executor:
return dict(executor.map(manipulate_values1, d.items()))
@b.add_function()
def test_simple_for_loop(d):
for key, value in d.items():
d[key] = manipulate_values2(value)
@b.add_arguments('Number of keys in dict')
def argument_provider():
for exp in range(2, 7):
size = 2**exp
yield size, {i: np.random.randint(0, 1000, size=1_000_000) for i in range(size)}
r = b.run()
r.plot()
預計簡單循環(huán)會更快,因為處理一個numpy.array需要<1ms:for
def manipulate_value2( input_list ):
return input_list ** 2 - 13
l = np.random.randint(0, 1000, size=1_000_000)
%timeit manipulate_values2(l)
# 951 μs ± 5.7 μs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

TA貢獻1772條經驗 獲得超5個贊
問:“為什么我不能使用多個線程來執(zhí)行此計算,例如三個線程,一個用于 key1、key2 和 key3?”
你可以,但對性能沒有合理的影響 - 了解python如何處理基于線程的執(zhí)行流的所有細節(jié)在這里是基本的。了解 GIL 鎖定技巧,正確使用它,避免任何并發(fā)處理及其對性能的影響,您將獲得 WHY 部分。
Q : “concurrent.futures.ProcessPoolExecutor() 在這里工作嗎?”
愿意。
然而,它們的凈效應(如果有任何比純處理流“更快”)將取決于給定大小的“大”列表(如上文所警告的那樣,“數(shù)百萬個密鑰,并且列表同樣長”),這些列表應該被復制(RAM-I/ O)并傳遞(SER /DES處理+IPC傳輸)到生成的(基于進程的)遠程執(zhí)行器池。[SERIAL]
這些多次重復的RAM-I/O + SER/DES附加開銷成本將很快占據(jù)主導地位。
RAM-I/O 復制步驟:
>>> from zmq import Stopwatch; aClk = Stopwatch()
>>> aClk.start(); aList = [ i for i in range( int( 1E4 ) ) ]; aClk.stop()
1345 [us] to copy a List of 1E4 elements
>>> aClk.start(); aList = [ i for i in range( int( 1E5 ) ) ]; aClk.stop()
12776 [us] to copy a List of 1E5 elements
>>> aClk.start(); aList = [ i for i in range( int( 1E6 ) ) ]; aClk.stop()
149197 [us] to copy a List of 1E6 elements
>>> aClk.start(); aList = [ i for i in range( int( 1E7 ) ) ]; aClk.stop()
1253792 [us] to copy a List of 1E7 elements
| |::: [us]
| +--- [ms]
+------ [ s]
SER/DES 步驟 :
>>> import pickle
>>> aClk.start(); _ = pickle.dumps( aList ); aClk.stop()
608323
615851
638821 [us] to copy pickle.dumps() a List of 1E7 elements
| |::: [us]
| +--- [ms]
+------ [ s]
因此,每個批次的附加開銷預期為 ~ 2 x ( 1253 + 608 ) [ms] + IPC 傳輸成本,只需一次 1E7 個項目
manipulate_values() 的實際有用工作有效負載非常小,以至于所有附加成本的一次性總和幾乎無法支付與在遠程工作人員池中分配工作單元相關的額外費用。矢量化計算形式有望帶來更智能的結果。這里的附加成本比少量的有用工作要大得多。
模式將更多地取決于SER/DES參數(shù)通過“那里”的開銷成本,加上SER/DES返回結果的附加成本 - 所有這些都將決定凈效應(<<1.0 x的反加速經常在用例中觀察到,但引入只是一個糟糕的設計端工程實踐, 沒有后期基準可以挽救已經燒毀的人*天,浪費在如此糟糕的設計決策中)
添加回答
舉報