第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號安全,請及時綁定郵箱和手機(jī)立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

RxJava 相當(dāng)于簡單的 ThreadPoolExecutor 示例

RxJava 相當(dāng)于簡單的 ThreadPoolExecutor 示例

千萬里不及你 2023-09-27 16:54:17
我已經(jīng)退出 Java 游戲大約 8 年了,從那時起發(fā)生了很多變化。對我來說最大的挑戰(zhàn)是 RxJava / 反應(yīng)式。我正在尋找有關(guān)如何以完全反應(yīng)式方式執(zhí)行以下等效操作的粗略指導(dǎo)。Stuff下面使用 ThreadPoolExecutor 實現(xiàn)的基本要求是通過調(diào)用遠(yuǎn)程 Web 服務(wù)來處理大量數(shù)據(jù),該服務(wù)的記錄速率限制為 100 個請求/分鐘。我的目標(biāo)是盡可能快地處理盡可能多的數(shù)據(jù),不丟失任何數(shù)據(jù),Stuff但仍遵守下游速率限制。該代碼已被簡化,以避免錯誤、隔板、斷路器、重試邏輯等。這段代碼目前工作正常,但在所有非阻塞反應(yīng)選項的情況下,它會導(dǎo)致感覺浪費(fèi)了很多線程。甚至我用來調(diào)用服務(wù)的 HTTP 客戶端也會返回 a Flowable,我只是在執(zhí)行程序的 20 個線程中的每個線程中阻塞它。我很想了解反應(yīng)性等價物應(yīng)該是什么。我一直在努力的地方是我發(fā)現(xiàn)的幾乎所有文檔都展示了使用 Observable 的靜態(tài)源(例如Observable.fromArray(1,2,3,4,5):)。我知道解決方案可能涉及IoScheduler和groupBy,但我還沒有弄清楚如何Flowable將來自我的 HTTP 客戶端的 s 合并到某個完整的鏈中,該鏈可以進(jìn)行并行化(最多限制,例如 20)和速率限制。public class Example {    private static final int THREADS = 20;    // using https://docs.micronaut.io/latest/guide/index.html#httpClient    @Client("http://stuff-processor.internal:8080")    @Inject    RxHttpClient httpClient;    private ThreadPoolExecutor executor;    private final RateLimiter rateLimiter;    public Example() {        // up to 20 threads to process the unbounded queue        // incoming Stuff is very bursty...        // ...we could go hours without anything and then hundreds could come in        this.executor = new ThreadPoolExecutor(THREADS, THREADS,                30,TimeUnit.SECONDS, new LinkedBlockingQueue<>());        this.executor.allowCoreThreadTimeOut(true);        // using https://resilience4j.readme.io/docs/ratelimiter        RateLimiterConfig config = RateLimiterConfig.custom()                .limitRefreshPeriod(Duration.ofSeconds(60))                .limitForPeriod(100)                .timeoutDuration(Duration.ofSeconds(90))                .build();        RateLimiterRegistry rateLimiterRegistry = RateLimiterRegistry.of(config);        rateLimiter = rateLimiterRegistry.rateLimiter("stuff-processor", config);    }    /**     * Called when the user takes an action that can cause 1 or 1000s of new     * Stuff to be entered into the system. Each instance of Stuff results in     * a separate call to this method. Ex: 100 Stuffs = 100 calls.     */
查看完整描述

1 回答

?
Cats萌萌

TA貢獻(xiàn)1805條經(jīng)驗 獲得超9個贊

首先,要以完全非阻塞的方式構(gòu)建它,您需要使用像 Netty 這樣的非阻塞、異步 HTTP 客戶端庫。我不確定如何RxHttpClient運(yùn)作。

假設(shè)你有一個 list stuff。我就是這樣做的:

Observable.fromIterable(stuffs).flatMap(a -> client.nonBlockingPost(a).subscribeOn(Schedulers.io())).subscribe();

flatMap合并響應(yīng)。

為了限制速率,您flatMap有第二個參數(shù),它限制它并行訂閱的內(nèi)部流的數(shù)量。假設(shè)您想同時撥打不超過 10 個電話。做這個:

Observable.fromIterable(stuffs).flatMap(a -> client.nonBlockingPost(a).subscribeOn(Schedulers.io()), 10).subscribe();



查看完整回答
反對 回復(fù) 2023-09-27
  • 1 回答
  • 0 關(guān)注
  • 122 瀏覽

添加回答

舉報

0/150
提交
取消
微信客服

購課補(bǔ)貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動學(xué)習(xí)伙伴

公眾號

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號