第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號(hào)安全,請(qǐng)及時(shí)綁定郵箱和手機(jī)立即綁定
已解決430363個(gè)問(wèn)題,去搜搜看,總會(huì)有你想問(wèn)的

使 Boto3 上傳調(diào)用阻塞(單線程)

使 Boto3 上傳調(diào)用阻塞(單線程)

慕桂英546537 2022-01-18 17:47:48
編輯:我最初的假設(shè)被證明部分錯(cuò)誤。我在這里添加了一個(gè)冗長(zhǎng)的答案,我邀請(qǐng)其他人對(duì)其進(jìn)行壓力測(cè)試和糾正。我正在尋找一種以單線程方式利用 Boto3 S3 API 來(lái)模擬線程安全鍵值存儲(chǔ)的方法。簡(jiǎn)而言之,我想使用調(diào)用線程而不是新線程來(lái)進(jìn)行上傳。.upload_fileobj()據(jù)我所知, Boto3(或)中方法的默認(rèn)行為.upload_file()是將任務(wù)啟動(dòng)到新線程并None立即返回。從文檔:這是一種托管傳輸,如有必要,它將在多個(gè)線程中執(zhí)行分段上傳。(如果我對(duì)此的理解首先是錯(cuò)誤的,那么對(duì)此進(jìn)行更正也會(huì)有所幫助。這是在 Boto3 1.9.134 中。)>>> import io>>> import boto3>>> bucket = boto3.resource('s3').Bucket('my-bucket-name')>>> buf = io.BytesIO(b"test")>>> res = bucket.upload_fileobj(buf, 'testobj')>>> res is NoneTrue現(xiàn)在,假設(shè)這buf不是一個(gè)短的 4 字節(jié)字符串,而是一個(gè)巨大的文本 blob,它將花費(fèi)不可忽略的時(shí)間來(lái)完全上傳。我還使用此函數(shù)來(lái)檢查具有給定鍵的對(duì)象是否存在:def key_exists_in_bucket(bucket_obj, key: str) -> bool:    try:        bucket_obj.Object(key).load()    except botocore.exceptions.ClientError:        return False    else:        return True如果對(duì)象按名稱(chēng)存在,我的意圖是不重寫(xiě)該對(duì)象。這里的競(jìng)爭(zhēng)條件相當(dāng)明顯:異步啟動(dòng)上傳,然后快速檢查key_exists_in_bucket(),F(xiàn)alse如果對(duì)象仍在寫(xiě)入,則返回,然后不必要地再次寫(xiě)入它。有沒(méi)有辦法確保bucket.upload_fileobj()由當(dāng)前線程而不是在該方法范圍內(nèi)創(chuàng)建的新線程調(diào)用?我意識(shí)到這會(huì)減慢速度。在這種情況下,我愿意犧牲速度。
查看完整描述

3 回答

?
滄海一幻覺(jué)

TA貢獻(xiàn)1824條經(jīng)驗(yàn) 獲得超5個(gè)贊

我認(rèn)為,由于這個(gè)問(wèn)題的答案和另一個(gè)類(lèi)似問(wèn)題的答案似乎直接沖突,所以最好直接使用pdb.


概括

boto3 默認(rèn)情況下使用多個(gè)線程 (10)

但是,它不是異步的,因?yàn)樗诜祷刂暗却尤耄┻@些線程,而不是使用“即發(fā)即棄”技術(shù)

因此,以這種方式,如果您嘗試與來(lái)自多個(gè)客戶端的 s3 存儲(chǔ)桶通信,則讀/寫(xiě)線程安全性就位。

細(xì)節(jié)

我在這里努力解決的一個(gè)方面是多個(gè)(子線程)并不意味著頂級(jí)方法本身是非阻塞的:如果調(diào)用線程開(kāi)始上傳到多個(gè)子線程,然后等待這些線程完成并返回,我敢說(shuō)這仍然是一個(gè)阻塞電話。反過(guò)來(lái)asyncio說(shuō),如果方法調(diào)用是一個(gè)“即發(fā)即棄”的調(diào)用。使用threading,這實(shí)際上歸結(jié)為是否x.join()曾經(jīng)被調(diào)用過(guò)。


這是取自 Victor Val 的初始代碼,用于啟動(dòng)調(diào)試器:


import io

import pdb


import boto3


# From dd if=/dev/zero of=100mb.txt  bs=50M  count=1

buf = io.BytesIO(open('100mb.txt', 'rb').read())

bucket = boto3.resource('s3').Bucket('test-threads')

pdb.run("bucket.upload_fileobj(buf, '100mb')")

此堆棧幀來(lái)自 Boto 1.9.134。


現(xiàn)在跳入pdb:


.upload_fileobj() 首先調(diào)用一個(gè)嵌套方法——還沒(méi)有太多可看的。


(Pdb) s

--Call--

> /home/ubuntu/envs/py372/lib/python3.7/site-packages/boto3/s3/inject.py(542)bucket_upload_fileobj()

-> def bucket_upload_fileobj(self, Fileobj, Key, ExtraArgs=None,

(Pdb) s


(Pdb) l

574     

575         :type Config: boto3.s3.transfer.TransferConfig

576         :param Config: The transfer configuration to be used when performing the

577             upload.

578         """

579  ->     return self.meta.client.upload_fileobj(

580             Fileobj=Fileobj, Bucket=self.name, Key=Key, ExtraArgs=ExtraArgs,

581             Callback=Callback, Config=Config)

582     

583     

584  

所以頂級(jí)方法確實(shí)返回了一些東西,但目前還不清楚那個(gè)東西最終會(huì)變成什么None。


所以我們進(jìn)入了那個(gè)。


現(xiàn)在,.upload_fileobj()確實(shí)有一個(gè)config參數(shù),默認(rèn)情況下是 None :


(Pdb) l 531

526     

527         subscribers = None

528         if Callback is not None:

529             subscribers = [ProgressCallbackInvoker(Callback)]

530     

531         config = Config

532         if config is None:

533             config = TransferConfig()

534     

535         with create_transfer_manager(self, config) as manager:

536             future = manager.upload(

這意味著config成為默認(rèn)值TransferConfig():


use_threads-- 如果為 True,則執(zhí)行 S3 傳輸時(shí)將使用線程。如果為 False,則不會(huì)使用線程來(lái)執(zhí)行傳輸:所有邏輯都將在主線程中運(yùn)行。

max_concurrency-- 請(qǐng)求執(zhí)行傳輸?shù)淖畲缶€程數(shù)。如果 use_threads 設(shè)置為 False,則忽略提供的值,因?yàn)閭鬏斨粫?huì)使用主線程。

哇啦,他們?cè)谶@里:


(Pdb) unt 534

> /home/ubuntu/envs/py372/lib/python3.7/site-packages/boto3/s3/inject.py(535)upload_fileobj()

-> with create_transfer_manager(self, config) as manager:

(Pdb) config

<boto3.s3.transfer.TransferConfig object at 0x7f1790dc0cc0>

(Pdb) config.use_threads

True

(Pdb) config.max_concurrency

10

現(xiàn)在我們?cè)谡{(diào)用堆棧中下降一個(gè)級(jí)別以使用TransferManager(上下文管理器)。此時(shí),max_concurrency已被用作類(lèi)似名稱(chēng)的參數(shù)max_request_concurrency:


# https://github.com/boto/s3transfer/blob/2aead638c8385d8ae0b1756b2de17e8fad45fffa/s3transfer/manager.py#L223


    # The executor responsible for making S3 API transfer requests

    self._request_executor = BoundedExecutor(

        max_size=self._config.max_request_queue_size,

        max_num_threads=self._config.max_request_concurrency,

        tag_semaphores={

            IN_MEMORY_UPLOAD_TAG: TaskSemaphore(

                self._config.max_in_memory_upload_chunks),

            IN_MEMORY_DOWNLOAD_TAG: SlidingWindowSemaphore(

                self._config.max_in_memory_download_chunks)

        },

        executor_cls=executor_cls

    )

至少在這個(gè) boto3 版本中,該類(lèi)來(lái)自單獨(dú)的庫(kù)s3transfer。


(Pdb) n

> /home/ubuntu/envs/py372/lib/python3.7/site-packages/boto3/s3/inject.py(536)upload_fileobj()

-> future = manager.upload(

(Pdb) manager

<s3transfer.manager.TransferManager object at 0x7f178db437f0>

(Pdb) manager._config

<boto3.s3.transfer.TransferConfig object at 0x7f1790dc0cc0>

(Pdb) manager._config.use_threads

True

(Pdb) manager._config.max_concurrency

10

接下來(lái),讓我們進(jìn)入manager.upload(). 這是該方法的全文:


(Pdb) l 290, 303

290  ->         if extra_args is None:

291                 extra_args = {}

292             if subscribers is None:

293                 subscribers = []

294             self._validate_all_known_args(extra_args, self.ALLOWED_UPLOAD_ARGS)

295             call_args = CallArgs(

296                 fileobj=fileobj, bucket=bucket, key=key, extra_args=extra_args,

297                 subscribers=subscribers

298             )

299             extra_main_kwargs = {}

300             if self._bandwidth_limiter:

301                 extra_main_kwargs['bandwidth_limiter'] = self._bandwidth_limiter

302             return self._submit_transfer(

303                 call_args, UploadSubmissionTask, extra_main_kwargs)


(Pdb) unt 301

> /home/ubuntu/envs/py372/lib/python3.7/site-packages/s3transfer/manager.py(302)upload()

-> return self._submit_transfer(

(Pdb) extra_main_kwargs

{}


(Pdb) UploadSubmissionTask

<class 's3transfer.upload.UploadSubmissionTask'>

(Pdb) call_args

<s3transfer.utils.CallArgs object at 0x7f178db5a5f8>


(Pdb) l 300, 5

300             if self._bandwidth_limiter:

301                 extra_main_kwargs['bandwidth_limiter'] = self._bandwidth_limiter

302  ->         return self._submit_transfer(

303                 call_args, UploadSubmissionTask, extra_main_kwargs)

304     

305         def download(self, bucket, key, fileobj, extra_args=None,

啊,太可愛(ài)了——所以我們至少需要再往下一層才能看到實(shí)際的底層上傳。


(Pdb) s

> /home/ubuntu/envs/py372/lib/python3.7/site-packages/s3transfer/manager.py(303)upload()

-> call_args, UploadSubmissionTask, extra_main_kwargs)

(Pdb) s

--Call--

> /home/ubuntu/envs/py372/lib/python3.7/site-packages/s3transfer/manager.py(438)_submit_transfer()

-> def _submit_transfer(self, call_args, submission_task_cls,

(Pdb) s

> /home/ubuntu/envs/py372/lib/python3.7/site-packages/s3transfer/manager.py(440)_submit_transfer()

-> if not extra_main_kwargs:


(Pdb) l 440, 10

440  ->         if not extra_main_kwargs:

441                 extra_main_kwargs = {}

442     

443             # Create a TransferFuture to return back to the user

444             transfer_future, components = self._get_future_with_components(

445                 call_args)

446     

447             # Add any provided done callbacks to the created transfer future

448             # to be invoked on the transfer future being complete.

449             for callback in get_callbacks(transfer_future, 'done'):

450                 components['coordinator'].add_done_callback(callback)

好的,所以現(xiàn)在我們有一個(gè)TransferFuture, 定義在沒(méi)有明確的證據(jù)表明線程已經(jīng)被啟動(dòng)了,但是當(dāng)涉及到期貨s3transfer/futures.py 時(shí),它肯定聽(tīng)起來(lái)像這樣。


(Pdb) l

444             transfer_future, components = self._get_future_with_components(

445                 call_args)

446     

447             # Add any provided done callbacks to the created transfer future

448             # to be invoked on the transfer future being complete.

449  ->         for callback in get_callbacks(transfer_future, 'done'):

450                 components['coordinator'].add_done_callback(callback)

451     

452             # Get the main kwargs needed to instantiate the submission task

453             main_kwargs = self._get_submission_task_main_kwargs(

454                 transfer_future, extra_main_kwargs)

(Pdb) transfer_future

<s3transfer.futures.TransferFuture object at 0x7f178db5a780>

下面的最后一行來(lái)自TransferCoordinator課堂,乍一看似乎很重要:


class TransferCoordinator(object):

    """A helper class for managing TransferFuture"""

    def __init__(self, transfer_id=None):

        self.transfer_id = transfer_id

        self._status = 'not-started'

        self._result = None

        self._exception = None

        self._associated_futures = set()

        self._failure_cleanups = []

        self._done_callbacks = []

        self._done_event = threading.Event()  # < ------ !!!!!!

您通常會(huì)看到threading.Event 一個(gè)線程用于發(fā)出事件狀態(tài)的信號(hào),而其他線程可以等待該事件發(fā)生。


TransferCoordinator是由 .使用的TransferFuture.result()。


好的,從上面循環(huán)回來(lái),我們現(xiàn)在在s3transfer.futures.BoundedExecutor它的max_num_threads屬性:


class BoundedExecutor(object):

    EXECUTOR_CLS = futures.ThreadPoolExecutor

    # ...

    def __init__(self, max_size, max_num_threads, tag_semaphores=None,

                 executor_cls=None):

    self._max_num_threads = max_num_threads

    if executor_cls is None:

        executor_cls = self.EXECUTOR_CLS

    self._executor = executor_cls(max_workers=self._max_num_threads)

這基本上相當(dāng)于:


from concurrent import futures


_executor = futures.ThreadPoolExecutor(max_workers=10)

但是仍然存在一個(gè)問(wèn)題:這是一種“即發(fā)即棄”,還是調(diào)用實(shí)際上是在等待線程完成并返回?


似乎是后者。 .result()來(lái)電self._done_event.wait(MAXINT)。


# https://github.com/boto/s3transfer/blob/2aead638c8385d8ae0b1756b2de17e8fad45fffa/s3transfer/futures.py#L249


def result(self):

    self._done_event.wait(MAXINT)


    # Once done waiting, raise an exception if present or return the

    # final result.

    if self._exception:

        raise self._exception

    return self._result

最后,重新運(yùn)行 Victor Val 的測(cè)試,這似乎證實(shí)了上述內(nèi)容:


>>> import boto3

>>> import time

>>> import io

>>> 

>>> buf = io.BytesIO(open('100mb.txt', 'rb').read())

>>> 

>>> bucket = boto3.resource('s3').Bucket('test-threads')

>>> start = time.time()

>>> print("starting to upload...")

starting to upload...

>>> bucket.upload_fileobj(buf, '100mb')

>>> print("finished uploading")

finished uploading

>>> end = time.time()

>>> print("time: {}".format(end-start))

time: 2.6030001640319824

(此示例在網(wǎng)絡(luò)優(yōu)化實(shí)例上運(yùn)行時(shí),此執(zhí)行時(shí)間可能更短。但 2.5 秒仍然是一個(gè)明顯的大塊時(shí)間,并且根本不表示線程被啟動(dòng)并且沒(méi)有等待。)


最后,這是一個(gè)Callbackfor的示例.upload_fileobj()。它遵循文檔中的示例。


首先,一個(gè)小幫手可以有效地獲取緩沖區(qū)的大小:


def get_bufsize(buf, chunk=1024) -> int:

    start = buf.tell()

    try:

        size = 0 

        while True: 

            out = buf.read(chunk) 

            if out: 

                size += chunk 

            else: 

                break

        return size

    finally:

        buf.seek(start)

類(lèi)本身:


import os

import sys

import threading

import time


class ProgressPercentage(object):

    def __init__(self, filename, buf):

        self._filename = filename

        self._size = float(get_bufsize(buf))

        self._seen_so_far = 0

        self._lock = threading.Lock()

        self.start = None


    def __call__(self, bytes_amount):

        with self._lock:

            if not self.start:

                self.start = time.monotonic()

            self._seen_so_far += bytes_amount

            percentage = (self._seen_so_far / self._size) * 100

            sys.stdout.write(

                "\r%s  %s of %s  (%.2f%% done, %.2fs elapsed\n" % (

                    self._filename, self._seen_so_far, self._size,

                    percentage, time.monotonic() - self.start))

            # Use sys.stdout.flush() to update on one line

            # sys.stdout.flush()

例子:


In [19]: import io 

    ...:  

    ...: from boto3.session import Session 

    ...:  

    ...: s3 = Session().resource("s3") 

    ...: bucket = s3.Bucket("test-threads") 

    ...: buf = io.BytesIO(open('100mb.txt', 'rb').read()) 

    ...:  

    ...: bucket.upload_fileobj(buf, 'mykey', Callback=ProgressPercentage("mykey", buf))                                                                                                                                                                      

mykey  262144 of 104857600.0  (0.25% done, 0.00s elapsed

mykey  524288 of 104857600.0  (0.50% done, 0.00s elapsed

mykey  786432 of 104857600.0  (0.75% done, 0.01s elapsed

mykey  1048576 of 104857600.0  (1.00% done, 0.01s elapsed

mykey  1310720 of 104857600.0  (1.25% done, 0.01s elapsed

mykey  1572864 of 104857600.0  (1.50% done, 0.02s elapsed


查看完整回答
反對(duì) 回復(fù) 2022-01-18
?
吃雞游戲

TA貢獻(xiàn)1829條經(jīng)驗(yàn) 獲得超7個(gè)贊

upload_fileobj接受一個(gè) Config 參數(shù)。這是一個(gè)boto3.s3.transfer.TransferConfig對(duì)象,它又具有一個(gè)名為use_threads(默認(rèn)為 true)的參數(shù) - 如果為 True,則執(zhí)行 S3 傳輸時(shí)將使用線程。如果為 False,則不會(huì)使用線程來(lái)執(zhí)行傳輸:所有邏輯都將在主線程中運(yùn)行。

希望這對(duì)你有用。


查看完整回答
反對(duì) 回復(fù) 2022-01-18
?
慕尼黑8549860

TA貢獻(xiàn)1818條經(jīng)驗(yàn) 獲得超11個(gè)贊

測(cè)試該方法是否阻塞:

我自己根據(jù)經(jīng)驗(yàn)測(cè)試了這種行為。首先,我生成了一個(gè) 100MB 的文件:


dd if=/dev/zero of=100mb.txt  bs=100M  count=1

然后我嘗試以與您相同的方式上傳文件并測(cè)量所花費(fèi)的時(shí)間:


import boto3

import time

import io

file = open('100mb.txt', 'rb')

buf = io.BytesIO(file.read())

bucket = boto3.resource('s3').Bucket('testbucket')

start = time.time()

print("starting to upload...")

bucket.upload_fileobj(buf, '100mb')

print("finished uploading")

end = time.time()

print("time: {}".format(end-start))

upload_fileobj() 方法完成并讀取下一個(gè) python 行(1gb 文件需要 50 秒)需要 8 秒以上,所以我假設(shè)這個(gè)方法是阻塞的。


使用線程測(cè)試:


使用多個(gè)線程時(shí),即使使用選項(xiàng) use_threads=False ,我也可以驗(yàn)證該方法是否同時(shí)支持多個(gè)傳輸。我開(kāi)始上傳一個(gè) 200mb 的文件,然后是一個(gè) 100mb 的文件,然后 100mb 的文件首先完成。這證實(shí)了TransferConfig中的并發(fā)與多部分傳輸有關(guān)。


代碼:


import boto3

import time

import io

from boto3.s3.transfer import TransferConfig

import threading


config = TransferConfig(use_threads=False)


bucket = boto3.resource('s3').Bucket('testbucket')

def upload(filename):

     file = open(filename, 'rb')

     buf = io.BytesIO(file.read())

     start = time.time()

     print("starting to upload file {}".format(filename))

     bucket.upload_fileobj(buf,filename,Config=config)

     end = time.time()

     print("finished uploading file {}. time: {}".format(filename,end-start))

x1 = threading.Thread(target=upload, args=('200mb.txt',))

x2 = threading.Thread(target=upload, args=('100mb.txt',))

x1.start()

time.sleep(2)

x2.start()

輸出:


開(kāi)始上傳文件 200mb.txt

開(kāi)始上傳文件 100mb.txt

完成上傳文件 100mb.txt。時(shí)間:46.35254502296448

完成上傳文件200mb.txt。時(shí)間:61.70564889907837


使用會(huì)話進(jìn)行測(cè)試:

如果您希望上傳方法按照調(diào)用的順序完成,這就是您所需要的。


代碼:


import boto3

import time

import io

from boto3.s3.transfer import TransferConfig

import threading


config = TransferConfig(use_threads=False)


session = boto3.session.Session()

s3 = session.resource('s3')

bucket = s3.Bucket('testbucket')

def upload(filename):

     file = open(filename, 'rb')

     buf = io.BytesIO(file.read())

     start = time.time()

     print("starting to upload file {}".format(filename))

     bucket.upload_fileobj(buf,filename)

     end = time.time()

     print("finished uploading file {}. time: {}".format(filename,end-start))

x1 = threading.Thread(target=upload, args=('200mb.txt',))

x2 = threading.Thread(target=upload, args=('100mb.txt',))

x1.start()

time.sleep(2)

x2.start()

輸出:


開(kāi)始上傳文件 200mb.txt

開(kāi)始上傳文件 100mb.txt

完成上傳文件 200mb.txt。時(shí)間:46.62478971481323

完成上傳文件100mb.txt。時(shí)間:50.515950202941895


我發(fā)現(xiàn)的一些資源:

-這是在 SO 中提出的關(guān)于阻塞或非阻塞方法的問(wèn)題。這不是決定性的,但那里可能有相關(guān)信息。

- GitHub 上存在一個(gè)開(kāi)放問(wèn)題,允許在 boto3 中進(jìn)行異步傳輸。

- 還有像aioboto和aiobotocore這樣的工具,專(zhuān)門(mén)用于允許從/到 s3 和其他 aws 服務(wù)的異步下載和上傳。


關(guān)于我之前的回答:

您可以在此處閱讀有關(guān) boto3 中的文件傳輸配置的信息。特別是:


傳輸操作使用線程來(lái)實(shí)現(xiàn)并發(fā)。可以通過(guò)將 use_threads 屬性設(shè)置為 False 來(lái)禁用線程使用。


最初我認(rèn)為這與同時(shí)執(zhí)行的多個(gè)傳輸有關(guān)。但是,閱讀源代碼時(shí),使用TransferConfig時(shí)參數(shù)max_concurrency中的注釋解釋說(shuō)并發(fā)不是指多次傳輸,而是指 “將發(fā)出請(qǐng)求以執(zhí)行傳輸?shù)木€程數(shù)”。所以這是用來(lái)加速傳輸?shù)臇|西。use_threads屬性?xún)H用于允許多部分傳輸中的并發(fā)性。


查看完整回答
反對(duì) 回復(fù) 2022-01-18
  • 3 回答
  • 0 關(guān)注
  • 356 瀏覽
慕課專(zhuān)欄
更多

添加回答

舉報(bào)

0/150
提交
取消
微信客服

購(gòu)課補(bǔ)貼
聯(lián)系客服咨詢(xún)優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動(dòng)學(xué)習(xí)伙伴

公眾號(hào)

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號(hào)