第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號(hào)安全,請(qǐng)及時(shí)綁定郵箱和手機(jī)立即綁定
已解決430363個(gè)問(wèn)題,去搜搜看,總會(huì)有你想問(wèn)的

Reactor - 理解 .flatMap() 中的線程池

Reactor - 理解 .flatMap() 中的線程池

函數(shù)式編程 2023-03-31 15:38:28
我試圖了解反應(yīng)式編程是如何工作的。我為此準(zhǔn)備了簡(jiǎn)單的演示:WebClient來(lái)自 Spring Framework 的 reactive 將請(qǐng)求發(fā)送到簡(jiǎn)單的 rest api,并且此客戶端在每個(gè)操作中打印線程名稱(chēng)。休息API:@RestController@SpringBootApplicationpublic class RestApiApplication {    public static void main(String[] args) {        SpringApplication.run(RestApiApplication.class, args);    }    @PostMapping("/resource")    public void consumeResource(@RequestBody Resource resource) {        System.out.println(String.format("consumed resource: %s", resource.toString()));    }}@Data@AllArgsConstructorclass Resource {    private final Long id;    private final String name;}問(wèn)題是行為與我預(yù)期的不同。.map()我預(yù)計(jì),.filter()和 的每次調(diào)用都.flatMap()將在線程上執(zhí)行,而ormain的每次調(diào)用都將在 nio 線程池中的線程上執(zhí)行。所以我希望日志看起來(lái)像:.doOnSuccess().doOnError------- map [main] --------------- filter [main] --------------- flatmap [main] --------(and so on...)------- onsuccess [reactor-http-nio-2] --------(and so on...)但我得到的日志是:------- map [main] --------------- filter [main] --------------- flatmap [main] --------------- map [main] --------------- filter [main] --------------- flatmap [main] --------------- onsuccess [reactor-http-nio-2] --------------- onsuccess [reactor-http-nio-6] --------------- onsuccess [reactor-http-nio-4] --------------- onsuccess [reactor-http-nio-8] --------------- map [reactor-http-nio-2] --------------- filter [reactor-http-nio-2] --------------- flatmap [reactor-http-nio-2] --------------- map [reactor-http-nio-2] --------每次下一次登錄.map(),.filter()都是.flatMap()在 reactor-http-nio 的線程上完成的。下一個(gè)難以理解的事實(shí)是在主線程和 reactor-http-nio 上執(zhí)行的操作之間的比率總是不同的。有時(shí)所有操作.map(),.filter()和.flatMap()都在主線程上執(zhí)行。
查看完整描述

1 回答

?
慕桂英546537

TA貢獻(xiàn)1848條經(jīng)驗(yàn) 獲得超10個(gè)贊

Reactor 和 RxJava 一樣,可以被認(rèn)為是并發(fā)不可知的。也就是說(shuō),它不強(qiáng)制執(zhí)行并發(fā)模型。相反,它讓您(開(kāi)發(fā)人員)掌握一切。但是,這并不妨礙該庫(kù)幫助您處理并發(fā)。

獲得aFlux或aMono并不一定意味著它運(yùn)行在專(zhuān)用的Thread中。相反,大多數(shù)運(yùn)算符繼續(xù)在前一個(gè)運(yùn)算符執(zhí)行的線程中工作。subscribe()除非指定,否則最頂層的運(yùn)算符(源)本身在進(jìn)行調(diào)用的線程上運(yùn)行。

從您的代碼中,以下代碼段:

webClient.post()
?????????.uri("/resource")
?????????.syncBody(res)
?????????.header("Content-Type",?"application/json")
?????????.header("Accept",?"application/json")
?????????.retrieve()
?????????.bodyToMono(Resource.class)

導(dǎo)致線程從main切換到netty 的工作池。之后,以下所有操作均由 netty 工作線程執(zhí)行。

如果你想控制這種行為,你應(yīng)該publishOn(...)在你的代碼中添加一條語(yǔ)句,例如:

webClient.post()
?????????.uri("/resource")
?????????.syncBody(res)
?????????.header("Content-Type",?"application/json")
?????????.header("Accept",?"application/json")
?????????.retrieve()
?????????.bodyToMono(Resource.class)
?????????.publishOn(Schedulers.elastic())

這樣,彈性調(diào)度程序線程池將執(zhí)行任何后續(xù)操作。

另一個(gè)例子是使用專(zhuān)用調(diào)度程序處理 HTTP 請(qǐng)求執(zhí)行后的繁重任務(wù)。

import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;

import static com.github.tomakehurst.wiremock.client.WireMock.get;

import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;


import com.github.tomakehurst.wiremock.WireMockServer;

import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.Test;

import org.junit.jupiter.api.extension.ExtendWith;

import org.springframework.web.reactive.function.client.ClientResponse;

import org.springframework.web.reactive.function.client.WebClient;

import reactor.core.publisher.Flux;

import reactor.core.publisher.Mono;

import reactor.core.scheduler.Schedulers;

import ru.lanwen.wiremock.ext.WiremockResolver;

import ru.lanwen.wiremock.ext.WiremockResolver.Wiremock;

import ru.lanwen.wiremock.ext.WiremockUriResolver;

import ru.lanwen.wiremock.ext.WiremockUriResolver.WiremockUri;


@ExtendWith({

? WiremockResolver.class,

? WiremockUriResolver.class

})

public class ReactiveThreadsControlTest {


? private static int concurrency = 1;


? private final WebClient webClient = WebClient.create();


? @Test

? public void slowServerResponsesTest(@Wiremock WireMockServer server, @WiremockUri String uri) {


? ? String requestUri = "/slow-response";


? ? server.stubFor(get(urlEqualTo(requestUri))

? ? ? .willReturn(aResponse().withStatus(200)

? ? ? ? .withFixedDelay((int) TimeUnit.SECONDS.toMillis(2)))

? ? );


? ? Flux

? ? ? .generate(() -> Integer.valueOf(1), (i, sink) -> {

? ? ? ? System.out.println(String.format("[%s] Emitting next value: %d", Thread.currentThread().getName(), i));

? ? ? ? sink.next(i);

? ? ? ? return i + 1;

? ? ? })

? ? ? .subscribeOn(Schedulers.single())

? ? ? .flatMap(i ->

? ? ? ? ? executeGet(uri + requestUri)

? ? ? ? ? ? .publishOn(Schedulers.elastic())

? ? ? ? ? ? .map(response -> {

? ? ? ? ? ? ? heavyTask();

? ? ? ? ? ? ? return true;

? ? ? ? ? ? })

? ? ? ? , concurrency)

? ? ? .subscribe();


? ? blockForever();

? }


? private void blockForever() {

? ? Object monitor = new Object();


? ? synchronized (monitor) {

? ? ? try {

? ? ? ? monitor.wait();

? ? ? } catch (InterruptedException ex) {

? ? ? }

? ? }

? }



? private Mono<ClientResponse> executeGet(String path) {

? ? System.out.println(String.format("[%s] About to execute an HTTP GET request: %s", Thread.currentThread().getName(), path));

? ? return webClient

? ? ? .get()

? ? ? .uri(path)

? ? ? .exchange();

? }


? private void heavyTask() {

? ? try {

? ? ? System.out.println(String.format("[%s] About to execute a heavy task", Thread.currentThread().getName()));

? ? ? Thread.sleep(TimeUnit.SECONDS.toMillis(20));

? ? } catch (InterruptedException ex) {

? ? }

? }

}


查看完整回答
反對(duì) 回復(fù) 2023-03-31
  • 1 回答
  • 0 關(guān)注
  • 180 瀏覽
慕課專(zhuān)欄
更多

添加回答

舉報(bào)

0/150
提交
取消
微信客服

購(gòu)課補(bǔ)貼
聯(lián)系客服咨詢(xún)優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動(dòng)學(xué)習(xí)伙伴

公眾號(hào)

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號(hào)