02-BIO

nobility 发布于 2021-07-29 2477 次阅读


BIO

BIO模型图解

阻塞式IO模型(一客户端一线程)

BIO通信模型

多人聊天室实现思路

服务端

  • 使用一个Map集合存储在线的用户,使用ConcurrentHashMap来保证线程安全
  • 该类有addClient()向集合中添加在线用户
  • 该类有removeClient()从集合中删除在线用户并关闭连接
  • 该类有forwardMessage()转发消息,给除啦自己以外的其他客户端
  • 该类有clientHandler()来创建与客户端交互的线程,使用线程池来复用线程,从而提高效率

客户端

  • 该类有send()向服务端发送消息
  • 该类有receive()接收服务端消息
  • 该类有userInputHandler()创建与客户交互的线程,与响应服务端消息的线程不同,避免由于输入的阻塞而无法即时受到服务端消息

多人聊天室实现代码

Server

public class Server {
  private final static int PORT = 8080;
  private static final String QUIT = "quit";
  private final ConcurrentHashMap<String, Writer> connectedClients = new ConcurrentHashMap<>();  //在线列表,保证线程安全
  private final ExecutorService executorService = Executors.newFixedThreadPool(10);  //使用线程池提高效率

  public String getId(Socket clientSocket) {
    return clientSocket.getInetAddress() + ":" + clientSocket.getPort();
  }
  
  public boolean isQuit(String message) {
    return QUIT.equals(message);
  }
  
  public void addClient(Socket clientSocket) throws IOException {
    if (Objects.nonNull(clientSocket)) {
      String id = getId(clientSocket);
      BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(clientSocket.getOutputStream()));
      connectedClients.put(id, writer);
      System.out.println("客户端[" + id + "], 已连接");
    }
  }

  public void removeClient(Socket clientSocket) throws IOException {
    if (Objects.nonNull(clientSocket)) {
      String id = getId(clientSocket);
      if (connectedClients.containsKey(id)) {
        Writer writer = connectedClients.remove(id);  //删除
        writer.close();  //并关闭客户端
      }
      System.out.println("客户端[" + id + "], 已断开");
    }
  }

  public void forwardMessage(Socket clientSocket, String message) throws IOException {
    if (Objects.isNull(clientSocket) || Objects.isNull(message)) {
      return;
    }
    String id = getId(clientSocket);
    for (String forwardId : connectedClients.keySet()) {
      if (!forwardId.equals(id)) {  //排除自己
        Writer writer = connectedClients.get(forwardId);
        writer.write(message + "\n");  //进行消息转发,最后加一个换行符,为了客户端能使用readLine()方法
        writer.flush();
      }
    }
  }

  public void clientHandler(Socket clientSocket) {
    Runnable runnable = () -> {
      String id = getId(clientSocket);
      try {
        addClient(clientSocket);
        BufferedReader reader = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
        String message;
        while ((message = reader.readLine()) != null) {  //若能读取到客户端一行内容
          System.out.println(id + "说: " + message);
          forwardMessage(clientSocket, id + "说: " + message);  //转发给其他客户端
          if (isQuit(message)) {  //若客户端说要退出则不再接收消息
            break;  //若此处没有判断客户端是否退出则无法关闭客户端连接,导致此处死循环
          }
        }
      } catch (IOException e) {
        e.printStackTrace();
      } finally {
        try {
          removeClient(clientSocket);  //客户端退出后从在线列表中删除
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
    };
    //new Thread(runnable).start();  //启动新线程
    executorService.execute(runnable); //向线程池中提交任务
  }

  public void start() {
    try (ServerSocket serverSocket = new ServerSocket(PORT)) {
      System.out.println("服务器监听在" + PORT + "端口");
      while (true) {
        Socket clientSocket = serverSocket.accept();  //等待客户端连接
        clientHandler(clientSocket);  //处理客户端连接
      }
    } catch (IOException e) {
      e.printStackTrace();
      System.out.println(PORT + "端口被占用");
    }
  }

  public static void main(String[] args) {
    new Server().start();
  }
}

Client

public class Client {
  private static final String HOST = "localhost";
  private static final int PORT = 8080;
  private static final String QUIT = "quit";
  private Socket socket;  //客户端
  private BufferedWriter writer;  //由该客户端创建的输出流
  private BufferedReader reader;  //由该客户端创建的输入流

  public void send(String message) throws IOException {
    if (!socket.isOutputShutdown()) {  //若输出流未关闭
      writer.write(message + "\n");  //就进行数据发送,最后加一个换行符,为了服务端能使用readLine()方法
      writer.flush();
    }
  }

  public String receive() throws IOException {
    String message = null;
    if (!socket.isInputShutdown()) {  //若输入流未关闭
      message = reader.readLine();  //就读取服务端一行数据
    }
    return message;
  }

  public boolean isQuit(String message) {
    return QUIT.equals(message);
  }

  public void userInputHandler() {
    Runnable runnable = () -> {
      try (BufferedReader consoleReader = new BufferedReader(new InputStreamReader(System.in))) {
        while (true) {
          String inputMessage = consoleReader.readLine();  //读取客户端输入消息
          if (inputMessage != null) {  //发送消息
            send(inputMessage);
          }
          if (isQuit(inputMessage)) {  //检查用户是否退出连接
            break;
          }
        }
      } catch (IOException e) {
        e.printStackTrace();
        System.out.println("[" + HOST + ":" + PORT + "]服务器未启动");
      }
    };
    new Thread(runnable).start();
  }

  public void start() {
    try (Socket socket = new Socket(HOST, PORT)) {
      this.socket = socket;
      this.writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
      this.reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
      userInputHandler();  //处理用户输入
      String message;
      while ((message = receive()) != null) {  //若能读到服务器返回的消息
        System.out.println(message);  //就输出消息
      }
    } catch (IOException e) {
      e.printStackTrace();
      System.out.println("[" + HOST + ":" + PORT + "]服务器未启动");
    }
  }

  public static void main(String[] args) {
    new Client().start();
  }
}

BIO的缺点

  • 每建立一个连接都需要创建独立的线程与客户端交互,当并发数较大时,就会创建大量的线程进行处理,系统资源大
  • 连接建立后,若该线程一直没有交互,则该线程就会处于阻塞,造成线程资源的浪费
此作者没有提供个人介绍
最后更新于 2021-07-29