BIO
BIO模型图解
阻塞式IO模型(一客户端一线程)
多人聊天室实现思路
服务端
- 使用一个Map集合存储在线的用户,使用
ConcurrentHashMap
来保证线程安全 - 该类有
addClient()
向集合中添加在线用户 - 该类有
removeClient()
从集合中删除在线用户并关闭连接 - 该类有
forwardMessage()
转发消息,给除啦自己以外的其他客户端 - 该类有
clientHandler()
来创建与客户端交互的线程,使用线程池来复用线程,从而提高效率
客户端
- 该类有
send()
向服务端发送消息 - 该类有
receive()
接收服务端消息 - 该类有
userInputHandler()
创建与客户交互的线程,与响应服务端消息的线程不同,避免由于输入的阻塞而无法即时受到服务端消息
多人聊天室实现代码
Server
public class Server {
private final static int PORT = 8080;
private static final String QUIT = "quit";
private final ConcurrentHashMap<String, Writer> connectedClients = new ConcurrentHashMap<>(); //在线列表,保证线程安全
private final ExecutorService executorService = Executors.newFixedThreadPool(10); //使用线程池提高效率
public String getId(Socket clientSocket) {
return clientSocket.getInetAddress() + ":" + clientSocket.getPort();
}
public boolean isQuit(String message) {
return QUIT.equals(message);
}
public void addClient(Socket clientSocket) throws IOException {
if (Objects.nonNull(clientSocket)) {
String id = getId(clientSocket);
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(clientSocket.getOutputStream()));
connectedClients.put(id, writer);
System.out.println("客户端[" + id + "], 已连接");
}
}
public void removeClient(Socket clientSocket) throws IOException {
if (Objects.nonNull(clientSocket)) {
String id = getId(clientSocket);
if (connectedClients.containsKey(id)) {
Writer writer = connectedClients.remove(id); //删除
writer.close(); //并关闭客户端
}
System.out.println("客户端[" + id + "], 已断开");
}
}
public void forwardMessage(Socket clientSocket, String message) throws IOException {
if (Objects.isNull(clientSocket) || Objects.isNull(message)) {
return;
}
String id = getId(clientSocket);
for (String forwardId : connectedClients.keySet()) {
if (!forwardId.equals(id)) { //排除自己
Writer writer = connectedClients.get(forwardId);
writer.write(message + "\n"); //进行消息转发,最后加一个换行符,为了客户端能使用readLine()方法
writer.flush();
}
}
}
public void clientHandler(Socket clientSocket) {
Runnable runnable = () -> {
String id = getId(clientSocket);
try {
addClient(clientSocket);
BufferedReader reader = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
String message;
while ((message = reader.readLine()) != null) { //若能读取到客户端一行内容
System.out.println(id + "说: " + message);
forwardMessage(clientSocket, id + "说: " + message); //转发给其他客户端
if (isQuit(message)) { //若客户端说要退出则不再接收消息
break; //若此处没有判断客户端是否退出则无法关闭客户端连接,导致此处死循环
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
removeClient(clientSocket); //客户端退出后从在线列表中删除
} catch (IOException e) {
e.printStackTrace();
}
}
};
//new Thread(runnable).start(); //启动新线程
executorService.execute(runnable); //向线程池中提交任务
}
public void start() {
try (ServerSocket serverSocket = new ServerSocket(PORT)) {
System.out.println("服务器监听在" + PORT + "端口");
while (true) {
Socket clientSocket = serverSocket.accept(); //等待客户端连接
clientHandler(clientSocket); //处理客户端连接
}
} catch (IOException e) {
e.printStackTrace();
System.out.println(PORT + "端口被占用");
}
}
public static void main(String[] args) {
new Server().start();
}
}
Client
public class Client {
private static final String HOST = "localhost";
private static final int PORT = 8080;
private static final String QUIT = "quit";
private Socket socket; //客户端
private BufferedWriter writer; //由该客户端创建的输出流
private BufferedReader reader; //由该客户端创建的输入流
public void send(String message) throws IOException {
if (!socket.isOutputShutdown()) { //若输出流未关闭
writer.write(message + "\n"); //就进行数据发送,最后加一个换行符,为了服务端能使用readLine()方法
writer.flush();
}
}
public String receive() throws IOException {
String message = null;
if (!socket.isInputShutdown()) { //若输入流未关闭
message = reader.readLine(); //就读取服务端一行数据
}
return message;
}
public boolean isQuit(String message) {
return QUIT.equals(message);
}
public void userInputHandler() {
Runnable runnable = () -> {
try (BufferedReader consoleReader = new BufferedReader(new InputStreamReader(System.in))) {
while (true) {
String inputMessage = consoleReader.readLine(); //读取客户端输入消息
if (inputMessage != null) { //发送消息
send(inputMessage);
}
if (isQuit(inputMessage)) { //检查用户是否退出连接
break;
}
}
} catch (IOException e) {
e.printStackTrace();
System.out.println("[" + HOST + ":" + PORT + "]服务器未启动");
}
};
new Thread(runnable).start();
}
public void start() {
try (Socket socket = new Socket(HOST, PORT)) {
this.socket = socket;
this.writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
this.reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
userInputHandler(); //处理用户输入
String message;
while ((message = receive()) != null) { //若能读到服务器返回的消息
System.out.println(message); //就输出消息
}
} catch (IOException e) {
e.printStackTrace();
System.out.println("[" + HOST + ":" + PORT + "]服务器未启动");
}
}
public static void main(String[] args) {
new Client().start();
}
}
BIO的缺点
- 每建立一个连接都需要创建独立的线程与客户端交互,当并发数较大时,就会创建大量的线程进行处理,系统资源大
- 连接建立后,若该线程一直没有交互,则该线程就会处于阻塞,造成线程资源的浪费
Comments NOTHING