剛開(kāi)始學(xué)習(xí)Java網(wǎng)絡(luò)編程,問(wèn)題可能有點(diǎn)小白,還請(qǐng)見(jiàn)諒。
我寫(xiě)了一個(gè)簡(jiǎn)單的Demo,運(yùn)用AIO(NIO2.0)編程模型中的AsynchronousSocketChannel來(lái)發(fā)送和接收數(shù)據(jù),在客戶(hù)端與服務(wù)端之間建立一個(gè)長(zhǎng)連接來(lái)進(jìn)行通訊,然后發(fā)現(xiàn)當(dāng)客戶(hù)端連續(xù)進(jìn)行多次發(fā)送時(shí),服務(wù)端收到的數(shù)據(jù)就會(huì)連在一起,并且是隨機(jī)地連在一起,感覺(jué)像是兩次read之間到達(dá)的數(shù)據(jù)都被后一次read一次性讀出來(lái)了,
在一次測(cè)試中,分別進(jìn)行了三輪發(fā)送(客戶(hù)端運(yùn)行了三次),每輪按順序發(fā)送1-10這10個(gè)數(shù),每次發(fā)送一個(gè)。服務(wù)端的結(jié)果如下:
服務(wù)端已啟動(dòng)線(xiàn)程pool-1-thread-7已建立來(lái)自10.1.84.54:2381的連接908324714線(xiàn)程pool-1-thread-8已通過(guò)來(lái)自10.1.84.54:2381的連接908324714收到信息【12345678910】來(lái)自10.1.84.54:2381的連接908324714已斷開(kāi)線(xiàn)程pool-1-thread-8已建立來(lái)自10.1.84.54:2387的連接1224441394線(xiàn)程pool-1-thread-8已通過(guò)來(lái)自10.1.84.54:2387的連接1224441394收到信息【1】線(xiàn)程pool-1-thread-8已通過(guò)來(lái)自10.1.84.54:2387的連接1224441394收到信息【2】線(xiàn)程pool-1-thread-7已通過(guò)來(lái)自10.1.84.54:2387的連接1224441394收到信息【3456】線(xiàn)程pool-1-thread-8已通過(guò)來(lái)自10.1.84.54:2387的連接1224441394收到信息【78】線(xiàn)程pool-1-thread-7已通過(guò)來(lái)自10.1.84.54:2387的連接1224441394收到信息【910】來(lái)自10.1.84.54:2387的連接1224441394已斷開(kāi)線(xiàn)程pool-1-thread-7已建立來(lái)自10.1.84.54:2393的連接1666378193線(xiàn)程pool-1-thread-7已通過(guò)來(lái)自10.1.84.54:2393的連接1666378193收到信息【1】線(xiàn)程pool-1-thread-8已通過(guò)來(lái)自10.1.84.54:2393的連接1666378193收到信息【2345】線(xiàn)程pool-1-thread-7已通過(guò)來(lái)自10.1.84.54:2393的連接1666378193收到信息【678】線(xiàn)程pool-1-thread-7已通過(guò)來(lái)自10.1.84.54:2393的連接1666378193收到信息【9】線(xiàn)程pool-1-thread-8已通過(guò)來(lái)自10.1.84.54:2393的連接1666378193收到信息【10】來(lái)自10.1.84.54:2393的連接1666378193已斷開(kāi)
問(wèn)題是如何才能避免這種情況?使得一次read的數(shù)據(jù)正好是一次發(fā)送的數(shù)據(jù)?或者說(shuō)這并不是個(gè)問(wèn)題,本身就是這樣的機(jī)制,也避免不了?
以下是Demo的代碼
服務(wù)端
package server;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.Executors;
public class Main {
private static final String SERVER_ADDRESS = "0.0.0.0";
private static final int SERVER_PORT = 8888;
private static final int BUFFER_SIZE = 32 * 1024;
public static void main(String[] args) {
try {
AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withFixedThreadPool(Runtime.getRuntime().availableProcessors(), Executors.defaultThreadFactory());
AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open(channelGroup)
.setOption(StandardSocketOptions.SO_REUSEADDR, true)
.bind(new InetSocketAddress(SERVER_ADDRESS, SERVER_PORT));
serverSocketChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
@Override
public void completed(AsynchronousSocketChannel asynchronousSocketChannel, Object attachment) {
serverSocketChannel.accept(null, this);
try {
asynchronousSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
asynchronousSocketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
asynchronousSocketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
asynchronousSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, BUFFER_SIZE);
asynchronousSocketChannel.setOption(StandardSocketOptions.SO_SNDBUF, BUFFER_SIZE);
ByteBuffer readBuffer = ByteBuffer.allocate(BUFFER_SIZE);
asynchronousSocketChannel.read(readBuffer, readBuffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer readBuffer) {
try {
InetSocketAddress inetSocketAddress = (InetSocketAddress) asynchronousSocketChannel.getRemoteAddress();
if (result > 0) {
readBuffer.flip();
byte[] data = new byte[readBuffer.remaining()];
readBuffer.get(data);
System.out.println("線(xiàn)程" + Thread.currentThread().getName() + "已通過(guò)來(lái)自" + inetSocketAddress.getHostString() + ":" + inetSocketAddress.getPort() + "的連接" + asynchronousSocketChannel.hashCode() + "收到信息【" + new String(data) + "】");
} else if (result == -1) {
System.out.println("來(lái)自" + inetSocketAddress.getHostString() + ":" + inetSocketAddress.getPort() + "的連接" + asynchronousSocketChannel.hashCode() + "已斷開(kāi)");
asynchronousSocketChannel.close();
return;
}
} catch (IOException e) {
e.printStackTrace();
}
readBuffer.clear();
asynchronousSocketChannel.read(readBuffer, readBuffer, this);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
System.out.println("讀取信息失敗");
try {
asynchronousSocketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
InetSocketAddress inetSocketAddress = (InetSocketAddress) asynchronousSocketChannel.getRemoteAddress();
System.out.println("線(xiàn)程" + Thread.currentThread().getName() + "已建立來(lái)自" + inetSocketAddress.getHostString() + ":" + inetSocketAddress.getPort() + "的連接" + asynchronousSocketChannel.hashCode());
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("連接建立失敗");
serverSocketChannel.accept(null, this);
}
});
System.out.println("服務(wù)端已啟動(dòng)");
} catch (IOException e) {
e.printStackTrace();
}
try {
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
}
客戶(hù)端
package client;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
public class Main {
private static final String SERVER_ADDRESS = "10.1.84.7";
private static final int SERVER_PORT = 8888;
private static final int WRITE_BUFFER_SIZE = 32 * 1024;
private static final int WRITE_TIMES = 10;
public static void main(String[] args) {
try {
AsynchronousSocketChannel asynchronousSocketChannel = AsynchronousSocketChannel.open();
asynchronousSocketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
asynchronousSocketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
asynchronousSocketChannel.connect(new InetSocketAddress(SERVER_ADDRESS, SERVER_PORT), null, new CompletionHandler<Void, Object>() {
@Override
public void completed(Void result, Object attachment) {
System.out.println("連接服務(wù)器成功");
ByteBuffer writeBuffer = ByteBuffer.allocate(WRITE_BUFFER_SIZE);
System.out.println("第1次數(shù)據(jù)由線(xiàn)程" + Thread.currentThread().getName() + "發(fā)送");
writeBuffer.put("1".getBytes());
writeBuffer.flip();
asynchronousSocketChannel.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() {
int count = 1;
@Override
public void completed(Integer result, ByteBuffer attachment) {
if (count < WRITE_TIMES) {
System.out.println("第" + ++count + "次數(shù)據(jù)由線(xiàn)程" + Thread.currentThread().getName() + "發(fā)送");
String msg = "" + count;
writeBuffer.clear();
writeBuffer.put(msg.getBytes());
writeBuffer.flip();
asynchronousSocketChannel.write(writeBuffer, writeBuffer, this);
} else {
System.out.println(WRITE_TIMES + "次數(shù)據(jù)已全部發(fā)送完成");
try {
asynchronousSocketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
System.exit(0);
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
System.out.println("第" + count + "次發(fā)送數(shù)據(jù)失敗");
try {
asynchronousSocketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
System.exit(0);
}
});
}
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("連接服務(wù)器失敗");
System.exit(0);
}
});
System.out.println("開(kāi)始連接服務(wù)器");
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
}
添加回答
舉報(bào)
0/150
提交
取消