Skip to content

NIO:non-blocking io 非阻塞 IO

三大组件(概念)

Channel & Buffer

Channel 类似于 Stream,它是读写数据的双向通道,可以从 Channel 将数据读入 Buffer,也可以把 Buffer 中的数据写入 Channel;Buffer 则用来缓冲读写数据。

常见的 Channel:FileChannelDatagramChannelSocketChannelServerSocketChannel

常见的 Buffer:ByteBufferMappedByteBufferDirctByteBufferHeapByteBuffer)、ShortBufferIntBufferLongBufferFloatBufferDoubleBufferCharBuffer;最常用的 ByteBuffer

案例

java
@Slf4j
public class TestByteBuffer {
    public static void main(String[] args) {
        // 从文件中读取数据
        try (FileChannel channel = new FileInputStream("data.txt").getChannel()) {
            // 准备缓冲区
            ByteBuffer byteBuffer = ByteBuffer.allocate(10);    // 给他分配了 10 个字节的空间
            while (true) {
                // 从 channel 中读取数据,向 byteBuffer 中写
                int read = channel.read(byteBuffer);
                // 切换为读模式
                byteBuffer.flip();
                // 打印 byteBuffer 中的数据
                while (byteBuffer.hasRemaining()) { // 是否还有剩余数据
                    log.debug("{}", (char) byteBuffer.get());
                }

                // 如果为 -1 就代表没有数据了
                if (read == -1) break;

                // 切换为写模式
                byteBuffer.clear();
                // byteBuffer.compact();
            }
        } catch (IOException e) {
            log.error("error", e);
        }
    }
}

上面是通过 输入流 获取 data.txt 文件中内容的一个案例,上面通过 ByteBufferFileChannel 中的数据读出来。

通过 ByteBuffer.allocate(10) 创建出来一个 HeapByteBuffer 的实例,并且只给它分配 10 个字节的空间。这代表着他一次性只能缓冲 10 个字节。

写完之后通过调用 flip() 方法让 ByteBuffer 变化 读模式 开始输出内容

ByteBuffer

结构图
txt
+---------------------------+
|       ByteBuffer          |
+---------------------------+
| - mark (标记位置)         |  // 标记某个特定的位置
| - position (当前位置)     |  // 当前读/写的位置
| - limit (限制位置)        |  // 可读/写的最大位置
| - capacity (容量)         |  // 缓冲区的总容量
| - data (实际存储的数据)    |  // 存储的字节数组或直接内存
+---------------------------+

mark:用于标记当前的 position 值,方便后续通过 reset() 方法恢复到该位置;默认值为 -1,表示未设置

position:当前读/写操作的位置;初始值为 0,随着读写操作逐渐增加

limit:表示缓冲区中可读/写的最大位置。在写模式下,limit 通常等于 capacity;在读模式下,limit 表示有效数据的末尾 capacity:缓冲区的总容量,表示缓冲区最多可以容纳多少字节。创建时确定,不可更改 data:实际存储的字节数据;数据可以存储在堆内存(HeapByteBuffer)或直接内存(DirectByteBuffer)中


现在假设创建了一个 10 字节的 ByteBuffer 状态会经历以下变化:

txt
# 初始状态
+----+----+----+----+----+----+----+----+----+----+
|    |    |    |    |    |    |    |    |    |    |
+----+----+----+----+----+----+----+----+----+----+
position = 0, limit = 10, capacity = 10

# 写入 6 字节数据
+----+----+----+----+----+----+----+----+----+----+
| A  | B  | C  | D  | E  | F  |    |    |    |    |
+----+----+----+----+----+----+----+----+----+----+
position = 6, limit = 10, capacity = 10

# 切换到读模式
+----+----+----+----+----+----+----+----+----+----+
| A  | B  | C  | D  | E  | F  |    |    |    |    |
+----+----+----+----+----+----+----+----+----+----+
position = 0, limit = 6, capacity = 10

# 读取 3 字节后
+----+----+----+----+----+----+----+----+----+----+
| A  | B  | C  | D  | E  | F  |    |    |    |    |
+----+----+----+----+----+----+----+----+----+----+
position = 3, limit = 6, capacity = 10

# 重置为写模式(clear)
+----+----+----+----+----+----+----+----+----+----+
| A  | B  | C  | D  | E  | F  |    |    |    |    |
+----+----+----+----+----+----+----+----+----+----+
position = 0, limit = 10, capacity = 10

# 重置为写模式(compact)
+----+----+----+----+----+----+----+----+----+----+
| D  | E  | F  |    |    |    |    |    |    |    |
+----+----+----+----+----+----+----+----+----+----+
position = 3, limit = 10, capacity = 10

clear 的重置写是把 position 重置为 0 ,到时候新数据来的时候把原来的内容覆盖掉;而 compact 写模式则是把没有读到的部分全部向前压缩,然后切换为写模式之前的数据还是保留的

常用方法

put(byte b):写入一个字节(也可以调用 channelread() 方法写入)

get():读取一个字节(也可以调用 channelwrite() 方法读取);直接通过 get() 读取字节会让 position 后移,但是使用 get(Integer) 的时候不会后移

hasRemaining():判断当前元素和极限位置之间是否还存在元素

flip():切换到读模式,将 limit 设置为当前 position,并将 position 置为 0

clear():重置缓冲区为写模式,position 置为 0,limit 置为 capacity

rewind():将 position 置为 0,保持 limit 不变

mark() & reset():标记和恢复 position;调用 mark 标记当前 position 的位置,然后进行后续操作,如果想回复原本的位置则调用 reset 即可

实例化

因为 ByteBuffer 本身是一个抽象类所以只能实例化子类,可以通过以下方法:

  1. ByteBuffer.allocate(Number) - HeapByteBuffer
  2. ByteBuffer.allocateDirect(Number) - DirectByteBuffer

allocate() 使用 Java 的 堆内存allocateDirect() 使用的是 直接内存

区别:

  • 堆内存:读写效率低、会受到 GC 的影响、分配内存的效率高
  • 直接内存:读写效率高(少一次拷贝)、不会受 GC 影响、分配内存的效率低

Selector

Selector 是一个选择器,用来配合一个线程来管理多个 Channel,获取这些 Channel 上发生的事件,这些 Channel 工作在非阻塞模式下,不会让线程死磕在一个 Channel 上,适合连接数特别多但流量低的场景。

调用 Selector 的 select() 会阻塞直到 Channel 发生了读写就绪事件,这些事件发生,select 方法就会返回这些事件交给 thread 来处理

注意:Channel 必须得是非阻塞模式才能正常让 Selector 使用

网络编程

API的具体使用

ServerSocketChannel

它是服务端专用;通过调用 ServerSocketChannelopen() 静态方法来创建一个服务器

java
import java.net.InetSocketAddress;
import java.nio.channels.ServerSocketChannel;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

ServerSocketChannel ssc = ServerSocketChannel.open();   // 创建服务器
ssc.configureBlocking(false);   // 非阻塞模式(默认阻塞)
ssc.bind(new InetSocketAddress(8080));  // 绑定端口

// 通过 accept 方法获取客户端的连接
SocketChannel accept = open.accept();
ByteBuffer byteBuffer = ByteBuffer.allocate(16);
accept.read(byteBuffer);

byteBuffer.flip();
System.out.println(new String(byteBuffer.array()));

以上是一个简易服务器案例,只能被连接一次

accept.read() 方法是一个阻塞方法,也可以像 ServerSocketChannel 一样设置 configureBlocking 方法让其成为非阻塞

连接方法 accept()

这个方法时阻塞的,当有客户端的连接的时候才会继续往下执行;可以通过设置 ServerSocketChannel 实例的 configureBlocking() 方法为 false 让其变为非阻塞(默认为 true);但是如果设置为非阻塞的情况下又没有客户端的连接时方法会返回 null

对于连接成功后它会返回一个 SocketChannel 对象,这个对象表示一个客户端的连接;可以通过这个对象进行数据传输,比如以上案例就是通过 ByteBuffer 进行数据读取然后展示

SocketChannel

这个时客户端和服务器都可以用,这里以客户端来使用

java
// 创建 SocketChannel
SocketChannel sc = SocketChannel.open();
// 绑定服务器连接地址和端口
sc.connect(new InetSocketAddress("localhost", 8080));
// 向服务器发送数据
sc.write(ByteBuffer.wrap("hello world".getBytes()));

跟上面的 ServerSocketChannel 一样都是调用自己的 open() 静态类来完成实例的创建,通过 connect() 方法配置信息然后连接

Selector

这是一个选择器,它主要是用来在一个线程中处理多个连接;案例:

java
// 打开一个选择器(Selector),用于监听多个通道(Channel)上的事件,如连接、读写等
Selector selector = Selector.open();

// 打开一个服务器套接字通道(ServerSocketChannel),用于监听客户端连接
ServerSocketChannel scc = ServerSocketChannel.open();
scc.bind(new InetSocketAddress(8080));

// 设置通道为非阻塞模式,这是使用 Selector 的前提条件
scc.configureBlocking(false);

// 将 ServerSocketChannel 注册到 Selector 上,并监听“接受连接”事件(OP_ACCEPT)
scc.register(selector, SelectionKey.OP_ACCEPT);

// 阻塞等待至少一个注册的事件发生(即有客户端连接)
selector.select();

// 获取所有已发生的事件对应的 SelectionKey 集合的迭代器
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();

// 遍历所有事件
while (iterator.hasNext()) {
    SelectionKey key = iterator.next();

    // 判断事件是否是“接受连接”事件
    if (key.isAcceptable()) {
        // 获取触发事件的通道(ServerSocketChannel)
        ServerSocketChannel channel = (ServerSocketChannel) key.channel();

        // 接受客户端连接,得到与客户端通信的 SocketChannel
        SocketChannel sc = channel.accept();

        // 获取客户端数据并输出
        ByteBuffer byteBuffer = ByteBuffer.allocate(16);
        sc.read(byteBuffer);
        byteBuffer.flip();
        System.out.println(new String(byteBuffer.array()));
    }
}

创建好 Selector 之后还需要调用 Channel 的注册方法去把自己注册到 Selector 中;scc.register() 方法会返回一个 SelectionKey 实例,可以通过这个实例来配置事件类型等

在注册 Channel 的时候必须选择一个事件类型,或者通过返回的 SelectionKey 实例选择事件类型,必须得有一个,否则 Selector 不会帮你干任何事情

以 连接事件 举例,在事件来临之后,可以调用 accept() 方法来处理事件。如果没有处理事件的话 selector.select() 方法是不会阻塞的,而是会一直尝试让你处理这个事件。

对于不想处理的事件可以调用 key.cancel() 来取消这个事件,当事件被取消之后并且没有别的事件后 selector.select() 将被重新阻塞

四种事件类型

事件Java 静态常量描述
acceptSelectionKey.OP_ACCEPT(服务端)有连接请求时触发
connectSelectionKey.OP_CONNECT(客户端)连接建立后触发
readSelectionKey.OP_READ可读事件
writeSelectionKey.OP_WRITE可写事件

Selector 内部集合机制

Selector 作为多路复用核心组件,其内部维护着两个关键集合:

  1. allKeys:存储所有已注册的 SelectionKey(包含 channel 与 event 的映射关系)
  2. selectedKeys:存储当前触发事件的 SelectionKey

当调用 Channel.register() 方法时,系统会将 SelectionKey 添加到 allKeys 集合。当某个 channel 触发事件时,对应的 SelectionKey 会被自动添加到 selectedKeys 集合(注意:两个集合中的对象是同一个实例)。

⚠️ 关键注意事项:selectedKeys() 方法只负责添加事件标记为已处理,但不会自动清除已处理的 Key。若不手动移除,会导致:

  • 下次 select 时重复处理相同事件
  • 可能引发空指针异常(NPE)
示例代码
java
// 创建 Selector 并注册 ServerSocketChannel
Selector selector = Selector.open();
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.register(selector, SelectionKey.OP_ACCEPT);

while (true) {
    // 等待事件触发
    selector.select();
    
    // 获取已触发事件的集合
    Set<SelectionKey> selectedKeys = selector.selectedKeys();
    Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
    
    while (keyIterator.hasNext()) {
        SelectionKey key = keyIterator.next();
        
        if (key.isAcceptable()) {
            // 处理 Accept 事件
            ServerSocketChannel server = (ServerSocketChannel) key.channel();
            SocketChannel client = server.accept();
            // 注册新连接的读事件
            client.configureBlocking(false);
            client.register(selector, SelectionKey.OP_READ);
        } else if (key.isReadable()) {
            // 处理 Read 事件
            // ......
        }
        
        // 必须手动清除已处理的 Key
        keyIterator.remove();
    }
}