我開發(fā)一個服務(wù)軟件A,A接收從通信軟件B發(fā)過來的包含很多個設(shè)備信息的數(shù)據(jù)包,B發(fā)送數(shù)據(jù)包的頻率很快(2s);我現(xiàn)在用一個單線程在服務(wù)軟件A中對B發(fā)過來的數(shù)據(jù)包進(jìn)行解析,先把解析出來的設(shè)備信息保存在內(nèi)存中的一個集合S中(只維持每個設(shè)備最新的一條數(shù)據(jù)),然后再把解析出來的數(shù)據(jù)發(fā)給不同的客戶端C(有多個C1,C2,C3。。。。。)。測試后發(fā)現(xiàn)在A中處理包的速度跟不上B發(fā)送包的速度,會導(dǎo)致B發(fā)過來的很多包得不到及時(shí)的處理。我考慮過用多線程,但還沒有什么思路,如果用多線程的話我這個內(nèi)存的集合S要怎么來管理。另外,整個過程對數(shù)據(jù)的實(shí)時(shí)性要求很高(就是B發(fā)送包后,在客戶端C中就能很快的接收到該包),望各位大蝦指教!
2 回答

呼如林
TA貢獻(xiàn)1798條經(jīng)驗(yàn) 獲得超3個贊
典型的生產(chǎn)、消費(fèi)模型,建議用 Queue<T> 非線程安全,需要自己同步。
1 public class Message 2 { 3 private string iD; 4 5 public string ID 6 { 7 get { return iD; } 8 set { iD = value; } 9 } 10 11 private int number; 12 13 public int Number 14 { 15 get { return number; } 16 set { number = value; } 17 } 18 }
1 ///<summary> 2 /// 信息處理單元,封裝了線程、狀態(tài)和同步對象 3 ///</summary> 4 class Worker 5 { 6 public AutoResetEvent State = new AutoResetEvent(false); 7 8 public Thread Thread = null; 9 10 ///<summary> 11 /// 表示線程是否在運(yùn)行著 12 ///</summary> 13 public volatile bool IsBusy = false; 14 }
1 public class MessageScheduler 2 { 3 #region Private Fields 4 5 ///<summary> 6 /// 線程對象列表 7 ///</summary> 8 private List<Worker> workerList = new List<Worker>(); 9 10 ///<summary> 11 /// 消息隊(duì)列 12 ///</summary> 13 private Queue<Message> messageList = new Queue<Message>(); 14 15 ///<summary> 16 /// 消息隊(duì)列同步對象 17 ///</summary> 18 private readonly object syncObject = new object(); 19 20 ///<summary> 21 /// 狀態(tài) 22 ///</summary> 23 private volatile bool running; 24 25 #endregion 26 27 28 #region Constructors 29 30 ///<summary> 31 /// 創(chuàng)建調(diào)度器(線程數(shù)量不要太多,否則線程的切換損耗很大) 32 ///</summary> 33 ///<param name="threadCount">要創(chuàng)建的線程數(shù)量</param> 34 public MessageScheduler(int threadCount) 35 { 36 running = false; 37 InitializeWorkerCount(threadCount); 38 } 39 40 #endregion 41 42 #region Private Methods 43 44 ///<summary> 45 /// 創(chuàng)建線程 46 ///</summary> 47 ///<param name="threadCount">要創(chuàng)建線程的數(shù)量</param> 48 private void InitializeWorkerCount(int threadCount) 49 { 50 Worker worker; 51 for (int i = 0; i < threadCount; i++) 52 { 53 worker = new Worker(); 54 worker.Thread = new Thread(new ParameterizedThreadStart(this.ProcessMessage)); 55 worker.Thread.IsBackground = true; 56 workerList.Add(worker); 57 } 58 } 59 60 ///<summary> 61 /// 消息處理程序 62 ///</summary> 63 ///<param name="param"></param> 64 private void ProcessMessage(object param) 65 { 66 Worker worker = param as Worker; 67 Message message = null; 68 worker.State.WaitOne(); 69 while (true) 70 { 71 //消息處理程序一定不要放到lock里面,否則多線程的性能可能會比單線程性能還低(線程切換會帶來損耗) 72 lock (this.syncObject) 73 { 74 if (messageList.Count > 0) 75 { 76 worker.IsBusy = true; 77 message = messageList.Dequeue(); 78 } 79 } 80 81 if (message != null) 82 { 83 //進(jìn)行消息處理,可能比較消耗CPU和時(shí)間。 84 //這里僅僅輸出消息的Number 85 Console.WriteLine(message.Number); 86 87 } 88 else 89 { 90 worker.IsBusy = false; 91 worker.State.WaitOne(); 92 } 93 } 94 } 95 96 #endregion 97 98 #region Public Methods 99 100 ///<summary> 101 /// 啟動調(diào)度器 102 ///</summary> 103 public void Start() 104 { 105 running = true; 106 foreach (Worker worker in workerList) 107 { 108 worker.Thread.Start(worker); 109 } 110 } 111 112 ///<summary> 113 /// 停止調(diào)度器 114 ///</summary> 115 public void Stop() 116 { 117 running = false; 118 foreach (Worker worker in workerList) 119 { 120 worker.Thread.Abort(); 121 } 122 } 123 124 ///<summary> 125 /// 消息調(diào)度 126 ///</summary> 127 ///<param name="message"></param> 128 public void DoWork(Message message) 129 { 130 if (!running) 131 { 132 return; 133 } 134 lock (this.syncObject) 135 { 136 this.messageList.Enqueue(message); 137 } 138 foreach (Worker worker in this.workerList) 139 { 140 //如果某線程處于等待,則通知繼續(xù) 141 if (!worker.IsBusy) 142 { 143 worker.State.Set(); 144 } 145 } 146 } 147 148 #endregion 149 }
1 class Program 2 { 3 static MessageScheduler scheduler = new MessageScheduler(5); 4 5 static void Main(string[] args) 6 { 7 scheduler.Start(); 8 System.Threading.Thread thread = new System.Threading.Thread(CreateMessage); 9 thread.IsBackground = true; 10 thread.Start(); 11 Console.ReadLine(); 12 } 13 14 static void CreateMessage() 15 { 16 Message message; 17 int i = 0; 18 while (true) 19 { 20 message = new Message(); 21 message.ID = Guid.NewGuid().ToString(); 22 message.Number = i; 23 scheduler.DoWork(message); 24 i++; 25 System.Threading.Thread.Sleep(1); 26 } 27 } 28 }

qq_笑_17
TA貢獻(xiàn)1818條經(jīng)驗(yàn) 獲得超7個贊
首先要肯定你的問題出在哪里。
是在解析數(shù)據(jù)上還是在發(fā)送到C哪里。
根據(jù)你說的情況,我初步估計(jì)你在把數(shù)據(jù)發(fā)給C時(shí)用的同步方法。
也就是說你需要等到C接收完數(shù)據(jù),你才會處理下一次發(fā)送或接收。
你只需發(fā)送到C時(shí)用異步發(fā)送即可
- 2 回答
- 0 關(guān)注
- 563 瀏覽
添加回答
舉報(bào)
0/150
提交
取消