剛開始學習Java網(wǎng)絡(luò)編程,問題可能有點小白,還請見諒。
我寫了一個簡單的Demo,運用AIO(NIO2.0)編程模型中的AsynchronousSocketChannel來發(fā)送和接收數(shù)據(jù),在客戶端與服務(wù)端之間建立一個長連接來進行通訊,然后發(fā)現(xiàn)當客戶端連續(xù)進行多次發(fā)送時,服務(wù)端收到的數(shù)據(jù)就會連在一起,并且是隨機地連在一起,感覺像是兩次read之間到達的數(shù)據(jù)都被后一次read一次性讀出來了,
在一次測試中,分別進行了三輪發(fā)送(客戶端運行了三次),每輪按順序發(fā)送1-10這10個數(shù),每次發(fā)送一個。服務(wù)端的結(jié)果如下:
服務(wù)端已啟動線程pool-1-thread-7已建立來自10.1.84.54:2381的連接908324714線程pool-1-thread-8已通過來自10.1.84.54:2381的連接908324714收到信息【12345678910】來自10.1.84.54:2381的連接908324714已斷開線程pool-1-thread-8已建立來自10.1.84.54:2387的連接1224441394線程pool-1-thread-8已通過來自10.1.84.54:2387的連接1224441394收到信息【1】線程pool-1-thread-8已通過來自10.1.84.54:2387的連接1224441394收到信息【2】線程pool-1-thread-7已通過來自10.1.84.54:2387的連接1224441394收到信息【3456】線程pool-1-thread-8已通過來自10.1.84.54:2387的連接1224441394收到信息【78】線程pool-1-thread-7已通過來自10.1.84.54:2387的連接1224441394收到信息【910】來自10.1.84.54:2387的連接1224441394已斷開線程pool-1-thread-7已建立來自10.1.84.54:2393的連接1666378193線程pool-1-thread-7已通過來自10.1.84.54:2393的連接1666378193收到信息【1】線程pool-1-thread-8已通過來自10.1.84.54:2393的連接1666378193收到信息【2345】線程pool-1-thread-7已通過來自10.1.84.54:2393的連接1666378193收到信息【678】線程pool-1-thread-7已通過來自10.1.84.54:2393的連接1666378193收到信息【9】線程pool-1-thread-8已通過來自10.1.84.54:2393的連接1666378193收到信息【10】來自10.1.84.54:2393的連接1666378193已斷開
問題是如何才能避免這種情況?使得一次read的數(shù)據(jù)正好是一次發(fā)送的數(shù)據(jù)?或者說這并不是個問題,本身就是這樣的機制,也避免不了?
以下是Demo的代碼
服務(wù)端
package server;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.Executors;
public class Main {
private static final String SERVER_ADDRESS = "0.0.0.0";
private static final int SERVER_PORT = 8888;
private static final int BUFFER_SIZE = 32 * 1024;
public static void main(String[] args) {
try {
AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withFixedThreadPool(Runtime.getRuntime().availableProcessors(), Executors.defaultThreadFactory());
AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open(channelGroup)
.setOption(StandardSocketOptions.SO_REUSEADDR, true)
.bind(new InetSocketAddress(SERVER_ADDRESS, SERVER_PORT));
serverSocketChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
@Override
public void completed(AsynchronousSocketChannel asynchronousSocketChannel, Object attachment) {
serverSocketChannel.accept(null, this);
try {
asynchronousSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
asynchronousSocketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
asynchronousSocketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
asynchronousSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, BUFFER_SIZE);
asynchronousSocketChannel.setOption(StandardSocketOptions.SO_SNDBUF, BUFFER_SIZE);
ByteBuffer readBuffer = ByteBuffer.allocate(BUFFER_SIZE);
asynchronousSocketChannel.read(readBuffer, readBuffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer readBuffer) {
try {
InetSocketAddress inetSocketAddress = (InetSocketAddress) asynchronousSocketChannel.getRemoteAddress();
if (result > 0) {
readBuffer.flip();
byte[] data = new byte[readBuffer.remaining()];
readBuffer.get(data);
System.out.println("線程" + Thread.currentThread().getName() + "已通過來自" + inetSocketAddress.getHostString() + ":" + inetSocketAddress.getPort() + "的連接" + asynchronousSocketChannel.hashCode() + "收到信息【" + new String(data) + "】");
} else if (result == -1) {
System.out.println("來自" + inetSocketAddress.getHostString() + ":" + inetSocketAddress.getPort() + "的連接" + asynchronousSocketChannel.hashCode() + "已斷開");
asynchronousSocketChannel.close();
return;
}
} catch (IOException e) {
e.printStackTrace();
}
readBuffer.clear();
asynchronousSocketChannel.read(readBuffer, readBuffer, this);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
System.out.println("讀取信息失敗");
try {
asynchronousSocketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
InetSocketAddress inetSocketAddress = (InetSocketAddress) asynchronousSocketChannel.getRemoteAddress();
System.out.println("線程" + Thread.currentThread().getName() + "已建立來自" + inetSocketAddress.getHostString() + ":" + inetSocketAddress.getPort() + "的連接" + asynchronousSocketChannel.hashCode());
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("連接建立失敗");
serverSocketChannel.accept(null, this);
}
});
System.out.println("服務(wù)端已啟動");
} catch (IOException e) {
e.printStackTrace();
}
try {
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
}
客戶端
package client;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
public class Main {
private static final String SERVER_ADDRESS = "10.1.84.7";
private static final int SERVER_PORT = 8888;
private static final int WRITE_BUFFER_SIZE = 32 * 1024;
private static final int WRITE_TIMES = 10;
public static void main(String[] args) {
try {
AsynchronousSocketChannel asynchronousSocketChannel = AsynchronousSocketChannel.open();
asynchronousSocketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
asynchronousSocketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
asynchronousSocketChannel.connect(new InetSocketAddress(SERVER_ADDRESS, SERVER_PORT), null, new CompletionHandler<Void, Object>() {
@Override
public void completed(Void result, Object attachment) {
System.out.println("連接服務(wù)器成功");
ByteBuffer writeBuffer = ByteBuffer.allocate(WRITE_BUFFER_SIZE);
System.out.println("第1次數(shù)據(jù)由線程" + Thread.currentThread().getName() + "發(fā)送");
writeBuffer.put("1".getBytes());
writeBuffer.flip();
asynchronousSocketChannel.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() {
int count = 1;
@Override
public void completed(Integer result, ByteBuffer attachment) {
if (count < WRITE_TIMES) {
System.out.println("第" + ++count + "次數(shù)據(jù)由線程" + Thread.currentThread().getName() + "發(fā)送");
String msg = "" + count;
writeBuffer.clear();
writeBuffer.put(msg.getBytes());
writeBuffer.flip();
asynchronousSocketChannel.write(writeBuffer, writeBuffer, this);
} else {
System.out.println(WRITE_TIMES + "次數(shù)據(jù)已全部發(fā)送完成");
try {
asynchronousSocketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
System.exit(0);
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
System.out.println("第" + count + "次發(fā)送數(shù)據(jù)失敗");
try {
asynchronousSocketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
System.exit(0);
}
});
}
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("連接服務(wù)器失敗");
System.exit(0);
}
});
System.out.println("開始連接服務(wù)器");
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
}
添加回答
舉報
0/150
提交
取消