三大I/O

BIO、NIO、AIO

BIO

简介

同步并阻塞(传统阻塞型),服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销

  • Java BIO 就是传统的 java io 编程,其相关的类和接口在 java.io
  • BIO(blocking I/O) : 同步阻塞,服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需
    要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,可以通过线程池机制改善(实现多个客户连接服务器).
image-20230528112723566

工作机制

  1. 服务器端启动一个 ServerSocket,注册端口,调用accpet方法监听客户端的Socket连接。
  2. 客户端启动 Socket 对服务器进行通信,默认情况下服务器端需要对每个客户 建立一个线程与之通讯

image-20230528112831315

示例

​ 网络编程的基本模型是Client/Server模型,也就是两个进程之间进行相互通信,其中服务端提供位置信(绑定IP地址和端口),客户端通过连接操作向服务端监听的端口地址发起连接请求,基于TCP协议下进行三次握手连接,连接成功后,双方通过网络套接(Socket)进行通信。

​ 传统的同步阻塞模型开发中,服务端ServerSocket负责绑定IP地址,启动监听端口;客户端Socket负责发起连接操作。连接成功后,双方通过输入和输出流进行同步阻塞式通信。 基于BIO模式下的通信,客户端 - 服务端是完全同步,完全耦合的。

客户端

1
2
3
4
5
6
7
8
9
10
11
12
public class Client {
public static void main(String[] args) throws IOException {
Socket socket = new Socket("127.0.0.1", 8080);
OutputStream os = socket.getOutputStream();
PrintStream printStream = new PrintStream(os);
while (true){
Scanner sc = new Scanner(System.in);
String s = sc.nextLine();
printStream.println(s);
}
}
}

服务端

那么如果服务端需要处理很多个客户端的消息通信请求,此时我们就需要在服务端引入线程了,也就是说客户端每发起一个请求,服务端就创建一个新的线程来处理这个客户端的请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class Server {
public static void main(String[] args) {
try {
ServerSocket socket = new ServerSocket(8080);
while (true){
Socket accept = socket.accept();
//开启新线程去处理客户端请求
new Thread(()->{
try {
InputStream is = accept.getInputStream();
BufferedReader bs = new BufferedReader(new InputStreamReader(is));
String ws;
while((ws = bs.readLine()) != null){
System.out.println("客户端发送"+ws);
}
} catch (IOException e) {
e.printStackTrace();
}
}).start();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

小结

  • 每个Socket接收到,都会创建一个线程,线程的竞争、切换上下文影响性能;
  • 每个线程都会占用栈空间和CPU资源;
  • 并不是每个socket都进行IO操作,无意义的线程处理;
  • 客户端的并发访问增加时。服务端将呈现1:1的线程开销,访问量越大,系统将发生线程栈溢出,线程创建失败,最终导致进程宕机或者僵死,从而不能对外提供服务。

伪异步IO编程

​ 在上述案例中:客户端的并发访问增加时。服务端将呈现1:1的线程开销,访问量越大,系统将发生线程栈溢出,线程创建失败,最终导致进程宕机或者僵死,从而不能对外提供服务。

​ 接下来我们采用一个伪异步I/O的通信框架,采用线程池和任务队列实现,当客户端接入时,将客户端的Socket封装成一个Task(该任务实现java.lang.Runnable线程任务接口)交给后端的线程池中进行处理。JDK的线程池维护一个消息队列和N个活跃的线程,对消息队列中Socket任务进行处理,由于线程池可以设置消息队列的大小和最大线程数,因此,它的资源占用是可控的,无论多少个客户端并发访问,都不会导致资源的耗尽和宕机。

image-20230528153932976

示例

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class Client {
public static void main(String[] args) throws IOException {
Socket socket = new Socket("127.0.0.1", 8080);

OutputStream os = socket.getOutputStream();

PrintStream printStream = new PrintStream(os);

while (true) {
Scanner scanner = new Scanner(System.in);
String postWord = scanner.nextLine();
printStream.println(postWord);
}
}
}

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class Server {
public static void main(String[] args) {
try {
ServerSocket socket = new ServerSocket(8080);
SocketThreadPool threadPool = new SocketThreadPool(3, 10);

while (true) {
Socket accept = socket.accept();
threadPool.execute(() -> {
try {
InputStream is = accept.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
String rd;
while ((rd = reader.readLine()) != null) {
System.out.println("服务器接收到==>" + rd);
}
} catch (IOException e) {
e.printStackTrace();
}
});
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class SocketThreadPool {

private ExecutorService executorService;

public SocketThreadPool(int max_pool_size,int queue_size){
executorService = new
ThreadPoolExecutor(3,
max_pool_size,
120L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(queue_size));
}

public void execute(Runnable task){
executorService.execute(task);
}
}

BIO模式端口转发

示例代码

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class Server {
public static List<Socket> ONLINE_USER = new ArrayList<>();
public static void main(String[] args) {
try {
ServerSocket socket = new ServerSocket(8080);
while(true){
Socket accept = socket.accept();
ONLINE_USER.add(accept);
//开启新线程
new Socket_Thread(accept).start();
}

} catch (Exception e) {
e.printStackTrace();
}
}
}

服务端线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class Socket_Thread extends Thread {
private Socket socket;

public Socket_Thread(Socket socket) {
this.socket = socket;
}

@Override
public void run() {
try {
BufferedReader bs = new BufferedReader(new InputStreamReader(socket.getInputStream()));

String msg;
while ((msg = bs.readLine()) != null) {
sendMsg(msg);
System.out.println(msg);
}
} catch (Exception e) {
Server.ONLINE_USER.remove(socket);
}

}

private void sendMsg(String msg) {
Server.ONLINE_USER.forEach(socket -> {
if (this.socket != socket){
try {
PrintStream ps = new PrintStream(socket.getOutputStream());
ps.println(msg);
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
}

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class Client {
public static void main(String[] args) throws IOException {
Socket socket = new Socket("127.0.0.1", 8080);

new Thread(() -> {
PrintStream printStream = null;
try {
printStream = new PrintStream(socket.getOutputStream());
while (true) {
Scanner scanner = new Scanner(System.in);
String msg = scanner.nextLine();
printStream.println(msg);
}
} catch (IOException e) {
e.printStackTrace();
}


}).start();

new Thread(() -> {
BufferedReader br = null;
try {
br = new BufferedReader(new InputStreamReader(socket.getInputStream()));
String server_msg;
while((server_msg=br.readLine())!= null){
System.out.println(server_msg);
}
} catch (IOException e) {
e.printStackTrace();
}
}).start();
}
}

NIO