Netty EventLoop 事件循環(huán)機(jī)制
1. 前言
前面幾節(jié)主要講解了 Netty 的幾種線程模型,基本上都是理論上的東西,那么 Netty 是如何去實(shí)現(xiàn)這些線程模型的呢?答案:核心是 EventLoop,今天我們主要介紹 EventLoop 是如何來實(shí)現(xiàn)線程模型的。
2. 什么是 EventLoop
源碼:
public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {
EventLoopGroup parent();
}
public interface EventLoopGroup extends EventExecutorGroup {
}
public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> {
}
通過上面的簡單源碼,我們發(fā)現(xiàn) EventLoopGroup 就是一個(gè)線程池,它是繼承 Java 并發(fā)包下的定時(shí)線程池,而 EventLoop 則是線程池里面的一個(gè)子線程。
通過源碼查看它們之間的關(guān)系,具體如下所示:
public interface EventLoopGroup extends EventExecutorGroup {
EventLoop next();//返回線程組里面的一個(gè)線程
}
public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {
EventLoopGroup parent();//關(guān)聯(lián)該線程所屬的線程組
}
通過以上簡單的分析,我們需要掌握的知識(shí)點(diǎn)是,Netty 是通過線程池去實(shí)現(xiàn) Reactor 線程模型的,而線程池并不是使用 Java 內(nèi)置的線程池,而是繼承它們并且進(jìn)行了一定的擴(kuò)展。就是 EventLoopGroup 和 EventLoop。
3. EventLoop 的架構(gòu)圖
EventLoop 整體的類 UML 關(guān)系圖還是比較復(fù)雜的,Netty 底層主要是以 NIO 為主,因此核心掌握 NioEventLoop 和 NioEventLoopGroup 兩個(gè)類的結(jié)構(gòu)即可。
NioEventLoopGroup 關(guān)系圖:
以上的架構(gòu)圖,我們主要關(guān)心 NioEventLoopGroup
->MultithreadEventLoopGroup
->EventLoopGroup
,其實(shí)這種是比較典型的接口、抽象類、實(shí)現(xiàn)類的模式。
NioEventLoop 關(guān)系圖
以上的架構(gòu)圖,我們主要關(guān)心的是 NioEventLoop
->SingleThreadEventLoop
->EventLoop
,它也是典型的接口、抽象類、實(shí)現(xiàn)類的模式。
4. EventLoop 的核心原理
下面,我們將通過源碼的方式介紹 EventLoop 在 Netty 當(dāng)中是如何運(yùn)行的。
首先,我們需要了解 EventLoop 三個(gè)核心步驟,如下圖所示:
4.1 EventLoop 的核心作用
EventLoop 的核心作用是,一定客戶端連接進(jìn)來,則服務(wù)端給其分配一個(gè) Channel(連接通道),并且會(huì)給 Channel 分配一個(gè) EventLoop 和 ChannelPipeline。其中,EventLoop 主要負(fù)責(zé)該 Channel 相關(guān)的業(yè)務(wù)邏輯處理的,ChannelPipeline 則負(fù)責(zé)管理業(yè)務(wù)邏輯(雙向鏈表)。ChannelPipleline 下一個(gè)章節(jié)會(huì)詳細(xì)講解。
EventLoop 的核心功能是處理 Channel 相關(guān)的業(yè)務(wù)邏輯,它里面其實(shí)是一個(gè)死循環(huán),重復(fù)做著 3 個(gè)事件,分別是
- 監(jiān)控端口;
- 處理端口事件,將其分發(fā);
- 處理隊(duì)列事件。
核心結(jié)論:每個(gè) EventLoop 可以被綁定到多個(gè) Channel 身上,但是一個(gè) Channel 有且僅有一個(gè) EventLoop 與之進(jìn)行對(duì)應(yīng)。
4.2 NioEventLoop 初始化流程
這里,我們將通過核心源碼來梳理一下 NioEventLoopGroup
的初始化流程,也就是線程池的初始化。
實(shí)例:
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
.group(bossGroup, workerGroup);
//省略其它代碼
第一步: 進(jìn)入 NioEventLoopGroup 構(gòu)造函數(shù)
public class NioEventLoopGroup extends MultithreadEventLoopGroup {
public NioEventLoopGroup() {
//構(gòu)造函數(shù)一直跟進(jìn)
this(0);
}
public NioEventLoopGroup(
int nThreads, Executor executor,
SelectorProvider selectorProvider,
SelectStrategyFactory selectStrategyFactory) {
//調(diào)用父類的構(gòu)造函數(shù),點(diǎn)進(jìn)去,查看源碼
super(nThreads, executor,
new Object[]{selectorProvider, selectStrategyFactory,
RejectedExecutionHandlers.reject()});
}
}
第二步: MultithreadEventLoopGroup 構(gòu)造函數(shù)
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
//調(diào)用父類的構(gòu)造函數(shù),點(diǎn)進(jìn)去,查看源碼
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
}
第三步: MultithreadEventExecutorGroup 構(gòu)造函數(shù)
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
//構(gòu)造函數(shù)
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
//點(diǎn)擊跟進(jìn)
this(nThreads,
(Executor)(threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory)),
args);
}
//核心,在這里進(jìn)行 “線程組” 初始化工作
private final EventExecutor[] children;
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
//省略了其它代碼,只保留核心部分
//1.創(chuàng)建一個(gè)數(shù)組,長度是 nThreads
this.children = new EventExecutor[nThreads];
//2.數(shù)組初始化值
for(int i = 0; i < nThreads; ++i) {
//通過 this.newChild() 來創(chuàng)建具體“線程”
this.children[i] = this.newChild((Executor)executor, args);
}
}
//newChild 是一個(gè)抽象類,由子類去進(jìn)行實(shí)現(xiàn)
protected abstract EventExecutor newChild(Executor var1, Object... var2) throws Exception;
}
第四步: 由子類 NioEventLoopGroup 去實(shí)現(xiàn) newChild () 抽象方法
public class NioEventLoopGroup extends MultithreadEventLoopGroup {
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
//創(chuàng)建一個(gè) NioEventLoop
return new NioEventLoop(this, executor,
(SelectorProvider)args[0],
((SelectStrategyFactory)args[1]).newSelectStrategy(),
(RejectedExecutionHandler)args[2]);
}
}
到這里,我們終于看到 NioEventLoop 的身影了,在 newChild () 去進(jìn)行初始化工作。
4.3 NioEventLoop 執(zhí)行流程
上面講解了 NioEventLoop 的初始化流程,那么它到底在什么時(shí)候開始執(zhí)行的呢?
源碼入口:
serverBootstrap.bind(80);
第一步: 抽象類 AbstractBootstrap
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
public ChannelFuture bind(int inetPort) {
return this.bind(new InetSocketAddress(inetPort));
}
public ChannelFuture bind(SocketAddress localAddress) {
this.validate();
if (localAddress == null) {
throw new NullPointerException("localAddress");
} else {
//繼續(xù)跟進(jìn)
return this.doBind(localAddress);
}
}
private ChannelFuture doBind(final SocketAddress localAddress) {
//繼續(xù)跟進(jìn)
final ChannelFuture regFuture = this.initAndRegister();
}
final ChannelFuture initAndRegister() {
//繼續(xù)跟進(jìn)
this.init(channel);
}
//抽象方法
abstract void init(Channel var1) throws Exception;
}
第二步: 實(shí)現(xiàn)類 ServerBootstrap
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
void init(Channel channel) throws Exception {
//1.把 ChannelHandler 添加到 ChannelPipeline 里,組成一條雙向業(yè)務(wù)鏈表
p.addLast(new ChannelHandler[]{new ChannelInitializer<Channel>() {
public void initChannel(Channel ch) throws Exception {
//1.1.管道
final ChannelPipeline pipeline = ch.pipeline();
//1.2.添加到管道
ChannelHandler handler = ServerBootstrap.this.config.handler();
if (handler != null) {
pipeline.addLast(new ChannelHandler[]{handler});
}
//1.3.執(zhí)行線程池的 “execute()”,核心入口
ch.eventLoop().execute(new Runnable() {
public void run() {
pipeline.addLast(
new ChannelHandler[]{
new ServerBootstrap.ServerBootstrapAcceptor(
currentChildGroup,
currentChildHandler,
currentChildOptions,
currentChildAttrs)
}
);
}
});
}
}});
}
}
這里是在 init () 方法里面進(jìn)行一序列的初始化工作,并且執(zhí)行上面初始化好的 NioEventLoop 的 execute () 方法。
第三步: 執(zhí)行 SingleThreadEventExecutor 的 execute () 方法
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
public void execute(Runnable task) {
//是否是當(dāng)前線程
boolean inEventLoop = this.inEventLoop();
if (inEventLoop) {
//如果是當(dāng)前線程,則添加任務(wù)到隊(duì)列
this.addTask(task);
} else {
//如果不是當(dāng)前線程,則先啟動(dòng)線程
this.startThread();
//把任務(wù)添加到任務(wù)隊(duì)列
this.addTask(task);
//如果線程已經(jīng)關(guān)閉并且該任務(wù)已經(jīng)被移除了
if (this.isShutdown() && this.removeTask(task)) {
//執(zhí)行拒絕策略
reject();
}
}
}
private void startThread() {
this.doStartThread();
}
private void doStartThread() {
this.executor.execute(new Runnable() {
public void run() {
//執(zhí)行 run() 方法
SingleThreadEventExecutor.this.run();
}
});
}
//抽象方法
protected abstract void run();
}
第四步: 子類 NioEventLoop 實(shí)現(xiàn)抽象方法 run (),這里是 run () 方法是一個(gè)死循環(huán),并且執(zhí)行三個(gè)核心事件,分別是 “監(jiān)聽端口”、“處理端口事件”、“處理隊(duì)列事件”。
public final class NioEventLoop extends SingleThreadEventLoop {
protected void run() {
while(true) {
//省略
}
}
}
run () 方法里面核心執(zhí)行了 this.processSelectedKeys()
和 this.runAllTasks()
。
4.4 核心總結(jié)
每次執(zhí)行 execute () 時(shí),都是向隊(duì)列里面添加任務(wù),當(dāng)?shù)谝淮翁砑訒r(shí)則先啟動(dòng)線程,并且執(zhí)行子類 NioEventLoop 的 run () 方法。而該 run () 是整個(gè) EventLoop 的核心,主要的核心功能如下:
this.select()
,該方法是一個(gè)堵塞方法,主要是監(jiān)聽客戶端事件;this.processSelectedKeys()
,該方法獲取監(jiān)聽到的客戶端事件,并將其進(jìn)行分發(fā);this.runAllTasks()
,執(zhí)行隊(duì)列里面的任務(wù)。
5. 小結(jié)
本節(jié)主要是講解 EventLoop,它是 Netty 的線程模型的核心,Netty 已經(jīng)幫我們進(jìn)行了高度的封裝,不需要懂得其原理也不會(huì)影響 Netty 的使用,但是如果能了解其核心原理,可以讓我們更加深刻的理解 Netty 是如何運(yùn)轉(zhuǎn)的。