3 回答

TA貢獻(xiàn)2016條經(jīng)驗(yàn) 獲得超9個(gè)贊
這里是最簡(jiǎn)單的算法,如果您只想在消息到達(dá)太快時(shí)丟棄它們(而不是排隊(duì)它們,這是有道理的,因?yàn)殛?duì)列可能會(huì)變得任意大):
rate = 5.0; // unit: messages
per = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds
when (message_received):
current = now();
time_passed = current - last_check;
last_check = current;
allowance += time_passed * (rate / per);
if (allowance > rate):
allowance = rate; // throttle
if (allowance < 1.0):
discard_message();
else:
forward_message();
allowance -= 1.0;
在這個(gè)解決方案中沒有數(shù)據(jù)結(jié)構(gòu),定時(shí)器等,它干凈利落地工作:)為了看到這一點(diǎn),'allowance'最多以每秒5/8單位的速度增長,即每8秒最多5個(gè)單位。轉(zhuǎn)發(fā)的每條消息都會(huì)扣除一個(gè)單元,因此每八秒鐘不能發(fā)送五條以上的消息。
請(qǐng)注意,rate應(yīng)該是一個(gè)整數(shù),即沒有非零小數(shù)部分,否則算法將無法正常工作(實(shí)際速率不會(huì)rate/per)。例如rate=0.5; per=1.0;不起作用,因?yàn)閍llowance永遠(yuǎn)不會(huì)增長到1.0。但rate=1.0; per=2.0;工作正常。

TA貢獻(xiàn)1871條經(jīng)驗(yàn) 獲得超13個(gè)贊
在排隊(duì)的函數(shù)之前使用此裝飾器@RateLimited(ratepersec)。
基本上,這會(huì)檢查自上次以來是否已經(jīng)過1 /速率秒,如果沒有,則等待剩余的時(shí)間,否則它不會(huì)等待。這實(shí)際上限制了你的速率/秒。裝飾器可以應(yīng)用于您想要的速率限制的任何功能。
在您的情況下,如果您希望每8秒最多發(fā)送5條消息,請(qǐng)?jiān)趕endToQueue函數(shù)之前使用@RateLimited(0.625)。
import timedef RateLimited(maxPerSecond): minInterval = 1.0 / float(maxPerSecond) def decorate(func): lastTimeCalled = [0.0] def rateLimitedFunction(*args,**kargs): elapsed = time.clock() - lastTimeCalled[0] leftToWait = minInterval - elapsed if leftToWait>0: time.sleep(leftToWait) ret = func(*args,**kargs) lastTimeCalled[0] = time.clock() return ret return rateLimitedFunction return decorate@RateLimited(2) # 2 per second at mostdef PrintNumber(num): print numif __name__ == "__main__": print "This should print 1,2,3... at about 2 per second." for i in range(1,100): PrintNumber(i)

TA貢獻(xiàn)1786條經(jīng)驗(yàn) 獲得超11個(gè)贊
令牌桶實(shí)現(xiàn)起來相當(dāng)簡(jiǎn)單。
從帶有5個(gè)令牌的桶開始。
每5/8秒:如果存儲(chǔ)桶少于5個(gè)令牌,請(qǐng)?zhí)砑右粋€(gè)。
每次要發(fā)送消息時(shí):如果存儲(chǔ)桶有≥1個(gè)令牌,請(qǐng)取出一個(gè)令牌并發(fā)送消息。否則,等待/刪除消息/等等。
(顯然,在實(shí)際代碼中,你使用整數(shù)計(jì)數(shù)器而不是真實(shí)的標(biāo)記,你可以通過存儲(chǔ)時(shí)間戳來優(yōu)化每5/8步驟)
再次閱讀問題,如果速率限制每8秒完全重置一次,那么這里是一個(gè)修改:
以時(shí)間戳開始,last_send
很久以前(例如,在紀(jì)元)。此外,從相同的5令牌桶開始。
每5/8秒執(zhí)行一次規(guī)則。
每次發(fā)送消息時(shí):首先,檢查是否last_send
≥8秒前。如果是這樣,請(qǐng)?zhí)畛渫埃▽⑵湓O(shè)置為5個(gè)令牌)。其次,如果存儲(chǔ)桶中有令牌,則發(fā)送消息(否則,丟棄/等待/等)。第三,設(shè)置last_send
為現(xiàn)在。
這應(yīng)該適用于那種情況。
我實(shí)際上是用這樣的策略編寫了一個(gè)IRC機(jī)器人(第一種方法)。它在Perl中,而不是Python,但這里有一些代碼來說明:
這里的第一部分處理向桶添加令牌。您可以看到基于時(shí)間(第2行到最后一行)添加令牌的優(yōu)化,然后最后一行將桶內(nèi)容限制為最大值(MESSAGE_BURST)
my $start_time = time; ... # Bucket handling my $bucket = $conn->{fujiko_limit_bucket}; my $lasttx = $conn->{fujiko_limit_lasttx}; $bucket += ($start_time-$lasttx)/MESSAGE_INTERVAL; ($bucket <= MESSAGE_BURST) or $bucket = MESSAGE_BURST;
$ conn是一種傳遞的數(shù)據(jù)結(jié)構(gòu)。這是在一個(gè)常規(guī)運(yùn)行的方法中(它計(jì)算下次有什么事情要做,并且長時(shí)間睡眠或直到它獲得網(wǎng)絡(luò)流量)。該方法的下一部分處理發(fā)送。它相當(dāng)復(fù)雜,因?yàn)橄⒕哂信c之相關(guān)的優(yōu)先級(jí)。
# Queue handling. Start with the ultimate queue. my $queues = $conn->{fujiko_queues}; foreach my $entry (@{$queues->[PRIORITY_ULTIMATE]}) { # Ultimate is special. We run ultimate no matter what. Even if # it sends the bucket negative. --$bucket; $entry->{code}(@{$entry->{args}}); } $queues->[PRIORITY_ULTIMATE] = [];
這是第一個(gè)隊(duì)列,無論如何都會(huì)運(yùn)行。即使它讓我們的連接因洪水而死亡。用于非常重要的事情,比如響應(yīng)服務(wù)器的PING。接下來,其余的隊(duì)列:
# Continue to the other queues, in order of priority. QRUN: for (my $pri = PRIORITY_HIGH; $pri >= PRIORITY_JUNK; --$pri) { my $queue = $queues->[$pri]; while (scalar(@$queue)) { if ($bucket < 1) { # continue later. $need_more_time = 1; last QRUN; } else { --$bucket; my $entry = shift @$queue; $entry->{code}(@{$entry->{args}}); } } }
最后,存儲(chǔ)桶狀態(tài)被保存回$ conn數(shù)據(jù)結(jié)構(gòu)(實(shí)際上稍晚于該方法;它首先計(jì)算它將在多長時(shí)間內(nèi)完成更多工作)
# Save status. $conn->{fujiko_limit_bucket} = $bucket; $conn->{fujiko_limit_lasttx} = $start_time;
如您所見,實(shí)際的鏟斗處理代碼非常小 - 大約四行。其余代碼是優(yōu)先級(jí)隊(duì)列處理。機(jī)器人具有優(yōu)先級(jí)隊(duì)列,例如,與之聊天的人不能阻止它執(zhí)行其重要的啟動(dòng)/禁令職責(zé)。
添加回答
舉報(bào)