03-NIO

nobility 发布于 2021-07-30 1127 次阅读


NIO

NIO相关概念

  • Channel:使用双向的Channel代替单向的Stream,而且可以是非阻塞式的,也就是单个线程中可以处理多个Channel的IO
  • Buffer:其实Channel双向操作就是通过Buffer来完成的,从Buffer中读取数据到Channel,从Channel写入数据到Buffer
  • Selector:使用Selector监控多条Channel的状态,当某个Channel已处于某个状态时,就会触发对应的状态事件,再逐个处理即可

Channel

  • 相比Stream来说,Stream是单向的分为输入流输出流,而Channel是双向的既可以读也可以写
  • Channel的双向操作依赖于Buffer,但是Channel之间也是可以transfer()方法直接进行数据交互
  • Channel可以通过configureBlocking()方法选择阻塞式或非阻两种模式(ServerSocketChannelSocketChannel有这个方法,FileChannel中没有)

常用的Channel类如下:

  • FileChannel:文件通道
  • ServerSocketChannel:服务端Socket通道
  • SocketChannel:Socket通道

Buffer

  • Channel的双向操作依赖于Buffer,当向Channel中读写数据时,一定是要对Buffer进行读写操作
  • Buffer就是内存中的一块可读写的连续空间,这有点向数组,当初始化Buffer时需要先规定好容量
Buffer的结构

Buffer中有以下几个重要指针,当Buffer初始化后各个指针的位置应该如图所示

  • position:当前读写位置
  • limit:可读写到的最大位置,也就是limit后的区域不能读写
  • capacity:最大容量位置
  • mark:标记位置,调用mark()方法在某个位置进行标记,调用reset()方法可将position指针恢复到标记位置

Buffer的结构

Buffer基本操作

可通过hasRemaining()方法判断这次读是否将数据读完

  • flip():从写模式转化到读模式
  • clear():清空Buffer,从读模式转化到写模式
  • compact():压缩Buffer,从读模式转化到写模式

Buffer基本操作

Selector

若Channel是非阻塞的,那么就需要监听该通道是否处于可操作状态,在Selector中注册Channel,就可以让Selector起到监听Channel的作用(可选择性的监听,只监听我们感兴趣的状态)

Channel的状态有如下几种(不包括不可操作状态):这里说的是ServerSocketChannelSocketChannel

  • CONNECT:客户端与服务端建立了连接
  • ACCEPT:服务端接受了客户端连接请求
  • READ:可读取数据
  • WRITE:可写入数据

在监听到某个Channel正处于我们所监听的状态时,再获取该Channel对应的SelectionKey对象,通过SelectionKey对象执行相应的操作

SelectionKey对象

SelectionKey对象就相当于在Selector中注册的Channel的ID,每一个Channel都对应着一个SelectionKey对象

方法名 描述
int interestOps() 返回该Channel所有监听着的感兴趣状态
int readyOps() 返回该Channel监听着的感兴趣状态,当前是已经准备好的
SelectableChannel channel() 返回该Channel对象
Selector selector() 返回该Selector对象
Object attach(Object ob) 为该SelectionKey附加一个对象
Object attachment() 获取该SelectionKey附加的对象
Selector的使用流程
  1. 在Selector上注册Channel,监听其想监听的状态
  2. 调用Selector的select()方法(该方法是阻塞式的),若有某个Channel处于想监听的状态时该方法就会被唤醒,返回有几个已经监听到的感兴趣状态
  3. 调用Selector的selectedKeys()方法,返回一个装着SelectionKey对象的Set集合,该集合中只是已经监听到的感兴趣状态的Channel的SelectionKey
  4. 使用SelectionKey对象进行相应的处理(类似BIO的,若ServerSocketChannel触发ACCEPT客户端连接状态,使用accept()方法来生产与之通信的SocketChannel
  5. 处理完后,Selector不会自动将Channel置为不可操作状态,需手动置为不可操作状态

NIO与BIO性能对比

interface copyFile {
  void copyFile(File source, File target);
}

class NoBufferStreamCopy implements copyFile {
  @Override
  public void copyFile(File source, File target) {
    try (FileInputStream fileInputStream = new FileInputStream(source);
         FileOutputStream fileOutputStream = new FileOutputStream(target)) {
      int result;
      while ((result = fileInputStream.read()) != -1) {  //每读一个字节
        fileOutputStream.write(result);  //就写一个字节
      }
    } catch (FileNotFoundException e) {
      e.printStackTrace();
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
}

class BufferStreamCopy implements copyFile {
  @Override
  public void copyFile(File source, File target) {
    try (BufferedInputStream bufferedInputStream = new BufferedInputStream(new FileInputStream(source));
         BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(new FileOutputStream(target))) {
      byte[] buffer = new byte[1024];  //缓冲区大小
      int result;
      while ((result = bufferedInputStream.read(buffer)) != -1) {  //每读多少到缓冲区
        bufferedOutputStream.write(buffer, 0, result);  //就写写多少到缓冲区
      }
    } catch (FileNotFoundException e) {
      e.printStackTrace();
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
}

class NioBufferCopy implements copyFile {
  @Override
  public void copyFile(File source, File target) {
    try (FileChannel sourceFileChannel = new FileInputStream(source).getChannel();
         FileChannel targetFileChannel = new FileOutputStream(target).getChannel()) {
      ByteBuffer byteBuffer = ByteBuffer.allocate(1024);  //创建一个1024byte容量的Buffer对象
      while ((sourceFileChannel.read(byteBuffer)) != -1) {  //每从源文件读多少到缓冲区,对于Buffer来说是写模式
        byteBuffer.flip();  //将Buffer转化为读模式
        targetFileChannel.write(byteBuffer);  //就从缓冲区写到目标文件,对于Buffer来说是读模式
        if (byteBuffer.hasRemaining()) {  //若Buffer中还有剩余可读的数据,也就是未读完
          byteBuffer.compact();  //就压缩Buffer,并从读模式转化到写模式
        } else {  //若Buffer中没有剩余可读数据,也就是全部读完
          byteBuffer.clear();  //就清空Buffer,并从读模式转化到写模式
        }
      }
    } catch (FileNotFoundException e) {
      e.printStackTrace();
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
}

class NioTransferCopy implements copyFile {
  @Override
  public void copyFile(File source, File target) {
    try (FileChannel sourceFileChannel = new FileInputStream(source).getChannel();
         FileChannel targetFileChannel = new FileOutputStream(target).getChannel()) {
      long position = 0L;
      long size = sourceFileChannel.size();
      while (position != size) {
        //transferTo()方法接收的参数分别是:此次起始位置、此次最大位置、此次目标通道
        //返回值是:此次复制到的位置,有可能是0也就是此次未复制,源码注释中已经说明
        position += sourceFileChannel.transferTo(position, size, targetFileChannel);
        //System.out.println(position);  //经过测试我的电脑,当文件大于2GB时一次复制不完,进入下一个循环
      }
    } catch (FileNotFoundException e) {
      e.printStackTrace();
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
}

public class Main {
  public static void main(String[] args) {
    File source = new File("src/main/resources/test.txt");
    File target = new File("src/main/resources/test-copy.txt");
    try (FileOutputStream fileOutputStream = new FileOutputStream(source)) {
      source.createNewFile();  //创建源文件
      byte[] bytes = new byte[1024 * 1024];  //1KB
      fileOutputStream.write(bytes);  //写入点内容
      /*进行测试*/
      testCopy(new NoBufferStreamCopy(), source, target);
      testCopy(new BufferStreamCopy(), source, target);
      testCopy(new NioBufferCopy(), source, target);
      testCopy(new NioTransferCopy(), source, target);
    } catch (IOException e) {
      e.printStackTrace();
    }
    source.delete();  //文件的输入流关闭后在进行测试文件的删除,否则会删除失败
  }

  public static void testCopy(copyFile copyFile, File source, File target) {
    System.out.println("--------" + copyFile.getClass() + "testCopy--------");
    long start = System.currentTimeMillis();
    copyFile.copyFile(source, target);
    long end = System.currentTimeMillis();
    System.out.println(copyFile.getClass() + ":" + (end - start));
    target.delete();  //拷贝完毕删除复制的文件
  }
}

NIO模型图解

NIO+Selector的IO多路复用,Selector的select()方法本身是阻塞式的,若监听到某个IO发生状态变化才会返回,从而减少非阻塞式的不断询问操作系统内核的开销,该方法返回后根据IO的状态变化在进行相应的处理

NIO通信模型

多人聊天室改进思路

服务端
  • 通过监听Channel的状态对其进行相应的处理,单个线程内完成
  • 使用两个Buffer分别用于发送消息和接收消息
  • forwardMessage()方法改进思路:由于受到Buffer大小的影响,可能会出现Buffer溢出情况,所以添加repeatedlyWrite()方法多次进行对Buffer写入操作
  • receive()方法改进思路:由于受到Buffer大小的影响,可能会出现Buffer溢出情况,所以使用每次读出来的数据使用字符串拼接形式进行多次接收
客户端
  • 使用两个Buffer分别用于发送消息和接收消息
  • 由于受到Buffer大小的影响,可能会出现Buffer溢出情况,处理与服务端一致,多次分批发送消息,可多次接收消息进行字符串拼接
  • userInputHandler()与原来一样,创建与客户交互的线程,与响应服务端消息的线程不同,避免由于输入的阻塞而无法即时受到服务端消息

多人聊天室实现代码

Server

public class Server {
  private final static int PORT = 8080;
  private static final String QUIT = "quit";
  private static final int BUFFER_SIZE = 1024;  //可调整小一点进行分批写入Channel测试
  ByteBuffer readBuffer = ByteBuffer.allocate(BUFFER_SIZE);
  ByteBuffer writeBuffer = ByteBuffer.allocate(BUFFER_SIZE);

  public boolean isQuit(String message) {
    return QUIT.equals(message);
  }

  public String getId(SocketChannel socketChannel) {
    Socket socket = socketChannel.socket();  //socket()会返回该Channel的socket对象
    return socket.getInetAddress() + ":" + socket.getPort();
  }

  public void forwardMessage(SelectionKey selectionKey, String message) throws IOException {
    SocketChannel clientChannel = (SocketChannel) selectionKey.channel();  //该对象一定是SocketChannel,获取该Channel
    Set<SelectionKey> selectionKeys = selectionKey.selector().keys();  //keys()方法返回所有已注册在该Selector上的Channel的selectionKey
    for (SelectionKey key : selectionKeys) {
      SelectableChannel currentChannel = key.channel();  //当前遍历到的Channel
      if (currentChannel instanceof ServerSocketChannel) {  //若是服务器的ServerSocketChannel
        continue;  //就跳过
      }
      if (key.isValid() && !clientChannel.equals(currentChannel)) {  //若当前Channel处于有效状态,并且排除自己,才进行转发
        SocketChannel currentSocketChannel = (SocketChannel) currentChannel;  //将当前遍历到的Channel转型成SocketChannel
        message = getId(clientChannel) + ":" + message + "\n";  //增加消息头
        repeatedlyWrite(message, currentSocketChannel);  //将消息分批传入当前Channel
      }
    }
  }

  private void repeatedlyWrite(String message, SocketChannel socketChannel) throws IOException {
    byte[] bytes = message.getBytes(StandardCharsets.UTF_8);  //将String转化为字节
    writeBuffer.clear();  //清除Buffer,转为写模式
    int offset = 0;  //偏移量
    int capacity = writeBuffer.capacity();  //写Buffer最大容量
    while (offset < bytes.length) {  //若偏移量小于要写入Buffer的数组大小,就要写入
      if (bytes.length > capacity) {  //若一次写不完
        writeBuffer.put(bytes, offset, capacity);  //分批写入,一次写满
        offset += capacity;  //偏移量递增容量
      } else {  //若一次可以写完
        writeBuffer.put(bytes, offset, bytes.length);  //直接写完
        offset += bytes.length;  //偏移量递增写入的值
      }
      if (offset + capacity > bytes.length) {  //若未来偏移量会越界
        capacity = bytes.length - offset;  //就改变要写入的容量,通过修改capacity间接影响下一次写入的量
      }
      writeBuffer.flip();  //改变Buffer为读模式
      while (writeBuffer.hasRemaining()) {  //若Buffer还有数据为被读完
        socketChannel.write(writeBuffer);  //就一直向Channel中写
      }
      writeBuffer.clear();  //改变Buffer为写模式
    }
  }

  public String receive(SocketChannel socketChannel) throws IOException {
    String message = "";
    while (socketChannel.read(readBuffer) > 0) {  //若从socketChannel能读出数据
      readBuffer.flip();  //将Buffer转化为读模式
      Charset charset = StandardCharsets.UTF_8;
      CharBuffer charBuffer = charset.decode(readBuffer);
      message += charBuffer.toString();  //将已写入Buffer中的数据读出,拼接message
      readBuffer.clear();  //就清空Buffer,并从读模式转化到写模式
    }
    return message;
  }

  private void handler(SelectionKey selectionKey) throws IOException {
    if (selectionKey.isAcceptable()) {  //若当前Channel是ACCEPT状态,说明有客户端要与服务器建立连接
      ServerSocketChannel ServerSocketChannel = (ServerSocketChannel) selectionKey.channel();  //该对象一定是ServerSocketChannel,获取该Channel
      SocketChannel socketChannel = ServerSocketChannel.accept();  //返回对应生成与之通讯的成客户端Channel
      socketChannel.configureBlocking(false);  //将Channel设置为非阻塞模式
      socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ);  //向Selector中注册该Channel的READ状态,若处于该状态说明客户端发来消息了
      System.out.println("客户端[" + getId(socketChannel) + "], 已连接");
    }
    if (selectionKey.isReadable()) {  //若当前Channel是READ状态,说明有客户端发来消息了
      SocketChannel socketChannel = (SocketChannel) selectionKey.channel();  //该对象一定是SocketChannel,获取该Channel
      String message = receive(socketChannel);  //接收消息
      System.out.println(getId(socketChannel) + ":" + message);  //服务端打印消息
      forwardMessage(selectionKey, message);  //转发消息
      if (isQuit(message)) {  //检查该客户端是否要退出
        selectionKey.cancel();  //取消该Channel的注册
        selectionKey.selector().wakeup();  //由于Selector的select()方法是阻塞式,使用wakeup()可通知Selector更新
        System.out.println(getId(socketChannel) + "已断开");
      }
    }
  }

  public void start() {
    try (ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();  //创建ServerSocketChannel对象
         Selector selector = Selector.open()) {  //创建Selector对象
      serverSocketChannel.configureBlocking(false);  //将Channel设置为非阻塞模式
      serverSocketChannel.socket().bind(new InetSocketAddress(PORT));  //绑定端口,socket()会返回该Channel的socket对象
      serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);   //向Selector中注册ServerSocketChannel的ACCEPT状态
      System.out.println("服务器监听在" + PORT + "端口");
      while (true) {
        selector.select();  //监听已注册的Channel状态
        Set<SelectionKey> selectionKeys = selector.selectedKeys();  //获取已经监听到的感兴趣状态的Channel的SelectionKey集合
        for (SelectionKey selectionKey : selectionKeys) {
          handler(selectionKey);
        }
        selectionKeys.clear();  //手动将这些Channel置为不可操作状态
      }
    } catch (IOException e) {
      e.printStackTrace();
    }
  }

  public static void main(String[] args) {
    new Server().start();
  }
}

Client

public class Client {
  private static final String HOST = "localhost";
  private static final int PORT = 8080;
  private static final String QUIT = "quit";
  private static final int BUFFER_SIZE = 1024;   //可调整小一点进行分批写入Channel测试,会将消息分隔
  ByteBuffer writeBuffer = ByteBuffer.allocate(BUFFER_SIZE);
  ByteBuffer readBuffer = ByteBuffer.allocate(BUFFER_SIZE);

  public boolean isQuit(String message) {
    return QUIT.equals(message);
  }

  public void userInputHandler(SelectionKey selectionKey) {
    Runnable runnable = () -> {
      try (BufferedReader consoleReader = new BufferedReader(new InputStreamReader(System.in))) {
        while (true) {
          String inputMessage = consoleReader.readLine();  //读取客户端输入消息
          if (inputMessage != null) {  //发送消息
            send(selectionKey, inputMessage);
          }
          if (isQuit(inputMessage)) {  //检查用户是否退出连接
            selectionKey.selector().close();
            break;
          }
        }
      } catch (IOException e) {
        e.printStackTrace();
        System.out.println("[" + HOST + ":" + PORT + "]服务器未启动");
      }
    };
    new Thread(runnable).start();
  }

  private void send(SelectionKey selectionKey, String inputMessage) throws IOException {
    SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
    repeatedlyWrite(inputMessage, socketChannel);  //将输入消息分批传入当前Channel
  }

  private void repeatedlyWrite(String message, SocketChannel socketChannel) throws IOException {
    byte[] bytes = message.getBytes(StandardCharsets.UTF_8);  //将String转化为字节
    writeBuffer.clear();  //清除Buffer,转为写模式
    int offset = 0;  //偏移量
    int capacity = writeBuffer.capacity();  //写Buffer最大容量
    while (offset < bytes.length) {  //若偏移量小于要写入Buffer的数组大小,就要写入
      if (bytes.length > capacity) {  //若一次写不完
        writeBuffer.put(bytes, offset, capacity);  //分批写入,一次写满
        offset += capacity;  //偏移量递增容量
      } else {  //若一次可以写完
        writeBuffer.put(bytes, offset, bytes.length);  //直接写完
        offset += bytes.length;  //偏移量递增写入的值
      }
      if (offset + capacity > bytes.length) {  //若未来偏移量会越界
        capacity = bytes.length - offset;  //就改变要写入的容量,通过修改capacity间接影响下一次写入的量
      }
      writeBuffer.flip();  //改变Buffer为读模式
      while (writeBuffer.hasRemaining()) {  //若Buffer还有数据为被读完
        socketChannel.write(writeBuffer);  //就一直向Channel中写
      }
      writeBuffer.clear();  //改变Buffer为写模式
    }
  }

  public String receive(SocketChannel socketChannel) throws IOException {
    String message = "";
    while (socketChannel.read(readBuffer) > 0) {  //若从socketChannel能读出数据
      readBuffer.flip();  //将Buffer转化为读模式
      Charset charset = StandardCharsets.UTF_8;
      CharBuffer charBuffer = charset.decode(readBuffer);
      message += charBuffer.toString();  //将已写入Buffer中的数据读出,拼接message
      readBuffer.clear();  //就清空Buffer,并从读模式转化到写模式
    }
    return message;
  }

  private void handler(SelectionKey selectionKey) throws IOException {
    if (selectionKey.isConnectable()) {  //若当前Channel是CONNECT状态,说明正在与服务器建立连接
      SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
      if (socketChannel.isConnectionPending()) {  //若当前连接已经建立完成
        socketChannel.finishConnect();  //完成建立连接的过程
        userInputHandler(selectionKey);  //处理用户输入
      }
      socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ);  //向Selector中注册该Channel的READ状态,若处于该状态说明服务器发来消息了
    }
    if (selectionKey.isReadable()) {  //若当前Channel是READ状态,说明服务器发来了消息
      SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
      String message = receive(socketChannel);  //接收服务端消息
      System.out.print(message);  //客户端打印消息
    }
  }

  public void start() {
    try (SocketChannel socketChannel = SocketChannel.open();  //创建SocketChannel对象
         Selector selector = Selector.open()) {  //创建Selector对象
      socketChannel.configureBlocking(false);  //将Channel设置为非阻塞模式
      socketChannel.register(selector, SelectionKey.OP_CONNECT);  //向Selector中注册SocketChannel的CONNECT状态
      socketChannel.connect(new InetSocketAddress(HOST, PORT));  //连接到主机和端口
      while (true) {
        selector.select();  //监听已注册的Channel状态
        Set<SelectionKey> selectionKeys = selector.selectedKeys();  //获取已经监听到的感兴趣状态的Channel的SelectionKey集合
        for (SelectionKey selectionKey : selectionKeys) {
          handler(selectionKey);
        }
        selectionKeys.clear();  //手动将这些Channel置为不可操作状态
      }
    } catch (ClosedSelectorException e) {  //若用户userInputHandler线程中关闭了Selector,这里就正常退出即可
      System.out.println("正常退出");
    } catch (IOException e) {
      e.printStackTrace();
    }
  }

  public static void main(String[] args) {
    new Client().start();
  }
}

NIO缺点

从操作系统内核层面看,应用程序一定要和操作系统内核进行交互,才能真正的实现不同进程、不同机器之间信息交互,以网络编程为例,应用程序获取数据需要经历下面步骤

  1. 通过物理连接到达机器(网卡接收数据)
  2. 将数据拷贝到操作系统内核缓冲区
  3. 从内核缓冲区复制到应用程序缓冲区,此后才能从应用程序层面取到数据

操作系统层面的IO模型有

  • 阻塞式IO:应用程序通过系统调用向操作系统内核发出IO请求,若IO还为准备好,该系统调用就会被阻塞,直到IO已经准备好并且已经拷贝到应用程序缓冲区,此系统调用才会成功返回
  • 非阻塞式IO:应用程序通过系统调用向操作系统内核发出IO请求,该系统调用会立刻失败返回,告知应用程序IO还未准备好,应用程序需要多次询问操作系统内核IO是否已经准备就绪,直到IO已经准备好并且已经拷贝到应用程序缓冲区后,此系统调用才会成功返回
  • IO多路复用:应用程序通过系统调用向操作系统内核发出监听一些IO状态变化的请求,若这些IO的状态未发生变化,该系统调用就会被阻塞,知道某个IO发生状态变化,此系统调用就会返回,此时应用程序询问操作系统内核IO是否已经准备就绪,从而减少了无目的性的询问操作系统内核所带来的开销

无论是NIO还是BIO都属于同步模型,所以若应用程序只要不主动系统调用,那么应用程序就没有办法收到数据,以至于该主动的系统调用是无法省略的,而异步IO可以解决此问题

异步IO:应用程序通过系统调用向操作系统内核发出IO请求,该系统调用会立刻失败返回,当IO已经准备好并且已经拷贝到应用程序缓冲区后,操作系统内核会给该应用程序传递一个信号,来通知应用程序IO已经准备好

此作者没有提供个人介绍
最后更新于 2021-07-30