Skip to content

Selector

1. Selector 简介

Selector一般称为选择器 ,也可以翻译为多路复用器 。它是Java NIO核心组件中的一个,用于检查一个或多个NIO Channel(通道)的状态是否处于可读、可写。如此可以实现单线程管理多个channels,也就是可以管理多个网络链接。
Alt text 使用Selector的好处在于:使用更少的线程来就可以来处理通道了,相比使用多个线程,避免了线程上下文切换带来的开销。

1.2 可选择通道(SelectableChannel)

  1. 不是所有的 Channel 都可以被 Selector 复用的。比方说,FileChannel 就不能被选择器复用。判断一个 Channel 能被 Selector 复用,有一个前提:判断他是否继承了一个抽象类 SelectableChannel。
  2. SelectableChannel 类提供了实现通道的可选择性所需要的公共方法。它是所有支持就绪检查的通道类的父类。所有 socket 通道,都继承了 SelectableChannel类都是可选择的,包括从管道(Pipe)对象的中获得的通道。
  3. 一个通道可以被注册到多个选择器上,但对每个选择器而言只能被注册一次。通道和选择器之间的关系,使用注册的方式完成。SelectableChannel可以被注册到 Selector对象上,在注册的时候需要指定通道的哪些操作。
    Alt text

1.3 Channel注册到Selector

  1. 使用 Channel.register(Selector sel,int ops)方法,将一个通道注册到一个选择器时。第一个参数,指定通道要注册的选择器。第二个参数指定选择器需要查询的通道操作。
  2. 可以供选择器查询的通道操作,从类型来分,包括以下四种:
    • 可读: SelectionKey.OP_READ
    • 可写: SelectionKey.OP_WRITE
    • 连接: SelectionKey.OP_CONNECT
    • 接收: SelectionKey.OP_ACCEPT
      如果Selector对通道的多操作类型感兴趣,可以用"位或"操作符来实现:比如:int key = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
  3. 选择器查询的不是通道的操作,而是通道的某个操作的一种就绪状态。比方说,某个SocketChannel通道可以连接到一个服务器,则处于"连接就绪"(OP_CONNECT)。

1.4 选择键(SelectionKey)

  1. Channel 注册到后,并且一旦通道处于某种就绪的状态,就可以被选择器查询到。这个工作使用选择器Selector的select()方法完成。select方法的作用,对感兴趣的通道操作,进行就绪状态的查询。
  2. Selector可以不断的查询Channel中发生的操作的就绪状态。并且挑选感兴趣的操作就绪状态。一旦通道有操作的就绪状态达成,并且是Selector感兴趣的操作,就会被Selector选中,放入选择键集合中。
  3. 一个选择键,首先是包含了注册在Selector的通道操作的类型,比方说SelectionKey.OP_READ。也包含了特定的通道与特定的选择器之间的注册关系。
  4. 选择键的概念,和事件的概念比较相似。一个选择键类似监听器模式里边的一个事件。由于Selector不是事件触发的模式,而是主动去查询的模式,所以不叫事件 Event,而是叫SelectionKey选择键。

2. Selector的使用方法

2.1 Selector的创建

通过调用 Selector.open()方法创建一个 Selector对象

java
// 1、获取 Selector 选择器
Selector selector = Selector.open();

2.2 注册Channel到Selector

要实现Selector管理Channel,需要将channel注册到相应的Selector上

java
// 1、获取 Selector 选择器
Selector selector = Selector.open();
// 2、获取通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 3.设置为非阻塞
serverSocketChannel.configureBlocking(false);
// 4、绑定连接
serverSocketChannel.bind(new InetSocketAddress(9999));
// 5、将通道注册到选择器上,并制定监听事件为:"接收"事件
serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);

代码说明:

  1. 与 Selector 一起使用时,Channel必须处于非阻塞模式下,否则将抛出异常IllegalBlockingModeException。这意味着,FileChannel 不能与 Selector一起使用,因为FileChannel不能切换到非阻塞模式,而套接字相关的所有的通道都可以。
  2. 一个通道,并没有一定要支持所有的四种操作。比如服务器通道ServerSocketChannel支持Accept接受操作,而SocketChannel客户端通道则不支持。 可以通过通道上的validOps()方法,来获取特定通道下所有支持的操作集合。

2.3 轮询查询就绪操作

  1. 通过Selector的select()方法,可以查询出已经就绪的通道操作,这些就绪的状态集合,包存在一个元素是SelectionKey对象的Set集合中。
  2. 下面是Selector几个重载的查询 select()方法:
    • select():阻塞到至少有一个通道在你注册的事件上就绪了。
    • select(long timeout):和 select()一样,但最长阻塞事件为timeout毫秒。
    • selectNow():非阻塞,只要有通道就绪就立刻返回。
    • select()方法返回的int值,表示有多少通道已经就绪,更准确的说,是自前一次select方法以来到这一次select方法之间的时间段上,有多少通道变成就绪状态。
      例如:首次调用select()方法,如果有一个通道变成就绪状态,返回了1,若再次调用select()方法,如果另一个通道就绪了,它会再次返回1。如果对第一个就绪的 channel没有做任何操作,现在就有两个就绪的通道,但在每次select()方法调用之间,只有一个通道就绪了。一旦调用select()方法,并且返回值不为0时,在Selector中有一个selectedKeys()方法,用来访问已选择键集合,迭代集合的每一个选择键元素,根据就绪操作的类型,完成对应的操作:
java
Set selectedKeys = selector.selectedKeys();
Iterator keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()) {
    SelectionKey key = keyIterator.next();
    if(key.isAcceptable()) {
    // a connection was accepted by a ServerSocketChannel. 
    } else if (key.isConnectable()) {

    }
    keyIterator.remove();
}

2.3 停止选择的方法

选择器执行选择的过程,系统底层会依次询问每个通道是否已经就绪,这个过程可能会造成调用线程进入阻塞状态,那么我们有以下三种方式可以唤醒在select()方法中阻塞的线程。

  • wakeup()方法: 通过调用Selector对象的wakeup()方法让处在阻塞状态的select()方法立刻返回该方法使得选择器上的第一个还没有返回的选择操作立即返回。如果当前没有进行中的选择操作,那么下一次对select()方法的一次调用将立即返回。
  • close()方法 :通过 close()方法关闭Selector,该方法使得任何一个在选择操作中阻塞的线程都被唤醒(类似wakeup()),同时使得注册到该Selector的所有Channel被注销,所有的键将被取消,但是 Channel本身并不会关闭。

3. Selector的示例

java
public class ServerSelectorDemo {
    public static void main(String[] args) throws IOException {
        // 1. 创建一个serverchannel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        // 2. 设定非阻塞
        serverSocketChannel.configureBlocking(false);
        // 3. 绑定端口
        serverSocketChannel.bind(new InetSocketAddress(8000));
        // 4. 创建一个selector
        Selector selector = Selector.open();
        // 5. 注册selector,关注OP_ACCEPT状态
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        // 6 轮询selector, 遍历得到的状态集合
        ByteBuffer byteBuffer = ByteBuffer.allocate(200);
        while (selector.select() > 0) {
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey selectionKey = iterator.next();
                // 7. 判断有客户端连接就绪,有就注册selector状态为read
                if (selectionKey.isAcceptable()) {
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    socketChannel.configureBlocking(false);
                    socketChannel.register(selector, SelectionKey.OP_READ);
                } else if (selectionKey.isReadable()) {
                    // 8. 读取客户端的数据
                    SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                    byteBuffer.clear();
                    int length = 0;
                    // 直到读完
                    while ((length = socketChannel.read(byteBuffer))>0){
                        socketChannel.read(byteBuffer);
                        byteBuffer.flip();
                        System.out.println("输入内容:"+new String(byteBuffer.array(), 0 , length));
                    }
                }
                iterator.remove();
            }
        }
    }
}
java
public class ClientSelectorDemo {
    public static void main(String[] args) throws IOException {
        // 1. 创建一个socketChannel
        SocketChannel socketChannel = SocketChannel.open();
        // 2. 请求地址和端口
        socketChannel.connect(new InetSocketAddress("127.0.0.1", 8000));
        ByteBuffer writeBuffer = ByteBuffer.allocate(32);
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()){
            writeBuffer.put((scanner.next()).getBytes());
            writeBuffer.flip();
            socketChannel.write(writeBuffer);
            writeBuffer.clear();
        }
    }
}