3 回答

TA貢獻1812條經(jīng)驗 獲得超5個贊
由于所有客戶端都必須等待一個線程來完成這項工作,因此實際上沒有必要使用隊列。所以我建議改用這個Monitor
類,特別是Wait/Pulse功能。雖然它有點低級和冗長。
class Worker<TResult> : IDisposable
{
private readonly object _outerLock = new object();
private readonly object _innerLock = new object();
private Func<TResult> _currentJob;
private TResult _currentResult;
private Exception _currentException;
private bool _disposed;
public Worker()
{
var thread = new Thread(MainLoop);
thread.IsBackground = true;
thread.Start();
}
private void MainLoop()
{
lock (_innerLock)
{
while (true)
{
Monitor.Wait(_innerLock); // Wait for client requests
if (_disposed) break;
try
{
_currentResult = _currentJob.Invoke();
_currentException = null;
}
catch (Exception ex)
{
_currentException = ex;
_currentResult = default;
}
Monitor.Pulse(_innerLock); // Notify the waiting client that the job is done
}
} // We are done
}
public TResult DoWork(Func<TResult> job)
{
TResult result;
Exception exception;
lock (_outerLock) // Accept only one client at a time
{
lock (_innerLock) // Acquire inner lock
{
if (_disposed) throw new InvalidOperationException();
_currentJob = job;
Monitor.Pulse(_innerLock); // Notify worker thread about the new job
Monitor.Wait(_innerLock); // Wait for worker thread to process the job
result = _currentResult;
exception = _currentException;
// Clean up
_currentJob = null;
_currentResult = default;
_currentException = null;
}
}
// Throw the exception, if occurred, preserving the stack trace
if (exception != null) ExceptionDispatchInfo.Capture(exception).Throw();
return result;
}
public void Dispose()
{
lock (_outerLock)
{
lock (_innerLock)
{
_disposed = true;
Monitor.Pulse(_innerLock); // Notify worker thread to exit loop
}
}
}
}
使用示例:
var worker = new Worker<int>();
int result = worker.DoWork(() => 1); // Accepts a function as argument
Console.WriteLine($"Result: {result}");
worker.Dispose();
輸出:
Result: 1
更新:之前的解決方案對等待不友好,所以這里有一個允許適當?shù)却慕鉀Q方案。它TaskCompletionSource為每個作業(yè)使用一個,存儲在一個BlockingCollection.
class Worker<TResult> : IDisposable
{
private BlockingCollection<TaskCompletionSource<TResult>> _blockingCollection
= new BlockingCollection<TaskCompletionSource<TResult>>();
public Worker()
{
var thread = new Thread(MainLoop);
thread.IsBackground = true;
thread.Start();
}
private void MainLoop()
{
foreach (var tcs in _blockingCollection.GetConsumingEnumerable())
{
var job = (Func<TResult>)tcs.Task.AsyncState;
try
{
var result = job.Invoke();
tcs.SetResult(result);
}
catch (Exception ex)
{
tcs.TrySetException(ex);
}
}
}
public Task<TResult> DoWorkAsync(Func<TResult> job)
{
var tcs = new TaskCompletionSource<TResult>(job,
TaskCreationOptions.RunContinuationsAsynchronously);
_blockingCollection.Add(tcs);
return tcs.Task;
}
public TResult DoWork(Func<TResult> job) // Synchronous call
{
var task = DoWorkAsync(job);
try { task.Wait(); } catch { } // Swallow the AggregateException
// Throw the original exception, if occurred, preserving the stack trace
if (task.IsFaulted) ExceptionDispatchInfo.Capture(task.Exception.InnerException).Throw();
return task.Result;
}
public void Dispose()
{
_blockingCollection.CompleteAdding();
}
}
使用示例
var worker = new Worker<int>();
int result = await worker.DoWorkAsync(() => 1); // Accepts a function as argument
Console.WriteLine($"Result: {result}");
worker.Dispose();
輸出:
Result: 1

TA貢獻1911條經(jīng)驗 獲得超7個贊
從同步的角度來看,這工作正常。
但是這樣做似乎沒有用。如果你想一個接一個地執(zhí)行作業(yè),你可以使用鎖:
lock (lockObject) { RunJob(); }
您對這段代碼的意圖是什么?
還有一個效率問題,因為每個任務都會創(chuàng)建一個操作系統(tǒng)事件并等待它。如果您使用更現(xiàn)代TaskCompletionSource
的,如果您同步等待該任務,這將在引擎蓋下使用相同的東西。您可以使用異步等待 ( await myTCS.Task;
) 來稍微提高效率。當然,這會用 async/await 感染整個調(diào)用堆棧。如果這是一個相當?shù)偷慕灰琢坎僮?,您將不會獲得太多收益。

TA貢獻1829條經(jīng)驗 獲得超7個贊
總的來說,我認為可行,盡管當您說“許多”線程正在調(diào)用 Do() 時,這可能無法很好地擴展……掛起的線程使用資源。
這段代碼的另一個問題是,在空閑時間,您將在“workerThread”中出現(xiàn)“硬循環(huán)”,這將導致您的應用程序返回高 CPU 使用率時間。您可能希望將此代碼添加到“workerThread”:
if (concurrentQueue.IsEmpty) Thread.Sleep(1);
您可能還想為 WaitOne 調(diào)用引入超時以避免日志堵塞。
- 3 回答
- 0 關注
- 147 瀏覽
添加回答
舉報