NIO:non-blocking io 非阻塞 IO
三大组件(概念)
Channel & Buffer
Channel 类似于 Stream,它是读写数据的双向通道,可以从 Channel 将数据读入 Buffer,也可以把 Buffer 中的数据写入 Channel;Buffer 则用来缓冲读写数据。
常见的 Channel:FileChannel、DatagramChannel、SocketChannel、ServerSocketChannel
常见的 Buffer:ByteBuffer(MappedByteBuffer、DirctByteBuffer、HeapByteBuffer)、ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer、CharBuffer;最常用的 ByteBuffer
案例
@Slf4j
public class TestByteBuffer {
public static void main(String[] args) {
// 从文件中读取数据
try (FileChannel channel = new FileInputStream("data.txt").getChannel()) {
// 准备缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate(10); // 给他分配了 10 个字节的空间
while (true) {
// 从 channel 中读取数据,向 byteBuffer 中写
int read = channel.read(byteBuffer);
// 切换为读模式
byteBuffer.flip();
// 打印 byteBuffer 中的数据
while (byteBuffer.hasRemaining()) { // 是否还有剩余数据
log.debug("{}", (char) byteBuffer.get());
}
// 如果为 -1 就代表没有数据了
if (read == -1) break;
// 切换为写模式
byteBuffer.clear();
// byteBuffer.compact();
}
} catch (IOException e) {
log.error("error", e);
}
}
}上面是通过 输入流 获取 data.txt 文件中内容的一个案例,上面通过 ByteBuffer 把 FileChannel 中的数据读出来。
通过 ByteBuffer.allocate(10) 创建出来一个 HeapByteBuffer 的实例,并且只给它分配 10 个字节的空间。这代表着他一次性只能缓冲 10 个字节。
写完之后通过调用 flip() 方法让 ByteBuffer 变化 读模式 开始输出内容
ByteBuffer
结构图
+---------------------------+
| ByteBuffer |
+---------------------------+
| - mark (标记位置) | // 标记某个特定的位置
| - position (当前位置) | // 当前读/写的位置
| - limit (限制位置) | // 可读/写的最大位置
| - capacity (容量) | // 缓冲区的总容量
| - data (实际存储的数据) | // 存储的字节数组或直接内存
+---------------------------+mark:用于标记当前的 position 值,方便后续通过 reset() 方法恢复到该位置;默认值为 -1,表示未设置
position:当前读/写操作的位置;初始值为 0,随着读写操作逐渐增加
limit:表示缓冲区中可读/写的最大位置。在写模式下,limit 通常等于 capacity;在读模式下,limit 表示有效数据的末尾 capacity:缓冲区的总容量,表示缓冲区最多可以容纳多少字节。创建时确定,不可更改 data:实际存储的字节数据;数据可以存储在堆内存(HeapByteBuffer)或直接内存(DirectByteBuffer)中
现在假设创建了一个 10 字节的 ByteBuffer 状态会经历以下变化:
# 初始状态
+----+----+----+----+----+----+----+----+----+----+
| | | | | | | | | | |
+----+----+----+----+----+----+----+----+----+----+
position = 0, limit = 10, capacity = 10
# 写入 6 字节数据
+----+----+----+----+----+----+----+----+----+----+
| A | B | C | D | E | F | | | | |
+----+----+----+----+----+----+----+----+----+----+
position = 6, limit = 10, capacity = 10
# 切换到读模式
+----+----+----+----+----+----+----+----+----+----+
| A | B | C | D | E | F | | | | |
+----+----+----+----+----+----+----+----+----+----+
position = 0, limit = 6, capacity = 10
# 读取 3 字节后
+----+----+----+----+----+----+----+----+----+----+
| A | B | C | D | E | F | | | | |
+----+----+----+----+----+----+----+----+----+----+
position = 3, limit = 6, capacity = 10
# 重置为写模式(clear)
+----+----+----+----+----+----+----+----+----+----+
| A | B | C | D | E | F | | | | |
+----+----+----+----+----+----+----+----+----+----+
position = 0, limit = 10, capacity = 10
# 重置为写模式(compact)
+----+----+----+----+----+----+----+----+----+----+
| D | E | F | | | | | | | |
+----+----+----+----+----+----+----+----+----+----+
position = 3, limit = 10, capacity = 10clear 的重置写是把 position 重置为 0 ,到时候新数据来的时候把原来的内容覆盖掉;而 compact 写模式则是把没有读到的部分全部向前压缩,然后切换为写模式之前的数据还是保留的
常用方法
put(byte b):写入一个字节(也可以调用 channel 的 read() 方法写入)
get():读取一个字节(也可以调用 channel 的 write() 方法读取);直接通过 get() 读取字节会让 position 后移,但是使用 get(Integer) 的时候不会后移
hasRemaining():判断当前元素和极限位置之间是否还存在元素
flip():切换到读模式,将 limit 设置为当前 position,并将 position 置为 0
clear():重置缓冲区为写模式,position 置为 0,limit 置为 capacity
rewind():将 position 置为 0,保持 limit 不变
mark() & reset():标记和恢复 position;调用 mark 标记当前 position 的位置,然后进行后续操作,如果想回复原本的位置则调用 reset 即可
实例化
因为 ByteBuffer 本身是一个抽象类所以只能实例化子类,可以通过以下方法:
ByteBuffer.allocate(Number)-HeapByteBufferByteBuffer.allocateDirect(Number)-DirectByteBuffer
allocate() 使用 Java 的 堆内存,allocateDirect() 使用的是 直接内存
区别:
- 堆内存:读写效率低、会受到 GC 的影响、分配内存的效率高
- 直接内存:读写效率高(少一次拷贝)、不会受 GC 影响、分配内存的效率低
Selector
Selector 是一个选择器,用来配合一个线程来管理多个 Channel,获取这些 Channel 上发生的事件,这些 Channel 工作在非阻塞模式下,不会让线程死磕在一个 Channel 上,适合连接数特别多但流量低的场景。
调用 Selector 的 select() 会阻塞直到 Channel 发生了读写就绪事件,这些事件发生,select 方法就会返回这些事件交给 thread 来处理
注意:Channel 必须得是非阻塞模式才能正常让 Selector 使用
网络编程
API的具体使用
ServerSocketChannel
它是服务端专用;通过调用 ServerSocketChannel 的 open() 静态方法来创建一个服务器
import java.net.InetSocketAddress;
import java.nio.channels.ServerSocketChannel;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
ServerSocketChannel ssc = ServerSocketChannel.open(); // 创建服务器
ssc.configureBlocking(false); // 非阻塞模式(默认阻塞)
ssc.bind(new InetSocketAddress(8080)); // 绑定端口
// 通过 accept 方法获取客户端的连接
SocketChannel accept = open.accept();
ByteBuffer byteBuffer = ByteBuffer.allocate(16);
accept.read(byteBuffer);
byteBuffer.flip();
System.out.println(new String(byteBuffer.array()));以上是一个简易服务器案例,只能被连接一次
accept.read() 方法是一个阻塞方法,也可以像 ServerSocketChannel 一样设置 configureBlocking 方法让其成为非阻塞
连接方法 accept()
这个方法时阻塞的,当有客户端的连接的时候才会继续往下执行;可以通过设置 ServerSocketChannel 实例的 configureBlocking() 方法为 false 让其变为非阻塞(默认为 true);但是如果设置为非阻塞的情况下又没有客户端的连接时方法会返回 null
对于连接成功后它会返回一个 SocketChannel 对象,这个对象表示一个客户端的连接;可以通过这个对象进行数据传输,比如以上案例就是通过 ByteBuffer 进行数据读取然后展示
SocketChannel
这个时客户端和服务器都可以用,这里以客户端来使用
// 创建 SocketChannel
SocketChannel sc = SocketChannel.open();
// 绑定服务器连接地址和端口
sc.connect(new InetSocketAddress("localhost", 8080));
// 向服务器发送数据
sc.write(ByteBuffer.wrap("hello world".getBytes()));跟上面的 ServerSocketChannel 一样都是调用自己的 open() 静态类来完成实例的创建,通过 connect() 方法配置信息然后连接
Selector
这是一个选择器,它主要是用来在一个线程中处理多个连接;案例:
// 打开一个选择器(Selector),用于监听多个通道(Channel)上的事件,如连接、读写等
Selector selector = Selector.open();
// 打开一个服务器套接字通道(ServerSocketChannel),用于监听客户端连接
ServerSocketChannel scc = ServerSocketChannel.open();
scc.bind(new InetSocketAddress(8080));
// 设置通道为非阻塞模式,这是使用 Selector 的前提条件
scc.configureBlocking(false);
// 将 ServerSocketChannel 注册到 Selector 上,并监听“接受连接”事件(OP_ACCEPT)
scc.register(selector, SelectionKey.OP_ACCEPT);
// 阻塞等待至少一个注册的事件发生(即有客户端连接)
selector.select();
// 获取所有已发生的事件对应的 SelectionKey 集合的迭代器
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
// 遍历所有事件
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
// 判断事件是否是“接受连接”事件
if (key.isAcceptable()) {
// 获取触发事件的通道(ServerSocketChannel)
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
// 接受客户端连接,得到与客户端通信的 SocketChannel
SocketChannel sc = channel.accept();
// 获取客户端数据并输出
ByteBuffer byteBuffer = ByteBuffer.allocate(16);
sc.read(byteBuffer);
byteBuffer.flip();
System.out.println(new String(byteBuffer.array()));
}
}创建好 Selector 之后还需要调用 Channel 的注册方法去把自己注册到 Selector 中;scc.register() 方法会返回一个 SelectionKey 实例,可以通过这个实例来配置事件类型等
在注册 Channel 的时候必须选择一个事件类型,或者通过返回的 SelectionKey 实例选择事件类型,必须得有一个,否则 Selector 不会帮你干任何事情
以 连接事件 举例,在事件来临之后,可以调用 accept() 方法来处理事件。如果没有处理事件的话 selector.select() 方法是不会阻塞的,而是会一直尝试让你处理这个事件。
对于不想处理的事件可以调用 key.cancel() 来取消这个事件,当事件被取消之后并且没有别的事件后 selector.select() 将被重新阻塞
四种事件类型
| 事件 | Java 静态常量 | 描述 |
|---|---|---|
| accept | SelectionKey.OP_ACCEPT | (服务端)有连接请求时触发 |
| connect | SelectionKey.OP_CONNECT | (客户端)连接建立后触发 |
| read | SelectionKey.OP_READ | 可读事件 |
| write | SelectionKey.OP_WRITE | 可写事件 |
Selector 内部集合机制
Selector 作为多路复用核心组件,其内部维护着两个关键集合:
- allKeys:存储所有已注册的
SelectionKey(包含 channel 与 event 的映射关系) - selectedKeys:存储当前触发事件的
SelectionKey
当调用 Channel.register() 方法时,系统会将 SelectionKey 添加到 allKeys 集合。当某个 channel 触发事件时,对应的 SelectionKey 会被自动添加到 selectedKeys 集合(注意:两个集合中的对象是同一个实例)。
⚠️ 关键注意事项:
selectedKeys()方法只负责添加事件标记为已处理,但不会自动清除已处理的 Key。若不手动移除,会导致:
- 下次 select 时重复处理相同事件
- 可能引发空指针异常(NPE)
示例代码
// 创建 Selector 并注册 ServerSocketChannel
Selector selector = Selector.open();
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// 等待事件触发
selector.select();
// 获取已触发事件的集合
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if (key.isAcceptable()) {
// 处理 Accept 事件
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel client = server.accept();
// 注册新连接的读事件
client.configureBlocking(false);
client.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {
// 处理 Read 事件
// ......
}
// 必须手动清除已处理的 Key
keyIterator.remove();
}
}