第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

如何同時計算大文件中的詞頻?

如何同時計算大文件中的詞頻?

慕工程0101907 2021-11-30 19:26:02
我需要計算一個3GB的gzip純文本英文句子文件的詞頻,解壓時大約30GB。我有一個帶有collections.Counterand 的單線程腳本gzip.open,需要幾個小時才能完成。由于逐行讀取文件比拆分和計數(shù)快得多,因此我正在考慮使用文件讀取器生成行和多個消費者進行拆分和計數(shù)的生產(chǎn)者-消費者流程,最后,將Counters合并到獲取單詞出現(xiàn)。但是,我找不到ProcessPoolExecutor將隊列發(fā)送到的示例Executor,它們只是map列表中的單個項目。. 只有單線程示例asyncio.Queue。這是一個巨大的文件,所以我無法讀取整個文件并獲得list之前的計數(shù),因此我無法使用concurrent.futures.Executor.map. 但是我閱讀的所有示例都使用固定列表作為開始。拆分和計算一個句子的時間相當于fork一個進程,所以我必須讓每個消費者進程的壽命更長。我不認為mapcan merge Counters,所以我不能使用chunksize> 1。因此我必須給消費者一個隊列并讓他們繼續(xù)計數(shù)直到整個文件完成。但大多數(shù)示例只向消費者發(fā)送一件物品并使用chunksize=1000以減少fork時間。你能幫我寫一個例子嗎?我希望代碼向后兼容 Python 3.5.3,因為 PyPy 更快。我的真實案例是針對更具體的文件格式:chr1    10011   141     0       157     4       41      50chr1    10012   146     1       158     4       42      51chr1    10013   150     0       163     4       43      53chr1    10014   164     3       167     4       44      54我需要計算第 3 到第 8 列的單列的每個直方圖。所以我將詞頻作為一個更簡單的例子。csv.DictReader 花費大部分時間。我的問題是,雖然 gzip 閱讀器很快,但 csv 閱讀器很快,我需要計算數(shù)十億行。而且 csv 閱讀器肯定比 gzip 閱讀器慢。因此,我需要將行傳播到 csv 讀取器的不同工作進程并分別進行下游計數(shù)。在一個生產(chǎn)者和許多消費者之間使用隊列很方便。由于我使用的是 Python,而不是 C,是否有一些抽象的多處理和隊列包裝器?這可以ProcessPoolExecutor與Queue類一起使用嗎?
查看完整描述

3 回答

?
智慧大石

TA貢獻1946條經(jīng)驗 獲得超3個贊

一個 30 GB 的文本文件足以將您的問題放入大數(shù)據(jù)領域。所以為了解決這個問題,我建議使用大數(shù)據(jù)工具,比如 Hadoop 和 Spark。您所解釋的“生產(chǎn)者-消費者流”基本上就是MapReduce算法的設計目的。字數(shù)頻率是典型的 MapReduce 問題。查一查,你會發(fā)現(xiàn)很多例子。


查看完整回答
反對 回復 2021-11-30
?
有只小跳蛙

TA貢獻1824條經(jīng)驗 獲得超8個贊

我在周末學習了多處理庫。


停止按 Ctrl+C 并寫入當前結果功能仍然無效。


主要功能現(xiàn)在很好。


#!/usr/bin/env pypy3

import sys

from collections import Counter

from multiprocessing import Pool, Process, Manager, current_process, freeze_support


SamplesList = ('D_Crick', 'D_Watson', 'Normal_Crick', 'Normal_Watson', 'D_WGS', 'Normal_WGS')


ChunkSize = 1024 * 128

verbose = 0

Nworkers = 16


def main():

    import math


    if len(sys.argv) < 3 :

        print('Usage:',sys.argv[0],'<samtools.depth.gz> <out.tsv> [verbose=0]',file=sys.stderr,flush=True)

        exit(0)

    try:

        verbose = int(sys.argv[3])

    except: # `except IndexError:` and `except ValueError:`

        verbose = 0


    inDepthFile = sys.argv[1]

    outFile = sys.argv[2]

    print('From:[{}], To:[{}].\nVerbose: [{}].'.format(inDepthFile,outFile,verbose),file=sys.stderr,flush=True)

    RecordCnt,MaxDepth,cDepthCnt,cDepthStat = CallStat(inDepthFile)

    for k in SamplesList:

        cDepthStat[k][2] = cDepthStat[k][0] / RecordCnt # E(X)

        cDepthStat[k][3] = cDepthStat[k][1] / RecordCnt # E(X^2)

        cDepthStat[k][4] = math.sqrt(cDepthStat[k][3] - cDepthStat[k][2]*cDepthStat[k][2])   # E(X^2)-E(X)^2

    tsvout = open(outFile, 'wt')

    print('#{}\t{}'.format('Depth','\t'.join(SamplesList)),file=tsvout)

    #RecordCntLength = len(str(RecordCnt))

    print( '#N={},SD:\t{}'.format(RecordCnt,'\t'.join(str(round(cDepthStat[col][4],1)) for col in SamplesList)),file=tsvout)

    for depth in range(0,MaxDepth+1):

        #print( '{}\t{}'.format(depth,'\t'.join(str(DepthCnt[col][depth]) for col in SamplesList)) )

        #print( '{}\t{}'.format(depth,'\t'.join(str(yDepthCnt[depth][col]) for col in SamplesList)) )

        print( '{}\t{}'.format(depth,'\t'.join(str(cDepthCnt[col][depth]) for col in SamplesList)),file=tsvout)

        #pass

    #print('#MaxDepth={}'.format(MaxDepth),file=tsvout)

    tsvout.close()

    pass


def CallStat(inDepthFile):

    import gzip

    import itertools

    RecordCnt = 0

    MaxDepth = 0

    cDepthCnt = {key:Counter() for key in SamplesList}

    cDepthStat = {key:[0,0,0,0,0] for key in SamplesList} # x and x^2

    #lines_queue = Queue()

    manager = Manager()

    lines_queue = manager.Queue()

    stater_pool = Pool(Nworkers)

    TASKS = itertools.repeat((lines_queue,SamplesList),Nworkers)

    #ApplyResult = [stater_pool.apply_async(iStator,x) for x in TASKS]

    #MapResult = stater_pool.map_async(iStator,TASKS,1)

    AsyncResult = stater_pool.imap_unordered(iStator,TASKS,1)

    try:

        with gzip.open(inDepthFile, 'rt') as tsvfin:

            while True:

                lines = tsvfin.readlines(ChunkSize)

                lines_queue.put(lines)

                if not lines:

                    for i in range(Nworkers):

                        lines_queue.put(b'\n\n')

                    break

    except KeyboardInterrupt:

        print('\n[!]Ctrl+C pressed.',file=sys.stderr,flush=True)

        for i in range(Nworkers):

            lines_queue.put(b'\n\n')

        pass

    #for results in ApplyResult:

        #(iRecordCnt,iMaxDepth,icDepthCnt,icDepthStat) = results.get()

    #for (iRecordCnt,iMaxDepth,icDepthCnt,icDepthStat) in MapResult.get():

    for (iRecordCnt,iMaxDepth,icDepthCnt,icDepthStat) in AsyncResult:

        RecordCnt += iRecordCnt

        if iMaxDepth > MaxDepth:

            MaxDepth = iMaxDepth

        for k in SamplesList:

            cDepthCnt[k].update(icDepthCnt[k])

            cDepthStat[k][0] += icDepthStat[k][0]

            cDepthStat[k][1] += icDepthStat[k][1]

    return RecordCnt,MaxDepth,cDepthCnt,cDepthStat


#def iStator(inQueue,inSamplesList):

def iStator(args):

    (inQueue,inSamplesList) = args

    import csv

    # Looking up things in global scope takes longer then looking up stuff in local scope. <https://stackoverflow.com/a/54645851/159695>

    cDepthCnt = {key:Counter() for key in inSamplesList}

    cDepthStat = {key:[0,0] for key in inSamplesList} # x and x^2

    RecordCnt = 0

    MaxDepth = 0

    for lines in iter(inQueue.get, b'\n\n'):

        try:

            tsvin = csv.DictReader(lines, delimiter='\t', fieldnames=('ChrID','Pos')+inSamplesList )

            for row in tsvin:

                #print(', '.join(row[col] for col in inSamplesList))

                RecordCnt += 1

                for k in inSamplesList:

                    theValue = int(row[k])

                    if theValue > MaxDepth:

                        MaxDepth = theValue

                    #DepthCnt[k][theValue] += 1  # PyPy3:30.54 ns, Python3:22.23 ns

                    #yDepthCnt[theValue][k] += 1 # PyPy3:30.47 ns, Python3:21.50 ns

                    cDepthCnt[k][theValue] += 1  # PyPy3:29.82 ns, Python3:30.61 ns

                    cDepthStat[k][0] += theValue

                    cDepthStat[k][1] += theValue * theValue

                #print(MaxDepth,DepthCnt)

        except KeyboardInterrupt:

            print('\n[!]Ctrl+C pressed.',file=sys.stderr,flush=True)

            pass

        #print('[!]{} Lines Read:[{}], MaxDepth is [{}].'.format(current_process().name,RecordCnt,MaxDepth),file=sys.stderr,flush=True)

    return RecordCnt,MaxDepth,cDepthCnt,cDepthStat


if __name__ == "__main__":

    main()  # time python3 ./samdepthplot.py t.tsv.gz 1



查看完整回答
反對 回復 2021-11-30
?
子衿沉夜

TA貢獻1828條經(jīng)驗 獲得超3個贊

只是一些偽代碼:


from concurrent.futures import ProcessPoolExecutor

from multiprocessing import Manager

import traceback



WORKER_POOL_SIZE = 10  # you should set this as the number of your processes

QUEUE_SIZE = 100       # 10 times to your pool size is good enough



def main():

    with Manager() as manager:

        q = manager.Queue(QUEUE_SIZE)


        # init worker pool

        executor = ProcessPoolExecutor(max_workers=WORKER_POOL_SIZE)

        workers_pool = [executor.submit(worker, i, q) for i in range(WORKER_POOL_SIZE)]


        # start producer

        run_producer(q)


        # wait to done

        for f in workers_pool:

            try:

                f.result()

            except Exception:

                traceback.print_exc()



def run_producer(q):

    try:

        with open("your file path") as fp:

            for line in fp:

                q.put(line)

    except Exception:

        traceback.print_exc()

    finally:

        q.put(None)




def worker(i, q):

    while 1:

        line = q.get()

        if line is None:

            print(f'worker {i} is done')

            q.put(None)

            return


        # do something with this line

        # ...



查看完整回答
反對 回復 2021-11-30
  • 3 回答
  • 0 關注
  • 258 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網(wǎng)微信公眾號