1 回答

TA貢獻(xiàn)1848條經(jīng)驗(yàn) 獲得超10個(gè)贊
Reactor 和 RxJava 一樣,可以被認(rèn)為是并發(fā)不可知的。也就是說(shuō),它不強(qiáng)制執(zhí)行并發(fā)模型。相反,它讓您(開(kāi)發(fā)人員)掌握一切。但是,這并不妨礙該庫(kù)幫助您處理并發(fā)。
獲得aFlux
或aMono
并不一定意味著它運(yùn)行在專(zhuān)用的Thread中。相反,大多數(shù)運(yùn)算符繼續(xù)在前一個(gè)運(yùn)算符執(zhí)行的線程中工作。subscribe()
除非指定,否則最頂層的運(yùn)算符(源)本身在進(jìn)行調(diào)用的線程上運(yùn)行。
從您的代碼中,以下代碼段:
webClient.post() ?????????.uri("/resource") ?????????.syncBody(res) ?????????.header("Content-Type",?"application/json") ?????????.header("Accept",?"application/json") ?????????.retrieve() ?????????.bodyToMono(Resource.class)
導(dǎo)致線程從main切換到netty 的工作池。之后,以下所有操作均由 netty 工作線程執(zhí)行。
如果你想控制這種行為,你應(yīng)該publishOn(...)
在你的代碼中添加一條語(yǔ)句,例如:
webClient.post() ?????????.uri("/resource") ?????????.syncBody(res) ?????????.header("Content-Type",?"application/json") ?????????.header("Accept",?"application/json") ?????????.retrieve() ?????????.bodyToMono(Resource.class) ?????????.publishOn(Schedulers.elastic())
這樣,彈性調(diào)度程序線程池將執(zhí)行任何后續(xù)操作。
另一個(gè)例子是使用專(zhuān)用調(diào)度程序處理 HTTP 請(qǐng)求執(zhí)行后的繁重任務(wù)。
import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
import static com.github.tomakehurst.wiremock.client.WireMock.get;
import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
import com.github.tomakehurst.wiremock.WireMockServer;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import ru.lanwen.wiremock.ext.WiremockResolver;
import ru.lanwen.wiremock.ext.WiremockResolver.Wiremock;
import ru.lanwen.wiremock.ext.WiremockUriResolver;
import ru.lanwen.wiremock.ext.WiremockUriResolver.WiremockUri;
@ExtendWith({
? WiremockResolver.class,
? WiremockUriResolver.class
})
public class ReactiveThreadsControlTest {
? private static int concurrency = 1;
? private final WebClient webClient = WebClient.create();
? @Test
? public void slowServerResponsesTest(@Wiremock WireMockServer server, @WiremockUri String uri) {
? ? String requestUri = "/slow-response";
? ? server.stubFor(get(urlEqualTo(requestUri))
? ? ? .willReturn(aResponse().withStatus(200)
? ? ? ? .withFixedDelay((int) TimeUnit.SECONDS.toMillis(2)))
? ? );
? ? Flux
? ? ? .generate(() -> Integer.valueOf(1), (i, sink) -> {
? ? ? ? System.out.println(String.format("[%s] Emitting next value: %d", Thread.currentThread().getName(), i));
? ? ? ? sink.next(i);
? ? ? ? return i + 1;
? ? ? })
? ? ? .subscribeOn(Schedulers.single())
? ? ? .flatMap(i ->
? ? ? ? ? executeGet(uri + requestUri)
? ? ? ? ? ? .publishOn(Schedulers.elastic())
? ? ? ? ? ? .map(response -> {
? ? ? ? ? ? ? heavyTask();
? ? ? ? ? ? ? return true;
? ? ? ? ? ? })
? ? ? ? , concurrency)
? ? ? .subscribe();
? ? blockForever();
? }
? private void blockForever() {
? ? Object monitor = new Object();
? ? synchronized (monitor) {
? ? ? try {
? ? ? ? monitor.wait();
? ? ? } catch (InterruptedException ex) {
? ? ? }
? ? }
? }
? private Mono<ClientResponse> executeGet(String path) {
? ? System.out.println(String.format("[%s] About to execute an HTTP GET request: %s", Thread.currentThread().getName(), path));
? ? return webClient
? ? ? .get()
? ? ? .uri(path)
? ? ? .exchange();
? }
? private void heavyTask() {
? ? try {
? ? ? System.out.println(String.format("[%s] About to execute a heavy task", Thread.currentThread().getName()));
? ? ? Thread.sleep(TimeUnit.SECONDS.toMillis(20));
? ? } catch (InterruptedException ex) {
? ? }
? }
}
添加回答
舉報(bào)