第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號(hào)安全,請(qǐng)及時(shí)綁定郵箱和手機(jī)立即綁定
已解決430363個(gè)問(wèn)題,去搜搜看,總會(huì)有你想問(wèn)的

限制異步任務(wù)

限制異步任務(wù)

C#
梵蒂岡之花 2019-11-15 12:10:13
限制異步任務(wù)我想運(yùn)行一堆異步任務(wù),并限制在任何給定時(shí)間可以完成的任務(wù)數(shù)量。假設(shè)您有1000個(gè)網(wǎng)址,并且您只希望一次打開(kāi)50個(gè)請(qǐng)求; 但只要一個(gè)請(qǐng)求完成,您就會(huì)打開(kāi)與列表中下一個(gè)URL的連接。這樣,一次只打開(kāi)50個(gè)連接,直到URL列表用完為止。如果可能的話,我也想利用給定數(shù)量的線程。我提出了一種擴(kuò)展方法,ThrottleTasksAsync可以實(shí)現(xiàn)我想要的功能。那里有更簡(jiǎn)單的解決方案嗎?我認(rèn)為這是一種常見(jiàn)的情況。用法:class Program{     static void Main(string[] args)     {         Enumerable.Range(1, 10).ThrottleTasksAsync(5, 2, async i => { Console.WriteLine(i); return i; }).Wait();         Console.WriteLine("Press a key to exit...");         Console.ReadKey(true);     }}這是代碼:static class IEnumerableExtensions{     public static async Task<Result_T[]> ThrottleTasksAsync<Enumerable_T, Result_T>(this IEnumerable<Enumerable_T> enumerable, int maxConcurrentTasks, int maxDegreeOfParallelism, Func<Enumerable_T, Task<Result_T>> taskToRun)     {         var blockingQueue = new BlockingCollection<Enumerable_T>(new ConcurrentBag<Enumerable_T>());         var semaphore = new SemaphoreSlim(maxConcurrentTasks);         // Run the throttler on a separate thread.         var t = Task.Run(() =>         {             foreach (var item in enumerable)             {                 // Wait for the semaphore                 semaphore.Wait();                 blockingQueue.Add(item);             }             blockingQueue.CompleteAdding();         });         var taskList = new List<Task<Result_T>>();         Parallel.ForEach(IterateUntilTrue(() => blockingQueue.IsCompleted), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism },         _ =>但是,線程池快速耗盡,你不能做async/ await。額外: 為了解決調(diào)用時(shí)BlockingCollection拋出異常的問(wèn)題,我正在使用帶超時(shí)的重載。如果我沒(méi)有使用超時(shí),它會(huì)破壞使用的目的,因?yàn)椴粫?huì)阻止。有沒(méi)有更好的辦法?理想情況下,會(huì)有一種方法。Take()CompleteAdding()TryTakeTryTakeBlockingCollectionTryTakeTakeAsync
查看完整描述

3 回答

?
至尊寶的傳說(shuō)

TA貢獻(xiàn)1789條經(jīng)驗(yàn) 獲得超10個(gè)贊

根據(jù)要求,這是我最終使用的代碼。

工作在主 - 詳細(xì)配置中設(shè)置,每個(gè)主服務(wù)器作為批處理進(jìn)行處理。每個(gè)工作單元都以這種方式排隊(duì):

var success = true;// Start processing all the master records.Master master;while (null != (master = await StoredProcedures.ClaimRecordsAsync(...))){
    await masterBuffer.SendAsync(master);}// Finished sending master recordsmasterBuffer.Complete();// Now, wait for all the batches to complete.await batchAction.Completion;return success;

Masters一次緩沖一個(gè),以節(jié)省其他外部進(jìn)程的工作。每個(gè)主人的詳細(xì)信息都通過(guò)以下方式發(fā)送給工作人員masterTransform TransformManyBlock。BatchedJoinBlock還創(chuàng)建了A 以在一批中收集詳細(xì)信息。

實(shí)際工作是以detailTransform TransformBlock異步方式完成的,每次150個(gè)。BoundedCapacity設(shè)置為300以確保太多的Masters不會(huì)在鏈的開(kāi)頭進(jìn)行緩沖,同時(shí)還留出足夠的空間來(lái)排列足夠的詳細(xì)記錄以允許一次處理150條記錄。該塊輸出object到它的目標(biāo),因?yàn)樗钦麄€(gè)取決于它是否是一個(gè)鏈接過(guò)濾DetailException。

所述batchAction ActionBlock收集來(lái)自所有批次的輸出,并且執(zhí)行散裝數(shù)據(jù)庫(kù)更新,錯(cuò)誤日志等。對(duì)于每個(gè)批次。

將有幾個(gè)BatchedJoinBlocks,每個(gè)主人一個(gè)。由于每個(gè)ISourceBlock都是按順序輸出的,并且每個(gè)批次只接受與一個(gè)主數(shù)據(jù)相關(guān)聯(lián)的詳細(xì)記錄的數(shù)量,因此將按順序處理批次。每個(gè)塊僅輸出一個(gè)組,并在完成時(shí)取消鏈接。只有最后一個(gè)批處理塊將其完成傳播到最終ActionBlock。

數(shù)據(jù)流網(wǎng)絡(luò):

// The dataflow networkBufferBlock<Master> masterBuffer = null;TransformManyBlock<Master, Detail> masterTransform = null;TransformBlock<Detail, object> detailTransform = null;ActionBlock<Tuple<IList<object>, IList<object>>> batchAction = null;// Buffer master records to enable efficient throttling.masterBuffer = new BufferBlock<Master>(new DataflowBlockOptions { BoundedCapacity = 1 });// Sequentially transform master records into a stream of detail records.masterTransform = new TransformManyBlock<Master, Detail>(async masterRecord =>{
    var records = await StoredProcedures.GetObjectsAsync(masterRecord);

    // Filter the master records based on some criteria here
    var filteredRecords = records;

    // Only propagate completion to the last batch
    var propagateCompletion = masterBuffer.Completion.IsCompleted && masterTransform.InputCount == 0;

    // Create a batch join block to encapsulate the results of the master record.
    var batchjoinblock = new BatchedJoinBlock<object, object>(records.Count(), new GroupingDataflowBlockOptions { MaxNumberOfGroups = 1 });

    // Add the batch block to the detail transform pipeline's link queue, and link the batch block to the the batch action block.
    var detailLink1 = detailTransform.LinkTo(batchjoinblock.Target1, detailResult => detailResult is Detail);
    var detailLink2 = detailTransform.LinkTo(batchjoinblock.Target2, detailResult => detailResult is Exception);
    var batchLink = batchjoinblock.LinkTo(batchAction, new DataflowLinkOptions { PropagateCompletion = propagateCompletion });

    // Unlink batchjoinblock upon completion.
    // (the returned task does not need to be awaited, despite the warning.)
    batchjoinblock.Completion.ContinueWith(task =>
    {
        detailLink1.Dispose();
        detailLink2.Dispose();
        batchLink.Dispose();
    });

    return filteredRecords;}, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });// Process each detail record asynchronously, 150 at a time.detailTransform = new TransformBlock<Detail, object>(async detail => {
    try
    {
        // Perform the action for each detail here asynchronously
        await DoSomethingAsync();

        return detail;
    }
    catch (Exception e)
    {
        success = false;
        return e;
    }}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 150, BoundedCapacity = 300 });// Perform the proper action for each batchbatchAction = new ActionBlock<Tuple<IList<object>, IList<object>>>(async batch =>{
    var details = batch.Item1.Cast<Detail>();
    var errors = batch.Item2.Cast<Exception>();

    // Do something with the batch here}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });masterBuffer.LinkTo(masterTransform, new DataflowLinkOptions { PropagateCompletion = true });masterTransform.LinkTo(detailTransform, new DataflowLinkOptions { PropagateCompletion = true });



查看完整回答
反對(duì) 回復(fù) 2019-11-16
  • 3 回答
  • 0 關(guān)注
  • 435 瀏覽

添加回答

舉報(bào)

0/150
提交
取消
微信客服

購(gòu)課補(bǔ)貼
聯(lián)系客服咨詢(xún)優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動(dòng)學(xué)習(xí)伙伴

公眾號(hào)

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號(hào)