NIO
NIO相关概念
- Channel:使用双向的Channel代替单向的Stream,而且可以是非阻塞式的,也就是单个线程中可以处理多个Channel的IO
- Buffer:其实Channel双向操作就是通过Buffer来完成的,从Buffer中读取数据到Channel,从Channel写入数据到Buffer
- Selector:使用Selector监控多条Channel的状态,当某个Channel已处于某个状态时,就会触发对应的状态事件,再逐个处理即可
Channel
- 相比Stream来说,Stream是单向的分为输入流输出流,而Channel是双向的既可以读也可以写
- Channel的双向操作依赖于Buffer,但是Channel之间也是可以
transfer()
方法直接进行数据交互 - Channel可以通过
configureBlocking()
方法选择阻塞式或非阻两种模式(ServerSocketChannel
和SocketChannel
有这个方法,FileChannel
中没有)
常用的Channel类如下:
FileChannel
:文件通道ServerSocketChannel
:服务端Socket通道SocketChannel
:Socket通道
Buffer
- Channel的双向操作依赖于Buffer,当向Channel中读写数据时,一定是要对Buffer进行读写操作
- Buffer就是内存中的一块可读写的连续空间,这有点向数组,当初始化Buffer时需要先规定好容量
Buffer的结构
Buffer中有以下几个重要指针,当Buffer初始化后各个指针的位置应该如图所示
- position:当前读写位置
- limit:可读写到的最大位置,也就是limit后的区域不能读写
- capacity:最大容量位置
- mark:标记位置,调用
mark()
方法在某个位置进行标记,调用reset()
方法可将position指针恢复到标记位置
Buffer基本操作
可通过
hasRemaining()
方法判断这次读是否将数据读完
flip()
:从写模式转化到读模式clear()
:清空Buffer,从读模式转化到写模式compact()
:压缩Buffer,从读模式转化到写模式
Selector
若Channel是非阻塞的,那么就需要监听该通道是否处于可操作状态,在Selector中注册Channel,就可以让Selector起到监听Channel的作用(可选择性的监听,只监听我们感兴趣的状态)
Channel的状态有如下几种(不包括不可操作状态):这里说的是ServerSocketChannel
和SocketChannel
- CONNECT:客户端与服务端建立了连接
- ACCEPT:服务端接受了客户端连接请求
- READ:可读取数据
- WRITE:可写入数据
在监听到某个Channel正处于我们所监听的状态时,再获取该Channel对应的SelectionKey
对象,通过SelectionKey
对象执行相应的操作
SelectionKey对象
SelectionKey
对象就相当于在Selector中注册的Channel的ID,每一个Channel都对应着一个SelectionKey
对象
方法名 | 描述 |
---|---|
int interestOps() |
返回该Channel所有监听着的感兴趣状态 |
int readyOps() |
返回该Channel监听着的感兴趣状态,当前是已经准备好的 |
SelectableChannel channel() |
返回该Channel对象 |
Selector selector() |
返回该Selector对象 |
Object attach(Object ob) |
为该SelectionKey附加一个对象 |
Object attachment() |
获取该SelectionKey附加的对象 |
Selector的使用流程
- 在Selector上注册Channel,监听其想监听的状态
- 调用Selector的
select()
方法(该方法是阻塞式的),若有某个Channel处于想监听的状态时该方法就会被唤醒,返回有几个已经监听到的感兴趣状态 - 调用Selector的
selectedKeys()
方法,返回一个装着SelectionKey
对象的Set集合,该集合中只是已经监听到的感兴趣状态的Channel的SelectionKey
- 使用
SelectionKey
对象进行相应的处理(类似BIO的,若ServerSocketChannel
触发ACCEPT客户端连接状态,使用accept()
方法来生产与之通信的SocketChannel
) - 处理完后,Selector不会自动将Channel置为不可操作状态,需手动置为不可操作状态
NIO与BIO性能对比
interface copyFile {
void copyFile(File source, File target);
}
class NoBufferStreamCopy implements copyFile {
@Override
public void copyFile(File source, File target) {
try (FileInputStream fileInputStream = new FileInputStream(source);
FileOutputStream fileOutputStream = new FileOutputStream(target)) {
int result;
while ((result = fileInputStream.read()) != -1) { //每读一个字节
fileOutputStream.write(result); //就写一个字节
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
class BufferStreamCopy implements copyFile {
@Override
public void copyFile(File source, File target) {
try (BufferedInputStream bufferedInputStream = new BufferedInputStream(new FileInputStream(source));
BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(new FileOutputStream(target))) {
byte[] buffer = new byte[1024]; //缓冲区大小
int result;
while ((result = bufferedInputStream.read(buffer)) != -1) { //每读多少到缓冲区
bufferedOutputStream.write(buffer, 0, result); //就写写多少到缓冲区
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
class NioBufferCopy implements copyFile {
@Override
public void copyFile(File source, File target) {
try (FileChannel sourceFileChannel = new FileInputStream(source).getChannel();
FileChannel targetFileChannel = new FileOutputStream(target).getChannel()) {
ByteBuffer byteBuffer = ByteBuffer.allocate(1024); //创建一个1024byte容量的Buffer对象
while ((sourceFileChannel.read(byteBuffer)) != -1) { //每从源文件读多少到缓冲区,对于Buffer来说是写模式
byteBuffer.flip(); //将Buffer转化为读模式
targetFileChannel.write(byteBuffer); //就从缓冲区写到目标文件,对于Buffer来说是读模式
if (byteBuffer.hasRemaining()) { //若Buffer中还有剩余可读的数据,也就是未读完
byteBuffer.compact(); //就压缩Buffer,并从读模式转化到写模式
} else { //若Buffer中没有剩余可读数据,也就是全部读完
byteBuffer.clear(); //就清空Buffer,并从读模式转化到写模式
}
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
class NioTransferCopy implements copyFile {
@Override
public void copyFile(File source, File target) {
try (FileChannel sourceFileChannel = new FileInputStream(source).getChannel();
FileChannel targetFileChannel = new FileOutputStream(target).getChannel()) {
long position = 0L;
long size = sourceFileChannel.size();
while (position != size) {
//transferTo()方法接收的参数分别是:此次起始位置、此次最大位置、此次目标通道
//返回值是:此次复制到的位置,有可能是0也就是此次未复制,源码注释中已经说明
position += sourceFileChannel.transferTo(position, size, targetFileChannel);
//System.out.println(position); //经过测试我的电脑,当文件大于2GB时一次复制不完,进入下一个循环
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public class Main {
public static void main(String[] args) {
File source = new File("src/main/resources/test.txt");
File target = new File("src/main/resources/test-copy.txt");
try (FileOutputStream fileOutputStream = new FileOutputStream(source)) {
source.createNewFile(); //创建源文件
byte[] bytes = new byte[1024 * 1024]; //1KB
fileOutputStream.write(bytes); //写入点内容
/*进行测试*/
testCopy(new NoBufferStreamCopy(), source, target);
testCopy(new BufferStreamCopy(), source, target);
testCopy(new NioBufferCopy(), source, target);
testCopy(new NioTransferCopy(), source, target);
} catch (IOException e) {
e.printStackTrace();
}
source.delete(); //文件的输入流关闭后在进行测试文件的删除,否则会删除失败
}
public static void testCopy(copyFile copyFile, File source, File target) {
System.out.println("--------" + copyFile.getClass() + "testCopy--------");
long start = System.currentTimeMillis();
copyFile.copyFile(source, target);
long end = System.currentTimeMillis();
System.out.println(copyFile.getClass() + ":" + (end - start));
target.delete(); //拷贝完毕删除复制的文件
}
}
NIO模型图解
NIO+Selector的IO多路复用,Selector的select()
方法本身是阻塞式的,若监听到某个IO发生状态变化才会返回,从而减少非阻塞式的不断询问操作系统内核的开销,该方法返回后根据IO的状态变化在进行相应的处理
多人聊天室改进思路
服务端
- 通过监听Channel的状态对其进行相应的处理,单个线程内完成
- 使用两个Buffer分别用于发送消息和接收消息
forwardMessage()
方法改进思路:由于受到Buffer大小的影响,可能会出现Buffer溢出情况,所以添加repeatedlyWrite()
方法多次进行对Buffer写入操作receive()
方法改进思路:由于受到Buffer大小的影响,可能会出现Buffer溢出情况,所以使用每次读出来的数据使用字符串拼接形式进行多次接收
客户端
- 使用两个Buffer分别用于发送消息和接收消息
- 由于受到Buffer大小的影响,可能会出现Buffer溢出情况,处理与服务端一致,多次分批发送消息,可多次接收消息进行字符串拼接
userInputHandler()
与原来一样,创建与客户交互的线程,与响应服务端消息的线程不同,避免由于输入的阻塞而无法即时受到服务端消息
多人聊天室实现代码
Server
public class Server {
private final static int PORT = 8080;
private static final String QUIT = "quit";
private static final int BUFFER_SIZE = 1024; //可调整小一点进行分批写入Channel测试
ByteBuffer readBuffer = ByteBuffer.allocate(BUFFER_SIZE);
ByteBuffer writeBuffer = ByteBuffer.allocate(BUFFER_SIZE);
public boolean isQuit(String message) {
return QUIT.equals(message);
}
public String getId(SocketChannel socketChannel) {
Socket socket = socketChannel.socket(); //socket()会返回该Channel的socket对象
return socket.getInetAddress() + ":" + socket.getPort();
}
public void forwardMessage(SelectionKey selectionKey, String message) throws IOException {
SocketChannel clientChannel = (SocketChannel) selectionKey.channel(); //该对象一定是SocketChannel,获取该Channel
Set<SelectionKey> selectionKeys = selectionKey.selector().keys(); //keys()方法返回所有已注册在该Selector上的Channel的selectionKey
for (SelectionKey key : selectionKeys) {
SelectableChannel currentChannel = key.channel(); //当前遍历到的Channel
if (currentChannel instanceof ServerSocketChannel) { //若是服务器的ServerSocketChannel
continue; //就跳过
}
if (key.isValid() && !clientChannel.equals(currentChannel)) { //若当前Channel处于有效状态,并且排除自己,才进行转发
SocketChannel currentSocketChannel = (SocketChannel) currentChannel; //将当前遍历到的Channel转型成SocketChannel
message = getId(clientChannel) + ":" + message + "\n"; //增加消息头
repeatedlyWrite(message, currentSocketChannel); //将消息分批传入当前Channel
}
}
}
private void repeatedlyWrite(String message, SocketChannel socketChannel) throws IOException {
byte[] bytes = message.getBytes(StandardCharsets.UTF_8); //将String转化为字节
writeBuffer.clear(); //清除Buffer,转为写模式
int offset = 0; //偏移量
int capacity = writeBuffer.capacity(); //写Buffer最大容量
while (offset < bytes.length) { //若偏移量小于要写入Buffer的数组大小,就要写入
if (bytes.length > capacity) { //若一次写不完
writeBuffer.put(bytes, offset, capacity); //分批写入,一次写满
offset += capacity; //偏移量递增容量
} else { //若一次可以写完
writeBuffer.put(bytes, offset, bytes.length); //直接写完
offset += bytes.length; //偏移量递增写入的值
}
if (offset + capacity > bytes.length) { //若未来偏移量会越界
capacity = bytes.length - offset; //就改变要写入的容量,通过修改capacity间接影响下一次写入的量
}
writeBuffer.flip(); //改变Buffer为读模式
while (writeBuffer.hasRemaining()) { //若Buffer还有数据为被读完
socketChannel.write(writeBuffer); //就一直向Channel中写
}
writeBuffer.clear(); //改变Buffer为写模式
}
}
public String receive(SocketChannel socketChannel) throws IOException {
String message = "";
while (socketChannel.read(readBuffer) > 0) { //若从socketChannel能读出数据
readBuffer.flip(); //将Buffer转化为读模式
Charset charset = StandardCharsets.UTF_8;
CharBuffer charBuffer = charset.decode(readBuffer);
message += charBuffer.toString(); //将已写入Buffer中的数据读出,拼接message
readBuffer.clear(); //就清空Buffer,并从读模式转化到写模式
}
return message;
}
private void handler(SelectionKey selectionKey) throws IOException {
if (selectionKey.isAcceptable()) { //若当前Channel是ACCEPT状态,说明有客户端要与服务器建立连接
ServerSocketChannel ServerSocketChannel = (ServerSocketChannel) selectionKey.channel(); //该对象一定是ServerSocketChannel,获取该Channel
SocketChannel socketChannel = ServerSocketChannel.accept(); //返回对应生成与之通讯的成客户端Channel
socketChannel.configureBlocking(false); //将Channel设置为非阻塞模式
socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ); //向Selector中注册该Channel的READ状态,若处于该状态说明客户端发来消息了
System.out.println("客户端[" + getId(socketChannel) + "], 已连接");
}
if (selectionKey.isReadable()) { //若当前Channel是READ状态,说明有客户端发来消息了
SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); //该对象一定是SocketChannel,获取该Channel
String message = receive(socketChannel); //接收消息
System.out.println(getId(socketChannel) + ":" + message); //服务端打印消息
forwardMessage(selectionKey, message); //转发消息
if (isQuit(message)) { //检查该客户端是否要退出
selectionKey.cancel(); //取消该Channel的注册
selectionKey.selector().wakeup(); //由于Selector的select()方法是阻塞式,使用wakeup()可通知Selector更新
System.out.println(getId(socketChannel) + "已断开");
}
}
}
public void start() {
try (ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); //创建ServerSocketChannel对象
Selector selector = Selector.open()) { //创建Selector对象
serverSocketChannel.configureBlocking(false); //将Channel设置为非阻塞模式
serverSocketChannel.socket().bind(new InetSocketAddress(PORT)); //绑定端口,socket()会返回该Channel的socket对象
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); //向Selector中注册ServerSocketChannel的ACCEPT状态
System.out.println("服务器监听在" + PORT + "端口");
while (true) {
selector.select(); //监听已注册的Channel状态
Set<SelectionKey> selectionKeys = selector.selectedKeys(); //获取已经监听到的感兴趣状态的Channel的SelectionKey集合
for (SelectionKey selectionKey : selectionKeys) {
handler(selectionKey);
}
selectionKeys.clear(); //手动将这些Channel置为不可操作状态
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
new Server().start();
}
}
Client
public class Client {
private static final String HOST = "localhost";
private static final int PORT = 8080;
private static final String QUIT = "quit";
private static final int BUFFER_SIZE = 1024; //可调整小一点进行分批写入Channel测试,会将消息分隔
ByteBuffer writeBuffer = ByteBuffer.allocate(BUFFER_SIZE);
ByteBuffer readBuffer = ByteBuffer.allocate(BUFFER_SIZE);
public boolean isQuit(String message) {
return QUIT.equals(message);
}
public void userInputHandler(SelectionKey selectionKey) {
Runnable runnable = () -> {
try (BufferedReader consoleReader = new BufferedReader(new InputStreamReader(System.in))) {
while (true) {
String inputMessage = consoleReader.readLine(); //读取客户端输入消息
if (inputMessage != null) { //发送消息
send(selectionKey, inputMessage);
}
if (isQuit(inputMessage)) { //检查用户是否退出连接
selectionKey.selector().close();
break;
}
}
} catch (IOException e) {
e.printStackTrace();
System.out.println("[" + HOST + ":" + PORT + "]服务器未启动");
}
};
new Thread(runnable).start();
}
private void send(SelectionKey selectionKey, String inputMessage) throws IOException {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
repeatedlyWrite(inputMessage, socketChannel); //将输入消息分批传入当前Channel
}
private void repeatedlyWrite(String message, SocketChannel socketChannel) throws IOException {
byte[] bytes = message.getBytes(StandardCharsets.UTF_8); //将String转化为字节
writeBuffer.clear(); //清除Buffer,转为写模式
int offset = 0; //偏移量
int capacity = writeBuffer.capacity(); //写Buffer最大容量
while (offset < bytes.length) { //若偏移量小于要写入Buffer的数组大小,就要写入
if (bytes.length > capacity) { //若一次写不完
writeBuffer.put(bytes, offset, capacity); //分批写入,一次写满
offset += capacity; //偏移量递增容量
} else { //若一次可以写完
writeBuffer.put(bytes, offset, bytes.length); //直接写完
offset += bytes.length; //偏移量递增写入的值
}
if (offset + capacity > bytes.length) { //若未来偏移量会越界
capacity = bytes.length - offset; //就改变要写入的容量,通过修改capacity间接影响下一次写入的量
}
writeBuffer.flip(); //改变Buffer为读模式
while (writeBuffer.hasRemaining()) { //若Buffer还有数据为被读完
socketChannel.write(writeBuffer); //就一直向Channel中写
}
writeBuffer.clear(); //改变Buffer为写模式
}
}
public String receive(SocketChannel socketChannel) throws IOException {
String message = "";
while (socketChannel.read(readBuffer) > 0) { //若从socketChannel能读出数据
readBuffer.flip(); //将Buffer转化为读模式
Charset charset = StandardCharsets.UTF_8;
CharBuffer charBuffer = charset.decode(readBuffer);
message += charBuffer.toString(); //将已写入Buffer中的数据读出,拼接message
readBuffer.clear(); //就清空Buffer,并从读模式转化到写模式
}
return message;
}
private void handler(SelectionKey selectionKey) throws IOException {
if (selectionKey.isConnectable()) { //若当前Channel是CONNECT状态,说明正在与服务器建立连接
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
if (socketChannel.isConnectionPending()) { //若当前连接已经建立完成
socketChannel.finishConnect(); //完成建立连接的过程
userInputHandler(selectionKey); //处理用户输入
}
socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ); //向Selector中注册该Channel的READ状态,若处于该状态说明服务器发来消息了
}
if (selectionKey.isReadable()) { //若当前Channel是READ状态,说明服务器发来了消息
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
String message = receive(socketChannel); //接收服务端消息
System.out.print(message); //客户端打印消息
}
}
public void start() {
try (SocketChannel socketChannel = SocketChannel.open(); //创建SocketChannel对象
Selector selector = Selector.open()) { //创建Selector对象
socketChannel.configureBlocking(false); //将Channel设置为非阻塞模式
socketChannel.register(selector, SelectionKey.OP_CONNECT); //向Selector中注册SocketChannel的CONNECT状态
socketChannel.connect(new InetSocketAddress(HOST, PORT)); //连接到主机和端口
while (true) {
selector.select(); //监听已注册的Channel状态
Set<SelectionKey> selectionKeys = selector.selectedKeys(); //获取已经监听到的感兴趣状态的Channel的SelectionKey集合
for (SelectionKey selectionKey : selectionKeys) {
handler(selectionKey);
}
selectionKeys.clear(); //手动将这些Channel置为不可操作状态
}
} catch (ClosedSelectorException e) { //若用户userInputHandler线程中关闭了Selector,这里就正常退出即可
System.out.println("正常退出");
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
new Client().start();
}
}
NIO缺点
从操作系统内核层面看,应用程序一定要和操作系统内核进行交互,才能真正的实现不同进程、不同机器之间信息交互,以网络编程为例,应用程序获取数据需要经历下面步骤
- 通过物理连接到达机器(网卡接收数据)
- 将数据拷贝到操作系统内核缓冲区
- 从内核缓冲区复制到应用程序缓冲区,此后才能从应用程序层面取到数据
操作系统层面的IO模型有
- 阻塞式IO:应用程序通过系统调用向操作系统内核发出IO请求,若IO还为准备好,该系统调用就会被阻塞,直到IO已经准备好并且已经拷贝到应用程序缓冲区,此系统调用才会成功返回
- 非阻塞式IO:应用程序通过系统调用向操作系统内核发出IO请求,该系统调用会立刻失败返回,告知应用程序IO还未准备好,应用程序需要多次询问操作系统内核IO是否已经准备就绪,直到IO已经准备好并且已经拷贝到应用程序缓冲区后,此系统调用才会成功返回
- IO多路复用:应用程序通过系统调用向操作系统内核发出监听一些IO状态变化的请求,若这些IO的状态未发生变化,该系统调用就会被阻塞,知道某个IO发生状态变化,此系统调用就会返回,此时应用程序询问操作系统内核IO是否已经准备就绪,从而减少了无目的性的询问操作系统内核所带来的开销
无论是NIO还是BIO都属于同步模型,所以若应用程序只要不主动系统调用,那么应用程序就没有办法收到数据,以至于该主动的系统调用是无法省略的,而异步IO可以解决此问题
异步IO:应用程序通过系统调用向操作系统内核发出IO请求,该系统调用会立刻失败返回,当IO已经准备好并且已经拷贝到应用程序缓冲区后,操作系统内核会给该应用程序传递一个信号,来通知应用程序IO已经准备好
Comments NOTHING