3 回答

TA貢獻(xiàn)1798條經(jīng)驗(yàn) 獲得超3個(gè)贊
看起來(lái)您正在將所有文件的內(nèi)容加載到內(nèi)存中,然后再將它們寫(xiě)回單個(gè)文件。這可以解釋為什么這個(gè)過(guò)程隨著時(shí)間的推移變得更慢。
優(yōu)化該過(guò)程的一種方法是將讀取部分與寫(xiě)入部分分開(kāi),并并行進(jìn)行。這稱(chēng)為生產(chǎn)者-消費(fèi)者模式。Parallel
它可以使用類(lèi)、線程或任務(wù)來(lái)實(shí)現(xiàn),但我將演示基于強(qiáng)大的TPL 數(shù)據(jù)流庫(kù)的實(shí)現(xiàn),該庫(kù)特別適合此類(lèi)作業(yè)。
private static async Task MergeFiles(IEnumerable<string> sourceFilePaths,
? ? string targetFilePath, CancellationToken cancellationToken = default,
? ? IProgress<int> progress = null)
{
? ? var readerBlock = new TransformBlock<string, string>(async filePath =>
? ? {
? ? ? ? return File.ReadAllText(filePath); // Read the small file
? ? }, new ExecutionDataflowBlockOptions()
? ? {
? ? ? ? MaxDegreeOfParallelism = 2, // Reading is parallelizable
? ? ? ? BoundedCapacity = 100, // No more than 100 file-paths buffered
? ? ? ? CancellationToken = cancellationToken, // Cancel at any time
? ? });
? ? StreamWriter streamWriter = null;
? ? int filesProcessed = 0;
? ? var writerBlock = new ActionBlock<string>(text =>
? ? {
? ? ? ? streamWriter.Write(text); // Append to the target file
? ? ? ? filesProcessed++;
? ? ? ? if (filesProcessed % 10 == 0) progress?.Report(filesProcessed);
? ? }, new ExecutionDataflowBlockOptions()
? ? {
? ? ? ? MaxDegreeOfParallelism = 1, // We can't parallelize the writer
? ? ? ? BoundedCapacity = 100, // No more than 100 file-contents buffered
? ? ? ? CancellationToken = cancellationToken, // Cancel at any time
? ? });
? ? readerBlock.LinkTo(writerBlock,
? ? ? ? new DataflowLinkOptions() { PropagateCompletion = true });
? ? // This is a tricky part. We use BoundedCapacity, so we must propagate manually
? ? // a possible failure of the writer to the reader, otherwise a deadlock may occur.
? ? PropagateFailure(writerBlock, readerBlock);
? ? // Open the output stream
? ? using (streamWriter = new StreamWriter(targetFilePath))
? ? {
? ? ? ? // Feed the reader with the file paths
? ? ? ? foreach (var filePath in sourceFilePaths)
? ? ? ? {
? ? ? ? ? ? var accepted = await readerBlock.SendAsync(filePath,
? ? ? ? ? ? ? ? cancellationToken); // Cancel at any time
? ? ? ? ? ? if (!accepted) break; // This will happen if the reader fails
? ? ? ? }
? ? ? ? readerBlock.Complete();
? ? ? ? await writerBlock.Completion;
? ? }
? ? async void PropagateFailure(IDataflowBlock block1, IDataflowBlock block2)
? ? {
? ? ? ? try { await block1.Completion.ConfigureAwait(false); }
? ? ? ? catch (Exception ex)
? ? ? ? {
? ? ? ? ? ? if (block1.Completion.IsCanceled) return; // On cancellation do nothing
? ? ? ? ? ? block2.Fault(ex);
? ? ? ? }
? ? }
}
使用示例:
var cts = new CancellationTokenSource();
var progress = new Progress<int>(value =>
{
? ? // Safe to update the UI
? ? Console.WriteLine($"Files processed: {value:#,0}");
});
var sourceFilePaths = Directory.EnumerateFiles(@"C:\SourceFolder", "*.log",
? ? SearchOption.AllDirectories); // Include subdirectories
await MergeFiles(sourceFilePaths, @"C:\AllLogs.log", cts.Token, progress);
BoundedCapacity
用于控制內(nèi)存使用。
如果磁盤(pán)驅(qū)動(dòng)器是SSD,您可以嘗試使用MaxDegreeOfParallelism
大于2的值讀取。
為了獲得最佳性能,您可以考慮寫(xiě)入與包含源文件的驅(qū)動(dòng)器不同的磁盤(pán)驅(qū)動(dòng)器。
TPL 數(shù)據(jù)流庫(kù)可作為.NET Framework 的包提供,并且內(nèi)置于 .NET Core。

TA貢獻(xiàn)1946條經(jīng)驗(yàn) 獲得超3個(gè)贊
當(dāng)涉及到IO操作時(shí),CPU并行是沒(méi)有用的。您的 IO 設(shè)備(磁盤(pán)、網(wǎng)絡(luò)等)是您的瓶頸。同時(shí)從設(shè)備讀取數(shù)據(jù)可能會(huì)降低性能。

TA貢獻(xiàn)1868條經(jīng)驗(yàn) 獲得超4個(gè)贊
也許您可以只使用 PowerShell 來(lái)連接文件,
另一種替代方法是編寫(xiě)一個(gè)程序,使用FileSystemWatcher類(lèi)來(lái)監(jiān)視新文件并在創(chuàng)建時(shí)追加它們。
- 3 回答
- 0 關(guān)注
- 338 瀏覽
添加回答
舉報(bào)