Skip to content

Channel

NIO中最重要的Channel的实现如下:

  • FileChannel:从文件中读写数据。
  • DatagramChannel:能通过UDP读写网络中的数据。
  • SocketChannel:能通过TCP读写网络中的数据。
  • ServerSocketChannel:可以监听新进来的TCP连接,像Web服务器那样。对每一个新进来的连接都会创建一个SocketChannel。

正如你所看到的,上面这些通道能够涵盖网络IO,以及文件IO场景

1. FileChannel

FileChannel是一个抽象类,不能直接通过构造方法创建实例,两种办法:

  1. FileChannel类提供了静态的open()方法
  2. 通过FileInputStream、FileOutputStream和RandomAccessFile类都提供了getChannel()方法得到

1.1 读取文件

FileChannel读取数据到Buffer中的示例:

java
Path path = Paths.get("C:\\Program Files (x86)\\Sangfor\\SSL\\Log\\SangforServiceClient.exebak.log");
// 创建FileChanel
try (FileChannel channel = FileChannel.open(path, StandardOpenOption.READ)) {
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    int bytesRead;
    int readPosition = 0;
    List<Byte> buff = new ArrayList<>();
    // 读取数据到buffer, 返回channel读取的字节数,文件较大每次返回1024
    while ((bytesRead = channel.read(buffer)) != -1) {
        // 累计读取的字节数
        readPosition += bytesRead;
        buffer.flip(); // 切换为读模式
        while (buffer.hasRemaining()) {
            byte b = buffer.get();
            buff.add(b);
        }
        buffer.clear(); // 清空缓冲区,准备下一次读取
    }
    byte[] array = new byte[readPosition];
    for (int i = 0; i < buff.size(); i++) {
        byte b = buff.get(i);
        array[i] = b;
    }
    String s = new String(array, Charset.forName("gb2312"));
    System.out.println(s);
} catch (IOException e) {
    e.printStackTrace();
}

1.2 向FileChannel写数据

java
public static void main(String[] args) {
    Path path = Paths.get("output.txt");
    String data = "Hello, FileChannel!";
    try (FileChannel channel = FileChannel.open(path, StandardOpenOption.CREATE, StandardOpenOption.WRITE)) {
        ByteBuffer buffer = ByteBuffer.wrap(data.getBytes());
        channel.write(buffer);
    } catch (IOException e) {
        e.printStackTrace();
    }
}

1.3 某个特定位置读/写

需要某个特定位置进行数据的读/写操作, 可以通过调用position()方法获取FileChannel的当前位置。也可以通过调用 position(long pos)方法设置FileChannel的当前位置。

java
long pos = channel.position();
channel.position(pos + 123); // 当前位置向后移动123进行读写操作

如果将位置设置在文件结束符之后,然后试图从文件通道中读取数据,读方法将返回- 1 (文件结束标志)。 如果将位置设置在文件结束符之后,然后向通道中写数据,文件将撑大到当前位置并写入数据。这可能导致"文件空洞",磁盘上物理文件中写入的数据间有空隙。
空洞文件的两个应用场景:

  • 在使用迅雷下载文件时,还未下载完成,就发现该文件已经占据了全部文件大小的空间,这也是空洞文件;下载时如果没有空洞文件,多线程下载时文件就只能从一个地方写入,这就不能发挥多线程的作用了;如果有了空洞文件,可以从不同的地址同时写入,就达到了多线程的优势;
  • 在创建虚拟机时,你给虚拟机分配了100G的磁盘空间,但其实系统安装完成之后,开始也不过只用了3~4G的磁盘空间,如果一开始就把100G分配出去,资源是很大的浪费。

1.4 获取文件大小

java
// 获取文件大小
long fileSize = outChannel.size();

1.5 截取文件

可以使用 FileChannel.truncate()方法截取一个文件。截取文件时,文件将中指定长度后面的部分将被删除。如:

java
channel.truncate(1024);  //截取文件的前1024个字节

1.6 强制写到磁盘

FileChannel.force()方法将通道里尚未写入磁盘的数据强制写到磁盘上。出于性能方面的考虑,操作系统会将数据缓存在内存中,所以无法保证写入到FileChannel里的数据一定会即时写到磁盘上。要保证这一点,需要调用force()方法。force()方法有一个boolean类型的参数,指明是否同时将文件元数据(权限信息等)写到磁盘上。

1.7 通道之间的数据传输

如果两个通道中有一个是FileChannel,那你可以直接将数据从一个channel传输到另外一个channel。

  1. transferFrom(): 可以将数据从源通道传输到FileChannel中
  2. transferTo(): 将数据从FileChannel传输到其他的channel中。
java
public static void main(String[] args) throws IOException {
    RandomAccessFile aFile = new RandomAccessFile("C:\\Users\\mi\\Downloads\\aa.txt", "rw");
    FileChannel fromChannel = aFile.getChannel();
    RandomAccessFile bFile = new RandomAccessFile("C:\\Users\\mi\\Downloads\\bb.txt", "rw");
    FileChannel toChannel = bFile.getChannel();
    long size = fromChannel.size();
    // 直接覆盖
    // toChannel.transferFrom(fromChannel, 0 , size);
    // 两个文件合并, 起始位置若是toChannel.size()+1,将有一个未知字符
    toChannel.transferFrom(fromChannel, toChannel.size() , size);
    toChannel.close();
    fromChannel.close();
    System.out.println("over");
}

2. ServerSocketChannel

ServerSocketChannel负责监听传入的连接和创建新的SocketChannel对象,它本身从不传输数据, 也就是本身不实现读和写功能。ServerSocketChannel是一个基于通道的socket监听器,和java.net.ServerSocket作用一样,但额外支持非阻塞下运行。

java
public class ServerSocketChannelDemo {

    public static final String GREETING = "Hello java nio.\r\n";
    public static void main(String[] args) throws IOException, InterruptedException {
        // 打开 ServerSocketChannel
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ServerSocket serverSocket = ssc.socket();
        // 服务端绑定8090端口
        serverSocket.bind(new InetSocketAddress(8090));
        // 设置为非阻塞模式
        ssc.configureBlocking(false);
        ByteBuffer buffer = ByteBuffer.wrap(GREETING.getBytes());
        while (true){
            System.out.println("等待客户端连接================");
            // 监听新的连接,返回一个包含新进来的连接的 SocketChannel。
            // 在非阻塞模式下,如果没有连接进来, 立即返回null。
            SocketChannel socketChannel = ssc.accept();
            if(socketChannel==null){
                System.out.println("服务器休息2S。。。。。");
                Thread.sleep(2000);
            }else {
                // 获取远程连接地址
                System.out.println("Incoming connection from: " + socketChannel.socket().getRemoteSocketAddress());
                // 读取Buffer 中的所有数据,rewind将读取开始位置设为0
                buffer.rewind();
                socketChannel.write(buffer);
                socketChannel.close();
            }
        }
    }
}

cmd窗口输入: telent 127.0.0.1 8090, 运行结果:
Alt text 客户端接收到服务端返回消息 Alt text

3. SocketChannel

Java NIO中的SocketChannel是一个连接到TCP网络套接字的通道,主要用途用来处理网络I/O的通道。SocketChannel可以被多路复用。

3.1 SocketChannel特点

  1. 对于已经存在的socket不能创建SocketChannel
  2. SocketChannel中提供的open接口创建的Channel并没有进行网络级联,需要使用connect接口连接到指定地址
  3. 未进行连接的SocketChannle执行I/O操作时,会抛出NotYetConnectedException
  4. SocketChannel支持两种I/O模式:阻塞式和非阻塞式
  5. SocketChannel支持异步关闭。如果SocketChannel在一个线程上read阻塞,另一个线程对该SocketChannel调用shutdownInput,则读阻塞的线程将返回-1 表示没有读取任何数据;如果SocketChannel在一个线程上write阻塞,另一个线程对该SocketChannel调用shutdownWrite,则写阻塞的线程将抛出AsynchronousCloseException
  6. SocketChannel 支持设定参数:
    • SO_SNDBUF 套接字发送缓冲区大小
    • SO_RCVBUF 套接字接收缓冲区大小
    • SO_KEEPALIVE 保活连接
    • O_REUSEADDR 复用地址
    • SO_LINGER 有数据传输时延缓关闭Channel(只有在非阻塞模式下有用)
    • TCP_NODELAY 禁用Nagle算法

3.2 SocketChannel的使用

java
  public static void main(String[] args) throws IOException {
        // 创建 SocketChannel
        // 方式一
//        SocketChannel socketChanne2 = SocketChannel.open();
//        socketChanne2.connect(new InetSocketAddress("www.baidu.com", 80));
        // 方式二
        SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("www.sina.com", 80));
        boolean open = socketChannel.isOpen();// 测试 SocketChannel 是否为 open 状态
        System.out.println("=======open============" + open);
        boolean connected = socketChannel.isConnected();//测试 SocketChannel 是否已经被连接
        System.out.println("=========connected==========" + connected);
        boolean connectionPending = socketChannel.isConnectionPending();//测试 SocketChannel 是否正在进行连接
        System.out.println("========connectionPending===========" + connectionPending);
        boolean finishConnect = socketChannel.finishConnect();//校验正在进行套接字连接的 SocketChannel是否已经完成连接
        System.out.println("=========finishConnect==========" + finishConnect);
        socketChannel.configureBlocking(false);
        ByteBuffer byteBuffer = ByteBuffer.allocate(32);
        socketChannel.read(byteBuffer);
        Boolean keepAlive = socketChannel.getOption(StandardSocketOptions.SO_KEEPALIVE);
        Integer soRcvbuf = socketChannel.getOption(StandardSocketOptions.SO_RCVBUF);
        // 反转读写模式
        byteBuffer.flip();
        while(byteBuffer.hasRemaining()) {
            System.out.println(byteBuffer.get());
        }
        socketChannel.close();
        System.out.println("read over");
        System.out.println(keepAlive);
        System.out.println(soRcvbuf);
    }

4. DatagramChannel

使用DatagramChannel来处理UDP的数据传输。和TCP不同,UDP不是面向连接的协议。使用UDP时,只要知道服务器的IP和端口就可以直接向对方发送数据。

java
@Test
public void testClient() throws IOException {
   // 打开 DatagramChannel
   DatagramChannel receiveChannel = DatagramChannel.open();
   //设置为非阻塞模式
   receiveChannel.configureBlocking(false);
   receiveChannel.bind(new InetSocketAddress(10086));
   ByteBuffer receiveBuffer = ByteBuffer.allocate(512);
   while (true) {
      receiveBuffer.clear();
      // 通过 receive()接收 UDP 包
      SocketAddress receiveAddr = receiveChannel.receive(receiveBuffer);
      receiveBuffer.flip();
      System.out.print(receiveAddr.toString() + " ");
      System.out.println(Charset.forName("UTF-8").decode(receiveBuffer));
   }
}

@Test
// 发送数据
public void testServer() throws IOException, InterruptedException {
   // 打开 DatagramChannel
   DatagramChannel server = DatagramChannel.open();
   //设置为非阻塞模式
   server.configureBlocking(false);
   while (true){
      // 通过 send()发送 UDP 包
      server.send(ByteBuffer.wrap("server msg:  send".getBytes()), new InetSocketAddress("127.0.0.1",10086));
      System.out.println("发包端发包");
      Thread.sleep(1000);
   }
}