1.9 Java网络编程模型
1.9.1 阻塞I/O模型
阻塞I/O模型是常见的I/O模型,在读写数据时客户端会发生阻塞。阻塞I/O模型的工作流程为:在用户线程发出I/O请求之后,内核会检查数据是否就绪,此时用户线程一直阻塞等待内存数据就绪;在内存数据就绪后,内核将数据复制到用户线程中,并返回I/O执行结果到用户线程,此时用户线程将解除阻塞状态并开始处理数据。典型的阻塞I/O模型的例子为data = socket.read(),如果内核数据没有就绪,Socket线程就会一直阻塞在read()中等待内核数据就绪。
1.9.2 非阻塞I/O模型
非阻塞I/O模型指用户线程在发起一个I/O操作后,无须阻塞便可以马上得到内核返回的一个结果。如果内核返回的结果为false,则表示内核数据还没准备好,需要稍后再发起I/O操作。一旦内核中的数据准备好了,并且再次收到用户线程的请求,内核就会立刻将数据复制到用户线程中并将复制的结果通知用户线程。
在非阻塞I/O模型中,用户线程需要不断询问内核数据是否就绪,在内存数据还未就绪时,用户线程可以处理其他任务,在内核数据就绪后可立即获取数据并进行相应的操作。典型的非阻塞I/O模型一般如下:
while(true){ data = socket.read(); if(data == true){//1:内核数据就绪 //获取并处理内核数据 break; }else{ //2:内核数据未就绪,用户线程处理其他任务 } }
1.9.3 多路复用I/O模型
多路复用I/O模型是多线程并发编程用得较多的模型,Java NIO就是基于多路复用I/O模型实现的。在多路复用I/O模型中会有一个被称为Selector的线程不断轮询多个Socket的状态,只有在Socket有读写事件时,才会通知用户线程进行I/O读写操作。
因为在多路复用I/O模型中只需一个线程就可以管理多个Socket(阻塞I/O模型和非阻塞1/O模型需要为每个Socket都建立一个单独的线程处理该Socket上的数据),并且在真正有Socket读写事件时才会使用操作系统的I/O资源,大大节约了系统资源。
Java NIO在用户的每个线程中都通过selector.select()查询当前通道是否有事件到达,如果没有,则用户线程会一直阻塞。而多路复用I/O模型通过一个线程管理多个Socket通道,在Socket有读写事件触发时才会通知用户线程进行I/O读写操作。因此,多路复用I/O模型在连接数众多且消息体不大的情况下有很大的优势。尤其在物联网领域比如车载设备实时位置、智能家电状态等定时上报状态且字节数较少的情况下优势更加明显,一般一个经过优化后的16核32GB服务器能承载约10万台设备连接。
非阻塞I/O模型在每个用户线程中都进行Socket状态检查,而在多路复用I/O模型中是在系统内核中进行Socket状态检查的,这也是多路复用I/O模型比非阻塞I/O模型效率高的原因。
多路复用I/O模型通过在一个Selector线程上以轮询方式检测在多个Socket上是否有事件到达,并逐个进行事件处理和响应。因此,对于多路复用I/O模型来说,在事件响应体(消息体)很大时,Selector线程就会成为性能瓶颈,导致后续的事件迟迟得不到处理,影响下一轮的事件轮询。在实际应用中,在多路复用方法体内一般不建议做复杂逻辑运算,只做数据的接收和转发,将具体的业务操作转发给后面的业务线程处理。
1.9.4 信号驱动I/O模型
在信号驱动I/O模型中,在用户线程发起一个I/O请求操作时,系统会为该请求对应的Socket注册一个信号函数,然后用户线程可以继续执行其他业务逻辑;在内核数据就绪时,系统会发送一个信号到用户线程,用户线程在接收到该信号后,会在信号函数中调用对应的I/O读写操作完成实际的I/O请求操作。
1.9.5 异步I/O模型
在异步I/O模型中,用户线程会发起一个asynchronous read操作到内核,内核在接收到synchronous read请求后会立刻返回一个状态,来说明请求是否成功发起,在此过程中用户线程不会发生任何阻塞。接着,内核会等待数据准备完成并将数据复制到用户线程中,在数据复制完成后内核会发送一个信号到用户线程,通知用户线程asynchronous读操作已完成。在异步I/O模型中,用户线程不需要关心整个I/O操作是如何进行的,只需发起一个请求,在接收到内核返回的成功或失败信号时说明I/O操作已经完成,直接使用数据即可。
在异步I/O模型中,I/O操作的两个阶段(请求的发起、数据的读取)都是在内核中自动完成的,最终发送一个信号告知用户线程I/O操作已经完成,用户直接使用内存写好的数据即可,不需要再次调用I/O函数进行具体的读写操作,因此在整个过程中用户线程不会发生阻塞。
在信号驱动模型中,用户线程接收到信号便表示数据已经就绪,需要用户线程调用I/O函数进行实际的I/O读写操作,将数据读取到用户线程;而在异步I/O模型中,用户线程接收到信号便表示I/O操作已经完成(数据已经被复制到用户线程),用户可以开始使用该数据了。
异步I/O需要操作系统的底层支持,在Java 7中提供了Asynchronous I/O操作。
1.9.6 Java I/O
在整个Java.io包中最重要的是5个类和1个接口。5个类指的是File、OutputStream、InputStream、Writer、Reader,1个接口指的是Serializable。具体的使用方法请参考JDK API。
1.9.7 Java NIO
Java NIO的实现主要涉及三大核心内容:Selector(选择器)、Channel(通道)和Buffer(缓冲区)。Selector用于监听多个Channel的事件,比如连接打开或数据到达,因此,一个线程可以实现对多个数据Channel的管理。传统I/O基于数据流进行I/O读写操作;而Java NIO基于Channel和Buffer进行I/O读写操作,并且数据总是被从Channel读取到Buffer中,或者从Buffer写入Channel中。
Java NIO和传统I/O的最大区别如下。
(1)I/O是面向流的,NIO是面向缓冲区的:在面向流的操作中,数据只能在一个流中连续进行读写,数据没有缓冲,因此字节流无法前后移动。而在NIO中每次都是将数据从一个Channel读取到一个Buffer中,再从Buffer写入Channel中,因此可以方便地在缓冲区中进行数据的前后移动等操作。该功能在应用层主要用于数据的粘包、拆包等操作,在网络不可靠的环境下尤为重要。
(2)传统I/O的流操作是阻塞模式的,NIO的流操作是非阻塞模式的。在传统I/O下,用户线程在调用read()或write()进行I/O读写操作时,该线程将一直被阻塞,直到数据被读取或数据完全写入。NIO通过Selector监听Channel上事件的变化,在Channel上有数据发生变化时通知该线程进行读写操作。对于读请求而言,在通道上有可用的数据时,线程将进行Buffer的读操作,在没有数据时,线程可以执行其他业务逻辑操作。对于写操作而言,在使用一个线程执行写操作将一些数据写入某通道时,只需将Channel上的数据异步写入Buffer即可,Buffer上的数据会被异步写入目标Channel上,用户线程不需要等待整个数据完全被写入目标Channel就可以继续执行其他业务逻辑。
非阻塞I/O模型中的Selector线程通常将I/O的空闲时间用于执行其他通道上的I/O操作,所以一个Selector线程可以管理多个输入和输出通道,如图1-18所示。
图1-18
1.Channel
Channel和I/O中的Stream(流)类似,只不过Stream是单向的(例如InputStream、OutputStream),而Channel是双向的,既可以用来进行读操作,也可以用来进行写操作。
NIO中Channel的主要实现有:FileChannel、DatagramChannel、SocketChannel、ServerSocketChannel,分别对应文件的I/O、UDP、TCP I/O、Socket Client和Socker Server操作。
2.Buffer
Buffer实际上是一个容器,其内部通过一个连续的字节数组存储I/O上的数据。在NIO中,Channel在文件、网络上对数据的读取或写入都必须经过Buffer。
如图1-19所示,客户端在向服务端发送数据时,必须先将数据写入Buffer中,然后将Buffer中的数据写到服务端对应的Channel上。服务端在接收数据时必须通过Channel将数据读入Buffer中,然后从Buffer中读取数据并处理。
图1-19
在NIO中,Buffer是一个抽象类,对不同的数据类型实现不同的Buffer操作。常用的Buffer实现类有:ByteBuffer、IntBuffer、CharBuffer、LongBuffer、DoubleBuffer、FloatBuffer、ShortBuffer。
3.Selector
Selector用于检测在多个注册的Channel上是否有I/O事件发生,并对检测到的I/O事件进行相应的响应和处理。因此通过一个Selector线程就可以实现对多个Channel的管理,不必为每个连接都创建一个线程,避免线程资源的浪费和多线程之间的上下文切换导致的开销。同时,Selector只有在Channel上有读写事件发生时,才会调用I/O函数进行读写操作,可极大减少系统开销,提高系统的并发量。
4.Java NIO使用
要实现Java NIO,就需要分别实现Server和Client。具体的Server实现代码如下:
public class MyServer { private int size = 1024; private ServerSocketChannel serverSocketChannel; private ByteBuffer byteBuffer; private Selector selector; private int remoteClientNum = 0; public MyServer(int port) { try { //在构造函数中初始化Channel监听 initChannel(port); } catch (IOException e) { e.printStackTrace(); System.exit(-1); } } //Channel的初始化 public void initChannel(int port) throws IOException { //打开Channel serverSocketChannel = ServerSocketChannel.open(); //设置为非阻塞模式 serverSocketChannel.configureBlocking(false); //绑定端口 serverSocketChannel.bind(new InetSocketAddress(port)); System.out.println("listener on port: " + port); //选择器的创建 selector = Selector.open(); //向选择器注册通道 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); //分配缓冲区的大小 byteBuffer = ByteBuffer.allocate(size); } //监听器,用于监听Channel上的数据变化 private void listener() throws Exception { while (true) { //返回的int值表示有多少个Channel处于就绪状态 int n = selector.select(); if (n == 0) { continue; } //每个selector对应多个SelectionKey,每个SelectionKey对应一个Channel Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); //如果SelectionKey处于连接就绪状态,则开始接收客户端的连接 if (key.isAcceptable()) { //获取Channel ServerSocketChannel server = (ServerSocketChannel) key.channel(); //Channel接收连接 SocketChannel channel = server.accept(); //Channel注册 registerChannel(selector, channel, SelectionKey.OP_READ); //远程客户端的连接数 remoteClientNum++; System.out.println("online client num="+remoteClientNum); write(channel, "hello client".getBytes()); } //如果通道已经处于读就绪状态 if (key.isReadable()) { read(key); } iterator.remove(); } } } private void read(SelectionKey key) throws IOException { SocketChannel socketChannel = (SocketChannel) key.channel(); int count; byteBuffer.clear(); //从通道中读数据到缓冲区 while ((count = socketChannel.read(byteBuffer)) > 0) { //byteBuffer写模式变为读模式 byteBuffer.flip(); while (byteBuffer.hasRemaining()) { System.out.print((char)byteBuffer.get()); } byteBuffer.clear(); } if (count < 0) { socketChannel.close(); } } private void write(SocketChannel channel, byte[] writeData) throws IOException { byteBuffer.clear(); byteBuffer.put(writeData); //byteBuffer从写模式变成读模式 byteBuffer.flip(); //将缓冲区的数据写入通道中 channel.write(byteBuffer); } private void registerChannel(Selector selector, SocketChannel channel, int opRead) throws IOException { if (channel == null) { return; } channel.configureBlocking(false); channel.register(selector, opRead); } public static void main(String[] args) { try { MyServer myServer = new MyServer(9999); myServer.listener(); } catch (Exception e) { e.printStackTrace(); } } }
在以上代码中定义了名为MyServer的服务端实现类,在该类中定义了serverSocketChannel用于ServerSocketChannel的建立和端口的绑定;byteBuffer用于不同Channel之间的数据交互;selector用于监听服务器各个Channel上数据的变化并做出响应。同时,在类构造函数中调用了初始化ServerSocketChannel的操作,定义了listener方法来监听Channel上的数据变化,解析客户端的数据并对客户端的请求做出响应。
具体的Client实现代码如下:
public class MyClient { private int size = 1024; private ByteBuffer byteBuffer; private SocketChannel socketChannel; public void connectServer() throws IOException { socketChannel = socketChannel.open(); socketChannel.connect(new InetSocketAddress("127.0.0.1", 9999)); socketChannel.configureBlocking(false); byteBuffer = ByteBuffer.allocate(size); receive(); } private void receive() throws IOException { while (true) { byteBuffer.clear(); int count; //如果没有数据可读,则read方法一直阻塞,直到读取到新的数据 while ((count = socketChannel.read(byteBuffer)) > 0) { byteBuffer.flip(); while (byteBuffer.hasRemaining()) { System.out.print((char)byteBuffer.get()); } send2Server("say hi".getBytes()); byteBuffer.clear(); } } } private void send2Server(byte[] bytes) throws IOException { byteBuffer.clear(); byteBuffer.put(bytes); byteBuffer.flip(); socketChannel.write(byteBuffer); } public static void main(String[] args) throws IOException { new MyClient().connectServer(); } }
在以上代码中定义了MyClient类来实现客户端的Channel逻辑,其中,connectServer方法用于和服务端建立连接,receive方法用于接收服务端发来的数据,send2Server用于向服务端发送数据。