4 回答

TA貢獻1827條經(jīng)驗 獲得超9個贊
這看起來簡單得多:
int numberProcessed = 0;
Parallel.ForEach(listProxies,
new ParallelOptions { MaxDegreeOfParallelism = nThreadsNum },
(p)=> {
var result = CheckProxyServer(p.sIPAddress, s.nPort, Thread.CurrentThread.ManagedThreadId);
UpdateProxyDBRecord(result.sIPAddress, result.bOnlineStatus);
Interlocked.Increment(numberProcessed);
});
帶插槽:
var obj = new Object();
var slots = new List<int>();
Parallel.ForEach(listProxies,
new ParallelOptions { MaxDegreeOfParallelism = nThreadsNum },
(p)=> {
int threadId = Thread.CurrentThread.ManagedThreadId;
int slot = slots.IndexOf(threadId);
if (slot == -1)
{
lock(obj)
{
slots.Add(threadId);
}
slot = slots.IndexOf(threadId);
}
var result = CheckProxyServer(p.sIPAddress, s.nPort, slot);
UpdateProxyDBRecord(result.sIPAddress, result.bOnlineStatus);
});
我在那里采取了一些捷徑來保證線程安全。您不必執(zhí)行正常的檢查-鎖定-檢查舞蹈,因為永遠不會有兩個線程嘗試將相同的 threadid 添加到列表中,因此第二次檢查將始終失敗并且不需要。其次,出于同樣的原因,我認(rèn)為您也不需要鎖定外部 IndexOf 。這使得它成為一個非常高效的并發(fā)例程,無論可枚舉中有多少項,都很少鎖定(它應(yīng)該只鎖定 nThreadsNum 次)。

TA貢獻1895條經(jīng)驗 獲得超7個贊
另一個解決方案是使用 aSemaphoreSlim
或使用 的生產(chǎn)者-消費者模式BlockinCollection<T>
。兩種解決方案都支持取消。
信號量瘦身
private async Task CheckProxyServerAsync(IEnumerable<object> proxies)
{
? var tasks = new List<Task>();
? int currentThreadNumber = 0;
? int maxNumberOfThreads = 8;
? using (semaphore = new SemaphoreSlim(maxNumberOfThreads, maxNumberOfThreads))
? {
? ? foreach (var proxy in proxies)
? ? {
? ? ? // Asynchronously wait until thread is available if thread limit reached
? ? ? await semaphore.WaitAsync();
? ? ? string proxyIP = proxy.IPAddress;
? ? ? int port = proxy.Port;
? ? ? tasks.Add(Task.Run(() => CheckProxyServer(proxyIP, port, Interlocked.Increment(ref currentThreadNumber)))
? ? ? ? .ContinueWith(
? ? ? ? ? (task) =>
? ? ? ? ? {
? ? ? ? ? ? ProxyAddress result = task.Result;
? ? ? ? ? ? // Method call must be thread-safe!
? ? ? ? ? ? UpdateProxyDbRecord(result.IPAddress, result.OnlineStatus);
? ? ? ? ? ? Interlocked.Decrement(ref currentThreadNumber);
? ? ? ? ? ? // Allow to start next thread if thread limit was reached
? ? ? ? ? ? semaphore.Release();
? ? ? ? ? },
? ? ? ? ? TaskContinuationOptions.OnlyOnRanToCompletion));
? ? }
? ? // Asynchronously wait until all tasks are completed
? ? // to prevent premature disposal of semaphore
? ? await Task.WhenAll(tasks);
? }
}
生產(chǎn)者-消費者模式
// Uses a fixed number of same threads
private async Task CheckProxyServerAsync(IEnumerable<ProxyInfo> proxies)
{
? var pipe = new BlockingCollection<ProxyInfo>();
? int maxNumberOfThreads = 8;
? var tasks = new List<Task>();
? // Create all threads (count == maxNumberOfThreads)
? for (int currentThreadNumber = 0; currentThreadNumber < maxNumberOfThreads; currentThreadNumber++)
? {
? ? tasks.Add(
? ? ? Task.Run(() => ConsumeProxyInfo(pipe, currentThreadNumber)));
? }
? proxies.ToList().ForEach(pipe.Add);
? pipe.CompleteAdding();
? await Task.WhenAll(tasks);
}
private void ConsumeProxyInfo(BlockingCollection<ProxyInfo> proxiesPipe, int currentThreadNumber)
{
? while (!proxiesPipe.IsCompleted)
? {
? ? if (proxiesPipe.TryTake(out ProxyInfo proxy))
? ? {
? ? ? int port = proxy.Port;
? ? ? string proxyIP = proxy.IPAddress;
? ? ? ProxyAddress result = CheckProxyServer(proxyIP, port, currentThreadNumber);?
? ? ? // Method call must be thread-safe!
? ? ? UpdateProxyDbRecord(result.IPAddress, result.OnlineStatus);
? ? }
? }
}

TA貢獻1828條經(jīng)驗 獲得超6個贊
我建議稍微改變一下你的方法。不要啟動和停止線程,而是將代理服務(wù)器數(shù)據(jù)放入并發(fā)隊列中,每個代理服務(wù)器對應(yīng)一個項目。然后創(chuàng)建固定數(shù)量的線程(或異步任務(wù))來處理隊列。在我看來,這更有可能提供平穩(wěn)的性能(您不會一遍又一遍地啟動和停止線程,這會產(chǎn)生開銷)并且更容易編碼。
一個簡單的例子:
class ProxyChecker
{
private ConcurrentQueue<ProxyInfo> _masterQueue = new ConcurrentQueue<ProxyInfo>();
public ProxyChecker(IEnumerable<ProxyInfo> listProxies)
{
foreach (var proxy in listProxies)
{
_masterQueue.Enqueue(proxy);
}
}
public async Task RunChecks(int maximumConcurrency)
{
var count = Math.Max(maximumConcurrency, _masterQueue.Count);
var tasks = Enumerable.Range(0, count).Select( i => WorkerTask() ).ToList();
await Task.WhenAll(tasks);
}
private async Task WorkerTask()
{
ProxyInfo proxyInfo;
while ( _masterList.TryDequeue(out proxyInfo))
{
DoTheTest(proxyInfo.IP, proxyInfo.Port)
}
}
}

TA貢獻1804條經(jīng)驗 獲得超8個贊
如果我正確理解你的問題,這實際上是相當(dāng)簡單的await Task.WhenAny。基本上,您保留所有正在運行的任務(wù)的集合。一旦運行的任務(wù)達到一定數(shù)量,您將等待一個或多個任務(wù)完成,然后從集合中刪除已完成的任務(wù)并繼續(xù)添加更多任務(wù)。
下面是我的意思的一個例子:
var tasks = new List<Task>();
for (int i = 0; i < 20; i++)
{
// I want my list of tasks to contain at most 5 tasks at once
if (tasks.Count == 5)
{
// Wait for at least one of the tasks to complete
await Task.WhenAny(tasks.ToArray());
// Remove all of the completed tasks from the list
tasks = tasks.Where(t => !t.IsCompleted).ToList();
}
// Add some task to the list
tasks.Add(Task.Factory.StartNew(async delegate ()
{
await Task.Delay(1000);
}));
}
- 4 回答
- 0 關(guān)注
- 163 瀏覽
添加回答
舉報