第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號(hào)安全,請(qǐng)及時(shí)綁定郵箱和手機(jī)立即綁定
已解決430363個(gè)問(wèn)題,去搜搜看,總會(huì)有你想問(wèn)的

在獨(dú)立運(yùn)行的 python 腳本之間共享 python 對(duì)象

在獨(dú)立運(yùn)行的 python 腳本之間共享 python 對(duì)象

忽然笑 2022-07-05 19:04:56
這是我在這里的第一個(gè)問(wèn)題,我希望我不會(huì)提出與已經(jīng)存在的問(wèn)題非常相似的問(wèn)題。如果是這樣,請(qǐng)?jiān)徫遥∫虼?,我遇到了一些麻煩的情況如下:我想并行運(yùn)行獨(dú)立的 python 腳本,它可以訪問(wèn)相同的 python 對(duì)象,在我的例子中是 Pandas Dataframe。我的想法是,一個(gè)腳本基本上一直在運(yùn)行并訂閱一個(gè)數(shù)據(jù)流(這里是通過(guò) websocket 推送的數(shù)據(jù)),然后將其附加到一個(gè)共享的 Dataframe 中。第二個(gè)腳本應(yīng)該能夠獨(dú)立于第一個(gè)腳本啟動(dòng),并且仍然可以訪問(wèn)由第一個(gè)腳本不斷更新的 Dataframe。在第二個(gè)腳本中,我想在預(yù)定義的時(shí)間間隔內(nèi)執(zhí)行不同類(lèi)型的分析,或者對(duì)實(shí)時(shí)數(shù)據(jù)執(zhí)行其他相對(duì)時(shí)間密集的操作。我已經(jīng)嘗試從一個(gè)腳本中運(yùn)行所有操作,但我一直與 websocket 斷開(kāi)連接。將來(lái)還會(huì)有多個(gè)腳本可以實(shí)時(shí)訪問(wèn)共享數(shù)據(jù)。與其在腳本 1 中的每次更新后保存一個(gè) csv 文件或 pickle,我寧愿有一個(gè)解決方案,兩個(gè)腳本基本上共享相同的內(nèi)存。我也只需要一個(gè)腳本來(lái)寫(xiě)入并附加到 Dataframe,另一個(gè)只需要從中讀取。多處理模塊似乎有一些有趣的功能,這可能會(huì)有所幫助,但到目前為止我還沒(méi)有真正理解它。我還查看了全局變量,但這似乎也不是在這種情況下使用的正確方法。我想象的是這樣的(我知道,代碼完全錯(cuò)誤,這只是為了說(shuō)明目的):第一個(gè)腳本應(yīng)繼續(xù)將數(shù)據(jù)流中的新數(shù)據(jù)分配給數(shù)據(jù)幀并共享此對(duì)象。from share_data import shareshared_df = pd.DataFrame()for data from datastream:        shared_df.append(data)        share(shared_df)然后第二個(gè)腳本應(yīng)該能夠執(zhí)行以下操作:from share_data import getdf = get(shared_df)這完全有可能嗎?或者您對(duì)如何以簡(jiǎn)單的方式完成此任務(wù)有任何想法嗎?或者您有什么建議可以使用哪些軟件包?
查看完整描述

1 回答

?
茅侃侃

TA貢獻(xiàn)1842條經(jīng)驗(yàn) 獲得超22個(gè)贊

您已經(jīng)非常清楚可以做什么來(lái)使用您的數(shù)據(jù)。

最佳解決方案取決于您的實(shí)際需求,因此我將嘗試通過(guò)一個(gè)可行的示例來(lái)介紹這些可能性。

你想要什么

如果我完全理解你的需要,你想

  • 不斷更新 DataFrame(來(lái)自 websocket)

  • 在對(duì)同一個(gè) DataFrame 進(jìn)行一些計(jì)算時(shí)

  • 使 DataFrame 在計(jì)算工作者上保持最新,

  • 一項(xiàng)計(jì)算是 CPU 密集型的

  • 另一個(gè)不是。

你需要什么

正如您所說(shuō),您將需要一種方法來(lái)運(yùn)行不同的線程或進(jìn)程以保持計(jì)算運(yùn)行。

線程怎么樣

實(shí)現(xiàn)您想要的最簡(jiǎn)單的方法是使用線程庫(kù)。由于線程可以共享內(nèi)存,并且您只有一個(gè)工作人員實(shí)際更新 DataFrame,因此很容易提出一種方法來(lái)管理最新數(shù)據(jù):

import time

from dataclasses import dataclass


import pandas

from threading import Thread



@dataclass

class DataFrameHolder:

    """This dataclass holds a reference to the current DF in memory.

    This is necessary if you do operations without in-place modification of

    the DataFrame, since you will need replace the whole object.

    """

    dataframe: pandas.DataFrame = pandas.DataFrame(columns=['A', 'B'])


    def update(self, data):

        self.dataframe = self.dataframe.append(data, ignore_index=True)



class StreamLoader:

    """This class is our worker communicating with the websocket"""


    def __init__(self, df_holder: DataFrameHolder) -> None:

        super().__init__()

        self.df_holder = df_holder


    def update_df(self):

        # read from websocket and update your DF.

        data = {

            'A': [1, 2, 3],

            'B': [4, 5, 6],

        }

        self.df_holder.update(data)


    def run(self):

        # limit loop for the showcase

        for _ in range(5):

            self.update_df()

            print("[1] Updated DF %s" % str(self.df_holder.dataframe))

            time.sleep(3)



class LightComputation:

    """This class is a random computation worker"""


    def __init__(self, df_holder: DataFrameHolder) -> None:

        super().__init__()

        self.df_holder = df_holder


    def compute(self):

        print("[2] Current DF %s" % str(self.df_holder.dataframe))


    def run(self):

        # limit loop for the showcase

        for _ in range(5):

            self.compute()

            time.sleep(5)



def main():

    # We create a DataFrameHolder to keep our DataFrame available.

    df_holder = DataFrameHolder()


    # We create and start our update worker

    stream = StreamLoader(df_holder)

    stream_process = Thread(target=stream.run)

    stream_process.start()


    # We create and start our computation worker

    compute = LightComputation(df_holder)

    compute_process = Thread(target=compute.run)

    compute_process.start()


    # We join our Threads, i.e. we wait for them to finish before continuing

    stream_process.join()

    compute_process.join()



if __name__ == "__main__":

    main()

請(qǐng)注意,我們使用一個(gè)類(lèi)來(lái)保存當(dāng)前 DataFrame 的引用,因?yàn)槟承┎僮鱝ppend不一定是就地的,因此,如果我們直接將引用發(fā)送到 DataFrame 對(duì)象,則修改將丟失在 worker 上。這里DataFrameHolder對(duì)象將在內(nèi)存中保持相同的位置,因此工作人員仍然可以訪問(wèn)更新的 DataFrame。


流程可能更強(qiáng)大

現(xiàn)在,如果您需要更多的計(jì)算能力,進(jìn)程可能會(huì)更有用,因?yàn)樗鼈兪鼓軌驅(qū)⒛墓ぷ魅藛T隔離在不同的核心上。要在 python 中啟動(dòng)進(jìn)程而不是線程,可以使用多處理庫(kù)。兩個(gè)對(duì)象的 API 是相同的,你只需要改變構(gòu)造函數(shù)如下


from threading import Thread

# I create a thread

compute_process = Thread(target=compute.run)



from multiprocessing import Process

# I create a process that I can use the same way

compute_process = Process(target=compute.run)

現(xiàn)在,如果您嘗試更改上述腳本中的值,您將看到 DataFrame 沒(méi)有正確更新。


為此,您將需要更多的工作,因?yàn)檫@兩個(gè)進(jìn)程不共享內(nèi)存,并且您有多種方式在它們之間進(jìn)行通信(https://en.wikipedia.org/wiki/Inter-process_communication)


@SimonCrane 的參考在這方面非常有趣,并展示了使用multiprocessing.manager在兩個(gè)進(jìn)程之間使用共享內(nèi)存。


這是一個(gè)工人使用共享內(nèi)存的單獨(dú)進(jìn)程的版本:


import logging

import multiprocessing

import time

from dataclasses import dataclass

from multiprocessing import Process

from multiprocessing.managers import BaseManager

from threading import Thread


import pandas



@dataclass

class DataFrameHolder:

    """This dataclass holds a reference to the current DF in memory.

    This is necessary if you do operations without in-place modification of

    the DataFrame, since you will need replace the whole object.

    """

    dataframe: pandas.DataFrame = pandas.DataFrame(columns=['A', 'B'])


    def update(self, data):

        self.dataframe = self.dataframe.append(data, ignore_index=True)


    def retrieve(self):

        return self.dataframe



class DataFrameManager(BaseManager):

    """This dataclass handles shared DataFrameHolder.

    See https://docs.python.org/3/library/multiprocessing.html#examples

    """

    # You can also use a socket file '/tmp/shared_df'

    MANAGER_ADDRESS = ('localhost', 6000)

    MANAGER_AUTH = b"auth"


    def __init__(self) -> None:

        super().__init__(self.MANAGER_ADDRESS, self.MANAGER_AUTH)

        self.dataframe: pandas.DataFrame = pandas.DataFrame(columns=['A', 'B'])


    @classmethod

    def register_dataframe(cls):

        BaseManager.register("DataFrameHolder", DataFrameHolder)



class DFWorker:

    """Abstract class initializing a worker depending on a DataFrameHolder."""


    def __init__(self, df_holder: DataFrameHolder) -> None:

        super().__init__()

        self.df_holder = df_holder



class StreamLoader(DFWorker):

    """This class is our worker communicating with the websocket"""


    def update_df(self):

        # read from websocket and update your DF.

        data = {

            'A': [1, 2, 3],

            'B': [4, 5, 6],

        }

        self.df_holder.update(data)


    def run(self):

        # limit loop for the showcase

        for _ in range(4):

            self.update_df()

            print("[1] Updated DF\n%s" % str(self.df_holder.retrieve()))

            time.sleep(3)



class LightComputation(DFWorker):

    """This class is a random computation worker"""


    def compute(self):

        print("[2] Current DF\n%s" % str(self.df_holder.retrieve()))


    def run(self):

        # limit loop for the showcase

        for _ in range(4):

            self.compute()

            time.sleep(5)



def main():

    logger = multiprocessing.log_to_stderr()

    logger.setLevel(logging.INFO)


    # Register our DataFrameHolder type in the DataFrameManager.

    DataFrameManager.register_dataframe()

    manager = DataFrameManager()

    manager.start()

    # We create a managed DataFrameHolder to keep our DataFrame available.

    df_holder = manager.DataFrameHolder()


    # We create and start our update worker

    stream = StreamLoader(df_holder)

    stream_process = Thread(target=stream.run)

    stream_process.start()


    # We create and start our computation worker

    compute = LightComputation(df_holder)

    compute_process = Process(target=compute.run)

    compute_process.start()


    # The managed dataframe is updated in every Thread/Process

    time.sleep(5)

    print("[0] Main process DF\n%s" % df_holder.retrieve())


    # We join our Threads, i.e. we wait for them to finish before continuing

    stream_process.join()

    compute_process.join()



if __name__ == "__main__":

    main()

如您所見(jiàn),線程和處理之間的差異非常小。


通過(guò)一些調(diào)整,如果您想使用不同的文件來(lái)處理 CPU 密集型處理,您可以從那里開(kāi)始連接到同一個(gè)管理器。


查看完整回答
反對(duì) 回復(fù) 2022-07-05
  • 1 回答
  • 0 關(guān)注
  • 183 瀏覽
慕課專(zhuān)欄
更多

添加回答

舉報(bào)

0/150
提交
取消
微信客服

購(gòu)課補(bǔ)貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動(dòng)學(xué)習(xí)伙伴

公眾號(hào)

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號(hào)