Java 服務(wù)器多線程編程
1. 前言
前面小節(jié)介紹的 Java TCP Socket 程序是單線程模型,也是阻塞式模型。我們調(diào)用 java.net.ServerSocket 的 accept 方法,此時(shí)線程會(huì)被阻塞,等待客戶端連接。當(dāng)有新客戶端連接到服務(wù)器以后,accept 方法會(huì)返回一個(gè) java.net.Socket 類型的對(duì)象,此對(duì)象代表了客戶端和服務(wù)器完成了三次握手后,建立的新連接。 調(diào)用 java.net.Socket 的 recv 和 send 方法和客戶端進(jìn)行數(shù)據(jù)收發(fā)。由于我們采用的是阻塞式 Socket 編程,java.net.ServerSocket 的 accept 方法會(huì)阻塞線程,java.net.Socket 的 recv 和 send 方法也會(huì)阻塞線程。因此,如果采用此模型,在同一時(shí)刻,服務(wù)器只能和一個(gè)客戶端通信。
要想服務(wù)器同時(shí)和多個(gè)客戶端進(jìn)行通信,要么采用非阻塞式 Socket 編程,通過 I/O 多路復(fù)用機(jī)制 實(shí)現(xiàn)此目的;要么采用多線程編程模型。當(dāng)然,在非阻塞式 Socket 編程模型下,往往也采用多線程編程。因?yàn)槟壳暗挠?jì)算機(jī)都是多核處理器,采用多線程編碼模型,可以充分利用 CPU 多核的優(yōu)勢(shì),最大化 CPU 資源的利用。
本節(jié)主要介紹阻塞式 Socket 編程中常用的兩種線程模型:
- 每線程模型
- 線程池模型
2. Java 多線程編程方法
由于本節(jié)會(huì)涉及到 Java 多線程編程,所以需要你能預(yù)先掌握 Java 多線程編程的方法。比如,線程的創(chuàng)建,線程的啟動(dòng),線程之間的同步和線程之間的通信。
在 Java 平臺(tái)下,創(chuàng)建線程的方法有兩種:
-
第一,是創(chuàng)建一個(gè)用戶自定義的線程類,然后繼承 java.leng.Thread 類,同時(shí)要覆寫它的 run 方法,調(diào)用它的 start 方法啟動(dòng)線程。例如:
class MyThread extends Thread { @Override public void run() { super.run(); } } new MyThread().start();
-
第二,是創(chuàng)建一個(gè)任務(wù)類。
首先,實(shí)現(xiàn) Runnable 接口,并且重寫它的 run 方法。然后,創(chuàng)建 java.leng.Thread 類的對(duì)象,同時(shí)將 Runnable 的實(shí)例通過 java.lang.Thread 的構(gòu)造方法傳入。最后,調(diào)用 java.lang.Thread 的 start 方法啟動(dòng)線程。例如:class MyTask implements Runnable { @Override public void run() { } } new Thread(new MyTask()).start();
3. 每線程模型
下圖展示了每線程模型的結(jié)構(gòu)。
從圖中可以看出,每線程模型的程序結(jié)構(gòu)如下:
- 創(chuàng)建一個(gè)監(jiān)聽線程,通常會(huì)采用 Java 主線程作為監(jiān)聽線程。
- 創(chuàng)建一個(gè) java.net.ServerSocket 實(shí)例,調(diào)用它的 accept 方法等待客戶端的連接。
- 當(dāng)有新的客戶端和服務(wù)器建立連接,accept 方法會(huì)返回,創(chuàng)建一個(gè)新的線程和客戶端通信。此時(shí)監(jiān)聽線程返回,繼續(xù)調(diào)用 accept 方法,等待新的客戶端連接。
- 在新線程中調(diào)用 java.net.Socket 的 recv 和 send 方法和客戶端進(jìn)行數(shù)據(jù)收發(fā)。
- 當(dāng)數(shù)據(jù)收發(fā)完成后,調(diào)用 java.net.Socket 的 close 方法關(guān)閉連接,同時(shí)線程退出。
下來,我們通過一個(gè)簡(jiǎn)單的示例程序演示一下每線程模型服務(wù)器的編寫方法。示例程序的基本功能如下:
- 客戶端每隔 1 秒向服務(wù)器發(fā)送一個(gè)消息。
- 服務(wù)器收到客戶端的消息后,向客戶端發(fā)送一個(gè)響應(yīng)消息。
- 客戶端發(fā)送完 10 個(gè)消息后,關(guān)閉 Socket 連接,程序退出。
- 服務(wù)器檢測(cè)到客戶端關(guān)閉連接后,同樣關(guān)閉 Socket 連接,并且負(fù)責(zé)和客戶端通信的線程也退出。
客戶端代碼:
import java.io.*;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
public class TCPClientMultiThread {
// 服務(wù)器監(jiān)聽的端口號(hào)
private static final int PORT = 56002;
// 連接超時(shí)時(shí)間
private static final int TIMEOUT = 15000;
// 客戶端執(zhí)行次數(shù)
private static final int TEST_TIMES = 10;
public static void main(String[] args) {
Socket client = null;
try {
// 測(cè)試次數(shù)
int testCount = 0;
// 調(diào)用無參構(gòu)造方法
client = new Socket();
// 構(gòu)造服務(wù)器地址結(jié)構(gòu)
SocketAddress serverAddr = new InetSocketAddress("192.168.0.101", PORT);
// 連接服務(wù)器,超時(shí)時(shí)間是 15 毫秒
client.connect(serverAddr, TIMEOUT);
System.out.println("Client start:" + client.getLocalSocketAddress().toString());
while (true) {
// 向服務(wù)器發(fā)送數(shù)據(jù)
DataOutputStream out = new DataOutputStream(
new BufferedOutputStream(client.getOutputStream()));
String req = "Hello Server!";
out.writeInt(req.getBytes().length);
out.write(req.getBytes());
// 不能忘記 flush 方法的調(diào)用
out.flush();
System.out.println("Send to server:" + req);
// 接收服務(wù)器的數(shù)據(jù)
DataInputStream in = new DataInputStream(
new BufferedInputStream(client.getInputStream()));
int msgLen = in.readInt();
byte[] inMessage = new byte[msgLen];
in.read(inMessage);
System.out.println("Recv from server:" + new String(inMessage));
// 如果執(zhí)行次數(shù)已經(jīng)達(dá)到上限,結(jié)束測(cè)試。
if (++testCount >= TEST_TIMES) {
break;
}
// 等待 1 秒然后再執(zhí)行
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (client != null){
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
服務(wù)器代碼:
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
public class TCPServerPerThread implements Runnable{
private static final int PORT =56002;
private Socket sock = null;
TCPServerPerThread(Socket sock){
this.sock = sock;
}
@Override
public void run() {
// 讀取客戶端數(shù)據(jù)
try {
while (true){
// 讀取客戶端數(shù)據(jù)
DataInputStream in = new DataInputStream(
new BufferedInputStream(sock.getInputStream()));
int msgLen = in.readInt();
byte[] inMessage = new byte[msgLen];
in.read(inMessage);
System.out.println("Recv from client:" + new String(inMessage) + "length:" + msgLen);
// 向客戶端發(fā)送數(shù)據(jù)
String rsp = "Hello Client!\n";
DataOutputStream out = new DataOutputStream(
new BufferedOutputStream(sock.getOutputStream()));
out.writeInt(rsp.getBytes().length);
out.write(rsp.getBytes());
out.flush();
System.out.println("Send to client:" + rsp + " length:" + rsp.getBytes().length);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (sock != null){
try {
sock.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
ServerSocket ss = null;
try {
// 創(chuàng)建一個(gè)服務(wù)器 Socket
ss = new ServerSocket(PORT);
while (true){
// 監(jiān)聽新的連接請(qǐng)求
Socket conn = ss.accept();
System.out.println("Accept a new connection:"
+ conn.getRemoteSocketAddress().toString());
Thread t = new Thread(new TCPServerPerThread(conn));
t.start();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (ss != null){
try {
ss.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
客戶端采用單線程模型。服務(wù)器采用每線程模型,我們采用實(shí)現(xiàn) Runnable 接口的方式實(shí)現(xiàn)多線程邏輯。從示例代碼可以看出,每線程模型的優(yōu)點(diǎn)就是結(jié)構(gòu)簡(jiǎn)單,相比單線程模型,也沒有增加復(fù)雜度。缺點(diǎn)就是針對(duì)每個(gè)客戶端都創(chuàng)建線程,當(dāng)和客戶端通信結(jié)束后,線程要退出。頻繁的創(chuàng)建、銷毀線程,對(duì)系統(tǒng)的資源消耗比較大,只能用在簡(jiǎn)單的業(yè)務(wù)場(chǎng)景下。
3. 線程池模型
線程池模型的結(jié)構(gòu)如下:
從圖中可以看出,線程池模型的程序結(jié)構(gòu)如下:
- 創(chuàng)建一個(gè)監(jiān)聽線程,通常會(huì)采用 Java 主線程作為監(jiān)聽線程。
- 創(chuàng)建一個(gè) java.net.ServerSocket 實(shí)例,調(diào)用它的 accept 方法等待客戶端的連接。
- 服務(wù)器預(yù)先創(chuàng)建一組線程,叫做線程池。線程池中的線程,在服務(wù)運(yùn)行過程中,一直運(yùn)行,不會(huì)退出。
- 當(dāng)有新的客戶端和服務(wù)器建立連接,accept 方法會(huì)返回 java.net.Socket 對(duì)象,表示新的連接。服務(wù)器一般會(huì)創(chuàng)建一個(gè)處理 java.net.Socket 邏輯的任務(wù),并且將此任務(wù)投遞給線程池去處理。然后,監(jiān)聽線程返回,繼續(xù)調(diào)用 accept 方法,等待新的客戶端連接。
- 線程池調(diào)度空閑的線程去處理任務(wù)。
- 在新新任務(wù)中調(diào)用 java.net.Socket 的 recv 和 send 方法和客戶端進(jìn)行數(shù)據(jù)收發(fā)。
- 當(dāng)數(shù)據(jù)收發(fā)完成后,調(diào)用 java.net.Socket 的 close 方法關(guān)閉連接,任務(wù)完成。
- 線程重新回歸線程池,等待調(diào)度。
下來,我們同樣通過示例代碼演示一下線程池模型的編寫方法。程序功能和每線程模型完全一致,所以我們只編寫服務(wù)端程序,客戶端程序采用每線程模型的客戶端。
示例代碼如下:
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TCPServerThreadPool{
// 服務(wù)監(jiān)聽端口號(hào)
private static final int PORT =56002;
// 開啟線程數(shù)
private static final int THREAD_NUMS = 20;
private static ExecutorService pool = null;
// 創(chuàng)建一個(gè) socket Task 類,處理數(shù)據(jù)收發(fā)
private static class SockTask implements Callable<Void> {
private Socket sock = null;
public SockTask(Socket sock){
this.sock = sock;
}
@Override
public Void call() throws Exception {
try {
while (true){
// 讀取客戶端數(shù)據(jù)
DataInputStream in = new DataInputStream(
new BufferedInputStream(sock.getInputStream()));
int msgLen = in.readInt();
byte[] inMessage = new byte[msgLen];
in.read(inMessage);
System.out.println("Recv from client:" + new String(inMessage) + "length:" + msgLen);
// 向客戶端發(fā)送數(shù)據(jù)
String rsp = "Hello Client!\n";
DataOutputStream out = new DataOutputStream(
new BufferedOutputStream(sock.getOutputStream()));
out.writeInt(rsp.getBytes().length);
out.write(rsp.getBytes());
out.flush();
System.out.println("Send to client:" + rsp + " length:" + rsp.getBytes().length);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (sock != null){
try {
sock.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return null;
}
}
public static void main(String[] args) {
ServerSocket ss = null;
try {
pool = Executors.newFixedThreadPool(THREAD_NUMS);
// 創(chuàng)建一個(gè)服務(wù)器 Socket
ss = new ServerSocket(PORT);
while (true){
// 監(jiān)聽新的連接請(qǐng)求
Socket conn = ss.accept();
System.out.println("Accept a new connection:"
+ conn.getRemoteSocketAddress().toString());
pool.submit(new SockTask(conn));
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (ss != null){
try {
ss.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
4. 小結(jié)
本節(jié)主要介紹的是 Java 服務(wù)器編程中比較簡(jiǎn)單的兩種線程模型,每線程模型和線程池模型。示例程序都采用了阻塞式 Socket 編程。編寫 Java 服務(wù)器程序,通常需要采用多線程模型。對(duì)于非常簡(jiǎn)單的業(yè)務(wù)場(chǎng)景,可以采用每線程模型。對(duì)于比較復(fù)雜的業(yè)務(wù)場(chǎng)景,通常需要采用線程池模型。