Aio
AIO用来解决数据复制阶段的阻塞问题:
- 同步意味着,在进行读写操作时,线程需要等待结果,还是相当于闲置 - 异步意味着,在进行读写操作时,线程不必等待结果,而是将来由操作系统来通过回调方式由另外的线程来获得结果
异步模型需要底层操作系统(Kernel)提供支持
- Windows系统通过IOCP实现了真正的异步IO
- Linux系统异步IO在2.6版本引入,但其底层实现还是用多路复用模拟了异步IO,性能没有优势。在2007年引入2.6.22+版本:通过io_uring原生实现了真正的内核级异步I/O
1. 文件AIO
在Java7中,Java NIO中添加了AsynchronousFileChannel,也就是是异步地将数据写入文件。需要注意的是默认文件AIO使用的线程都是守护线程,也就是主线程不会等待文件读取完毕再结束。
1.1 创建AsynchronousFileChannel
通过静态方法open()
创建
java
Path path = Paths.get("d:\\jack\\01.txt");
// 方式1
AsynchronousFileChannel fileChannel =
AsynchronousFileChannel.open(path, StandardOpenOption.READ);
// 方式2 由于是异步,底层就涉及到使用多线程,可以自定义线程池
ExecutorService executorService = Executors.newFixedThreadPool(10);
AsynchronousFileChannel.open(path, Set.of(StandardOpenOption.READ), executorService);
1.2 通过Future读取数据
通过调用返回Future的get()
方法
java
public static void main(String[] args) {
Path path = Paths.get("d:\\jack\\01.txt");
try {
AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(path, StandardOpenOption.READ);
ByteBuffer buffer = ByteBuffer.allocate(4096);
long position = 0;
// 发起异步读取操作,返回 Future 对象
fileChannel.size();
Future<Integer> result = fileChannel.read(buffer, position);
while (!result.isDone()) {
// 可以在此处执行其他任务
System.out.println("文件读取中...");
}
int bytesRead = result.get();
if (bytesRead > 0) {
buffer.flip();
byte[] data = new byte[buffer.limit()];
buffer.get(data);
System.out.println(new String(data));
buffer.clear();
}
fileChannel.close();
} catch (IOException | InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
1.3 通过CompletionHandler读取数据
调用read()
方法,该方法将一个CompletionHandler作为参数传入。
java
public static void main(String[] args) {
Path path = Paths.get("d:\\jack\\01.txt");
try (AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(path, StandardOpenOption.READ)) {
ByteBuffer buffer = ByteBuffer.allocate(4096);
long position = 0;
StringBuilder content = new StringBuilder();
readFile(fileChannel, buffer, position, content);
// 主线程可以继续执行其他任务
System.out.println("主线程继续执行其他任务...");
// 为了让主线程等待读取完成,简单休眠一段时间,实际应用中可使用更优雅的方式
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("文件内容读取完毕:\n" + content.toString());
} catch (IOException e) {
// 捕获异常并使用更健壮的日志记录方式
System.err.println("打开文件时出错: " + e.getMessage());
}
}
private static void readFile(AsynchronousFileChannel fileChannel, ByteBuffer buffer, long position, StringBuilder content) {
fileChannel.read(buffer, position, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer bytesRead, ByteBuffer attachment) {
if (bytesRead > 0) {
attachment.flip();
byte[] data = new byte[attachment.remaining()];
attachment.get(data);
content.append(new String(data));
attachment.clear();
readFile(fileChannel, attachment, position + bytesRead, content);
} else {
try {
fileChannel.close();
} catch (IOException e) {
System.err.println("关闭文件时出错: " + e.getMessage());
}
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
System.err.println("读取文件时出错: " + exc.getMessage());
try {
fileChannel.close();
} catch (IOException e) {
System.err.println("关闭文件时出错: " + e.getMessage());
}
}
});
}
1.4 通过Future写数据
java
Path path = Paths.get("d:\\jack\\001.txt");
AsynchronousFileChannel fileChannel = null;
try {
fileChannel = AsynchronousFileChannel.open(path, StandardOpenOption.WRITE);
} catch (IOException e) {
e.printStackTrace();
}
ByteBuffer buffer = ByteBuffer.allocate(1024);
long position = 0;
buffer.put("jack data".getBytes());
buffer.flip();
Future<Integer> operation = fileChannel.write(buffer, position);
buffer.clear();
while(!operation.isDone());
System.out.println("Write over");
首先,AsynchronousFileChannel以写模式打开。然后创建一个ByteBuffer,并将一些数据写入其中。然后ByteBuffer中的数据被写入到文件中。最后,示例检查返回的Future,以查看写操作完成时的情况。注意,文件必须已经存在。如果该文件不存在,那么write()
方法将抛出一个java.nio.file.NoSuchFileException。
1.5 通过CompletionHandler写数据
java
Path path = Paths.get("d:\\jack\\001.txt");
if(!Files.exists(path)){
try {
Files.createFile(path);
} catch (IOException e) {
e.printStackTrace();
}
}
AsynchronousFileChannel fileChannel = null;
try {
fileChannel = AsynchronousFileChannel.open(path, StandardOpenOption.WRITE);
} catch (IOException e) {
e.printStackTrace();
}
ByteBuffer buffer = ByteBuffer.allocate(1024);
long position = 0;
buffer.put("jack data".getBytes());
buffer.flip();
fileChannel.write(buffer, position, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
System.out.println("bytes written: " + result);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
System.out.println("Write failed");
exc.printStackTrace();
}
});
当写操作完成时,将会调用CompletionHandler的completed()
方法。如果写失败,则会调用failed()
方法。
2. 网络AIO
Java7的NIO2中提供AsynchronousServerSocketChannel和AsynchronousSocketChannel这两个网络AIO核心类。
java
public class AioServer {
public static void main(String[] args) throws IOException {
AsynchronousServerSocketChannel ssc = AsynchronousServerSocketChannel.open();
ssc.bind(new InetSocketAddress(8080));
ssc.accept(null, new AcceptHandler(ssc));
System.in.read();
}
private static void closeChannel(AsynchronousSocketChannel sc) {
try {
System.out.printf("[%s] %s close\n", Thread.currentThread().getName(), sc.getRemoteAddress());
sc.close();
} catch (IOException e) {
e.printStackTrace();
}
}
private static class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
private final AsynchronousSocketChannel sc;
public ReadHandler(AsynchronousSocketChannel sc) {
this.sc = sc;
}
@Override
public void completed(Integer result, ByteBuffer attachment) {
try {
if (result == -1) {
closeChannel(sc);
return;
}
System.out.printf("[%s] %s read\n", Thread.currentThread().getName(), sc.getRemoteAddress());
attachment.flip();
System.out.println(Charset.defaultCharset().decode(attachment));
attachment.clear();
// 处理完第一个 read 时,需要再次调用 read 方法来处理下一个 read 事件
sc.read(attachment, attachment, this);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
closeChannel(sc);
exc.printStackTrace();
}
}
private static class WriteHandler implements CompletionHandler<Integer, ByteBuffer> {
private final AsynchronousSocketChannel sc;
private WriteHandler(AsynchronousSocketChannel sc) {
this.sc = sc;
}
@Override
public void completed(Integer result, ByteBuffer attachment) {
// 如果作为附件的 buffer 还有内容,需要再次 write 写出剩余内容
if (attachment.hasRemaining()) {
sc.write(attachment);
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
closeChannel(sc);
}
}
private static class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Object> {
private final AsynchronousServerSocketChannel ssc;
public AcceptHandler(AsynchronousServerSocketChannel ssc) {
this.ssc = ssc;
}
@Override
public void completed(AsynchronousSocketChannel sc, Object attachment) {
try {
System.out.printf("[%s] %s connected\n", Thread.currentThread().getName(), sc.getRemoteAddress());
} catch (IOException e) {
e.printStackTrace();
}
ByteBuffer buffer = ByteBuffer.allocate(16);
// 读事件由 ReadHandler 处理
sc.read(buffer, buffer, new ReadHandler(sc));
// 写事件由 WriteHandler 处理
sc.write(Charset.defaultCharset().encode("server hello!"), ByteBuffer.allocate(16), new WriteHandler(sc));
// 处理完第一个 accpet 时,需要再次调用 accept 方法来处理下一个 accept 事件
ssc.accept(null, this);
}
@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace();
}
}
}