Netty 通訊協(xié)議功能實(shí)現(xiàn)
1. 前言
上節(jié)內(nèi)容,我們主要講解了 Netty 通訊協(xié)議設(shè)計(jì),其實(shí)思路很簡(jiǎn)單就是核心的四個(gè)字段,分別是協(xié)議標(biāo)識(shí)符、數(shù)據(jù)長(zhǎng)度、指令、數(shù)據(jù)。還有其中涉及的技術(shù)主要是序列化和反序列化技術(shù)以及字節(jié)容器。那么本節(jié)主要是基于這個(gè)思想去實(shí)現(xiàn)我們的自定義協(xié)議,并且測(cè)試客戶端循環(huán) 1000 遍發(fā)送數(shù)據(jù)是否還會(huì)出現(xiàn)粘包和拆包問(wèn)題。
技術(shù)棧說(shuō)明
- 主要是使用對(duì)象流進(jìn)行序列化和反序列化(ObjectInputStream 和 ObjectOutputStream);
- 字節(jié)容器主要是以 Netty 的 ByteBuf 來(lái)管理字節(jié)。
2. 實(shí)現(xiàn)流程
3. 功能實(shí)現(xiàn)
3.1 編碼實(shí)現(xiàn)
實(shí)例:
public class MyEncoder extends MessageToByteEncoder<User> {
protected void encode(ChannelHandlerContext channelHandlerContext,
User user,
ByteBuf byteBuf) throws Exception {
//1.創(chuàng)建一個(gè)內(nèi)存輸出流
ByteArrayOutputStream os = new ByteArrayOutputStream();
//2.創(chuàng)建一個(gè)對(duì)象輸出流
ObjectOutputStream oos = new ObjectOutputStream(os);
//3.把user對(duì)象寫到內(nèi)存流里面
oos.writeObject(user);
//4.通過(guò)內(nèi)存流獲取user對(duì)象轉(zhuǎn)換后的字節(jié)數(shù)字
byte[] bytes=os.toByteArray();
//5.關(guān)閉流
oos.close();
os.close();
//6.根據(jù)協(xié)議組裝數(shù)據(jù)
byteBuf.writeInt(1);//標(biāo)識(shí)
byteBuf.writeByte(1);//指令
byteBuf.writeInt(bytes.length);//長(zhǎng)度
byteBuf.writeBytes(bytes);//數(shù)據(jù)內(nèi)容
}
}
代碼說(shuō)明:
- 自定義一個(gè)編碼器,把客戶端向服務(wù)端發(fā)送的數(shù)據(jù)進(jìn)行加工,主要是轉(zhuǎn)換字節(jié)流,然后根據(jù)自定義協(xié)議來(lái)組裝數(shù)據(jù);
- 標(biāo)識(shí)占用四個(gè)字節(jié),使用 writeInt (),一個(gè) int 表示四個(gè)字節(jié);
- 指令占用一個(gè)字節(jié),因此使用 writeByte () 即可;
- 數(shù)據(jù)長(zhǎng)度占用四個(gè)字節(jié),因此使用 writeByte (),int 表示的最大值一般來(lái)說(shuō)足夠表示數(shù)據(jù)的內(nèi)容了,除非特別特別大的數(shù)據(jù)(比如:超級(jí)大文件的傳輸)則可以使用 writeLong () 來(lái)表示數(shù)據(jù)長(zhǎng)度。
3.2 解碼實(shí)現(xiàn)
實(shí)例:
public class MyDecoder extends ByteToMessageDecoder {
protected void decode(
ChannelHandlerContext channelHandlerContext,
ByteBuf byteBuf, List<Object> list) throws Exception {
//1.根據(jù)協(xié)議分別取出對(duì)應(yīng)的數(shù)據(jù)
int tag=byteBuf.readInt();//標(biāo)識(shí)符
byte code=byteBuf.readByte();//指令
int len=byteBuf.readInt();//長(zhǎng)度
byte[] bytes=new byte[len];//定義一個(gè)字節(jié)數(shù)據(jù),長(zhǎng)度是數(shù)據(jù)的長(zhǎng)度
byteBuf.readBytes(bytes);//往字節(jié)數(shù)組讀取數(shù)據(jù)
//2.通過(guò)對(duì)象流來(lái)轉(zhuǎn)換字節(jié)流,轉(zhuǎn)換成User對(duì)象
ByteArrayInputStream is=new ByteArrayInputStream(bytes);
ObjectInputStream iss=new ObjectInputStream(is);
User user=(User)iss.readObject();
is.close();
iss.close();
list.add(user);
}
}
代碼說(shuō)明:
這里主要是實(shí)現(xiàn)了解碼器,主要目的是通過(guò)自定義協(xié)議來(lái)分別讀取對(duì)應(yīng)的數(shù)據(jù),并且通過(guò)對(duì)象流來(lái)反序列化字節(jié)流。
3.3 發(fā)送方 Handler
public class ClientTestHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for(int i=0;i<1000;i++){
User user=new User();
user.setName(i+"->zwy");
user.setAge(18);
//注意,這里直接寫user對(duì)象,無(wú)需再手工轉(zhuǎn)換字節(jié)流了,編碼器會(huì)自動(dòng)幫忙處理。
ctx.channel().writeAndFlush(user);
}
}
}
代碼說(shuō)明:
客戶端在鏈接就緒時(shí),使用 for 循環(huán)給服務(wù)端發(fā)送數(shù)據(jù),主要目的是檢測(cè)是否會(huì)產(chǎn)生數(shù)據(jù)粘包和拆包問(wèn)題。
3.4 接受方 Handler
實(shí)例:
public class ServerTestHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
User user=(User)msg;
System.out.println(user.toString());
}
}
3.5 加入 Pipeline
客戶端
//1.拆包器
ch.pipeline().addLast(
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,5,4)
);
//2.自定義編碼器
ch.pipeline().addLast(new MyDecoder());
ch.pipeline().addLast(new MyEncoder());
//3.業(yè)務(wù)處理Handler
ch.pipeline().addLast(new ClientTestHandler());
服務(wù)端
//1.Netty內(nèi)置拆包器
ch.pipeline().addLast(
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,5,4)
);
//2.自定義解碼器
ch.pipeline().addLast(new MyDecoder());
ch.pipeline().addLast(new MyEncoder());
//3.業(yè)務(wù)Handler
ch.pipeline().addLast(new ServerTestHandler());
代碼說(shuō)明:
- 需要往雙向鏈表里面加入三個(gè)特殊的 Handler,分別是
LengthFieldBasedFrameDecoder
和自定義的編碼器、解碼器; LengthFieldBasedFrameDecoder
拆包器的構(gòu)造函數(shù)字段說(shuō)明,分別如下所示:
2.1 第一個(gè)參數(shù),maxFrameLength:解碼時(shí),處理每個(gè)幀數(shù)據(jù)的最大長(zhǎng)度,一般來(lái)說(shuō)直接賦予Integer.MAX_VALUE
即可;
2.2 第二個(gè)參數(shù),lengthFieldOffset :存放幀數(shù)據(jù)的長(zhǎng)度數(shù)據(jù)的起始位(偏移位),通俗點(diǎn)說(shuō),就是表示數(shù)據(jù)長(zhǎng)度的字段在整個(gè)協(xié)議里面所處的位置,由于協(xié)議的結(jié)果是:協(xié)議標(biāo)識(shí)(4 個(gè)字節(jié))、指令(1 個(gè)字節(jié))、數(shù)據(jù)長(zhǎng)度(4 個(gè)字節(jié)),因此數(shù)據(jù)長(zhǎng)度處于第 5 個(gè)位置;
2.3 第三個(gè)參數(shù),lengthFieldLength:長(zhǎng)度屬性的長(zhǎng)度,即存放整個(gè)大數(shù)據(jù)包長(zhǎng)度的字節(jié)所占的長(zhǎng)度,這里是 4 個(gè)字節(jié)。
疑問(wèn):為什么需要加
LengthFieldBasedFrameDecoder
呢?回答:自定義協(xié)議它是無(wú)法知道數(shù)據(jù)包是什么時(shí)候應(yīng)該結(jié)束,需要依賴 Netty 提供的拆包器。
3.6 運(yùn)行效果
先啟動(dòng)服務(wù)端,然后啟動(dòng)客戶端,打印結(jié)果沒(méi)有出現(xiàn)粘包和拆包問(wèn)題,證明我們自定義的協(xié)議有效,最終運(yùn)行效果如下所示:
4. LengthFieldBaseFrameDecoder
這里,主要簡(jiǎn)單的介紹該拆包器,因?yàn)樗俏覀兤綍r(shí)開(kāi)發(fā)當(dāng)中最常用的拆包器, 幾乎所有和長(zhǎng)度相關(guān)的二進(jìn)制協(xié)議都可以通過(guò)它來(lái)實(shí)現(xiàn),因此在這里簡(jiǎn)單的介紹一下它的原理。
思考:如果讓我們簡(jiǎn)單實(shí)現(xiàn)一個(gè)自己的拆包器,那么我們應(yīng)該如何去實(shí)現(xiàn)呢?
其實(shí)原理很簡(jiǎn)單,就是不斷從 TCP 緩沖區(qū)中讀取數(shù)據(jù),每次讀取完都需要判斷是否是一個(gè)完整的數(shù)據(jù)包。
- 如果當(dāng)前讀取的數(shù)據(jù)不足以拼接成一個(gè)完整的業(yè)務(wù)數(shù)據(jù)包,那就保留該數(shù)據(jù),繼續(xù)從 tcp 緩沖區(qū)中讀取,直到得到一個(gè)完整的數(shù)據(jù)包;
- 如果當(dāng)前讀到的數(shù)據(jù)加上已經(jīng)讀取的數(shù)據(jù)足夠拼接成一個(gè)數(shù)據(jù)包,那就將已經(jīng)讀取的數(shù)據(jù)拼接上本次讀取的數(shù)據(jù),夠成一個(gè)完整的業(yè)務(wù)數(shù)據(jù)包傳遞到下一個(gè)節(jié)點(diǎn)進(jìn)行處理。如果拼接完一個(gè)數(shù)據(jù)包時(shí)還有多余的數(shù)據(jù)仍然保留,以便和下次讀到的數(shù)據(jù)進(jìn)行拼接;
- Netty 中的拆包也是如上這個(gè)原理,內(nèi)部會(huì)有一個(gè)累加器,每次讀取到數(shù)據(jù)都會(huì)不斷累加,然后嘗試對(duì)累加到的數(shù)據(jù)進(jìn)行拆包,拆成一個(gè)完整的業(yè)務(wù)數(shù)據(jù)包,這個(gè)基類叫做
ByteToMessageDecoder
。
5. 小結(jié)
本節(jié),主要是根據(jù)上節(jié)設(shè)計(jì)的通訊協(xié)議來(lái)具體的實(shí)現(xiàn)效果,主要掌握的核心步驟是:
- 需要依賴
LengthFieldBaseFrameDecoder
拆包器,并且需要了解該拆包器的參數(shù)定義和大概原理; - 掌握編碼器和解碼器的實(shí)現(xiàn),主要是在編碼器和解碼器里面實(shí)現(xiàn)協(xié)議的數(shù)據(jù)粘包和數(shù)據(jù)拆包。