第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

Java AsynchronousSocketChannel 介紹

1. 前言

Java NIO 可以編寫高性能服務(wù)器,所依賴的 I/O 事件分發(fā)機(jī)制是 Selector。Selector 的工作原理就是有一個(gè)線程會(huì)調(diào)用 Selector 的 select 方法,然后進(jìn)入阻塞狀態(tài),等待事件的發(fā)生。一旦有 I/O 事件發(fā)生,阻塞在 select 方法上的線程會(huì)返回,然后進(jìn)行事件分發(fā)。其本質(zhì)還是一個(gè)同步實(shí)現(xiàn)。

本小節(jié)將要介紹 Java 7 中引入的完全異步的編程方法,核心組件是 AsynchronousServerSocketChannel 和 AsynchronousSocketChannel 兩個(gè)類,分別用來編寫服務(wù)器和客戶端程序。 AsynchronousServerSocketChannel 和 AsynchronousSocketChannel 是在 java.nio.channels 包中引入的。

2. 基于 Future 編寫服務(wù)器程序

創(chuàng)建一個(gè) AsynchronousServerSocketChannel 服務(wù)器的步驟如下:

  • 創(chuàng)建 AsynchronousServerSocketChannel 的實(shí)例,需要通過它提供的工廠方法 open 完成。如下:
AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open()
  • 將 AsynchronousServerSocketChannel 綁定在一個(gè)本地 IP 地址或者端口
server.bind(new InetSocketAddress("127.0.0.1", PORT));
  • 向 AsynchronousServerSocketChannel 投遞一個(gè) accept 操作。accept 調(diào)用會(huì)立即返回,不會(huì)阻塞調(diào)用線程。accept 的返回值是一個(gè) Future 對(duì)象。
Future<AsynchronousSocketChannel> acceptFuture = server.accept();
  • 通過 Future 對(duì)象的 get 方法獲取新的連接對(duì)象,返回值是 AsynchronousSocketChannel 類型的對(duì)象。注意,F(xiàn)uture 對(duì)象的 get 方法會(huì)阻塞調(diào)用線程。get 方法接收一個(gè) timeout 參數(shù)。
 AsynchronousSocketChannel client = acceptFuture.get(10, TimeUnit.SECONDS);
  • 調(diào)用 AsynchronousSocketChannel 的 read 方法,投遞一個(gè) read 事件。注意:read 接收的參數(shù)是 ByteBuffer。read 是異步調(diào)用,不會(huì)阻塞線程。Future 的 get 調(diào)用會(huì)阻塞調(diào)用線程。
ByteBuffer inBuffer = ByteBuffer.allocate(128);
Future<Integer> readResult = client.read(inBuffer);
System.out.println("Do something");
readResult.get();
  • 調(diào)用 AsynchronousSocketChannel 的 write 方法,投遞一個(gè) write 事件。注意:write 接收的參數(shù)是 ByteBuffer。write 是異步調(diào)用,不會(huì)阻塞線程。Future 的 get 調(diào)用會(huì)阻塞調(diào)用線程。
Future<Integer> writeResult = client.write(inBuffer);
writeResult.get();

服務(wù)器完整代碼:

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class AsyncServer {
    private static final int PORT =56002;

    public static void main(String[] args) {
        try (AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open()){
            server.bind(new InetSocketAddress("127.0.0.1", PORT));

            Future<AsynchronousSocketChannel> acceptFuture = server.accept();
            AsynchronousSocketChannel client = acceptFuture.get(10, TimeUnit.SECONDS);

            if (client != null && client.isOpen()){
                ByteBuffer inBuffer = ByteBuffer.allocate(128);
                Future<Integer> readResult = client.read(inBuffer);
                System.out.println("Do something");
                readResult.get();

                inBuffer.flip();
                Future<Integer> writeResult = client.write(inBuffer);
                writeResult.get();
            }

            client.close();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

3. 基于 Future 編寫客戶端程序

編寫客戶端程序,首先是創(chuàng)建 AsynchronousSocketChannel 實(shí)例,通過它的 open 方法完成。然后調(diào)用 AsynchronousSocketChannel 的 connect 方法連接服務(wù)器,同樣是異步調(diào)用,不會(huì)阻塞調(diào)用線程。調(diào)用 Future 的 get 方法獲取連接結(jié)果。剩下客戶端數(shù)據(jù)收發(fā)邏輯和服務(wù)器的數(shù)據(jù)收發(fā)邏輯一致。

客戶端完整代碼:

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.concurrent.Future;

public class AsyncClient {
    private static final int PORT =56002;
    public static void main(String[] args) {
        try (AsynchronousSocketChannel client = AsynchronousSocketChannel.open()) {
            Future<Void> result = client.connect(new InetSocketAddress("127.0.0.1", PORT));
            System.out.println("Async connect the server");
            result.get();

            String reqMessage = "Hello server!";
            ByteBuffer reqBuffer = ByteBuffer.wrap(reqMessage.getBytes());
            Future<Integer> writeResult = client.write(reqBuffer);
            System.out.println("Async send to server:" + reqMessage);
            writeResult.get();

            ByteBuffer inBuffer = ByteBuffer.allocate(128);
            Future<Integer> readResult = client.read(inBuffer);
            readResult.get();
            System.out.println("Async recv from server:" + new String(inBuffer.array()).trim());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

4. 異步 I/O 操作說明

異步 Socket 編程的一個(gè)關(guān)鍵點(diǎn)是:AsynchronousServerSocketChannel 和 AsynchronousSocketChannel 提供的一組 I/O 操作是異步的,方法調(diào)用完后會(huì)立即返回,而不會(huì)關(guān)心操作是否完成,并不會(huì)阻塞調(diào)用線程。如果要想獲取 I/O 操作的結(jié)果,可以通過 Future 的方式,或者是 CompletionHandler 的方式。

下面列舉的 connect、accept、read、write 四組 I/O 方法,返回值是 Future 對(duì)象的 I/O 方法,前面已經(jīng)介紹。還有就是需要傳入一個(gè) attachment 參數(shù)和一個(gè) CompletionHandler 參數(shù),這是基于完成例程的方式。

  • connect 異步操作
public abstract Future<Void> connect(SocketAddress remote);
public abstract <A> void connect(SocketAddress remote,
                                     A attachment,
                                     CompletionHandler<Void,? super A> handler);
  • accept 異步操作
public abstract Future<AsynchronousSocketChannel> accept();
public abstract <A> void accept(A attachment, CompletionHandler<AsynchronousSocketChannel,? super A> handler);
  • read 異步操作
public abstract Future<Integer> read(ByteBuffer dst);
public final <A> void read(ByteBuffer dst,
                               A attachment,
                               CompletionHandler<Integer,? super A> handler)                               
  • write 異步操作
public abstract Future<Integer> write(ByteBuffer src); 
public final <A> void write(ByteBuffer src,
                                A attachment,
                                CompletionHandler<Integer,? super A> handler)                              

通過 Future 實(shí)現(xiàn)異步客戶端、服務(wù)器程序,盡管 I/O 相關(guān)方法調(diào)用是異步的,但是還得通過 Future 的 get 方法獲取操作的結(jié)果,而 Future 的 get 調(diào)用是同步的,所以還是沒有做到完全異步。而通過 CompletionHandler 獲取 I/O 結(jié)果,所有 I/O 操作的執(zhí)行結(jié)果都會(huì)通過 CompletionHandler 回調(diào)返回。

5. 基于 CompletionHandler 編寫服務(wù)器

基于 CompletionHandler 編寫服務(wù)器,關(guān)鍵是兩步:

  • 需要給每一個(gè) I/O 操作傳入一個(gè) attachment 參數(shù),這是用來記錄用戶上下文信息的。在示例代碼中,我們抽象了一個(gè)類表示上下文信息。
private static class AsyncIOOP {
        private int op_type;
        private ByteBuffer read_buffer;
        private AsynchronousSocketChannel client;
}
  • 還需要傳入一個(gè) CompletionHandler 參數(shù),這需要你自定義一個(gè)類并且實(shí)現(xiàn) CompletionHandler 接口。由于 accept 操作和其他三個(gè)操作不同,所以我們定義了兩個(gè)實(shí)現(xiàn) CompletionHandler 接口的類。
private static class AsyncIOOPCompletionHandler implements CompletionHandler<Integer, AsyncIOOP>
{
}

private static class AsyncAcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, syncIOOP>
{
}

每一個(gè) I/O 操作完成,系統(tǒng)都會(huì)回調(diào) CompletionHandler 的 completed 方法,你需要覆蓋此方法,然后處理返回結(jié)果。

示例代碼實(shí)現(xiàn)的是一個(gè) Echo 邏輯,關(guān)鍵步驟如下:

  • 服務(wù)器啟動(dòng)的時(shí)候,投遞一個(gè) accept 操作。
  • 當(dāng)收到 accept 操作完成,首先投遞一個(gè) accept 操作,準(zhǔn)備接收新客戶端請(qǐng)求;然后為剛接收的客戶端投遞一個(gè) read 操作,準(zhǔn)備接收數(shù)據(jù)。
  • 當(dāng)收到 read 操作完成,向客戶端投遞一個(gè) write 操作,發(fā)送響應(yīng)給客戶端;然后再次投遞一個(gè) read 操作,準(zhǔn)備接收新的消息。
  • 當(dāng)收到 write 操作完成,我們沒有處理邏輯,因?yàn)檫@是一個(gè)簡(jiǎn)單的 Echo 功能。

完整代碼如下:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

public class AsyncServerCompletionHandler {
    private static final int PORT =56002;
    private AsynchronousServerSocketChannel server = null;
    private static final int ASYNC_READ = 1;
    private static final int ASYNC_WRITE = 2;
    private static final int ASYNC_ACCEPT = 3;
    private static final int ASYNC_CONNECT = 4;

    private static class AsyncIOOP {
        private int op_type;
        private ByteBuffer read_buffer;
        private AsynchronousSocketChannel client;

        public int getOp_type() {
            return op_type;
        }

        public void setOp_type(int op_type) {
            this.op_type = op_type;
        }

        public ByteBuffer getRead_buffer() {
            return read_buffer;
        }

        public void setRead_buffer(ByteBuffer read_buffer) {
            this.read_buffer = read_buffer;
        }

        public AsynchronousSocketChannel getClient() {
            return client;
        }

        public void setClient(AsynchronousSocketChannel client) {
            this.client = client;
        }

        public AsyncIOOP(int op) {
            this(op, null, null);
        }
        public AsyncIOOP(int op, ByteBuffer b) {
            this(op, b, null);
        }
        public AsyncIOOP(int op, ByteBuffer b, AsynchronousSocketChannel ch) {
            this.op_type = op;
            this.read_buffer = b;
            this.client = ch;
        }
    }
    private static class AsyncIOOPCompletionHandler implements CompletionHandler<Integer, AsyncIOOP>
    {
        private AsyncServerCompletionHandler server;

        public AsyncIOOPCompletionHandler(AsyncServerCompletionHandler server){
            this.server = server;
        }
        @Override
        public void completed(Integer result, AsyncIOOP attachment) {
            if (attachment.op_type == ASYNC_READ) {
                server.async_write(attachment.getClient(), "Hello Client!");

                ByteBuffer inBuffer = attachment.getRead_buffer();
                System.out.println("Recv message from client:" + new String(inBuffer.array()).trim());

                server.async_read(attachment.getClient());
            } else if (attachment.op_type == ASYNC_WRITE) {

            }
        }

        @Override
        public void failed(Throwable exc, AsyncIOOP attachment) {
            try {
                attachment.getClient().close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private static class AsyncAcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, AsyncIOOP>
    {
        private AsyncServerCompletionHandler server;

        public AsyncAcceptCompletionHandler(AsyncServerCompletionHandler server) {
            this.server = server;
        }

        @Override
        public void completed(AsynchronousSocketChannel result, AsyncIOOP attachment) {
            if (attachment.op_type == ASYNC_ACCEPT) {
                server.accept_new_client();

                if (result != null && result.isOpen()) {
                    server.async_read(result);
                }
            }
        }

        @Override
        public void failed(Throwable exc, AsyncIOOP attachment) {

        }
    }

    public void start() {
        try {
            server = AsynchronousServerSocketChannel.open();
            server.bind(new InetSocketAddress("127.0.0.1", PORT));
            accept_new_client();
        } catch (Exception e) {
            e.printStackTrace();
            stop();
        }
    }

    public void accept_new_client() {
        server.accept(new AsyncIOOP(ASYNC_ACCEPT), new AsyncAcceptCompletionHandler(this));
    }

    public void async_read(AsynchronousSocketChannel client){
        ByteBuffer inBuffer = ByteBuffer.allocate(128);
        AsyncIOOP ioop = new AsyncIOOP(ASYNC_READ, inBuffer, client);
        client.read(inBuffer, ioop, new AsyncIOOPCompletionHandler(this));
    }
    public void async_write(AsynchronousSocketChannel client, String message){
        ByteBuffer outBuffer = ByteBuffer.wrap(message.getBytes());
        AsyncIOOP ioop = new AsyncIOOP(ASYNC_WRITE, outBuffer, client);
        client.write(outBuffer, ioop, new AsyncIOOPCompletionHandler(this));
    }
    public void stop(){
        if (server != null){
            try {
                server.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    public static void main(String[] args) {
        AsyncServerCompletionHandler server = new AsyncServerCompletionHandler();
        server.start();

        try {
            Thread.sleep(1000*1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

6. 總結(jié)

本小節(jié)重點(diǎn)是介紹 Java NIO2 中引入的異步 Socket 的功能。異步 Socket 的核心是每一個(gè) I/O 方法(connect、accept、read、write)的調(diào)用只是向系統(tǒng)投遞一個(gè)事件,方法執(zhí)行完會(huì)立即返回。如果要獲取 I/O 執(zhí)行的結(jié)果,可以通過 Future 或者 CompletionHandler 獲取。Java 的這個(gè)機(jī)制非常類似 Windows IOCP(完成端口)的功能,如果有興趣可以參考[慕課網(wǎng)專欄][1] IOCP 一節(jié),或者 [IOCP 相關(guān)實(shí)現(xiàn)代碼][2]。

7. 參考