Netty ChannelFuture 異步監(jiān)聽
1. 前言
本節(jié)主要講解 ChannelFuture ,它的作用是用來保存 Channel 異步操作的結(jié)果,可以看作是一個異步操作結(jié)果的占位符。
2. 概念
在 Netty 中所有的 IO 操作都是異步的,不能立刻得到 IO 操作的執(zhí)行結(jié)果,但是可以通過注冊一個監(jiān)聽器來監(jiān)聽其執(zhí)行結(jié)果。在 Java 的并發(fā)編程當中可以通過 Future 來進行異步結(jié)果的監(jiān)聽,但是在 Netty 當中是通過 ChannelFuture 來實現(xiàn)異步結(jié)果的監(jiān)聽。通過注冊一個監(jiān)聽的方式進行監(jiān)聽,當操作執(zhí)行成功或者失敗時監(jiān)聽會自動觸發(fā)注冊的監(jiān)聽事件。
3. 應(yīng)用場景
ChannelFture 在開發(fā)當中經(jīng)常需要用到,可以用來監(jiān)聽客戶端連接服務(wù)端的結(jié)果反饋,Netty 是異步操作,無法知道什么時候執(zhí)行完成,因此可以通過 ChannelFuture 來進行執(zhí)行結(jié)果的監(jiān)聽。在 Netty 當中 Bind 、Write 、Connect 等操作會簡單的返回一個 ChannelFuture。
4. 核心方法
序號 | 方法 | 描述 |
---|---|---|
1 | addListener | 注冊監(jiān)聽器,當操作已完成 (isDone 方法返回完成),將會通知指定的監(jiān)聽器;如果 Future 對象已完成,則通知指定的監(jiān)聽器 |
2 | removeListener | 移除監(jiān)聽器 |
3 | sync | 等待異步操作執(zhí)行完畢 |
4 | await | 等待異步操作執(zhí)行完畢 |
5 | isDone | 判斷當前操作是否完成 |
6 | isSuccess | 判斷已完成的當前操作是否成功 |
7 | isCancellable | 判斷已完成的當前操作是否被取消 |
8 | cause | 獲取已完成的當前操作失敗的原因 |
sync () 和 await () 都是等待異步操作執(zhí)行完成,那么它們有什么區(qū)別呢?
- sync () 會拋出異常,建議使用 sync ();
- await () 不會拋出異常,主線程無法捕捉子線程執(zhí)行拋出的異常。
5. 深入了解 ChannelFuture
5.1 生命周期說明
Future 可以通過四個核心方法來判斷任務(wù)的執(zhí)行情況。
狀態(tài) | 說明 |
---|---|
isDone() | 任務(wù)是否執(zhí)行完成,無論成功還是失敗 |
isSuccess() | 任務(wù)是否執(zhí)行采購 |
isCancelled() | 任務(wù)是否被取消 |
cause() | 獲取執(zhí)行異常信息 |
執(zhí)行過程狀態(tài)的改變說明
當一個異步任務(wù)操作開始的時候,一個新的 future 對象就會被創(chuàng)建。在開始的時候該 future 是處于未完成的狀態(tài),也就是說,isDone ()=false、isSuccess ()=false、isCancelled ()=false;只要該任務(wù)中任何一種狀態(tài)結(jié)束了,無論是說成功、失敗、或者被取消,那么整個 Future 就會被標記為已完成。注意的是,如果執(zhí)行失敗那么 cause () 方法會返回異常信息的內(nèi)容。
實例:
ChannelFuture channelFuture=bootstrap.connect("127.0.0.1",80);
channelFuture.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
if(future.isDone()){
if(future.isSuccess()){
System.out.println("執(zhí)行成功...");
}else if(future.isCancelled()){
System.out.println("任務(wù)被取消...");
}else if(future.cause()!=null){
System.out.println("執(zhí)行出錯:"+future.cause().getMessage());
}
}
}
});
5.2 ChannelFuture 父接口說明
ChannelFuture 的類繼承結(jié)構(gòu),具體如下所示:
public interface ChannelFuture extends Future<Void> {
}
public interface Future<V> extends java.util.concurrent.Future<V> {
}
通過上面的繼承關(guān)系,我們可以清晰的知道 ChannelFuture 其實最頂層的接口是來自 java 并發(fā)包的 Future,java 并發(fā)包下的 Future 需要手工檢查執(zhí)行結(jié)果是否已經(jīng)完成,非常的繁瑣,因此 Netty 把它進行了封裝和完善,變成了自動的監(jiān)聽,用起來變的非常的簡單。
java 并發(fā)包下的 Future 主要存在以下幾個缺陷:
- 只允許手動通過 get () 來檢查對應(yīng)的操作是否已經(jīng)完成,它是堵塞直到子線程完成執(zhí)行并且返回結(jié)果;
- 只有 isDone () 方法判斷一個異步操作是否完成,但是對于完成的定義過于模糊,JDK 文檔指出正常終止、拋出異常、用戶取消都會使 isDone () 方法返回真。并不能很好的區(qū)分到底是哪種狀態(tài)。
get () 方法是堵塞的,必須等待子線程執(zhí)行完成才能往下執(zhí)行。
實例:
//1.定義一個子線程,實現(xiàn) Callable 接口
public class ThreadTest implements Callable<Integer>{
@Override
public Integer call(){
//打印
System.out.println(">>>>>>>>子線程休眠之前");
//休眠5秒
Thread.sleep(5000);
//打印
System.out.println(">>>>>>>>子線程休眠之后");
return 1;
}
}
//2.調(diào)用子線程處理
public static void main(String[] args){
ThreadTest t=new ThreadTest();
FutureTask<Integer> future=new FutureTask<Integer>(t);
//2.1.開始執(zhí)行子線程
new Thread(future).start();
//2.2.手工返回結(jié)果
int result=future.get();
System.out.println(">>>>>>>>執(zhí)行結(jié)果:"+result);
//2.3.操作數(shù)據(jù)庫
userDao.updateStatus("1");
}
執(zhí)行結(jié)果:
>>>>>>>>子線程休眠之前
>>>>>>>>子線程休眠之后
>>>>>>>>執(zhí)行結(jié)果:1
結(jié)論總結(jié):
- 說明了 Java 并發(fā)包的 Future 要想獲取異步執(zhí)行結(jié)果,必須手工調(diào)用 get () 方法,此時雖然能獲取執(zhí)行結(jié)果,但是無法知道執(zhí)行結(jié)果是成功還是失??;
- 使用 get () 獲取執(zhí)行結(jié)果,但是 get () 后面的業(yè)務(wù)則被堵塞,直到后面執(zhí)行完畢才會往下執(zhí)行,失去了異步操作提高執(zhí)行效率的意義了。
6. ChannelFuture 原理
6.1 線程堵塞
思考:sync () 和 await () 方法如何同步等待執(zhí)行完成并獲取執(zhí)行結(jié)果的呢?
源碼分析如下所示:
private short waiters;//計數(shù)器
@Override
public Promise<V> await() throws InterruptedException {
//1.判斷是否執(zhí)行完成,如果執(zhí)行完成則返回
if (isDone()) {
return this;
}
//2.線程是否已經(jīng)中斷,如果中斷則拋異常
if (Thread.interrupted()) {
throw new InterruptedException(toString());
}
//3.檢查死鎖
checkDeadLock();
//4.同步代碼塊->while循環(huán)不斷的監(jiān)聽執(zhí)行結(jié)果
synchronized (this) {
while (!isDone()) {
incWaiters();//waiters遞增
try {
wait();//JDK 的 Object 方法,線程等待【核心】
} finally {
decWaiters();//waiters 遞減
}
}
}
return this;
}
//遞增函數(shù)
private void incWaiters() {
if (waiters == Short.MAX_VALUE) {
throw new IllegalStateException("too many waiters: " + this);
}
++waiters;
}
//遞減函數(shù)
private void decWaiters() {
--waiters;
}
通過以上代碼,我們發(fā)現(xiàn) await () 的核心其實就是調(diào)用 Object 的 wait () 方法進行線程休眠,普通的 Java 多線程知識點。
6.2 線程喚醒
思考:當前線程休眠了,那么什么時候進行喚醒呢?
源碼分析如下所示:
@Override
public Promise<V> setSuccess(V result) {
//1.setSuccess0 賦值操作
if (setSuccess0(result)) {
//2.通知執(zhí)行監(jiān)聽器
notifyListeners();
return this;
}
throw new IllegalStateException("complete already: " + this);
}
private boolean setSuccess0(V result) {
//繼續(xù)進入方法
return setValue0(result == null ? SUCCESS : result);
}
private boolean setValue0(Object objResult) {
if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
//繼續(xù)進入方法
checkNotifyWaiters();
return true;
}
return false;
}
private synchronized void checkNotifyWaiters() {
if (waiters > 0) {
//核心:喚醒之前休眠的線程
notifyAll();
}
}
源碼分析總結(jié):
- 堵塞的核心是通過 Object.wait () 方法進行休眠當前線程,普通的 Java 多線程知識;
- 執(zhí)行完成之后給不同狀態(tài)(setSuccess、setFailure)賦值的時候喚醒休眠的線程;
- 喚醒線程之后調(diào)用監(jiān)聽器的方法
l.operationComplete(future);
7. 小結(jié)
通過本節(jié)的學習,我們需要掌握以下幾個核心知識點:
- 掌握異步的概念,傳統(tǒng) I/O 是同步堵塞的,執(zhí)行 I/O 操作后線程會被阻塞住,直到操作完成;異步處理的好處是不會造成線程阻塞,可以通過 Future 來監(jiān)聽異步執(zhí)行的結(jié)果;
- ChannelFuture 的幾種狀態(tài),以及它的值變化時機;
- ChannelFuture 的堵塞和喚醒源碼分析。