AIO
AIO相关概念
Linux系统的AIO还不成熟(2021年),可从这里看到,Windows系统的AIO比较成熟,可从Windows的JDK源码包看到有实现,但是服务器多数都是Linux系统
Java中的AIO是异步非阻塞式,所以使用的是Channel,与BIO和NIO类似的,使用AsyncSocketChannel
和AsyncServerSoketChannel
两个类分别表示双端,若使用AsyncServerSoketChannel
发现客户端连接使用accept()
方法来生成与之通信的AsyncSocketChannel
Future
像调用同步Channel的方法那样调用异步Channel的方法,会返回一个多线程编程中常使用的Future对象,使用该对象来获取未来该异步调用的返回结果
CompletionHandler
在调用异步Channel时传入一个CompletionHandler
接口实现类,该接口要求实现两个回调函数,分别是成功的回调completed()
和失败的回调failed()
,回调函数的参数中有此次异步调用的返回结果,这有点像JavaScript中的Promise
回调函数会在另一个守护线程中执行,为了保证守护线程不会被销毁,所以需要在此处将主线程阻塞,这些守护线程运行在,一个异步通道组,其实就类似运行在一个线程池中,若不指定则会使用默认的异步通道组
在使用open()
方法创建Socket对象时,可指定异步通道组AsynchronousChannelGroup
,在使用异步通道组类的withCachedThreadPool()
静态方法可将线程池包装成AsynchronousChannelGroup
对象
AIO实现CS程序
Server
使用CompletionHandler异步模式
public class Server {
private final static int PORT = 8080;
private static final int BUFFER_SIZE = 1024;
/**
* 在回调的内部定义读回调和写回调,从而使得可在回调的回调中使用到内部定义的回调,达到循环调用的目的
*/
private void planOne(AsynchronousSocketChannel asynchronousSocketChannel) {
ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE); //在回调中创建Buffer保证线程安全,一个用户使用一块Buffer
CompletionHandler<Integer, Object> readCallback = new CompletionHandler<>() { //读取Channel内容到Buffer
@Override
public void completed(Integer result, Object attachment) {
if (result <= 0) return; //若未读到内容,说明客户端已经断开连接,无需在回调,直接返回即可
CompletionHandler<Integer, Object> readCallback = this; //this == readCallback
CompletionHandler<Integer, Object> writeCallback = new CompletionHandler<>() { //将Buffer中内容写入Channel
@Override
public void completed(Integer result, Object attachment) {
if (result <= 0) return; //若未写入内容,说明客户端已经断开连接,无需在回调,直接返回即可
buffer.clear(); //将Buffer清空,转换为写模式
asynchronousSocketChannel.read(buffer, null, readCallback); //读取Channel内容到Buffer,之后在写
}
@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace(); //错误处理
}
};
buffer.flip(); //将Buffer转换为读模式
asynchronousSocketChannel.write(buffer, null, writeCallback);
}
@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace(); //错误处理
}
};
asynchronousSocketChannel.read(buffer, null, readCallback);
}
/**
* 使用附加对象进行互相回调的互相调用,向读回调传递写回调,向写回调传递读回调,达到循环调用的目的
*/
private void planTwo(AsynchronousSocketChannel asynchronousSocketChannel) {
ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE); //在回调中创建Buffer保证线程安全,一个用户使用一块Buffer
CompletionHandler<Integer, Object> readCallback = new CompletionHandler<>() { //读取Channel内容到Buffer
@Override
public void completed(Integer result, Object writeCallback) {
if (result <= 0) return; //若未读到内容,说明客户端已经断开连接,无需在回调,直接返回即可
buffer.flip(); //将Buffer转换为读模式
//this == readCallback
asynchronousSocketChannel.write(buffer, this, (CompletionHandler<Integer, Object>) writeCallback);
}
@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace(); //错误处理
}
};
CompletionHandler<Integer, Object> writeCallback = new CompletionHandler<>() { //将Buffer中内容写入Channel
@Override
public void completed(Integer result, Object readCallback) {
if (result <= 0) return; //若未写入内容,说明客户端已经断开连接,无需在回调,直接返回即可
buffer.clear(); //将Buffer清空,转换为写模式
//this == writeCallback
asynchronousSocketChannel.read(buffer, this, (CompletionHandler<Integer, Object>) readCallback); //读取Channel内容到Buffer,之后在读
}
@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace(); //错误处理
}
};
asynchronousSocketChannel.read(buffer, writeCallback, readCallback);
}
public void handler(AsynchronousSocketChannel asynchronousSocketChannel) {
planOne(asynchronousSocketChannel); //方案1
//planTwo(asynchronousSocketChannel); //方案2
}
public void start() {
try (AsynchronousServerSocketChannel asynchronousServerSocketChannel =
AsynchronousServerSocketChannel.open()) { //创建AsynchronousServerSocketChannel对象
asynchronousServerSocketChannel.bind(new InetSocketAddress(PORT)); //绑定端口
System.out.println("服务器监听在" + PORT + "端口");
while (true) {
/**
* accept()方法的CompletionHandler使用方式:
* 第一个参数类似SelectionKey的attachment()方法的作用,为了更方便回调函数的执行,所以附加一个对象传递给回调函数
* 第二个参数才是CompletionHandler接口的实现类
* 回调函数会在另一个守护线程中执行,为了保证守护线程不会被销毁,所以需要在此处将主线程阻塞
* 这些守护线程运行在一个线程池中,使用asynchronousServerSocketChannel.open()方法可指定线程池,若不指定则会使用默认的线程池
*/
asynchronousServerSocketChannel.accept(null, new CompletionHandler<>() {
/**
* 该接口拥有两个泛型,分别是:异步Channel方法的返回值类型和附加对象的类型
*/
@Override
public void completed(AsynchronousSocketChannel result, Object attachment) {
if (asynchronousServerSocketChannel.isOpen()) { //主线程已经阻塞,若此时AsynchronousServerSocketChannel对象还未关闭
asynchronousServerSocketChannel.accept(null, this); //就需要继续使用该回调处理下一个客户端连接,相当于在此处循环
//一直调用可能会有栈溢出的风险,但是好像Java底层已经处理过了,无需担心
}
if (result != null && result.isOpen()) { //若与客户端通信的Channel没有关闭
handler(result); //就处理该客户端
}
}
@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace(); //错误处理
}
});
/*以下代码仅用于阻塞主线程,不至于accept()方法被调用的过于频繁,也不让回调函数所在守护线程终止*/
synchronized (asynchronousServerSocketChannel) {
try {
asynchronousServerSocketChannel.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
new Server().start();
}
}
Client
使用Future异步模式
public class Client {
private static final String HOST = "localhost";
private static final int PORT = 8080;
private static final String QUIT = "quit";
public boolean isQuit(String message) {
return QUIT.equals(message);
}
public void handler(AsynchronousSocketChannel asynchronousSocketChannel) throws ExecutionException, InterruptedException {
try (BufferedReader consoleReader = new BufferedReader(new InputStreamReader(System.in))) {
while (true) {
String inputMessage = consoleReader.readLine(); //读取客户端输入消息
if (inputMessage != null) { //发送消息
byte[] bytes = inputMessage.getBytes(); //将用户输入信息转换为字节
ByteBuffer buffer = ByteBuffer.wrap(bytes); //将该用户输入信息转换为Buffer
Future<Integer> write = asynchronousSocketChannel.write(buffer); //将Buffer中内容写入Channel
write.get(); //等待写入完毕
buffer.flip(); //将Buffer转为读模式
Future<Integer> read = asynchronousSocketChannel.read(buffer); //读取Channel内容到Buffer
read.get(); //等待读取完毕
System.out.println(new String(buffer.array())); //将Buffer转换为字节数组,构建字符串,客户端打印消息
buffer.clear(); //清空Buffer,并从读模式转化到写模式
}
if (isQuit(inputMessage)) { //检查用户是否退出连接
break;
}
}
} catch (IOException e) {
e.printStackTrace();
System.out.println("[" + HOST + ":" + PORT + "]服务器未启动");
}
}
public void start() {
try (AsynchronousSocketChannel asynchronousSocketChannel =
AsynchronousSocketChannel.open()) { //创建AsynchronousSocketChannel对象
Future<Void> future = asynchronousSocketChannel.connect(new InetSocketAddress(HOST, PORT));
future.get(); //等待建立连接
handler(asynchronousSocketChannel);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
new Client().start();
}
}
AIO模型图解
多人聊天室改进思路
服务端
- 使用自定义异步线程组
- 使用一个Map集合存储在线的用户,使用
ConcurrentHashMap
来保证线程安全 - 使用读写标志来区分不同操作调用的回调函数对象,来减少对象的创建
客户端
- 使用Future方式进行异步操作
- 使用
userInputHandler()
创建与客户交互的线程,与响应服务端消息的线程不同,避免由于输入的阻塞而无法即时受到服务端消息
多人聊天室实现代码
Server
public class Server {
private final static int PORT = 8080;
private static final String QUIT = "quit";
private static final int BUFFER_SIZE = 1024;
private static final int THREADED_SIZE = 10;
private ConcurrentHashMap<String, AsynchronousSocketChannel> connectedClients = new ConcurrentHashMap<>();
private static final Integer WRITE = 0; //读标志
private static final Integer READ = 1; //写标志
private class ClientCallBack implements CompletionHandler<Integer, Integer> {
private AsynchronousSocketChannel asynchronousSocketChannel;
private ByteBuffer buffer;
public ClientCallBack(AsynchronousSocketChannel asynchronousSocketChannel, ByteBuffer buffer) {
this.asynchronousSocketChannel = asynchronousSocketChannel;
this.buffer = buffer;
}
private void forwardMessage(AsynchronousSocketChannel clientChannel, String message) {
String id = getId(clientChannel); //获取自己ID
for (String forwardId : connectedClients.keySet()) { //遍历在线列表
if (!forwardId.equals(id)) { //排除自己
byte[] bytes = (id + ":" + message).getBytes(); //添加消息头,转为字节数组
connectedClients.get(forwardId).write(ByteBuffer.wrap(bytes), WRITE, this); //异步将Buffer内容写入通道
}
}
}
private String receive() {
buffer.flip(); //改变Buffer为读模式
Charset charset = StandardCharsets.UTF_8;
CharBuffer charBuffer = charset.decode(buffer);
buffer.clear(); //清空Buffer,并转换为写模式
return charBuffer.toString(); //将已写入Buffer中的数据读出
}
@Override
public void completed(Integer result, Integer attachment) {
if (WRITE.equals(attachment)) { //若是write()的回调
return; //什么也不做
}
if (READ.equals(attachment)) { //若是read()的回调
if (result <= 0) { //若未读取到消息,说明客户端已经离线
removeClient(asynchronousSocketChannel); //就从在线列表中删除用户
} else { //若还能读取到客户端消息
String message = receive(); //就接收客户端消息
System.out.println(getId(asynchronousSocketChannel) + ":" + message); //服务端打印
forwardMessage(asynchronousSocketChannel, message); //转发给其他客户端
if (isQuit(message)) { //检查用户是否决定退出
removeClient(asynchronousSocketChannel); //退出就从在线列表中删除用户
} else { //不退出就继续读取用户传来的消息
asynchronousSocketChannel.read(buffer, READ, this);
}
}
}
}
@Override
public void failed(Throwable exc, Integer attachment) {
exc.printStackTrace();
}
}
public boolean isQuit(String message) {
return QUIT.equals(message);
}
public String getId(AsynchronousSocketChannel asynchronousSocketChannel) {
InetSocketAddress remoteAddress = null;
try {
remoteAddress = (InetSocketAddress) asynchronousSocketChannel.getRemoteAddress();
//该方法返回远端地址,需要强转为InetSocketAddress对象即可获得IP和断开
} catch (IOException e) {
e.printStackTrace();
return "获取客户端ID失败";
}
return remoteAddress.getAddress() + ":" + remoteAddress.getPort();
}
private void addClient(AsynchronousSocketChannel socketChannel) {
String id = getId(socketChannel); //计算ID
connectedClients.put(id, socketChannel); //添加该用户
System.out.println(id + "已连接");
}
private void removeClient(AsynchronousSocketChannel socketChannel) {
String id = getId(socketChannel); //计算ID
connectedClients.remove(id); //删除该用户
System.out.println(id + "已断连接");
}
public void handler(AsynchronousSocketChannel asynchronousSocketChannel) {
addClient(asynchronousSocketChannel); //将当前用户添加到在线列表
ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE); //每个用户独享Buffer
asynchronousSocketChannel.read(buffer, READ, //异步读取用户消息
new ClientCallBack(asynchronousSocketChannel, buffer)); //传入客户端回调函数对象
}
public void start() {
try (AsynchronousServerSocketChannel asynchronousServerSocketChannel = //创建AsynchronousServerSocketChannel对象
AsynchronousServerSocketChannel.open(AsynchronousChannelGroup. //使用自定义异步线程组
withThreadPool(Executors. //包装线程池为异步线程组对象
newFixedThreadPool(THREADED_SIZE)))) {
asynchronousServerSocketChannel.bind(new InetSocketAddress(PORT)); //绑定端口
System.out.println("服务器监听在" + PORT + "端口");
while (true) {
asynchronousServerSocketChannel.accept(null, new CompletionHandler<>() {
@Override
public void completed(AsynchronousSocketChannel result, Object attachment) {
if (asynchronousServerSocketChannel.isOpen()) { //主线程已经阻塞,若此时AsynchronousServerSocketChannel对象还未关闭
asynchronousServerSocketChannel.accept(null, this); //就需要继续使用该回调处理下一个客户端连接,相当于在此处循环
}
if (result != null && result.isOpen()) { //若与客户端通信的Channel没有关闭
handler(result); //就处理与该客户端通信
}
}
@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace(); //错误处理
}
});
/*以下代码仅用于阻塞主线程,不至于accept()方法被调用的过于频繁,也不让回调函数所在守护线程终止*/
synchronized (asynchronousServerSocketChannel) {
try {
asynchronousServerSocketChannel.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
new Server().start();
}
}
Client
public class Client {
private static final String HOST = "localhost";
private static final int PORT = 8080;
private static final String QUIT = "quit";
private static final int BUFFER_SIZE = 1024;
public boolean isQuit(String message) {
return QUIT.equals(message);
}
public void userInputHandler(AsynchronousSocketChannel asynchronousSocketChannel) {
Runnable runnable = () -> {
try (BufferedReader consoleReader = new BufferedReader(new InputStreamReader(System.in))) {
while (true) {
String inputMessage = consoleReader.readLine(); //读取客户端输入消息
if (inputMessage != null) { //发送消息
send(inputMessage, asynchronousSocketChannel); //向asynchronousSocketChannel发送消息
}
if (isQuit(inputMessage)) { //检查用户是否退出连接
break;
}
}
} catch (IOException e) {
e.printStackTrace();
System.out.println("[" + HOST + ":" + PORT + "]服务器未启动");
}
};
new Thread(runnable).start();
}
private void send(String inputMessage, AsynchronousSocketChannel asynchronousSocketChannel) {
ByteBuffer buffer = ByteBuffer.wrap(inputMessage.getBytes()); //将用户输入消息转成Buufer
Future<Integer> future = asynchronousSocketChannel.write(buffer); //向Buffer写入Channel
try {
future.get(); //等待发送完毕
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
private String receive(ByteBuffer buffer) {
buffer.flip(); //改变Buffer为读模式
Charset charset = StandardCharsets.UTF_8;
CharBuffer charBuffer = charset.decode(buffer);
buffer.clear(); //清空Buffer,并转换为写模式
return charBuffer.toString(); //将已写入Buffer中的数据读出
}
public void start() {
try (AsynchronousSocketChannel asynchronousSocketChannel =
AsynchronousSocketChannel.open()) { //创建AsynchronousSocketChannel对象
Future<Void> future = asynchronousSocketChannel.connect(new InetSocketAddress(HOST, PORT)); //连接服务器
future.get(); //等待建立连接
ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE); //用于接收Channel的Buffer
userInputHandler(asynchronousSocketChannel); //处理用户输入
while (true) { //接收服务端发送的消息
Future<Integer> readResult = asynchronousSocketChannel.read(buffer); //读取Channel中的数据到Buffer
int result = readResult.get(); //等待读取完毕
if (result <= 0) { //若没有读到消息
asynchronousSocketChannel.close(); //服务端已经关闭,关闭客户端通道
break; //跳出循环
} else {
String message = receive(buffer); //接收Buffer中的消息
System.out.println(message); //客户端打印
}
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
new Client().start();
}
}
Comments NOTHING