网络IO-IO模型的演变

代码、资料来自于马士兵MAC课程。

本文主要讲解了IO模型,由 BIONIO,再演变到多路复用 select/pollepoll 的过程。

从本文中你可以了解到不同模型是如何解决之前模型所产生的问题,并且会带来什么样的新问题。

BIO

BIO 服务器端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public static void main(String[] args) throws Exception {
ServerSocket server = new ServerSocket(9090, 20);

System.out.println("step1: new ServerSocket(9090) ");

while (true) {
//阻塞
Socket client = server.accept();
System.out.println("step2:client\t" + client.getPort());

new Thread(new Runnable() {

@Override
public void run() {
InputStream in = null;
try {
in = client.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
while (true) {
//阻塞
String dataline = reader.readLine();

if (null != dataline) {
System.out.println(dataline);
} else {
client.close();
break;
}
}
System.out.println("客户端断开");
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
}
}

如何查看java所在位置

1
2
3
4
5
6
7
8
9
whereis java
java: /usr/bin/java /usr/lib/java /etc/java /usr/share/java /usr/share/man/man1/java.1.gz

# -l 查看详细 -r 倒叙排序 -t 按时间排序
ls -lrt /usr/bin/java
lrwxrwxrwx. 1 root root 22 5月 25 11:11 /usr/bin/java -> /etc/alternatives/java

/etc/alternatives/java
lrwxrwxrwx. 1 root root 73 5月 25 11:11 /etc/alternatives/java -> /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.292.b10-1.el7_9.x86_64/jre/bin/java

使用strace追踪程序

1
2
# -ff 监听所有请求,使用 1.4 以前版本才能看到最早 BIO 的系统调用函数
strace -ff -o out /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.292.b10-1.el7_9.x86_64/jre/bin/java SocketBIO

这时当前目录下就会生成对应文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
ll
总用量 832
-rw-r--r--. 1 root root 13685 5月 26 12:40 out.8696
-rw-r--r--. 1 root root 195208 5月 26 12:40 out.8697
-rw-r--r--. 1 root root 33347 5月 26 12:43 out.8698
-rw-r--r--. 1 root root 975 5月 26 12:40 out.8699
-rw-r--r--. 1 root root 1098 5月 26 12:40 out.8700
-rw-r--r--. 1 root root 1019 5月 26 12:40 out.8701
-rw-r--r--. 1 root root 12260 5月 26 12:43 out.8702
-rw-r--r--. 1 root root 9956 5月 26 12:43 out.8703
-rw-r--r--. 1 root root 974 5月 26 12:40 out.8704
-rw-r--r--. 1 root root 545800 5月 26 12:43 out.8705
-rw-r--r--. 1 root root 1175 5月 26 11:47 SocketBIO$1.class
-rw-r--r--. 1 root root 1104 5月 26 11:47 SocketBIO.class
-rw-r--r--. 1 root root 1505 5月 26 11:47 SocketBIO.java

之后便可以观察具体发生了哪些系统调用

总结

  1. 系统调用socket(…)=3
  2. 系统调用bind(3, …8090…)绑定端口号
  3. 系统调用listen(3,…)来监听此端口,此时 netstat -napt 才会显示对应的socket在监听8090端口
  4. 主线程阻塞在系统调用accept(3, 处
  5. 调用 nc 命令去连接,则主线程通过系统调用clone(…)抛出一个线程去接收信息,此时主线程再次循环阻塞在accept(3, 处,而子线程阻塞在 recv(fd, 处

NIO

NIO 服务器端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public static void main(String[] args) throws Exception {

LinkedList<SocketChannel> clients = new LinkedList<>();

// 服务端开启监听:接受客户端
ServerSocketChannel ss = ServerSocketChannel.open();
ss.bind(new InetSocketAddress(9090));
// 重点 OS NONBLOCKING!!! 只让接受客户端时不阻塞
ss.configureBlocking(false);

while (true) {
// 接受客户端的连接
Thread.sleep(1000);
// 不会阻塞? -1 NULL
SocketChannel client = ss.accept();
// accept 调用内核了:1,没有客户端连接进来,返回值?在BIO 的时候一直卡着,但是在NIO ,不卡着,返回-1,NULL
// 如果来客户端的连接,accept 返回的是这个客户端的fd 5,client object

if (client == null) {
System.out.println("null.....");
} else {
//重点 socket(服务端的listen socket<连接请求三次握手后,往我这里扔,我去通过accept得到连接的socket>,连接socket<连接后的数据读写使用的> )
client.configureBlocking(false);
int port = client.socket().getPort();
System.out.println("client..port: " + port);
clients.add(client);
}
//可以在堆里 堆外
ByteBuffer buffer = ByteBuffer.allocateDirect(4096);

//遍历已经链接进来的客户端能不能读写数据
for (SocketChannel c : clients) {
// 不会阻塞,num 的返回值有 >0 -1 0
int num = c.read(buffer);
if (num > 0) {
buffer.flip();
byte[] aaa = new byte[buffer.limit()];
buffer.get(aaa);

String b = new String(aaa);
System.out.println(c.socket().getPort() + " : " + b);
buffer.clear();
}
}
}
}

使用strace追踪程序

1
2
# -ff 监听所有请求,使用 1.4 以前版本才能看到最早 BIO 的系统调用函数
strace -ff -o out java SocketNIO

观察 out 文件,可以发现系统调用 accept(5, 0x7f970c13bc70, [28]) = -1 EAGAIN (资源暂时不可用) 并未发生阻塞,当无服务端连接时直接返回 -1

1
2
3
4
5
6
7
8
9
10
11
tail out.9301
mprotect(0x7f970c22d000, 4096, PROT_READ|PROT_WRITE) = 0
futex(0x7f970c04c354, FUTEX_WAIT_BITSET_PRIVATE, 1, {tv_sec=19890, tv_nsec=261222528}, 0xffffffff) = -1 ETIMEDOUT (连接超时)
futex(0x7f970c04c328, FUTEX_WAKE_PRIVATE, 1) = 0
accept(5, 0x7f970c13bc70, [28]) = -1 EAGAIN (资源暂时不可用)
write(1, "null.....", 9) = 9
write(1, "\n", 1) = 1
futex(0x7f970c115954, FUTEX_WAKE_OP_PRIVATE, 1, 1, 0x7f970c115950, FUTEX_OP_SET<<28|0<<12|FUTEX_OP_CMP_GT<<24|0x1) = 1
futex(0x7f970c115928, FUTEX_WAKE_PRIVATE, 1) = 0
mprotect(0x7f970c22e000, 4096, PROT_READ|PROT_WRITE) = 0
futex(0x7f970c04c354, FUTEX_WAIT_BITSET_PRIVATE, 1, {tv_sec=19891, tv_nsec=272261757}, 0xffffffff

通过命令 nc localhost 9090 进行连接,再次观察 out 文件

1
2
3
4
5
6
7
8
9
10
11
tail out.9301
write(1, "null.....", 9) = 9
write(1, "\n", 1) = 1
read(6, 0x7f970c2c7a90, 4096) = -1 EAGAIN (资源暂时不可用)
futex(0x7f970c04c354, FUTEX_WAIT_BITSET_PRIVATE, 1, {tv_sec=20045, tv_nsec=470392585}, 0xffffffff) = -1 ETIMEDOUT (连接超时)
futex(0x7f970c04c328, FUTEX_WAKE_PRIVATE, 1) = 0
accept(5, 0x7f970c13e760, [28]) = -1 EAGAIN (资源暂时不可用)
write(1, "null.....", 9) = 9
write(1, "\n", 1) = 1
read(6, 0x7f970c2c8aa0, 4096) = -1 EAGAIN (资源暂时不可用)
futex(0x7f970c04c354, FUTEX_WAIT_BITSET_PRIVATE, 1, {tv_sec=20046, tv_nsec=479021706}, 0xffffffff

可以发现会调用 read(6, 0x7f970c2c8aa0, 4096) = -1 EAGAIN (资源暂时不可用) 尝试获取客户端数据,当前没数据时不阻塞直接返回 -1

当客户端输入数据12345时,观察 out 文件

1
2
3
4
5
6
7
8
9
10
11
tail out.9301
read(6, 0x7f970c3f7de0, 4096) = -1 EAGAIN (资源暂时不可用)
futex(0x7f970c04c354, FUTEX_WAIT_BITSET_PRIVATE, 1, {tv_sec=20350, tv_nsec=802512162}, 0xffffffff) = -1 ETIMEDOUT (连接超时)
futex(0x7f970c04c328, FUTEX_WAKE_PRIVATE, 1) = 0
accept(5, 0x7f970c13bc70, [28]) = -1 EAGAIN (资源暂时不可用)
write(1, "null.....", 9) = 9
write(1, "\n", 1) = 1
read(6, "12345\n", 4096) = 6
write(1, "34706 : 12345\n", 14) = 14
write(1, "\n", 1) = 1
futex(0x7f970c04c354, FUTEX_WAIT_BITSET_PRIVATE, 1, {tv_sec=20351, tv_nsec=812756397}, 0xffffffff

可以发现 read(6, “12345\n”, 4096) = 6 成功接收数据

总结

  • ServerSocketChannel 设置 configureBlocking 为 false 时,系统调用 accept(5, 0x7f970c13e760, [28]) = -1 EAGAIN (资源暂时不可用) 将不会被阻塞,当前没有客户端连接时会直接返回 -1
  • SocketChannel 设置 configureBlocking 为 false 时,系统调用 read(6, 0x7f970c2c8aa0, 4096) = -1 EAGAIN (资源暂时不可用) 尝试获取客户端数据,当前没数据时不阻塞直接返回 -1
  • NIO 在一个线程内,循环遍历询问是否有新的客户端连接,若有连接将其放进集合 clients 中,再遍历 client 查询这些客户端连接是否有传入数据,如果有则获取到对应数据,没有则返回 -1
  • NIO 的优势:能够解决 BIO 多次创建线程造成的系统调用频繁的问题
  • NIO 的问题:在循环 clients 集合是,多次进行 read 系统调用导致内核态用户态频繁切换

多路复用 POLL/SELECT

计算机组成原理之系统来消息了

image-20210601154546121

  1. 当系统接受到消息了,会产生 IO 中断
  2. 中断会导致调用 callback,将网卡中发来的数据走网络协议栈最终关联到 FD 的 buffer
  3. 所以在某一时间,如果从 app 询问内核某一个或者某些 FD 是否有可 R/W,会有状态返回

poll/select 原理

image-20210601155531168

  1. app 发起软中断,调用内核 select/poll 方法
  2. 内核通过 select/poll 方法,轮询 app 传入的参数 fds,会返回可用的 fds
  3. app 获取可用的 fds,再调用内核 read 方法获取数据

总结

  1. select/poll 解决了 NIO 对暂无数据的 fd 调用内核 read 方法导致用户态内核态切换频繁的问题
  2. select/poll 在内核层面筛选有数据的 fd 的时间复杂度为 O(n),且每次调用 select 方法会将所有需要监听的 fd 传给内核

EPOLL

epoll_create、epoll_ctl 和 epoll_wait

  • epoll_create - open an epoll file descriptor

    epoll_create() 返回一个引用新 epoll 实例的文件描述符。该文件描述符用于对epoll接口的所有后续调用。其本质是在内核中开辟一块内存空间,并返回描述该内存空间的 fd。

  • epoll_ctl - control interface for an epoll descriptor

    int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

    epfd:即为 epoll_create 返回的 fd

    op:表示要进行什么操作,例如 EPOLL_CTL_ADD、EPOLL_CTL_ADD、EPOLL_CTL_DEL

    fd:表示这些操作是对这个 fd 进行的

    *event:表示这个 fd 可用于什么操作,如

    ​ EPOLLIN - The associated file is available for read(2) operations.

    ​ EPOLLOUT - The associated file is available for write(2) operations.

  • epoll_wait - wait for an I/O event on an epoll file descriptor

    epoll_wait 表示等待返回一个可操作的 fd 链表

epoll 原理

image-20210601174205090

  1. 系统调用 epoll_create 在内核中开辟一块内存,并将 FD6 指向该空间
  2. 在客户端和服务器端三次握手结束后,并将该 socket 分配给 app 后,app 的线程会产生一个 FD5,并调用 epoll_ctl(fd6,ADD,fd5… 把 fd5 放入 fd6 指向的内核空间中
  3. 当系统接受到 IO 中断后,不仅将数据从网卡的 buffer 复制到 fd 的 buffer 中,还会将该 fd 复制到 fd 的链表里
  4. 当调用 epoll_wait 方法后,会将 fd 的链表返回。链表里的 fd 都是有数据的

fd4 表示处于 listen 时的 socket,也是会进入红黑树中的

java 代码实现多路复用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
public class SocketMultiplexingSingleThreadv1 {

private ServerSocketChannel server = null;
// linux 多路复用器(select poll epoll kqueue) nginx event{}
private Selector selector = null;
int port = 9090;

public void initServer() {
try {
server = ServerSocketChannel.open();
server.configureBlocking(false);
server.bind(new InetSocketAddress(port));


// 如果在epoll模型下,open--》 epoll_create -> fd3
// select poll *epoll 优先选择:epoll 但是可以 -D修正
selector = Selector.open();

// server 约等于 listen状态的 fd4
/*
register
如果:
select,poll:jvm里开辟一个数组 fd4 放进去
epoll: epoll_ctl(fd3,ADD,fd4,EPOLLIN
*/
server.register(selector, SelectionKey.OP_ACCEPT);


} catch (IOException e) {
e.printStackTrace();
}
}

public void start() {
initServer();
System.out.println("服务器启动了。。。。。");
try {
while (true) {

Set<SelectionKey> keys = selector.keys();
System.out.println(keys.size() + " size");


// 1.调用多路复用器(select,poll or epoll (epoll_wait))
/*
select()是啥意思:
1.select,poll 其实 内核的select(fd4) poll(fd4)
2.epoll: 其实 内核的 epoll_wait()
参数可以带时间:没有时间,0 : 阻塞,有时间设置一个超时
selector.wakeup() 结果返回0

懒加载:
其实再触碰到selector.select()调用的时候触发了epoll_ctl的调用
*/
while (selector.select() > 0) {
// 返回的有状态的fd集合
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iter = selectionKeys.iterator();
// so,管你啥多路复用器,你呀只能给我状态,我还得一个一个的去处理他们的R/W。同步好辛苦!!!!!!!!
// NIO 自己对着每一个fd调用系统调用,浪费资源,那么你看,这里是不是调用了一次select方法,知道具体的那些可以R/W了?
while (iter.hasNext()) {
SelectionKey key = iter.next();
//set 不移除会重复循环处理
iter.remove();
if (key.isAcceptable()) {
// 看代码的时候,这里是重点,如果要去接受一个新的连接
// 语义上,accept接受连接且返回新连接的FD对吧?
// 那新的FD怎么办?
// select,poll,因为他们内核没有空间,那么在jvm中保存和前边的fd4那个listen的一起
// epoll: 我们希望通过epoll_ctl把新的客户端fd注册到内核空间
acceptHandler(key);
} else if (key.isReadable()) {
readHandler(key);
//在当前线程,这个方法可能会阻塞 ,如果阻塞了十年,其他的IO早就没电了。。。
//所以,为什么提出了 IO THREADS
//redis 是不是用了epoll,redis是不是有个io threads的概念 ,redis是不是单线程的
//tomcat 8,9 异步的处理方式 IO 和 处理上 解耦
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}

public void acceptHandler(SelectionKey key) {
try {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
//来啦,目的是调用accept接受客户端 fd7
SocketChannel client = ssc.accept();
client.configureBlocking(false);

ByteBuffer buffer = ByteBuffer.allocate(8192);

//你看,调用了register
/*
select,poll:jvm里开辟一个数组 fd7 放进去
epoll: epoll_ctl(fd3,ADD,fd7,EPOLLIN
*/
client.register(selector, SelectionKey.OP_READ, buffer);
System.out.println("-------------------------------------------");
System.out.println("新客户端:" + client.getRemoteAddress());
System.out.println("-------------------------------------------");

} catch (IOException e) {
e.printStackTrace();
}
}

public void readHandler(SelectionKey key) {
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment();
buffer.clear();
int read = 0;
try {
while (true) {
read = client.read(buffer);
if (read > 0) {
buffer.flip();
while (buffer.hasRemaining()) {
client.write(buffer);
}
buffer.clear();
} else if (read == 0) {
break;
} else {
client.close();
break;
}
}
} catch (IOException e) {
e.printStackTrace();

}
}

public static void main(String[] args) {
SocketMultiplexingSingleThreadv1 service = new SocketMultiplexingSingleThreadv1();
service.start();
}
}
  • Selector selector = Selector.open(); 获取多路复用器模型,可以是 epoll、poll、select
  • server.register(selector, SelectionKey.OP_ACCEPT); 如果是 select/poll 模型:jvm里开辟一个数组 fd4 放进去;如果是 epoll 模型:epoll_ctl(fd3,ADD,fd4,EPOLLIN
  • client.register(selector, SelectionKey.OP_READ, buffer); 如果是 select/poll 模型:fd7 放进 jvm 里的一个数组;如果是 epoll 模型:epoll_ctl(fd3,ADD,fd7,EPOLLIN
  • selector.select() 如果是 select,poll 模型其实是调用内核的 *select(fd4)/poll(fd4)*,如果是 epoll 模型则是调用 epoll_wait()