Netty ChannelPipeline 數(shù)據(jù)管道
1. 前言
前面,我們也提到了 ChannelPipeline,它是管道或者說(shuō)管理 Handler 的集合,很多同學(xué)很容易搞混 Channel、ChannelPipeline 和 ChannelHandler 之間關(guān)系。
本節(jié)內(nèi)容我們需要理清并且掌握以下知識(shí)點(diǎn):
- Channel、ChannelPipeline、ChannelHandler 之間的關(guān)系;
- 了解 ChannelPipeline 如何管理 ChannelHandler。
2. 三者之間的關(guān)系
Channel 是一個(gè)連接通道,客戶端和服務(wù)端連接成功之后,會(huì)維持一個(gè) Channel,可以通過(guò) Channel 來(lái)發(fā)送數(shù)據(jù)。Channel 有且僅有一個(gè) ChannelPipeline 與之相對(duì)應(yīng),ChannelPipeline 又維護(hù)著一個(gè)由多個(gè) ChannelHandlerContext 組成的雙向鏈表,ChannelHandlerContext 又關(guān)聯(lián)著一個(gè) ChannelHandler。
它們之間的關(guān)系,大概如下圖所示:
3. ChannelPipeline 核心方法
ChannelPipeline 的最常用方法:
方法 | 描述 |
---|---|
addFirst(…) | 添加 ChannelHandler 在 ChannelPipeline 的第一個(gè)位置 |
addBefore(…) | 在 ChannelPipeline 中指定的 ChannelHandler 名稱之前添加 ChannelHandler |
addAfter(…) | 在 ChannelPipeline 中指定的 ChannelHandler 名稱之后添加 ChannelHandler |
addLast(…) | 在 ChannelPipeline 的末尾添加 ChannelHandler |
remove(…) | 刪除 ChannelPipeline 中指定的 ChannelHandler |
replace(…) | 替換 ChannelPipeline 中指定的 ChannelHandler |
ChannelHandler first() | 獲取鏈表當(dāng)中的第一個(gè)節(jié)點(diǎn) |
ChannelHandler last() | 獲取鏈表當(dāng)中的最后一個(gè)節(jié)點(diǎn) |
實(shí)例:
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new Handler1());
ch.pipeline().addLast(new Handler2());
ch.pipeline().addLast(new Handler3());
ch.pipeline().addLast(new Handler4());
}
});
總結(jié),ChannelPipeline 的用法比較固定,雖然方法很多,但是一般常用的就是 addLast。
4. ChannelPipeline 管理鏈表
實(shí)例:
//1.創(chuàng)建ChannelPipeline
ChannelPipeline pipeline = ch.pipeline();
//2.創(chuàng)建Handler
FirstHandler firstHandler = new FirstHandler();
SecondHandler secondHandler=new SecondHandler();
ThirdHandler thirdHandler=new ThirdHandler();
FourthHandler fourthHandler=new FourthHandler();
//3.操作
pipeline.addLast("handler1", firstHandler);
pipeline.addFirst("handler2", secondHandler);//在最開始添加
pipeline.addLast("handler3", thirdHandler);//在最后添加
pipeline.remove("handler3"); //根據(jù)名稱刪除
pipeline.remove(firstHandler);//根據(jù)對(duì)象刪除
pipeline.replace("handler2", "handler4", fourthHandler);//替換
輸出結(jié)果:
FourthHandler
5. 入站和出站執(zhí)行順序
在真實(shí)的項(xiàng)目開發(fā)當(dāng)中,inboundHandler 和 outboundHandler 都是多個(gè)的,一般是一個(gè)業(yè)務(wù)處理對(duì)應(yīng)一個(gè) Handler。那么多個(gè)的情況下,Pipeline 的執(zhí)行順序又是怎么樣的呢?
5.1 Inbound 不往下傳遞
實(shí)例:
ch.pipeline().addLast(new InboundHandler1());
ch.pipeline().addLast(new InboundHandler2());
public class InboundHandler1 extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("inbound1>>>>>>>>>");
}
}
public class InboundHandler2 extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("inbound2>>>>>>>>>");
}
}
執(zhí)行結(jié)果:
inbound1>>>>>>>>>
思考:為什么不執(zhí)行 InboundHandler2 呢?
原因:InboundHandler1 沒(méi)有手工往下傳遞執(zhí)行。
5.2 Inbound 流轉(zhuǎn)順序
實(shí)例:
ch.pipeline().addLast(new InboundHandler1());
ch.pipeline().addLast(new InboundHandler2());
public class InboundHandler1 extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("inbound1>>>>>>>>>");
//往下傳遞
super.channelRead(ctx, msg);
}
}
public class InboundHandler2 extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("inbound2>>>>>>>>>");
}
}
執(zhí)行結(jié)果:
inbound1>>>>>>>>>
inbound2>>>>>>>>>
InboundHandler 之間可以通過(guò) super.channelRead(ctx, msg);
往下傳遞。
5.3 Inbound 執(zhí)行順序
實(shí)例:
ch.pipeline().addLast(new InboundHandler1());
ch.pipeline().addLast(new InboundHandler2());
public class InboundHandler1 extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//1.往下傳遞
super.channelRead(ctx, msg);
//2.打印信息
System.out.println("inbound1>>>>>>>>>");
}
}
public class InboundHandler2 extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("inbound2>>>>>>>>>");
}
}
執(zhí)行結(jié)果:
inbound2>>>>>>>>>
inbound1>>>>>>>>>
InboundHandler1 先往下傳遞,在執(zhí)行自己的業(yè)務(wù),那么 InboundHandler2 就會(huì)比 InboundHandler1 先執(zhí)行。
總結(jié):Inbound 是按順序進(jìn)行傳遞,但是邏輯的執(zhí)行并非是按順序執(zhí)行,而是由
super.channelRead(ctx, msg);
去決定。
5.4 流轉(zhuǎn)到 Outbound
InboundHandler 往 OutboundHandler 流轉(zhuǎn),需要手工調(diào)用 ctx.channel().writeAndFlush()
,否則無(wú)法執(zhí)行 OutboundHandler 的業(yè)務(wù)邏輯。
實(shí)例:
public class InboundHandler2 extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("inbound2>>>>>>>>>");
//傳遞到OutboundHandler
ctx.channel().writeAndFlush("hello world");
}
}
5.5 Outbound 內(nèi)部流轉(zhuǎn)
跟 InboundHandler 一樣,需要手工往下傳遞,否則無(wú)法流轉(zhuǎn)到下一個(gè) OutboundHandler。
實(shí)例:
public class OutboundHandler2 extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("outbound2>>>>>>>>>");
//往下流轉(zhuǎn)
super.write(ctx, msg, promise);
}
}
總結(jié):OutboundHandler 是按逆向來(lái)流轉(zhuǎn),但是業(yè)務(wù)邏輯的執(zhí)行順序則是由
super.write(ctx, msg, promise);
決定。
5.6 ctx.writeAndFlush 和 ctx.channel ().writeAndFlush 的區(qū)別
很多同學(xué)很容易遇到以下問(wèn)題,并且會(huì)想不通。
實(shí)例:
ch.pipeline().addLast(new InboundHandler1());
ch.pipeline().addLast(new InboundHandler2());
ch.pipeline().addLast(new OutboundHandler1());
ch.pipeline().addLast(new OutboundHandler2());
InboundHandler2 流轉(zhuǎn)代碼
public class InboundHandler2 extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("inbound2>>>>>>>>>");
//流轉(zhuǎn)到OutboundHandler2
ctx.channel().writeAndFlush("hello world");
}
}
執(zhí)行結(jié)果:
inbound1>>>>>>>>>
inbound2>>>>>>>>>
outbound2>>>>>>>>>
outbound1>>>>>>>>>
修改 InboundHandler2 流轉(zhuǎn)代碼
public class InboundHandler2 extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("inbound2>>>>>>>>>");
//【注意,調(diào)整這里了】
ctx.writeAndFlush("hello world");
}
}
執(zhí)行結(jié)果
inbound1>>>>>>>>>
inbound2>>>>>>>>>
思考:為什么這里使用 ctx.writeAndFlush 就流程不下去了呢?
ctx.writeAndFlush();
最終源碼
private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while(!ctx.outbound);
return ctx;
}
通過(guò)源碼,我們發(fā)現(xiàn)它是從當(dāng)前 InboundHandler 開始往前執(zhí)行。
ctx.channel().writeAndFlush();
最終源碼
public final ChannelFuture writeAndFlush(Object msg) {
return this.tail.writeAndFlush(msg);
}
通過(guò)源碼,我們發(fā)現(xiàn)它是從鏈表的最后一個(gè)節(jié)點(diǎn)開始往前面執(zhí)行。
總結(jié),如果是 OutboundHandler 放在 InboundHandler 之后,使用不同的 writeAndFlush 則得到的結(jié)果不一樣。
5.7 規(guī)律總結(jié)
Inbound 的順序
- 流轉(zhuǎn)順序: 多個(gè) Inbound 不會(huì)自動(dòng)往下流轉(zhuǎn),需要手工調(diào)用
ctx.fireChannelRead(msg);
才能流轉(zhuǎn)到下一個(gè); - 執(zhí)行順序: 業(yè)務(wù)邏輯的執(zhí)行順序,則根據(jù)
ctx.fireChannelRead(msg);
和邏輯的先后順序所決定; - Inbound 往 Outbound 流轉(zhuǎn),則需要手工
ctx.channel().writeAndFlush()
。
Outbound 的順序
- 流轉(zhuǎn)順序: 多個(gè) Outbound 不會(huì)自動(dòng)往下流轉(zhuǎn),需要手工調(diào)用
ctx.write(msg, promise);
才能流轉(zhuǎn)到下一個(gè); - 執(zhí)行順序: 業(yè)務(wù)邏輯的執(zhí)行順序,則根據(jù)
ctx.write(msg, promise);
和邏輯的先后順序所決定。
6. 小結(jié)
本文主要講解的知識(shí)點(diǎn)
- Channel、ChannelPipeline、ChannelHandlerContext、ChannelHandler 之間的關(guān)系;
- ChannelPipeline 的核心方法,以及它是如何管理 Handler 的,主要通過(guò) addLast () 去組裝 Handler;
- 入站和出站的流轉(zhuǎn)順序和業(yè)務(wù)邏輯的執(zhí)行順序。