第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

首頁 慕課教程 Netty 教程 Netty 教程 Netty EventLoop 事件循環(huán)機(jī)制

Netty EventLoop 事件循環(huán)機(jī)制

1. 前言

前面幾節(jié)主要講解了 Netty 的幾種線程模型,基本上都是理論上的東西,那么 Netty 是如何去實(shí)現(xiàn)這些線程模型的呢?答案:核心是 EventLoop,今天我們主要介紹 EventLoop 是如何來實(shí)現(xiàn)線程模型的。

2. 什么是 EventLoop

源碼:

public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {
    EventLoopGroup parent();
}

public interface EventLoopGroup extends EventExecutorGroup {
    
}

public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> {
    
}

通過上面的簡單源碼,我們發(fā)現(xiàn) EventLoopGroup 就是一個(gè)線程池,它是繼承 Java 并發(fā)包下的定時(shí)線程池,而 EventLoop 則是線程池里面的一個(gè)子線程。

通過源碼查看它們之間的關(guān)系,具體如下所示:

public interface EventLoopGroup extends EventExecutorGroup {
    EventLoop next();//返回線程組里面的一個(gè)線程
}

public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {
    EventLoopGroup parent();//關(guān)聯(lián)該線程所屬的線程組
}

通過以上簡單的分析,我們需要掌握的知識(shí)點(diǎn)是,Netty 是通過線程池去實(shí)現(xiàn) Reactor 線程模型的,而線程池并不是使用 Java 內(nèi)置的線程池,而是繼承它們并且進(jìn)行了一定的擴(kuò)展。就是 EventLoopGroup 和 EventLoop。

3. EventLoop 的架構(gòu)圖

EventLoop 整體的類 UML 關(guān)系圖還是比較復(fù)雜的,Netty 底層主要是以 NIO 為主,因此核心掌握 NioEventLoop 和 NioEventLoopGroup 兩個(gè)類的結(jié)構(gòu)即可。

NioEventLoopGroup 關(guān)系圖:
圖片描述

以上的架構(gòu)圖,我們主要關(guān)心 NioEventLoopGroup->MultithreadEventLoopGroup->EventLoopGroup,其實(shí)這種是比較典型的接口、抽象類、實(shí)現(xiàn)類的模式。

NioEventLoop 關(guān)系圖
圖片描述

以上的架構(gòu)圖,我們主要關(guān)心的是 NioEventLoop->SingleThreadEventLoop->EventLoop,它也是典型的接口、抽象類、實(shí)現(xiàn)類的模式。

4. EventLoop 的核心原理

下面,我們將通過源碼的方式介紹 EventLoop 在 Netty 當(dāng)中是如何運(yùn)行的。
首先,我們需要了解 EventLoop 三個(gè)核心步驟,如下圖所示:
圖片描述

4.1 EventLoop 的核心作用

EventLoop 的核心作用是,一定客戶端連接進(jìn)來,則服務(wù)端給其分配一個(gè) Channel(連接通道),并且會(huì)給 Channel 分配一個(gè) EventLoop 和 ChannelPipeline。其中,EventLoop 主要負(fù)責(zé)該 Channel 相關(guān)的業(yè)務(wù)邏輯處理的,ChannelPipeline 則負(fù)責(zé)管理業(yè)務(wù)邏輯(雙向鏈表)。ChannelPipleline 下一個(gè)章節(jié)會(huì)詳細(xì)講解。

EventLoop 的核心功能是處理 Channel 相關(guān)的業(yè)務(wù)邏輯,它里面其實(shí)是一個(gè)死循環(huán),重復(fù)做著 3 個(gè)事件,分別是

  1. 監(jiān)控端口;
  2. 處理端口事件,將其分發(fā);
  3. 處理隊(duì)列事件。

核心結(jié)論:每個(gè) EventLoop 可以被綁定到多個(gè) Channel 身上,但是一個(gè) Channel 有且僅有一個(gè) EventLoop 與之進(jìn)行對(duì)應(yīng)。

4.2 NioEventLoop 初始化流程

這里,我們將通過核心源碼來梳理一下 NioEventLoopGroup 的初始化流程,也就是線程池的初始化。

實(shí)例:

NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();

ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
    .group(bossGroup, workerGroup);
//省略其它代碼

第一步: 進(jìn)入 NioEventLoopGroup 構(gòu)造函數(shù)

public class NioEventLoopGroup extends MultithreadEventLoopGroup {
    public NioEventLoopGroup() {
        //構(gòu)造函數(shù)一直跟進(jìn)
        this(0);
    }

    public NioEventLoopGroup(
        int nThreads, Executor executor, 
        SelectorProvider selectorProvider, 
        SelectStrategyFactory selectStrategyFactory) {

        //調(diào)用父類的構(gòu)造函數(shù),點(diǎn)進(jìn)去,查看源碼
        super(nThreads, executor, 
              new Object[]{selectorProvider, selectStrategyFactory, 
                           RejectedExecutionHandlers.reject()});

	}
}

第二步: MultithreadEventLoopGroup 構(gòu)造函數(shù)

public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
    
    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        //調(diào)用父類的構(gòu)造函數(shù),點(diǎn)進(jìn)去,查看源碼
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }
}

第三步: MultithreadEventExecutorGroup 構(gòu)造函數(shù)

public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
    
   	//構(gòu)造函數(shù)
	protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
        
        //點(diǎn)擊跟進(jìn)
        this(nThreads, 
             (Executor)(threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory)), 
             args);
    }
    
    //核心,在這里進(jìn)行 “線程組” 初始化工作
    private final EventExecutor[] children;
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor, 
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        //省略了其它代碼,只保留核心部分
        
        //1.創(chuàng)建一個(gè)數(shù)組,長度是 nThreads
		this.children = new EventExecutor[nThreads];
        //2.數(shù)組初始化值
        for(int i = 0; i < nThreads; ++i) {
            //通過 this.newChild() 來創(chuàng)建具體“線程”
            this.children[i] = this.newChild((Executor)executor, args);
        }
    }
    
    //newChild 是一個(gè)抽象類,由子類去進(jìn)行實(shí)現(xiàn)
    protected abstract EventExecutor newChild(Executor var1, Object... var2) throws Exception;
}

第四步: 由子類 NioEventLoopGroup 去實(shí)現(xiàn) newChild () 抽象方法

public class NioEventLoopGroup extends MultithreadEventLoopGroup {
	protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    	//創(chuàng)建一個(gè) NioEventLoop
        return new NioEventLoop(this, executor, 
                                (SelectorProvider)args[0], 	
                                ((SelectStrategyFactory)args[1]).newSelectStrategy(), 
                                (RejectedExecutionHandler)args[2]);
    } 
}
	

到這里,我們終于看到 NioEventLoop 的身影了,在 newChild () 去進(jìn)行初始化工作。

4.3 NioEventLoop 執(zhí)行流程

上面講解了 NioEventLoop 的初始化流程,那么它到底在什么時(shí)候開始執(zhí)行的呢?

源碼入口:

serverBootstrap.bind(80);

第一步: 抽象類 AbstractBootstrap

public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
    
    public ChannelFuture bind(int inetPort) {
        return this.bind(new InetSocketAddress(inetPort));
    }
    
    public ChannelFuture bind(SocketAddress localAddress) {
        this.validate();
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        } else {
            //繼續(xù)跟進(jìn)
            return this.doBind(localAddress);
        }
    }
    
    private ChannelFuture doBind(final SocketAddress localAddress) {
        //繼續(xù)跟進(jìn)
        final ChannelFuture regFuture = this.initAndRegister();
    }
    
    final ChannelFuture initAndRegister() {
        //繼續(xù)跟進(jìn)
        this.init(channel);
    }
    
    //抽象方法
    abstract void init(Channel var1) throws Exception;
}

第二步: 實(shí)現(xiàn)類 ServerBootstrap

public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
    void init(Channel channel) throws Exception {
        
        //1.把 ChannelHandler 添加到 ChannelPipeline 里,組成一條雙向業(yè)務(wù)鏈表
        p.addLast(new ChannelHandler[]{new ChannelInitializer<Channel>() {
            public void initChannel(Channel ch) throws Exception {
                //1.1.管道
                final ChannelPipeline pipeline = ch.pipeline();
                //1.2.添加到管道
                ChannelHandler handler = ServerBootstrap.this.config.handler();
                if (handler != null) {
                    pipeline.addLast(new ChannelHandler[]{handler});
                }
				//1.3.執(zhí)行線程池的 “execute()”,核心入口
                ch.eventLoop().execute(new Runnable() {
                    public void run() {
                        pipeline.addLast(
                            new ChannelHandler[]{
                                new ServerBootstrap.ServerBootstrapAcceptor(
                                    currentChildGroup, 
                                    currentChildHandler, 
                                    currentChildOptions, 
                                    currentChildAttrs)
                            }
                        );
                    }
                });
            }
        }});
    }
}

這里是在 init () 方法里面進(jìn)行一序列的初始化工作,并且執(zhí)行上面初始化好的 NioEventLoop 的 execute () 方法。

第三步: 執(zhí)行 SingleThreadEventExecutor 的 execute () 方法

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
    
	public void execute(Runnable task) {
        //是否是當(dāng)前線程
		boolean inEventLoop = this.inEventLoop();
		if (inEventLoop) {
            //如果是當(dāng)前線程,則添加任務(wù)到隊(duì)列
			this.addTask(task);
		} else {
            //如果不是當(dāng)前線程,則先啟動(dòng)線程
			this.startThread();
            //把任務(wù)添加到任務(wù)隊(duì)列
			this.addTask(task);
            //如果線程已經(jīng)關(guān)閉并且該任務(wù)已經(jīng)被移除了
			if (this.isShutdown() && this.removeTask(task)) {
                //執(zhí)行拒絕策略
				reject();
			}
         }
    }
    
    private void startThread() {
        this.doStartThread();
    }
    
    private void doStartThread() {
        this.executor.execute(new Runnable() {
            public void run() {
                //執(zhí)行 run() 方法
                SingleThreadEventExecutor.this.run();
            }
        });
    }
    
    //抽象方法
    protected abstract void run();
}

第四步: 子類 NioEventLoop 實(shí)現(xiàn)抽象方法 run (),這里是 run () 方法是一個(gè)死循環(huán),并且執(zhí)行三個(gè)核心事件,分別是 “監(jiān)聽端口”、“處理端口事件”、“處理隊(duì)列事件”。

public final class NioEventLoop extends SingleThreadEventLoop {
    protected void run() {
        while(true) {
            //省略
        }
    }
}

run () 方法里面核心執(zhí)行了 this.processSelectedKeys()this.runAllTasks()。

4.4 核心總結(jié)

每次執(zhí)行 execute () 時(shí),都是向隊(duì)列里面添加任務(wù),當(dāng)?shù)谝淮翁砑訒r(shí)則先啟動(dòng)線程,并且執(zhí)行子類 NioEventLoop 的 run () 方法。而該 run () 是整個(gè) EventLoop 的核心,主要的核心功能如下:

  1. this.select(),該方法是一個(gè)堵塞方法,主要是監(jiān)聽客戶端事件;
  2. this.processSelectedKeys(),該方法獲取監(jiān)聽到的客戶端事件,并將其進(jìn)行分發(fā);
  3. this.runAllTasks(),執(zhí)行隊(duì)列里面的任務(wù)。

5. 小結(jié)

本節(jié)主要是講解 EventLoop,它是 Netty 的線程模型的核心,Netty 已經(jīng)幫我們進(jìn)行了高度的封裝,不需要懂得其原理也不會(huì)影響 Netty 的使用,但是如果能了解其核心原理,可以讓我們更加深刻的理解 Netty 是如何運(yùn)轉(zhuǎn)的。