我已經(jīng)退出 Java 游戲大約 8 年了,從那時起發(fā)生了很多變化。對我來說最大的挑戰(zhàn)是 RxJava / 反應(yīng)式。我正在尋找有關(guān)如何以完全反應(yīng)式方式執(zhí)行以下等效操作的粗略指導(dǎo)。Stuff下面使用 ThreadPoolExecutor 實現(xiàn)的基本要求是通過調(diào)用遠(yuǎn)程 Web 服務(wù)來處理大量數(shù)據(jù),該服務(wù)的記錄速率限制為 100 個請求/分鐘。我的目標(biāo)是盡可能快地處理盡可能多的數(shù)據(jù),不丟失任何數(shù)據(jù),Stuff但仍遵守下游速率限制。該代碼已被簡化,以避免錯誤、隔板、斷路器、重試邏輯等。這段代碼目前工作正常,但在所有非阻塞反應(yīng)選項的情況下,它會導(dǎo)致感覺浪費(fèi)了很多線程。甚至我用來調(diào)用服務(wù)的 HTTP 客戶端也會返回 a Flowable,我只是在執(zhí)行程序的 20 個線程中的每個線程中阻塞它。我很想了解反應(yīng)性等價物應(yīng)該是什么。我一直在努力的地方是我發(fā)現(xiàn)的幾乎所有文檔都展示了使用 Observable 的靜態(tài)源(例如Observable.fromArray(1,2,3,4,5):)。我知道解決方案可能涉及IoScheduler和groupBy,但我還沒有弄清楚如何Flowable將來自我的 HTTP 客戶端的 s 合并到某個完整的鏈中,該鏈可以進(jìn)行并行化(最多限制,例如 20)和速率限制。public class Example { private static final int THREADS = 20; // using https://docs.micronaut.io/latest/guide/index.html#httpClient @Client("http://stuff-processor.internal:8080") @Inject RxHttpClient httpClient; private ThreadPoolExecutor executor; private final RateLimiter rateLimiter; public Example() { // up to 20 threads to process the unbounded queue // incoming Stuff is very bursty... // ...we could go hours without anything and then hundreds could come in this.executor = new ThreadPoolExecutor(THREADS, THREADS, 30,TimeUnit.SECONDS, new LinkedBlockingQueue<>()); this.executor.allowCoreThreadTimeOut(true); // using https://resilience4j.readme.io/docs/ratelimiter RateLimiterConfig config = RateLimiterConfig.custom() .limitRefreshPeriod(Duration.ofSeconds(60)) .limitForPeriod(100) .timeoutDuration(Duration.ofSeconds(90)) .build(); RateLimiterRegistry rateLimiterRegistry = RateLimiterRegistry.of(config); rateLimiter = rateLimiterRegistry.rateLimiter("stuff-processor", config); } /** * Called when the user takes an action that can cause 1 or 1000s of new * Stuff to be entered into the system. Each instance of Stuff results in * a separate call to this method. Ex: 100 Stuffs = 100 calls. */
1 回答

Cats萌萌
TA貢獻(xiàn)1805條經(jīng)驗 獲得超9個贊
首先,要以完全非阻塞的方式構(gòu)建它,您需要使用像 Netty 這樣的非阻塞、異步 HTTP 客戶端庫。我不確定如何RxHttpClient
運(yùn)作。
假設(shè)你有一個 list stuff
。我就是這樣做的:
Observable.fromIterable(stuffs).flatMap(a -> client.nonBlockingPost(a).subscribeOn(Schedulers.io())).subscribe();
flatMap
合并響應(yīng)。
為了限制速率,您flatMap
有第二個參數(shù),它限制它并行訂閱的內(nèi)部流的數(shù)量。假設(shè)您想同時撥打不超過 10 個電話。做這個:
Observable.fromIterable(stuffs).flatMap(a -> client.nonBlockingPost(a).subscribeOn(Schedulers.io()), 10).subscribe();
添加回答
舉報
0/150
提交
取消