3 回答

TA貢獻(xiàn)1824條經(jīng)驗(yàn) 獲得超5個(gè)贊
我認(rèn)為,由于這個(gè)問(wèn)題的答案和另一個(gè)類(lèi)似問(wèn)題的答案似乎直接沖突,所以最好直接使用pdb.
概括
boto3 默認(rèn)情況下使用多個(gè)線程 (10)
但是,它不是異步的,因?yàn)樗诜祷刂暗却尤耄┻@些線程,而不是使用“即發(fā)即棄”技術(shù)
因此,以這種方式,如果您嘗試與來(lái)自多個(gè)客戶端的 s3 存儲(chǔ)桶通信,則讀/寫(xiě)線程安全性就位。
細(xì)節(jié)
我在這里努力解決的一個(gè)方面是多個(gè)(子線程)并不意味著頂級(jí)方法本身是非阻塞的:如果調(diào)用線程開(kāi)始上傳到多個(gè)子線程,然后等待這些線程完成并返回,我敢說(shuō)這仍然是一個(gè)阻塞電話。反過(guò)來(lái)asyncio說(shuō),如果方法調(diào)用是一個(gè)“即發(fā)即棄”的調(diào)用。使用threading,這實(shí)際上歸結(jié)為是否x.join()曾經(jīng)被調(diào)用過(guò)。
這是取自 Victor Val 的初始代碼,用于啟動(dòng)調(diào)試器:
import io
import pdb
import boto3
# From dd if=/dev/zero of=100mb.txt bs=50M count=1
buf = io.BytesIO(open('100mb.txt', 'rb').read())
bucket = boto3.resource('s3').Bucket('test-threads')
pdb.run("bucket.upload_fileobj(buf, '100mb')")
此堆棧幀來(lái)自 Boto 1.9.134。
現(xiàn)在跳入pdb:
.upload_fileobj() 首先調(diào)用一個(gè)嵌套方法——還沒(méi)有太多可看的。
(Pdb) s
--Call--
> /home/ubuntu/envs/py372/lib/python3.7/site-packages/boto3/s3/inject.py(542)bucket_upload_fileobj()
-> def bucket_upload_fileobj(self, Fileobj, Key, ExtraArgs=None,
(Pdb) s
(Pdb) l
574
575 :type Config: boto3.s3.transfer.TransferConfig
576 :param Config: The transfer configuration to be used when performing the
577 upload.
578 """
579 -> return self.meta.client.upload_fileobj(
580 Fileobj=Fileobj, Bucket=self.name, Key=Key, ExtraArgs=ExtraArgs,
581 Callback=Callback, Config=Config)
582
583
584
所以頂級(jí)方法確實(shí)返回了一些東西,但目前還不清楚那個(gè)東西最終會(huì)變成什么None。
所以我們進(jìn)入了那個(gè)。
現(xiàn)在,.upload_fileobj()確實(shí)有一個(gè)config參數(shù),默認(rèn)情況下是 None :
(Pdb) l 531
526
527 subscribers = None
528 if Callback is not None:
529 subscribers = [ProgressCallbackInvoker(Callback)]
530
531 config = Config
532 if config is None:
533 config = TransferConfig()
534
535 with create_transfer_manager(self, config) as manager:
536 future = manager.upload(
這意味著config成為默認(rèn)值TransferConfig():
use_threads-- 如果為 True,則執(zhí)行 S3 傳輸時(shí)將使用線程。如果為 False,則不會(huì)使用線程來(lái)執(zhí)行傳輸:所有邏輯都將在主線程中運(yùn)行。
max_concurrency-- 請(qǐng)求執(zhí)行傳輸?shù)淖畲缶€程數(shù)。如果 use_threads 設(shè)置為 False,則忽略提供的值,因?yàn)閭鬏斨粫?huì)使用主線程。
哇啦,他們?cè)谶@里:
(Pdb) unt 534
> /home/ubuntu/envs/py372/lib/python3.7/site-packages/boto3/s3/inject.py(535)upload_fileobj()
-> with create_transfer_manager(self, config) as manager:
(Pdb) config
<boto3.s3.transfer.TransferConfig object at 0x7f1790dc0cc0>
(Pdb) config.use_threads
True
(Pdb) config.max_concurrency
10
現(xiàn)在我們?cè)谡{(diào)用堆棧中下降一個(gè)級(jí)別以使用TransferManager(上下文管理器)。此時(shí),max_concurrency已被用作類(lèi)似名稱(chēng)的參數(shù)max_request_concurrency:
# https://github.com/boto/s3transfer/blob/2aead638c8385d8ae0b1756b2de17e8fad45fffa/s3transfer/manager.py#L223
# The executor responsible for making S3 API transfer requests
self._request_executor = BoundedExecutor(
max_size=self._config.max_request_queue_size,
max_num_threads=self._config.max_request_concurrency,
tag_semaphores={
IN_MEMORY_UPLOAD_TAG: TaskSemaphore(
self._config.max_in_memory_upload_chunks),
IN_MEMORY_DOWNLOAD_TAG: SlidingWindowSemaphore(
self._config.max_in_memory_download_chunks)
},
executor_cls=executor_cls
)
至少在這個(gè) boto3 版本中,該類(lèi)來(lái)自單獨(dú)的庫(kù)s3transfer。
(Pdb) n
> /home/ubuntu/envs/py372/lib/python3.7/site-packages/boto3/s3/inject.py(536)upload_fileobj()
-> future = manager.upload(
(Pdb) manager
<s3transfer.manager.TransferManager object at 0x7f178db437f0>
(Pdb) manager._config
<boto3.s3.transfer.TransferConfig object at 0x7f1790dc0cc0>
(Pdb) manager._config.use_threads
True
(Pdb) manager._config.max_concurrency
10
接下來(lái),讓我們進(jìn)入manager.upload(). 這是該方法的全文:
(Pdb) l 290, 303
290 -> if extra_args is None:
291 extra_args = {}
292 if subscribers is None:
293 subscribers = []
294 self._validate_all_known_args(extra_args, self.ALLOWED_UPLOAD_ARGS)
295 call_args = CallArgs(
296 fileobj=fileobj, bucket=bucket, key=key, extra_args=extra_args,
297 subscribers=subscribers
298 )
299 extra_main_kwargs = {}
300 if self._bandwidth_limiter:
301 extra_main_kwargs['bandwidth_limiter'] = self._bandwidth_limiter
302 return self._submit_transfer(
303 call_args, UploadSubmissionTask, extra_main_kwargs)
(Pdb) unt 301
> /home/ubuntu/envs/py372/lib/python3.7/site-packages/s3transfer/manager.py(302)upload()
-> return self._submit_transfer(
(Pdb) extra_main_kwargs
{}
(Pdb) UploadSubmissionTask
<class 's3transfer.upload.UploadSubmissionTask'>
(Pdb) call_args
<s3transfer.utils.CallArgs object at 0x7f178db5a5f8>
(Pdb) l 300, 5
300 if self._bandwidth_limiter:
301 extra_main_kwargs['bandwidth_limiter'] = self._bandwidth_limiter
302 -> return self._submit_transfer(
303 call_args, UploadSubmissionTask, extra_main_kwargs)
304
305 def download(self, bucket, key, fileobj, extra_args=None,
啊,太可愛(ài)了——所以我們至少需要再往下一層才能看到實(shí)際的底層上傳。
(Pdb) s
> /home/ubuntu/envs/py372/lib/python3.7/site-packages/s3transfer/manager.py(303)upload()
-> call_args, UploadSubmissionTask, extra_main_kwargs)
(Pdb) s
--Call--
> /home/ubuntu/envs/py372/lib/python3.7/site-packages/s3transfer/manager.py(438)_submit_transfer()
-> def _submit_transfer(self, call_args, submission_task_cls,
(Pdb) s
> /home/ubuntu/envs/py372/lib/python3.7/site-packages/s3transfer/manager.py(440)_submit_transfer()
-> if not extra_main_kwargs:
(Pdb) l 440, 10
440 -> if not extra_main_kwargs:
441 extra_main_kwargs = {}
442
443 # Create a TransferFuture to return back to the user
444 transfer_future, components = self._get_future_with_components(
445 call_args)
446
447 # Add any provided done callbacks to the created transfer future
448 # to be invoked on the transfer future being complete.
449 for callback in get_callbacks(transfer_future, 'done'):
450 components['coordinator'].add_done_callback(callback)
好的,所以現(xiàn)在我們有一個(gè)TransferFuture, 定義在沒(méi)有明確的證據(jù)表明線程已經(jīng)被啟動(dòng)了,但是當(dāng)涉及到期貨s3transfer/futures.py 時(shí),它肯定聽(tīng)起來(lái)像這樣。
(Pdb) l
444 transfer_future, components = self._get_future_with_components(
445 call_args)
446
447 # Add any provided done callbacks to the created transfer future
448 # to be invoked on the transfer future being complete.
449 -> for callback in get_callbacks(transfer_future, 'done'):
450 components['coordinator'].add_done_callback(callback)
451
452 # Get the main kwargs needed to instantiate the submission task
453 main_kwargs = self._get_submission_task_main_kwargs(
454 transfer_future, extra_main_kwargs)
(Pdb) transfer_future
<s3transfer.futures.TransferFuture object at 0x7f178db5a780>
下面的最后一行來(lái)自TransferCoordinator課堂,乍一看似乎很重要:
class TransferCoordinator(object):
"""A helper class for managing TransferFuture"""
def __init__(self, transfer_id=None):
self.transfer_id = transfer_id
self._status = 'not-started'
self._result = None
self._exception = None
self._associated_futures = set()
self._failure_cleanups = []
self._done_callbacks = []
self._done_event = threading.Event() # < ------ !!!!!!
您通常會(huì)看到threading.Event 一個(gè)線程用于發(fā)出事件狀態(tài)的信號(hào),而其他線程可以等待該事件發(fā)生。
TransferCoordinator是由 .使用的TransferFuture.result()。
好的,從上面循環(huán)回來(lái),我們現(xiàn)在在s3transfer.futures.BoundedExecutor它的max_num_threads屬性:
class BoundedExecutor(object):
EXECUTOR_CLS = futures.ThreadPoolExecutor
# ...
def __init__(self, max_size, max_num_threads, tag_semaphores=None,
executor_cls=None):
self._max_num_threads = max_num_threads
if executor_cls is None:
executor_cls = self.EXECUTOR_CLS
self._executor = executor_cls(max_workers=self._max_num_threads)
這基本上相當(dāng)于:
from concurrent import futures
_executor = futures.ThreadPoolExecutor(max_workers=10)
但是仍然存在一個(gè)問(wèn)題:這是一種“即發(fā)即棄”,還是調(diào)用實(shí)際上是在等待線程完成并返回?
似乎是后者。 .result()來(lái)電self._done_event.wait(MAXINT)。
# https://github.com/boto/s3transfer/blob/2aead638c8385d8ae0b1756b2de17e8fad45fffa/s3transfer/futures.py#L249
def result(self):
self._done_event.wait(MAXINT)
# Once done waiting, raise an exception if present or return the
# final result.
if self._exception:
raise self._exception
return self._result
最后,重新運(yùn)行 Victor Val 的測(cè)試,這似乎證實(shí)了上述內(nèi)容:
>>> import boto3
>>> import time
>>> import io
>>>
>>> buf = io.BytesIO(open('100mb.txt', 'rb').read())
>>>
>>> bucket = boto3.resource('s3').Bucket('test-threads')
>>> start = time.time()
>>> print("starting to upload...")
starting to upload...
>>> bucket.upload_fileobj(buf, '100mb')
>>> print("finished uploading")
finished uploading
>>> end = time.time()
>>> print("time: {}".format(end-start))
time: 2.6030001640319824
(此示例在網(wǎng)絡(luò)優(yōu)化實(shí)例上運(yùn)行時(shí),此執(zhí)行時(shí)間可能更短。但 2.5 秒仍然是一個(gè)明顯的大塊時(shí)間,并且根本不表示線程被啟動(dòng)并且沒(méi)有等待。)
最后,這是一個(gè)Callbackfor的示例.upload_fileobj()。它遵循文檔中的示例。
首先,一個(gè)小幫手可以有效地獲取緩沖區(qū)的大小:
def get_bufsize(buf, chunk=1024) -> int:
start = buf.tell()
try:
size = 0
while True:
out = buf.read(chunk)
if out:
size += chunk
else:
break
return size
finally:
buf.seek(start)
類(lèi)本身:
import os
import sys
import threading
import time
class ProgressPercentage(object):
def __init__(self, filename, buf):
self._filename = filename
self._size = float(get_bufsize(buf))
self._seen_so_far = 0
self._lock = threading.Lock()
self.start = None
def __call__(self, bytes_amount):
with self._lock:
if not self.start:
self.start = time.monotonic()
self._seen_so_far += bytes_amount
percentage = (self._seen_so_far / self._size) * 100
sys.stdout.write(
"\r%s %s of %s (%.2f%% done, %.2fs elapsed\n" % (
self._filename, self._seen_so_far, self._size,
percentage, time.monotonic() - self.start))
# Use sys.stdout.flush() to update on one line
# sys.stdout.flush()
例子:
In [19]: import io
...:
...: from boto3.session import Session
...:
...: s3 = Session().resource("s3")
...: bucket = s3.Bucket("test-threads")
...: buf = io.BytesIO(open('100mb.txt', 'rb').read())
...:
...: bucket.upload_fileobj(buf, 'mykey', Callback=ProgressPercentage("mykey", buf))
mykey 262144 of 104857600.0 (0.25% done, 0.00s elapsed
mykey 524288 of 104857600.0 (0.50% done, 0.00s elapsed
mykey 786432 of 104857600.0 (0.75% done, 0.01s elapsed
mykey 1048576 of 104857600.0 (1.00% done, 0.01s elapsed
mykey 1310720 of 104857600.0 (1.25% done, 0.01s elapsed
mykey 1572864 of 104857600.0 (1.50% done, 0.02s elapsed

TA貢獻(xiàn)1829條經(jīng)驗(yàn) 獲得超7個(gè)贊
upload_fileobj接受一個(gè) Config 參數(shù)。這是一個(gè)boto3.s3.transfer.TransferConfig對(duì)象,它又具有一個(gè)名為use_threads
(默認(rèn)為 true)的參數(shù) - 如果為 True,則執(zhí)行 S3 傳輸時(shí)將使用線程。如果為 False,則不會(huì)使用線程來(lái)執(zhí)行傳輸:所有邏輯都將在主線程中運(yùn)行。
希望這對(duì)你有用。

TA貢獻(xiàn)1818條經(jīng)驗(yàn) 獲得超11個(gè)贊
測(cè)試該方法是否阻塞:
我自己根據(jù)經(jīng)驗(yàn)測(cè)試了這種行為。首先,我生成了一個(gè) 100MB 的文件:
dd if=/dev/zero of=100mb.txt bs=100M count=1
然后我嘗試以與您相同的方式上傳文件并測(cè)量所花費(fèi)的時(shí)間:
import boto3
import time
import io
file = open('100mb.txt', 'rb')
buf = io.BytesIO(file.read())
bucket = boto3.resource('s3').Bucket('testbucket')
start = time.time()
print("starting to upload...")
bucket.upload_fileobj(buf, '100mb')
print("finished uploading")
end = time.time()
print("time: {}".format(end-start))
upload_fileobj() 方法完成并讀取下一個(gè) python 行(1gb 文件需要 50 秒)需要 8 秒以上,所以我假設(shè)這個(gè)方法是阻塞的。
使用線程測(cè)試:
使用多個(gè)線程時(shí),即使使用選項(xiàng) use_threads=False ,我也可以驗(yàn)證該方法是否同時(shí)支持多個(gè)傳輸。我開(kāi)始上傳一個(gè) 200mb 的文件,然后是一個(gè) 100mb 的文件,然后 100mb 的文件首先完成。這證實(shí)了TransferConfig中的并發(fā)與多部分傳輸有關(guān)。
代碼:
import boto3
import time
import io
from boto3.s3.transfer import TransferConfig
import threading
config = TransferConfig(use_threads=False)
bucket = boto3.resource('s3').Bucket('testbucket')
def upload(filename):
file = open(filename, 'rb')
buf = io.BytesIO(file.read())
start = time.time()
print("starting to upload file {}".format(filename))
bucket.upload_fileobj(buf,filename,Config=config)
end = time.time()
print("finished uploading file {}. time: {}".format(filename,end-start))
x1 = threading.Thread(target=upload, args=('200mb.txt',))
x2 = threading.Thread(target=upload, args=('100mb.txt',))
x1.start()
time.sleep(2)
x2.start()
輸出:
開(kāi)始上傳文件 200mb.txt
開(kāi)始上傳文件 100mb.txt
完成上傳文件 100mb.txt。時(shí)間:46.35254502296448
完成上傳文件200mb.txt。時(shí)間:61.70564889907837
使用會(huì)話進(jìn)行測(cè)試:
如果您希望上傳方法按照調(diào)用的順序完成,這就是您所需要的。
代碼:
import boto3
import time
import io
from boto3.s3.transfer import TransferConfig
import threading
config = TransferConfig(use_threads=False)
session = boto3.session.Session()
s3 = session.resource('s3')
bucket = s3.Bucket('testbucket')
def upload(filename):
file = open(filename, 'rb')
buf = io.BytesIO(file.read())
start = time.time()
print("starting to upload file {}".format(filename))
bucket.upload_fileobj(buf,filename)
end = time.time()
print("finished uploading file {}. time: {}".format(filename,end-start))
x1 = threading.Thread(target=upload, args=('200mb.txt',))
x2 = threading.Thread(target=upload, args=('100mb.txt',))
x1.start()
time.sleep(2)
x2.start()
輸出:
開(kāi)始上傳文件 200mb.txt
開(kāi)始上傳文件 100mb.txt
完成上傳文件 200mb.txt。時(shí)間:46.62478971481323
完成上傳文件100mb.txt。時(shí)間:50.515950202941895
我發(fā)現(xiàn)的一些資源:
-這是在 SO 中提出的關(guān)于阻塞或非阻塞方法的問(wèn)題。這不是決定性的,但那里可能有相關(guān)信息。
- GitHub 上存在一個(gè)開(kāi)放問(wèn)題,允許在 boto3 中進(jìn)行異步傳輸。
- 還有像aioboto和aiobotocore這樣的工具,專(zhuān)門(mén)用于允許從/到 s3 和其他 aws 服務(wù)的異步下載和上傳。
關(guān)于我之前的回答:
您可以在此處閱讀有關(guān) boto3 中的文件傳輸配置的信息。特別是:
傳輸操作使用線程來(lái)實(shí)現(xiàn)并發(fā)。可以通過(guò)將 use_threads 屬性設(shè)置為 False 來(lái)禁用線程使用。
最初我認(rèn)為這與同時(shí)執(zhí)行的多個(gè)傳輸有關(guān)。但是,閱讀源代碼時(shí),使用TransferConfig時(shí)參數(shù)max_concurrency中的注釋解釋說(shuō)并發(fā)不是指多次傳輸,而是指 “將發(fā)出請(qǐng)求以執(zhí)行傳輸?shù)木€程數(shù)”。所以這是用來(lái)加速傳輸?shù)臇|西。use_threads屬性?xún)H用于允許多部分傳輸中的并發(fā)性。
添加回答
舉報(bào)