04-AIO

nobility 发布于 2021-08-05 952 次阅读


AIO

AIO相关概念

Linux系统的AIO还不成熟(2021年),可从这里看到,Windows系统的AIO比较成熟,可从Windows的JDK源码包看到有实现,但是服务器多数都是Linux系统

Java中的AIO是异步非阻塞式,所以使用的是Channel,与BIO和NIO类似的,使用AsyncSocketChannelAsyncServerSoketChannel两个类分别表示双端,若使用AsyncServerSoketChannel发现客户端连接使用accept()方法来生成与之通信的AsyncSocketChannel

Future

像调用同步Channel的方法那样调用异步Channel的方法,会返回一个多线程编程中常使用的Future对象,使用该对象来获取未来该异步调用的返回结果

CompletionHandler

在调用异步Channel时传入一个CompletionHandler接口实现类,该接口要求实现两个回调函数,分别是成功的回调completed()和失败的回调failed(),回调函数的参数中有此次异步调用的返回结果,这有点像JavaScript中的Promise

回调函数会在另一个守护线程中执行,为了保证守护线程不会被销毁,所以需要在此处将主线程阻塞,这些守护线程运行在,一个异步通道组,其实就类似运行在一个线程池中,若不指定则会使用默认的异步通道组

在使用open()方法创建Socket对象时,可指定异步通道组AsynchronousChannelGroup,在使用异步通道组类的withCachedThreadPool()静态方法可将线程池包装成AsynchronousChannelGroup对象

AIO实现CS程序

Server

使用CompletionHandler异步模式

public class Server {
  private final static int PORT = 8080;
  private static final int BUFFER_SIZE = 1024;

  /**
   * 在回调的内部定义读回调和写回调,从而使得可在回调的回调中使用到内部定义的回调,达到循环调用的目的
   */
  private void planOne(AsynchronousSocketChannel asynchronousSocketChannel) {
    ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);  //在回调中创建Buffer保证线程安全,一个用户使用一块Buffer
    CompletionHandler<Integer, Object> readCallback = new CompletionHandler<>() {  //读取Channel内容到Buffer
      @Override
      public void completed(Integer result, Object attachment) {
        if (result <= 0) return;  //若未读到内容,说明客户端已经断开连接,无需在回调,直接返回即可
        CompletionHandler<Integer, Object> readCallback = this;  //this  == readCallback
        CompletionHandler<Integer, Object> writeCallback = new CompletionHandler<>() {  //将Buffer中内容写入Channel
          @Override
          public void completed(Integer result, Object attachment) {
            if (result <= 0) return;  //若未写入内容,说明客户端已经断开连接,无需在回调,直接返回即可
            buffer.clear();  //将Buffer清空,转换为写模式
            asynchronousSocketChannel.read(buffer, null, readCallback);  //读取Channel内容到Buffer,之后在写
          }

          @Override
          public void failed(Throwable exc, Object attachment) {
            exc.printStackTrace();  //错误处理
          }
        };
        buffer.flip();  //将Buffer转换为读模式
        asynchronousSocketChannel.write(buffer, null, writeCallback);
      }

      @Override
      public void failed(Throwable exc, Object attachment) {
        exc.printStackTrace();  //错误处理
      }
    };
    asynchronousSocketChannel.read(buffer, null, readCallback);
  }

  /**
   * 使用附加对象进行互相回调的互相调用,向读回调传递写回调,向写回调传递读回调,达到循环调用的目的
   */
  private void planTwo(AsynchronousSocketChannel asynchronousSocketChannel) {
    ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);  //在回调中创建Buffer保证线程安全,一个用户使用一块Buffer
    CompletionHandler<Integer, Object> readCallback = new CompletionHandler<>() {  //读取Channel内容到Buffer
      @Override
      public void completed(Integer result, Object writeCallback) {
        if (result <= 0) return;  //若未读到内容,说明客户端已经断开连接,无需在回调,直接返回即可
        buffer.flip();  //将Buffer转换为读模式
        //this == readCallback
        asynchronousSocketChannel.write(buffer, this, (CompletionHandler<Integer, Object>) writeCallback);
      }

      @Override
      public void failed(Throwable exc, Object attachment) {
        exc.printStackTrace();  //错误处理
      }
    };
    CompletionHandler<Integer, Object> writeCallback = new CompletionHandler<>() {  //将Buffer中内容写入Channel
      @Override
      public void completed(Integer result, Object readCallback) {
        if (result <= 0) return;  //若未写入内容,说明客户端已经断开连接,无需在回调,直接返回即可
        buffer.clear();  //将Buffer清空,转换为写模式
        //this == writeCallback
        asynchronousSocketChannel.read(buffer, this, (CompletionHandler<Integer, Object>) readCallback);  //读取Channel内容到Buffer,之后在读
      }

      @Override
      public void failed(Throwable exc, Object attachment) {
        exc.printStackTrace();  //错误处理
      }
    };
    asynchronousSocketChannel.read(buffer, writeCallback, readCallback);
  }


  public void handler(AsynchronousSocketChannel asynchronousSocketChannel) {
    planOne(asynchronousSocketChannel);  //方案1
    //planTwo(asynchronousSocketChannel);  //方案2
  }

  public void start() {
    try (AsynchronousServerSocketChannel asynchronousServerSocketChannel =
             AsynchronousServerSocketChannel.open()) {  //创建AsynchronousServerSocketChannel对象
      asynchronousServerSocketChannel.bind(new InetSocketAddress(PORT));  //绑定端口
      System.out.println("服务器监听在" + PORT + "端口");
      while (true) {
        /**
         * accept()方法的CompletionHandler使用方式:
         * 第一个参数类似SelectionKey的attachment()方法的作用,为了更方便回调函数的执行,所以附加一个对象传递给回调函数
         * 第二个参数才是CompletionHandler接口的实现类
         * 回调函数会在另一个守护线程中执行,为了保证守护线程不会被销毁,所以需要在此处将主线程阻塞
         * 这些守护线程运行在一个线程池中,使用asynchronousServerSocketChannel.open()方法可指定线程池,若不指定则会使用默认的线程池
         */
        asynchronousServerSocketChannel.accept(null, new CompletionHandler<>() {
          /**
           * 该接口拥有两个泛型,分别是:异步Channel方法的返回值类型和附加对象的类型
           */
          @Override
          public void completed(AsynchronousSocketChannel result, Object attachment) {
            if (asynchronousServerSocketChannel.isOpen()) {  //主线程已经阻塞,若此时AsynchronousServerSocketChannel对象还未关闭
              asynchronousServerSocketChannel.accept(null, this);  //就需要继续使用该回调处理下一个客户端连接,相当于在此处循环
              //一直调用可能会有栈溢出的风险,但是好像Java底层已经处理过了,无需担心
            }
            if (result != null && result.isOpen()) {  //若与客户端通信的Channel没有关闭
              handler(result);  //就处理该客户端
            }
          }

          @Override
          public void failed(Throwable exc, Object attachment) {
            exc.printStackTrace();  //错误处理
          }
        });
        /*以下代码仅用于阻塞主线程,不至于accept()方法被调用的过于频繁,也不让回调函数所在守护线程终止*/
        synchronized (asynchronousServerSocketChannel) {
          try {
            asynchronousServerSocketChannel.wait();
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      }
    } catch (IOException e) {
      e.printStackTrace();
    }
  }

  public static void main(String[] args) {
    new Server().start();
  }
}

Client

使用Future异步模式

public class Client {
  private static final String HOST = "localhost";
  private static final int PORT = 8080;
  private static final String QUIT = "quit";

  public boolean isQuit(String message) {
    return QUIT.equals(message);
  }

  public void handler(AsynchronousSocketChannel asynchronousSocketChannel) throws ExecutionException, InterruptedException {
    try (BufferedReader consoleReader = new BufferedReader(new InputStreamReader(System.in))) {
      while (true) {
        String inputMessage = consoleReader.readLine();  //读取客户端输入消息
        if (inputMessage != null) {  //发送消息
          byte[] bytes = inputMessage.getBytes();  //将用户输入信息转换为字节
          ByteBuffer buffer = ByteBuffer.wrap(bytes);  //将该用户输入信息转换为Buffer
          Future<Integer> write = asynchronousSocketChannel.write(buffer);  //将Buffer中内容写入Channel
          write.get();  //等待写入完毕
          buffer.flip();  //将Buffer转为读模式
          Future<Integer> read = asynchronousSocketChannel.read(buffer);  //读取Channel内容到Buffer
          read.get(); //等待读取完毕
          System.out.println(new String(buffer.array()));  //将Buffer转换为字节数组,构建字符串,客户端打印消息
          buffer.clear(); //清空Buffer,并从读模式转化到写模式
        }
        if (isQuit(inputMessage)) {  //检查用户是否退出连接
          break;
        }
      }
    } catch (IOException e) {
      e.printStackTrace();
      System.out.println("[" + HOST + ":" + PORT + "]服务器未启动");
    }
  }

  public void start() {
    try (AsynchronousSocketChannel asynchronousSocketChannel =
             AsynchronousSocketChannel.open()) {  //创建AsynchronousSocketChannel对象
      Future<Void> future = asynchronousSocketChannel.connect(new InetSocketAddress(HOST, PORT));
      future.get();  //等待建立连接
      handler(asynchronousSocketChannel);
    } catch (IOException e) {
      e.printStackTrace();
    } catch (InterruptedException e) {
      e.printStackTrace();
    } catch (ExecutionException e) {
      e.printStackTrace();
    }
  }

  public static void main(String[] args) {
    new Client().start();
  }
}

AIO模型图解

AIO通信模型

多人聊天室改进思路

服务端

  • 使用自定义异步线程组
  • 使用一个Map集合存储在线的用户,使用ConcurrentHashMap来保证线程安全
  • 使用读写标志来区分不同操作调用的回调函数对象,来减少对象的创建

客户端

  • 使用Future方式进行异步操作
  • 使用userInputHandler()创建与客户交互的线程,与响应服务端消息的线程不同,避免由于输入的阻塞而无法即时受到服务端消息

多人聊天室实现代码

Server

public class Server {
  private final static int PORT = 8080;
  private static final String QUIT = "quit";
  private static final int BUFFER_SIZE = 1024;
  private static final int THREADED_SIZE = 10;

  private ConcurrentHashMap<String, AsynchronousSocketChannel> connectedClients = new ConcurrentHashMap<>();

  private static final Integer WRITE = 0;  //读标志
  private static final Integer READ = 1;  //写标志

  private class ClientCallBack implements CompletionHandler<Integer, Integer> {
    private AsynchronousSocketChannel asynchronousSocketChannel;
    private ByteBuffer buffer;

    public ClientCallBack(AsynchronousSocketChannel asynchronousSocketChannel, ByteBuffer buffer) {
      this.asynchronousSocketChannel = asynchronousSocketChannel;
      this.buffer = buffer;
    }

    private void forwardMessage(AsynchronousSocketChannel clientChannel, String message) {
      String id = getId(clientChannel);  //获取自己ID
      for (String forwardId : connectedClients.keySet()) {  //遍历在线列表
        if (!forwardId.equals(id)) {  //排除自己
          byte[] bytes = (id + ":" + message).getBytes();  //添加消息头,转为字节数组
          connectedClients.get(forwardId).write(ByteBuffer.wrap(bytes), WRITE, this);  //异步将Buffer内容写入通道
        }
      }
    }

    private String receive() {
      buffer.flip();  //改变Buffer为读模式
      Charset charset = StandardCharsets.UTF_8;
      CharBuffer charBuffer = charset.decode(buffer);
      buffer.clear();  //清空Buffer,并转换为写模式
      return charBuffer.toString();  //将已写入Buffer中的数据读出
    }

    @Override
    public void completed(Integer result, Integer attachment) {
      if (WRITE.equals(attachment)) {  //若是write()的回调
        return;  //什么也不做
      }
      if (READ.equals(attachment)) {  //若是read()的回调
        if (result <= 0) {  //若未读取到消息,说明客户端已经离线
          removeClient(asynchronousSocketChannel);  //就从在线列表中删除用户
        } else {  //若还能读取到客户端消息
          String message = receive();  //就接收客户端消息
          System.out.println(getId(asynchronousSocketChannel) + ":" + message);  //服务端打印
          forwardMessage(asynchronousSocketChannel, message);  //转发给其他客户端
          if (isQuit(message)) {  //检查用户是否决定退出
            removeClient(asynchronousSocketChannel);  //退出就从在线列表中删除用户
          } else {  //不退出就继续读取用户传来的消息
            asynchronousSocketChannel.read(buffer, READ, this);
          }
        }
      }
    }

    @Override
    public void failed(Throwable exc, Integer attachment) {
      exc.printStackTrace();
    }
  }

  public boolean isQuit(String message) {
    return QUIT.equals(message);
  }

  public String getId(AsynchronousSocketChannel asynchronousSocketChannel) {
    InetSocketAddress remoteAddress = null;
    try {
      remoteAddress = (InetSocketAddress) asynchronousSocketChannel.getRemoteAddress();
      //该方法返回远端地址,需要强转为InetSocketAddress对象即可获得IP和断开
    } catch (IOException e) {
      e.printStackTrace();
      return "获取客户端ID失败";
    }
    return remoteAddress.getAddress() + ":" + remoteAddress.getPort();
  }

  private void addClient(AsynchronousSocketChannel socketChannel) {
    String id = getId(socketChannel);  //计算ID
    connectedClients.put(id, socketChannel);  //添加该用户
    System.out.println(id + "已连接");
  }

  private void removeClient(AsynchronousSocketChannel socketChannel) {
    String id = getId(socketChannel);  //计算ID
    connectedClients.remove(id);  //删除该用户
    System.out.println(id + "已断连接");
  }

  public void handler(AsynchronousSocketChannel asynchronousSocketChannel) {
    addClient(asynchronousSocketChannel);  //将当前用户添加到在线列表
    ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);  //每个用户独享Buffer
    asynchronousSocketChannel.read(buffer, READ,  //异步读取用户消息
        new ClientCallBack(asynchronousSocketChannel, buffer));  //传入客户端回调函数对象
  }

  public void start() {
    try (AsynchronousServerSocketChannel asynchronousServerSocketChannel =  //创建AsynchronousServerSocketChannel对象
             AsynchronousServerSocketChannel.open(AsynchronousChannelGroup.  //使用自定义异步线程组
                 withThreadPool(Executors.  //包装线程池为异步线程组对象
                 newFixedThreadPool(THREADED_SIZE)))) {
      asynchronousServerSocketChannel.bind(new InetSocketAddress(PORT));  //绑定端口
      System.out.println("服务器监听在" + PORT + "端口");
      while (true) {
        asynchronousServerSocketChannel.accept(null, new CompletionHandler<>() {
          @Override
          public void completed(AsynchronousSocketChannel result, Object attachment) {
            if (asynchronousServerSocketChannel.isOpen()) {  //主线程已经阻塞,若此时AsynchronousServerSocketChannel对象还未关闭
              asynchronousServerSocketChannel.accept(null, this);  //就需要继续使用该回调处理下一个客户端连接,相当于在此处循环
            }
            if (result != null && result.isOpen()) {  //若与客户端通信的Channel没有关闭
              handler(result);  //就处理与该客户端通信
            }
          }

          @Override
          public void failed(Throwable exc, Object attachment) {
            exc.printStackTrace();  //错误处理
          }
        });
        /*以下代码仅用于阻塞主线程,不至于accept()方法被调用的过于频繁,也不让回调函数所在守护线程终止*/
        synchronized (asynchronousServerSocketChannel) {
          try {
            asynchronousServerSocketChannel.wait();
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      }
    } catch (IOException e) {
      e.printStackTrace();
    }
  }

  public static void main(String[] args) {
    new Server().start();
  }
}

Client

public class Client {
  private static final String HOST = "localhost";
  private static final int PORT = 8080;
  private static final String QUIT = "quit";
  private static final int BUFFER_SIZE = 1024;

  public boolean isQuit(String message) {
    return QUIT.equals(message);
  }

  public void userInputHandler(AsynchronousSocketChannel asynchronousSocketChannel) {
    Runnable runnable = () -> {
      try (BufferedReader consoleReader = new BufferedReader(new InputStreamReader(System.in))) {
        while (true) {
          String inputMessage = consoleReader.readLine();  //读取客户端输入消息
          if (inputMessage != null) {  //发送消息
            send(inputMessage, asynchronousSocketChannel);  //向asynchronousSocketChannel发送消息
          }
          if (isQuit(inputMessage)) {  //检查用户是否退出连接
            break;
          }
        }
      } catch (IOException e) {
        e.printStackTrace();
        System.out.println("[" + HOST + ":" + PORT + "]服务器未启动");
      }
    };
    new Thread(runnable).start();
  }

  private void send(String inputMessage, AsynchronousSocketChannel asynchronousSocketChannel) {
    ByteBuffer buffer = ByteBuffer.wrap(inputMessage.getBytes());  //将用户输入消息转成Buufer
    Future<Integer> future = asynchronousSocketChannel.write(buffer);  //向Buffer写入Channel
    try {
      future.get();  //等待发送完毕
    } catch (InterruptedException e) {
      e.printStackTrace();
    } catch (ExecutionException e) {
      e.printStackTrace();
    }
  }

  private String receive(ByteBuffer buffer) {
    buffer.flip();  //改变Buffer为读模式
    Charset charset = StandardCharsets.UTF_8;
    CharBuffer charBuffer = charset.decode(buffer);
    buffer.clear();  //清空Buffer,并转换为写模式
    return charBuffer.toString();  //将已写入Buffer中的数据读出
  }

  public void start() {
    try (AsynchronousSocketChannel asynchronousSocketChannel =
             AsynchronousSocketChannel.open()) {  //创建AsynchronousSocketChannel对象
      Future<Void> future = asynchronousSocketChannel.connect(new InetSocketAddress(HOST, PORT));  //连接服务器
      future.get();  //等待建立连接
      ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);  //用于接收Channel的Buffer
      userInputHandler(asynchronousSocketChannel);  //处理用户输入
      while (true) {  //接收服务端发送的消息
        Future<Integer> readResult = asynchronousSocketChannel.read(buffer);  //读取Channel中的数据到Buffer
        int result = readResult.get();  //等待读取完毕
        if (result <= 0) {  //若没有读到消息
          asynchronousSocketChannel.close();  //服务端已经关闭,关闭客户端通道
          break;  //跳出循环
        } else {
          String message = receive(buffer);  //接收Buffer中的消息
          System.out.println(message);  //客户端打印
        }
      }
    } catch (IOException e) {
      e.printStackTrace();
    } catch (InterruptedException e) {
      e.printStackTrace();
    } catch (ExecutionException e) {
      e.printStackTrace();
    }
  }

  public static void main(String[] args) {
    new Client().start();
  }
}
此作者没有提供个人介绍
最后更新于 2021-08-05