Netty 通訊協(xié)議功能實現(xiàn)
1. 前言
上節(jié)內(nèi)容,我們主要講解了 Netty 通訊協(xié)議設計,其實思路很簡單就是核心的四個字段,分別是協(xié)議標識符、數(shù)據(jù)長度、指令、數(shù)據(jù)。還有其中涉及的技術主要是序列化和反序列化技術以及字節(jié)容器。那么本節(jié)主要是基于這個思想去實現(xiàn)我們的自定義協(xié)議,并且測試客戶端循環(huán) 1000 遍發(fā)送數(shù)據(jù)是否還會出現(xiàn)粘包和拆包問題。
技術棧說明
- 主要是使用對象流進行序列化和反序列化(ObjectInputStream 和 ObjectOutputStream);
- 字節(jié)容器主要是以 Netty 的 ByteBuf 來管理字節(jié)。
2. 實現(xiàn)流程
3. 功能實現(xiàn)
3.1 編碼實現(xiàn)
實例:
public class MyEncoder extends MessageToByteEncoder<User> {
protected void encode(ChannelHandlerContext channelHandlerContext,
User user,
ByteBuf byteBuf) throws Exception {
//1.創(chuàng)建一個內(nèi)存輸出流
ByteArrayOutputStream os = new ByteArrayOutputStream();
//2.創(chuàng)建一個對象輸出流
ObjectOutputStream oos = new ObjectOutputStream(os);
//3.把user對象寫到內(nèi)存流里面
oos.writeObject(user);
//4.通過內(nèi)存流獲取user對象轉(zhuǎn)換后的字節(jié)數(shù)字
byte[] bytes=os.toByteArray();
//5.關閉流
oos.close();
os.close();
//6.根據(jù)協(xié)議組裝數(shù)據(jù)
byteBuf.writeInt(1);//標識
byteBuf.writeByte(1);//指令
byteBuf.writeInt(bytes.length);//長度
byteBuf.writeBytes(bytes);//數(shù)據(jù)內(nèi)容
}
}
代碼說明:
- 自定義一個編碼器,把客戶端向服務端發(fā)送的數(shù)據(jù)進行加工,主要是轉(zhuǎn)換字節(jié)流,然后根據(jù)自定義協(xié)議來組裝數(shù)據(jù);
- 標識占用四個字節(jié),使用 writeInt (),一個 int 表示四個字節(jié);
- 指令占用一個字節(jié),因此使用 writeByte () 即可;
- 數(shù)據(jù)長度占用四個字節(jié),因此使用 writeByte (),int 表示的最大值一般來說足夠表示數(shù)據(jù)的內(nèi)容了,除非特別特別大的數(shù)據(jù)(比如:超級大文件的傳輸)則可以使用 writeLong () 來表示數(shù)據(jù)長度。
3.2 解碼實現(xiàn)
實例:
public class MyDecoder extends ByteToMessageDecoder {
protected void decode(
ChannelHandlerContext channelHandlerContext,
ByteBuf byteBuf, List<Object> list) throws Exception {
//1.根據(jù)協(xié)議分別取出對應的數(shù)據(jù)
int tag=byteBuf.readInt();//標識符
byte code=byteBuf.readByte();//指令
int len=byteBuf.readInt();//長度
byte[] bytes=new byte[len];//定義一個字節(jié)數(shù)據(jù),長度是數(shù)據(jù)的長度
byteBuf.readBytes(bytes);//往字節(jié)數(shù)組讀取數(shù)據(jù)
//2.通過對象流來轉(zhuǎn)換字節(jié)流,轉(zhuǎn)換成User對象
ByteArrayInputStream is=new ByteArrayInputStream(bytes);
ObjectInputStream iss=new ObjectInputStream(is);
User user=(User)iss.readObject();
is.close();
iss.close();
list.add(user);
}
}
代碼說明:
這里主要是實現(xiàn)了解碼器,主要目的是通過自定義協(xié)議來分別讀取對應的數(shù)據(jù),并且通過對象流來反序列化字節(jié)流。
3.3 發(fā)送方 Handler
public class ClientTestHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for(int i=0;i<1000;i++){
User user=new User();
user.setName(i+"->zwy");
user.setAge(18);
//注意,這里直接寫user對象,無需再手工轉(zhuǎn)換字節(jié)流了,編碼器會自動幫忙處理。
ctx.channel().writeAndFlush(user);
}
}
}
代碼說明:
客戶端在鏈接就緒時,使用 for 循環(huán)給服務端發(fā)送數(shù)據(jù),主要目的是檢測是否會產(chǎn)生數(shù)據(jù)粘包和拆包問題。
3.4 接受方 Handler
實例:
public class ServerTestHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
User user=(User)msg;
System.out.println(user.toString());
}
}
3.5 加入 Pipeline
客戶端
//1.拆包器
ch.pipeline().addLast(
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,5,4)
);
//2.自定義編碼器
ch.pipeline().addLast(new MyDecoder());
ch.pipeline().addLast(new MyEncoder());
//3.業(yè)務處理Handler
ch.pipeline().addLast(new ClientTestHandler());
服務端
//1.Netty內(nèi)置拆包器
ch.pipeline().addLast(
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,5,4)
);
//2.自定義解碼器
ch.pipeline().addLast(new MyDecoder());
ch.pipeline().addLast(new MyEncoder());
//3.業(yè)務Handler
ch.pipeline().addLast(new ServerTestHandler());
代碼說明:
- 需要往雙向鏈表里面加入三個特殊的 Handler,分別是
LengthFieldBasedFrameDecoder
和自定義的編碼器、解碼器; LengthFieldBasedFrameDecoder
拆包器的構造函數(shù)字段說明,分別如下所示:
2.1 第一個參數(shù),maxFrameLength:解碼時,處理每個幀數(shù)據(jù)的最大長度,一般來說直接賦予Integer.MAX_VALUE
即可;
2.2 第二個參數(shù),lengthFieldOffset :存放幀數(shù)據(jù)的長度數(shù)據(jù)的起始位(偏移位),通俗點說,就是表示數(shù)據(jù)長度的字段在整個協(xié)議里面所處的位置,由于協(xié)議的結果是:協(xié)議標識(4 個字節(jié))、指令(1 個字節(jié))、數(shù)據(jù)長度(4 個字節(jié)),因此數(shù)據(jù)長度處于第 5 個位置;
2.3 第三個參數(shù),lengthFieldLength:長度屬性的長度,即存放整個大數(shù)據(jù)包長度的字節(jié)所占的長度,這里是 4 個字節(jié)。
疑問:為什么需要加
LengthFieldBasedFrameDecoder
呢?回答:自定義協(xié)議它是無法知道數(shù)據(jù)包是什么時候應該結束,需要依賴 Netty 提供的拆包器。
3.6 運行效果
先啟動服務端,然后啟動客戶端,打印結果沒有出現(xiàn)粘包和拆包問題,證明我們自定義的協(xié)議有效,最終運行效果如下所示:
4. LengthFieldBaseFrameDecoder
這里,主要簡單的介紹該拆包器,因為它是我們平時開發(fā)當中最常用的拆包器, 幾乎所有和長度相關的二進制協(xié)議都可以通過它來實現(xiàn),因此在這里簡單的介紹一下它的原理。
思考:如果讓我們簡單實現(xiàn)一個自己的拆包器,那么我們應該如何去實現(xiàn)呢?
其實原理很簡單,就是不斷從 TCP 緩沖區(qū)中讀取數(shù)據(jù),每次讀取完都需要判斷是否是一個完整的數(shù)據(jù)包。
- 如果當前讀取的數(shù)據(jù)不足以拼接成一個完整的業(yè)務數(shù)據(jù)包,那就保留該數(shù)據(jù),繼續(xù)從 tcp 緩沖區(qū)中讀取,直到得到一個完整的數(shù)據(jù)包;
- 如果當前讀到的數(shù)據(jù)加上已經(jīng)讀取的數(shù)據(jù)足夠拼接成一個數(shù)據(jù)包,那就將已經(jīng)讀取的數(shù)據(jù)拼接上本次讀取的數(shù)據(jù),夠成一個完整的業(yè)務數(shù)據(jù)包傳遞到下一個節(jié)點進行處理。如果拼接完一個數(shù)據(jù)包時還有多余的數(shù)據(jù)仍然保留,以便和下次讀到的數(shù)據(jù)進行拼接;
- Netty 中的拆包也是如上這個原理,內(nèi)部會有一個累加器,每次讀取到數(shù)據(jù)都會不斷累加,然后嘗試對累加到的數(shù)據(jù)進行拆包,拆成一個完整的業(yè)務數(shù)據(jù)包,這個基類叫做
ByteToMessageDecoder
。
5. 小結
本節(jié),主要是根據(jù)上節(jié)設計的通訊協(xié)議來具體的實現(xiàn)效果,主要掌握的核心步驟是:
- 需要依賴
LengthFieldBaseFrameDecoder
拆包器,并且需要了解該拆包器的參數(shù)定義和大概原理; - 掌握編碼器和解碼器的實現(xiàn),主要是在編碼器和解碼器里面實現(xiàn)協(xié)議的數(shù)據(jù)粘包和數(shù)據(jù)拆包。