Skip to content

Aio

AIO用来解决数据复制阶段的阻塞问题:
- 同步意味着,在进行读写操作时,线程需要等待结果,还是相当于闲置 - 异步意味着,在进行读写操作时,线程不必等待结果,而是将来由操作系统来通过回调方式由另外的线程来获得结果

异步模型需要底层操作系统(Kernel)提供支持

  • Windows系统通过IOCP实现了真正的异步IO
  • Linux系统异步IO在2.6版本引入,但其底层实现还是用多路复用模拟了异步IO,性能没有优势。在2007年引入2.6.22+版本:通过io_uring原生实现了真正的内核级异步I/O

1. 文件AIO

在Java7中,Java NIO中添加了AsynchronousFileChannel,也就是是异步地将数据写入文件。需要注意的是默认文件AIO使用的线程都是守护线程,也就是主线程不会等待文件读取完毕再结束。

1.1 创建AsynchronousFileChannel

通过静态方法open()创建

java
Path path = Paths.get("d:\\jack\\01.txt");
// 方式1
AsynchronousFileChannel fileChannel = 
AsynchronousFileChannel.open(path, StandardOpenOption.READ);
// 方式2 由于是异步,底层就涉及到使用多线程,可以自定义线程池
ExecutorService executorService = Executors.newFixedThreadPool(10);
AsynchronousFileChannel.open(path, Set.of(StandardOpenOption.READ), executorService);

1.2 通过Future读取数据

通过调用返回Future的get()方法

java
public static void main(String[] args) {
    Path path = Paths.get("d:\\jack\\01.txt");
    try {
        AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(path, StandardOpenOption.READ);
        ByteBuffer buffer = ByteBuffer.allocate(4096);
        long position = 0;
        // 发起异步读取操作,返回 Future 对象
        fileChannel.size();
        Future<Integer> result = fileChannel.read(buffer, position);
        while (!result.isDone()) {
            // 可以在此处执行其他任务
            System.out.println("文件读取中...");
        }
        int bytesRead = result.get();
        if (bytesRead > 0) {
            buffer.flip();
            byte[] data = new byte[buffer.limit()];
            buffer.get(data);
            System.out.println(new String(data));
            buffer.clear();
        }
        fileChannel.close();
    } catch (IOException | InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
}

1.3 通过CompletionHandler读取数据

调用read()方法,该方法将一个CompletionHandler作为参数传入。

java
public static void main(String[] args) {
    Path path = Paths.get("d:\\jack\\01.txt");
    try (AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(path, StandardOpenOption.READ)) {
        ByteBuffer buffer = ByteBuffer.allocate(4096);
        long position = 0;
        StringBuilder content = new StringBuilder();

        readFile(fileChannel, buffer, position, content);

        // 主线程可以继续执行其他任务
        System.out.println("主线程继续执行其他任务...");

        // 为了让主线程等待读取完成,简单休眠一段时间,实际应用中可使用更优雅的方式
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("文件内容读取完毕:\n" + content.toString());
    } catch (IOException e) {
        // 捕获异常并使用更健壮的日志记录方式
        System.err.println("打开文件时出错: " + e.getMessage());
    }
}

private static void readFile(AsynchronousFileChannel fileChannel, ByteBuffer buffer, long position, StringBuilder content) {
    fileChannel.read(buffer, position, buffer, new CompletionHandler<Integer, ByteBuffer>() {
        @Override
        public void completed(Integer bytesRead, ByteBuffer attachment) {
            if (bytesRead > 0) {
                attachment.flip();
                byte[] data = new byte[attachment.remaining()];
                attachment.get(data);
                content.append(new String(data));
                attachment.clear();
                readFile(fileChannel, attachment, position + bytesRead, content);
            } else {
                try {
                    fileChannel.close();
                } catch (IOException e) {
                    System.err.println("关闭文件时出错: " + e.getMessage());
                }
            }
        }

        @Override
        public void failed(Throwable exc, ByteBuffer attachment) {
            System.err.println("读取文件时出错: " + exc.getMessage());
            try {
                fileChannel.close();
            } catch (IOException e) {
                System.err.println("关闭文件时出错: " + e.getMessage());
            }
        }
    });
}

1.4 通过Future写数据

java
Path path = Paths.get("d:\\jack\\001.txt");
AsynchronousFileChannel fileChannel = null;
try {
    fileChannel = AsynchronousFileChannel.open(path, StandardOpenOption.WRITE);
} catch (IOException e) {
    e.printStackTrace();
}
ByteBuffer buffer = ByteBuffer.allocate(1024);
long position = 0;
buffer.put("jack data".getBytes());
buffer.flip();
Future<Integer> operation = fileChannel.write(buffer, position);
buffer.clear();
while(!operation.isDone());
System.out.println("Write over");

首先,AsynchronousFileChannel以写模式打开。然后创建一个ByteBuffer,并将一些数据写入其中。然后ByteBuffer中的数据被写入到文件中。最后,示例检查返回的Future,以查看写操作完成时的情况。注意,文件必须已经存在。如果该文件不存在,那么write()方法将抛出一个java.nio.file.NoSuchFileException。

1.5 通过CompletionHandler写数据

java
Path path = Paths.get("d:\\jack\\001.txt");
if(!Files.exists(path)){
    try {
        Files.createFile(path);
    } catch (IOException e) {
        e.printStackTrace();
    }
}
AsynchronousFileChannel fileChannel = null;
try {
    fileChannel = AsynchronousFileChannel.open(path, StandardOpenOption.WRITE);
} catch (IOException e) {
    e.printStackTrace();
}
ByteBuffer buffer = ByteBuffer.allocate(1024);
long position = 0;
buffer.put("jack data".getBytes());
buffer.flip();
fileChannel.write(buffer, position, buffer, new CompletionHandler<Integer, ByteBuffer>() {
        @Override
        public void completed(Integer result, ByteBuffer attachment) {
            System.out.println("bytes written: " + result);
    }
    @Override
    public void failed(Throwable exc, ByteBuffer attachment) {
        System.out.println("Write failed");
        exc.printStackTrace();
    }
});

当写操作完成时,将会调用CompletionHandler的completed()方法。如果写失败,则会调用failed()方法。

2. 网络AIO

Java7的NIO2中提供AsynchronousServerSocketChannel和AsynchronousSocketChannel这两个网络AIO核心类。

java
public class AioServer {
    public static void main(String[] args) throws IOException {
        AsynchronousServerSocketChannel ssc = AsynchronousServerSocketChannel.open();
        ssc.bind(new InetSocketAddress(8080));
        ssc.accept(null, new AcceptHandler(ssc));
        System.in.read();
    }

    private static void closeChannel(AsynchronousSocketChannel sc) {
        try {
            System.out.printf("[%s] %s close\n", Thread.currentThread().getName(), sc.getRemoteAddress());
            sc.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private static class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
        private final AsynchronousSocketChannel sc;

        public ReadHandler(AsynchronousSocketChannel sc) {
            this.sc = sc;
        }

        @Override
        public void completed(Integer result, ByteBuffer attachment) {
            try {
                if (result == -1) {
                    closeChannel(sc);
                    return;
                }
                System.out.printf("[%s] %s read\n", Thread.currentThread().getName(), sc.getRemoteAddress());
                attachment.flip();
                System.out.println(Charset.defaultCharset().decode(attachment));
                attachment.clear();
                // 处理完第一个 read 时,需要再次调用 read 方法来处理下一个 read 事件
                sc.read(attachment, attachment, this);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        @Override
        public void failed(Throwable exc, ByteBuffer attachment) {
            closeChannel(sc);
            exc.printStackTrace();
        }
    }

    private static class WriteHandler implements CompletionHandler<Integer, ByteBuffer> {
        private final AsynchronousSocketChannel sc;

        private WriteHandler(AsynchronousSocketChannel sc) {
            this.sc = sc;
        }

        @Override
        public void completed(Integer result, ByteBuffer attachment) {
            // 如果作为附件的 buffer 还有内容,需要再次 write 写出剩余内容
            if (attachment.hasRemaining()) {
                sc.write(attachment);
            }
        }

        @Override
        public void failed(Throwable exc, ByteBuffer attachment) {
            exc.printStackTrace();
            closeChannel(sc);
        }
    }

    private static class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Object> {
        private final AsynchronousServerSocketChannel ssc;

        public AcceptHandler(AsynchronousServerSocketChannel ssc) {
            this.ssc = ssc;
        }

        @Override
        public void completed(AsynchronousSocketChannel sc, Object attachment) {
            try {
                System.out.printf("[%s] %s connected\n", Thread.currentThread().getName(), sc.getRemoteAddress());
            } catch (IOException e) {
                e.printStackTrace();
            }
            ByteBuffer buffer = ByteBuffer.allocate(16);
            // 读事件由 ReadHandler 处理
            sc.read(buffer, buffer, new ReadHandler(sc));
            // 写事件由 WriteHandler 处理
            sc.write(Charset.defaultCharset().encode("server hello!"), ByteBuffer.allocate(16), new WriteHandler(sc));
            // 处理完第一个 accpet 时,需要再次调用 accept 方法来处理下一个 accept 事件
            ssc.accept(null, this);
        }

        @Override
        public void failed(Throwable exc, Object attachment) {
            exc.printStackTrace();
        }
    }
}