第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

如何在基于 db 的多線程通知/電子郵件發(fā)件人中減少 CPU 使用率

如何在基于 db 的多線程通知/電子郵件發(fā)件人中減少 CPU 使用率

C#
慕妹3242003 2022-06-12 15:29:11
我正在嘗試開發(fā)一個 Windows 服務(wù)來向訂閱發(fā)送通知。數(shù)據(jù)保存在 SQL 服務(wù)器數(shù)據(jù)庫中。通知是通過向 REST API 端點發(fā)出 Web POST 請求來創(chuàng)建的,并保存在數(shù)據(jù)庫表中。該服務(wù)啟動一個任務(wù),該任務(wù)不斷從該數(shù)據(jù)庫表中讀取通知并將它們添加到隊列中。該服務(wù)還啟動了一些任務(wù),這些任務(wù)不斷從隊列中讀取并執(zhí)行實際的發(fā)送過程。代碼運行良好并完成了所需的工作,但問題是運行服務(wù)時 CPU 使用率為 100%。我嘗試使用 Thread.Sleep 或 Task.Delay 但都沒有幫助我減少 CPU 使用率。我已在此 codeprojct頁面中讀到,我需要使用等待處理程序并且應(yīng)該在某些條件下等待。我無法正常工作。那么誰能建議我可以做些什么來減少EnqueueTask和的CPU使用率DequeueTask?這是發(fā)件人代碼:static class NotificationSender{    static ConcurrentQueue<NotificationDelivery> deliveryQueue = null;    static Task enqueueTask = null;    static Task[] dequeueTasks = null;    public static void StartSending(ServiceState serviceState)    {        PushService.InitServices();        enqueueTask = Task.Factory.StartNew(EnqueueTask, serviceState);        deliveryQueue = new ConcurrentQueue<NotificationDelivery>();        int dequeueTasksCount = 10;        dequeueTasks = new Task[dequeueTasksCount];        for (int i = 0; i < dequeueTasksCount; i++)        {            dequeueTasks[i] = Task.Factory.StartNew(DequeueTask, serviceState);        }    }    public static void EnqueueTask(object state)    {        ServiceState serviceState = (ServiceState)state;        using (DSTeckWebPushNotificationsContext db = new DSTeckWebPushNotificationsContext())        {            while (!serviceState.CancellationTokenSource.Token.IsCancellationRequested)            {                int toEnqueue = 100 - deliveryQueue.Count;
查看完整描述

1 回答

?
絕地?zé)o雙

TA貢獻1946條經(jīng)驗 獲得超4個贊

我想我終于可以使用等待處理程序和計時器進行良好的更改以維持 CPU 使用率。


EnqueueTask如果沒有獲取到通知,將等待 5 秒,然后再嘗試從通知表中獲取數(shù)據(jù)。如果沒有獲取到通知,它將啟動計時器并重置等待句柄。然后計時器經(jīng)過的回調(diào)將設(shè)置等待句柄。


現(xiàn)在也DequeueTask正在使用等待句柄。如果隊列中沒有更多項目,它將重置等待句柄以停止出隊空隊列。EnqueueTask將項目添加到隊列時將設(shè)置此等待句柄。


CPU 使用率現(xiàn)在 <= 10%


這是更新的NotificationSender代碼:


static class NotificationSender

{

    static ConcurrentQueue<NotificationDelivery> deliveryQueue = null;

    static Task enqueueTask = null;

    static Task[] dequeueTasks = null;


    static ManualResetEvent enqueueSignal = null;

    static ManualResetEvent dequeueSignal = null;


    static System.Timers.Timer enqueueTimer = null;


    public static void StartSending(CancellationToken token)

    {

        PushService.InitServices();


        using (DSTeckWebPushNotificationsContext db = new DSTeckWebPushNotificationsContext())

        {

            NotificationDelivery[] queuedDeliveries = db.NotificationDeliveries

                        .Where(nd => nd.Status == NotificationDeliveryStatus.Queued)

                        .ToArray();


            foreach (NotificationDelivery delivery in queuedDeliveries)

            {

                delivery.Status = NotificationDeliveryStatus.Pending;

            }


            db.SaveChanges();

        }


        enqueueSignal = new ManualResetEvent(true);

        dequeueSignal = new ManualResetEvent(false);


        enqueueTimer = new System.Timers.Timer();

        enqueueTimer.Elapsed += EnqueueTimerCallback;

        enqueueTimer.Interval = 5000;

        enqueueTimer.AutoReset = false;

        enqueueTimer.Stop();


        enqueueTask = new Task(EnqueueTask, token, TaskCreationOptions.LongRunning);

        enqueueTask.Start();


        deliveryQueue = new ConcurrentQueue<NotificationDelivery>();


        int dequeueTasksCount = 10;

        dequeueTasks = new Task[dequeueTasksCount];

        for (int i = 0; i < dequeueTasksCount; i++)

        {

            dequeueTasks[i] = new Task(DequeueTask, token, TaskCreationOptions.LongRunning);

            dequeueTasks[i].Start();

        }

    }


    public static void EnqueueTimerCallback(Object source, ElapsedEventArgs e)

    {

        enqueueSignal.Set();

        enqueueTimer.Stop();

    }


    public static void EnqueueTask(object state)

    {

        CancellationToken token = (CancellationToken)state;


        using (DSTeckWebPushNotificationsContext db = new DSTeckWebPushNotificationsContext())

        {

            while (!token.IsCancellationRequested)

            {

                if (enqueueSignal.WaitOne())

                {

                    int toEnqueue = 100 - deliveryQueue.Count;


                    if (toEnqueue > 0)

                    {

                        // fetch some records from db to be enqueued

                        NotificationDelivery[] deliveries = db.NotificationDeliveries

                            .Include("Subscription")

                            .Include("Notification")

                            .Include("Notification.NotificationLanguages")

                            .Include("Notification.NotificationLanguages.Language")

                            .Where(nd => nd.Status == NotificationDeliveryStatus.Pending && DateTime.Now >= nd.StartSendingAt)

                            .OrderBy(nd => nd.StartSendingAt)

                            .Take(toEnqueue)

                            .ToArray();


                        foreach (NotificationDelivery delivery in deliveries)

                        {

                            delivery.Status = NotificationDeliveryStatus.Queued;

                            deliveryQueue.Enqueue(delivery);

                        }


                        if (deliveries.Length > 0)

                        {

                            // save Queued state, so not fetched again the next loop

                            db.SaveChanges();


                            // signal the DequeueTask

                            dequeueSignal.Set();

                        }

                        else

                        {

                            // no more notifications, wait 5 seconds before try fetching again

                            enqueueSignal.Reset();

                            enqueueTimer.Start();

                        }

                    }


                    // save any changes made by the DequeueTask

                    // an event may be used here to know if any changes made

                    db.SaveChanges();

                }

            }


            Task.WaitAll(dequeueTasks);

            db.SaveChanges();

        }

    }


    public async static void DequeueTask(object state)

    {

        CancellationToken token = (CancellationToken)state;


        while (!token.IsCancellationRequested)

        {

            if (dequeueSignal.WaitOne()) // block untill we have items in the queue

            {

                NotificationDelivery delivery = null;


                if (deliveryQueue.TryDequeue(out delivery))

                {

                    NotificationDeliveryStatus ns = NotificationDeliveryStatus.Pending;

                    if (delivery.Subscription.Status == SubscriptionStatus.Subscribed)

                    {

                        PushResult result = await PushService.DoPushAsync(delivery);


                        switch (result)

                        {

                            case PushResult.Pushed:

                                ns = NotificationDeliveryStatus.Delivered;

                                break;

                            case PushResult.Error:

                                ns = NotificationDeliveryStatus.FailureError;

                                break;

                            case PushResult.NotSupported:

                                ns = NotificationDeliveryStatus.FailureNotSupported;

                                break;

                            case PushResult.UnSubscribed:

                                ns = NotificationDeliveryStatus.FailureUnSubscribed;

                                delivery.Subscription.Status = SubscriptionStatus.UnSubscribed;

                                break;

                        }

                    }

                    else

                    {

                        ns = NotificationDeliveryStatus.FailureUnSubscribed;

                    }


                    delivery.Status = ns;

                    delivery.DeliveredAt = DateTime.Now;

                }

                else

                {

                    // empty queue, no more items

                    // stop dequeueing untill new items added by EnqueueTask

                    dequeueSignal.Reset();

                }

            }

        }

    }


    public static void Wait()

    {

        Task.WaitAll(enqueueTask);

        Task.WaitAll(dequeueTasks);


        enqueueTask.Dispose();

        for(int i = 0; i < dequeueTasks.Length; i++)

        {

            dequeueTasks[i].Dispose();

        }

    }

}



查看完整回答
反對 回復(fù) 2022-06-12
  • 1 回答
  • 0 關(guān)注
  • 111 瀏覽

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動學(xué)習(xí)伙伴

公眾號

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號