3 回答

TA貢獻(xiàn)1789條經(jīng)驗(yàn) 獲得超10個(gè)贊
根據(jù)要求,這是我最終使用的代碼。
工作在主 - 詳細(xì)配置中設(shè)置,每個(gè)主服務(wù)器作為批處理進(jìn)行處理。每個(gè)工作單元都以這種方式排隊(duì):
var success = true;// Start processing all the master records.Master master;while (null != (master = await StoredProcedures.ClaimRecordsAsync(...))){ await masterBuffer.SendAsync(master);}// Finished sending master recordsmasterBuffer.Complete();// Now, wait for all the batches to complete.await batchAction.Completion;return success;
Masters一次緩沖一個(gè),以節(jié)省其他外部進(jìn)程的工作。每個(gè)主人的詳細(xì)信息都通過(guò)以下方式發(fā)送給工作人員masterTransform
TransformManyBlock
。BatchedJoinBlock
還創(chuàng)建了A 以在一批中收集詳細(xì)信息。
實(shí)際工作是以detailTransform
TransformBlock
異步方式完成的,每次150個(gè)。BoundedCapacity
設(shè)置為300以確保太多的Masters不會(huì)在鏈的開(kāi)頭進(jìn)行緩沖,同時(shí)還留出足夠的空間來(lái)排列足夠的詳細(xì)記錄以允許一次處理150條記錄。該塊輸出object
到它的目標(biāo),因?yàn)樗钦麄€(gè)取決于它是否是一個(gè)鏈接過(guò)濾Detail
或Exception
。
所述batchAction
ActionBlock
收集來(lái)自所有批次的輸出,并且執(zhí)行散裝數(shù)據(jù)庫(kù)更新,錯(cuò)誤日志等。對(duì)于每個(gè)批次。
將有幾個(gè)BatchedJoinBlock
s,每個(gè)主人一個(gè)。由于每個(gè)ISourceBlock
都是按順序輸出的,并且每個(gè)批次只接受與一個(gè)主數(shù)據(jù)相關(guān)聯(lián)的詳細(xì)記錄的數(shù)量,因此將按順序處理批次。每個(gè)塊僅輸出一個(gè)組,并在完成時(shí)取消鏈接。只有最后一個(gè)批處理塊將其完成傳播到最終ActionBlock
。
數(shù)據(jù)流網(wǎng)絡(luò):
// The dataflow networkBufferBlock<Master> masterBuffer = null;TransformManyBlock<Master, Detail> masterTransform = null;TransformBlock<Detail, object> detailTransform = null;ActionBlock<Tuple<IList<object>, IList<object>>> batchAction = null;// Buffer master records to enable efficient throttling.masterBuffer = new BufferBlock<Master>(new DataflowBlockOptions { BoundedCapacity = 1 });// Sequentially transform master records into a stream of detail records.masterTransform = new TransformManyBlock<Master, Detail>(async masterRecord =>{ var records = await StoredProcedures.GetObjectsAsync(masterRecord); // Filter the master records based on some criteria here var filteredRecords = records; // Only propagate completion to the last batch var propagateCompletion = masterBuffer.Completion.IsCompleted && masterTransform.InputCount == 0; // Create a batch join block to encapsulate the results of the master record. var batchjoinblock = new BatchedJoinBlock<object, object>(records.Count(), new GroupingDataflowBlockOptions { MaxNumberOfGroups = 1 }); // Add the batch block to the detail transform pipeline's link queue, and link the batch block to the the batch action block. var detailLink1 = detailTransform.LinkTo(batchjoinblock.Target1, detailResult => detailResult is Detail); var detailLink2 = detailTransform.LinkTo(batchjoinblock.Target2, detailResult => detailResult is Exception); var batchLink = batchjoinblock.LinkTo(batchAction, new DataflowLinkOptions { PropagateCompletion = propagateCompletion }); // Unlink batchjoinblock upon completion. // (the returned task does not need to be awaited, despite the warning.) batchjoinblock.Completion.ContinueWith(task => { detailLink1.Dispose(); detailLink2.Dispose(); batchLink.Dispose(); }); return filteredRecords;}, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });// Process each detail record asynchronously, 150 at a time.detailTransform = new TransformBlock<Detail, object>(async detail => { try { // Perform the action for each detail here asynchronously await DoSomethingAsync(); return detail; } catch (Exception e) { success = false; return e; }}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 150, BoundedCapacity = 300 });// Perform the proper action for each batchbatchAction = new ActionBlock<Tuple<IList<object>, IList<object>>>(async batch =>{ var details = batch.Item1.Cast<Detail>(); var errors = batch.Item2.Cast<Exception>(); // Do something with the batch here}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });masterBuffer.LinkTo(masterTransform, new DataflowLinkOptions { PropagateCompletion = true });masterTransform.LinkTo(detailTransform, new DataflowLinkOptions { PropagateCompletion = true });
- 3 回答
- 0 關(guān)注
- 435 瀏覽
添加回答
舉報(bào)