第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

首頁 慕課教程 Netty 教程 Netty 教程 Netty ChannelFuture 異步監(jiān)聽

Netty ChannelFuture 異步監(jiān)聽

1. 前言

本節(jié)主要講解 ChannelFuture ,它的作用是用來保存 Channel 異步操作的結(jié)果,可以看作是一個異步操作結(jié)果的占位符。

2. 概念

在 Netty 中所有的 IO 操作都是異步的,不能立刻得到 IO 操作的執(zhí)行結(jié)果,但是可以通過注冊一個監(jiān)聽器來監(jiān)聽其執(zhí)行結(jié)果。在 Java 的并發(fā)編程當中可以通過 Future 來進行異步結(jié)果的監(jiān)聽,但是在 Netty 當中是通過 ChannelFuture 來實現(xiàn)異步結(jié)果的監(jiān)聽。通過注冊一個監(jiān)聽的方式進行監(jiān)聽,當操作執(zhí)行成功或者失敗時監(jiān)聽會自動觸發(fā)注冊的監(jiān)聽事件。

3. 應(yīng)用場景

ChannelFture 在開發(fā)當中經(jīng)常需要用到,可以用來監(jiān)聽客戶端連接服務(wù)端的結(jié)果反饋,Netty 是異步操作,無法知道什么時候執(zhí)行完成,因此可以通過 ChannelFuture 來進行執(zhí)行結(jié)果的監(jiān)聽。在 Netty 當中 Bind 、Write 、Connect 等操作會簡單的返回一個 ChannelFuture。

4. 核心方法

序號 方法 描述
1 addListener 注冊監(jiān)聽器,當操作已完成 (isDone 方法返回完成),將會通知指定的監(jiān)聽器;如果 Future 對象已完成,則通知指定的監(jiān)聽器
2 removeListener 移除監(jiān)聽器
3 sync 等待異步操作執(zhí)行完畢
4 await 等待異步操作執(zhí)行完畢
5 isDone 判斷當前操作是否完成
6 isSuccess 判斷已完成的當前操作是否成功
7 isCancellable 判斷已完成的當前操作是否被取消
8 cause 獲取已完成的當前操作失敗的原因

sync () 和 await () 都是等待異步操作執(zhí)行完成,那么它們有什么區(qū)別呢?

  1. sync () 會拋出異常,建議使用 sync ();
  2. await () 不會拋出異常,主線程無法捕捉子線程執(zhí)行拋出的異常。

5. 深入了解 ChannelFuture

5.1 生命周期說明

Future 可以通過四個核心方法來判斷任務(wù)的執(zhí)行情況。

狀態(tài) 說明
isDone() 任務(wù)是否執(zhí)行完成,無論成功還是失敗
isSuccess() 任務(wù)是否執(zhí)行采購
isCancelled() 任務(wù)是否被取消
cause() 獲取執(zhí)行異常信息

執(zhí)行過程狀態(tài)的改變說明

當一個異步任務(wù)操作開始的時候,一個新的 future 對象就會被創(chuàng)建。在開始的時候該 future 是處于未完成的狀態(tài),也就是說,isDone ()=false、isSuccess ()=false、isCancelled ()=false;只要該任務(wù)中任何一種狀態(tài)結(jié)束了,無論是說成功、失敗、或者被取消,那么整個 Future 就會被標記為已完成。注意的是,如果執(zhí)行失敗那么 cause () 方法會返回異常信息的內(nèi)容。

圖片描述

實例:

ChannelFuture channelFuture=bootstrap.connect("127.0.0.1",80);
channelFuture.addListener(new ChannelFutureListener() {
    public void operationComplete(ChannelFuture future) throws Exception {
        if(future.isDone()){
            if(future.isSuccess()){
                System.out.println("執(zhí)行成功...");
            }else if(future.isCancelled()){
                System.out.println("任務(wù)被取消...");
            }else if(future.cause()!=null){
                System.out.println("執(zhí)行出錯:"+future.cause().getMessage());
            }
        }
    }
});

5.2 ChannelFuture 父接口說明

ChannelFuture 的類繼承結(jié)構(gòu),具體如下所示:

public interface ChannelFuture extends Future<Void> {
    
}
public interface Future<V> extends java.util.concurrent.Future<V> {
    
}

通過上面的繼承關(guān)系,我們可以清晰的知道 ChannelFuture 其實最頂層的接口是來自 java 并發(fā)包的 Future,java 并發(fā)包下的 Future 需要手工檢查執(zhí)行結(jié)果是否已經(jīng)完成,非常的繁瑣,因此 Netty 把它進行了封裝和完善,變成了自動的監(jiān)聽,用起來變的非常的簡單。

java 并發(fā)包下的 Future 主要存在以下幾個缺陷:

  1. 只允許手動通過 get () 來檢查對應(yīng)的操作是否已經(jīng)完成,它是堵塞直到子線程完成執(zhí)行并且返回結(jié)果;
  2. 只有 isDone () 方法判斷一個異步操作是否完成,但是對于完成的定義過于模糊,JDK 文檔指出正常終止、拋出異常、用戶取消都會使 isDone () 方法返回真。并不能很好的區(qū)分到底是哪種狀態(tài)。

get () 方法是堵塞的,必須等待子線程執(zhí)行完成才能往下執(zhí)行。

實例:

//1.定義一個子線程,實現(xiàn) Callable 接口
public class ThreadTest implements Callable<Integer>{
    @Override
    public Integer call(){
	    //打印
    	System.out.println(">>>>>>>>子線程休眠之前");
	    //休眠5秒
	    Thread.sleep(5000);
    	//打印
    	System.out.println(">>>>>>>>子線程休眠之后");
        return 1;
	}
}
//2.調(diào)用子線程處理
public static void main(String[] args){
    ThreadTest t=new ThreadTest();
    FutureTask<Integer> future=new FutureTask<Integer>(t);
    //2.1.開始執(zhí)行子線程
    new Thread(future).start();
    
  	//2.2.手工返回結(jié)果
    int result=future.get();
    System.out.println(">>>>>>>>執(zhí)行結(jié)果:"+result);
    //2.3.操作數(shù)據(jù)庫
    userDao.updateStatus("1");
}

執(zhí)行結(jié)果:

>>>>>>>>子線程休眠之前
>>>>>>>>子線程休眠之后
>>>>>>>>執(zhí)行結(jié)果:1

結(jié)論總結(jié):

  1. 說明了 Java 并發(fā)包的 Future 要想獲取異步執(zhí)行結(jié)果,必須手工調(diào)用 get () 方法,此時雖然能獲取執(zhí)行結(jié)果,但是無法知道執(zhí)行結(jié)果是成功還是失??;
  2. 使用 get () 獲取執(zhí)行結(jié)果,但是 get () 后面的業(yè)務(wù)則被堵塞,直到后面執(zhí)行完畢才會往下執(zhí)行,失去了異步操作提高執(zhí)行效率的意義了。

6. ChannelFuture 原理

6.1 線程堵塞

思考:sync () 和 await () 方法如何同步等待執(zhí)行完成并獲取執(zhí)行結(jié)果的呢?

源碼分析如下所示:

private short waiters;//計數(shù)器

@Override
public Promise<V> await() throws InterruptedException {
    //1.判斷是否執(zhí)行完成,如果執(zhí)行完成則返回
    if (isDone()) {
        return this;
    }

    //2.線程是否已經(jīng)中斷,如果中斷則拋異常
    if (Thread.interrupted()) {
        throw new InterruptedException(toString());
    }

    //3.檢查死鎖
    checkDeadLock();

    //4.同步代碼塊->while循環(huán)不斷的監(jiān)聽執(zhí)行結(jié)果
    synchronized (this) {
        while (!isDone()) {
            incWaiters();//waiters遞增
            try {
                wait();//JDK 的 Object 方法,線程等待【核心】
            } finally {
                decWaiters();//waiters 遞減
            }
        }
    }
    return this;
}

//遞增函數(shù)
private void incWaiters() {
    if (waiters == Short.MAX_VALUE) {
        throw new IllegalStateException("too many waiters: " + this);
    }
    ++waiters;
}

//遞減函數(shù)
private void decWaiters() {
    --waiters;
}

通過以上代碼,我們發(fā)現(xiàn) await () 的核心其實就是調(diào)用 Object 的 wait () 方法進行線程休眠,普通的 Java 多線程知識點。

6.2 線程喚醒

思考:當前線程休眠了,那么什么時候進行喚醒呢?

源碼分析如下所示:

@Override
public Promise<V> setSuccess(V result) {
    //1.setSuccess0 賦值操作
    if (setSuccess0(result)) {
        //2.通知執(zhí)行監(jiān)聽器
        notifyListeners();
        return this;
    }
    throw new IllegalStateException("complete already: " + this);
}

private boolean setSuccess0(V result) {
    //繼續(xù)進入方法
    return setValue0(result == null ? SUCCESS : result);
}

private boolean setValue0(Object objResult) {
    if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
        RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
        
        //繼續(xù)進入方法
        checkNotifyWaiters();
        return true;
    }
    return false;
}

private synchronized void checkNotifyWaiters() {
    if (waiters > 0) {
        //核心:喚醒之前休眠的線程
        notifyAll();
    }
}

源碼分析總結(jié):

  1. 堵塞的核心是通過 Object.wait () 方法進行休眠當前線程,普通的 Java 多線程知識;
  2. 執(zhí)行完成之后給不同狀態(tài)(setSuccess、setFailure)賦值的時候喚醒休眠的線程;
  3. 喚醒線程之后調(diào)用監(jiān)聽器的方法 l.operationComplete(future);

7. 小結(jié)

通過本節(jié)的學習,我們需要掌握以下幾個核心知識點:

  1. 掌握異步的概念,傳統(tǒng) I/O 是同步堵塞的,執(zhí)行 I/O 操作后線程會被阻塞住,直到操作完成;異步處理的好處是不會造成線程阻塞,可以通過 Future 來監(jiān)聽異步執(zhí)行的結(jié)果;
  2. ChannelFuture 的幾種狀態(tài),以及它的值變化時機;
  3. ChannelFuture 的堵塞和喚醒源碼分析。