一、写在最前面

1、误区

在我阅读了网上很多关于NIO和多路复用的文章,发现很多文章的语言存在歧义。如果不信,请你回答下列问题:

1、请问有几种IO模型

答:BIO、NIO、多路复用、信号驱动IO、异步IO

2、请问你如何理解NIO

答:你可能会说对应的select、poll甚至是epoll概念,然后引申到Netty

3、请问你如何理解多路复用

答:你就会把多路复用的基本概念说一遍,甚至还可以引申到redis的多路复用机制

4、请问NIO和多路复用IO的区别是什么

答:。。。

这是发生在我周围的例子,有人甚至给我的回答是——多路复用就是NIO,我也不能说他错,也不能说他对。

2、IO模型分类

主要是网上的博客表述有歧义,导致很多人理不清这几种IO模型的关系。回到第一个问题,有几种IO模型。

在《UNIX网络编程卷1》的第六章对IO模型给BIO、NIO、多路复用、信号驱动IO、异步IO五种IO模型给出了如下的定义





















3、概念再梳理(重点)

不知道你发现没有,NIO中根本没有提及select、poll甚至是epoll,反而是在多路复用中提到了他们。NIO的定义只包含了一个空轮询,会不断的去获取数据。

那为什么网上很多博客还在说NIO含有select呢?

这里一定要区分,我们所说的IO模型,是从网络层面来说的,而很多博客中的NIO是Java规范里面的。这就好比线程有多少种状态?在操作系统层面,你可以说有新建、可运行、运行、阻塞、死亡这五种。但在Java层面,你就会回答New(新创建)、Runnable(可运行)、Blocked(被阻塞)、Waiting(等待)、Tilme waiting(计时等待)、Terminated(已终止)这六种。

同理如果你问的是Java层面的NIO,那么我们就可以要理解为是java.nio这个包下面的代码,这里就可以说出我们NIO的三大组件——Buffer、Channel、Selector。即Java里面的NIO(NIO包下面的代码)就是封装了多路复用的机制。同理Netty是基于网络层面的多路复用机制,是基于Java层面的NIO。


二、BIO(Blocking IO)

1、客户端

@Slf4j
public class BioClient {

	public static void main(String[] args) throws IOException {
		Socket socket = new Socket("127.0.0.1", 9099);
		// 向服务端发送数据
		socket.getOutputStream().write("Hello BioServer".getBytes());
		socket.getOutputStream().flush();
		log.info("向服务端发送数据结束");
		byte[] bytes = new byte[1024];
		//接收服务端回传的数据
		socket.getInputStream().read(bytes);
		log.info("接收到服务端返回的数据:{}", new String(bytes));
		socket.close();
	}
}

2、服务端

@Slf4j
public class BioServer {
	public static void main(String[] args) throws IOException {
		ServerSocket serverSocket = new ServerSocket(9099);
		while (true) {
			log.info("等待客户端连接");
			//阻塞方法
			Socket socket = serverSocket.accept();
			log.info("客户端连接成功");
			new Thread(() -> {
				try {
					handler(socket);
				} catch (IOException e) {
					log.error(e.getMessage(), e);
				}
			}).start();

		}
	}

	private static void handler(Socket socket) throws IOException {
		byte[] bytes = new byte[1024];

		log.info("开始读取数据");
		// 接收客户端的数据,阻塞方法,没有数据可读时就阻塞
		int read = socket.getInputStream().read(bytes);
		log.info("读取数据完毕");
		if (read != -1) {
			log.info("接收到客户端的数据:{}", new String(bytes, 0, read));
		}
		// 向outputStream中回写数据
		socket.getOutputStream().write("Hello BioClient".getBytes());
		socket.getOutputStream().flush();
	}
}

3、效果展示

根据测试结果,我们不难发现,BIO的服务端是阻塞获取新请求。结合代码,没接收到一个新请求,就开启一个线程读取对应的数据。

服务端:



客户端:



4、总结

在BIO模式下,会存在一下问题:

问题一:只能以阻塞的形式读取每一次的请求。

问题二:如果客户端发送的数据为0,那么服务端在读取数据的时候也会阻塞读取,直到获取到数据才会向下执行,即下面这行代码也是阻塞的。

int read = socket.getInputStream().read(bytes);

BIO升级为每接收到一个连接,就开启一个线程

问题三:并发请求10W个连接,此时就会开启10W个线程,资源耗费极大

问题四:和问题二相同,如果其中一个线程迟迟没有获取到数据,这就会导致部分线程卡死

BIO再次升级,升级为一个线程池的使用

问题五:虽然线程池能够解决线程开启和释放的性能损耗,但是依然会出现问题四

BIO 方式适用于连接数目比较小且固定的架构, 这种方式对服务器资源要求比较高, 但程序简单易理解


三、NIO(NonBlocking IO)

1、客户端

@Slf4j
public class NioClient {
	public static void main(String[] args) throws IOException {


		SocketChannel sc = SocketChannel.open();
		sc.connect(new InetSocketAddress("localhost", 8099));

		sc.write(Charset.defaultCharset().encode("Hello NioServer"));
		log.info("发送数据成功");
		ByteBuffer buffer = ByteBuffer.allocate(16);

		int read = sc.read(buffer);
		if (read > 0) {
			log.info("读取到服务端返回的 {} 长度的数据", read);
		}

		sc.write(Charset.defaultCharset().encode("Hello NioServer2"));
		log.info("再次发送数据成功");

		System.in.read(); // 阻塞线程
	}
}

2、服务端

@Slf4j
public class NioServer {

	public static void main(String[] args) throws IOException, InterruptedException {
		ByteBuffer buffer = ByteBuffer.allocate(16);
		ServerSocketChannel ssc = ServerSocketChannel.open();
		ssc.configureBlocking(false);// accept不会阻塞
		ssc.bind(new InetSocketAddress(8099));
		List<SocketChannel> channelList = new ArrayList<>();
		while (true) {
			log.info("开始接收请求");
			SocketChannel accept = ssc.accept();
			if (accept != null) {
				log.info("获取到对应的连接" + accept);
				accept.configureBlocking(false); // read不会阻塞
				channelList.add(accept);
			}

			for (SocketChannel channel : channelList) {
				log.info("开始读取客户端的数据", channel);
				int read = channel.read(buffer);
				if (read > 0) {
					buffer.flip();
					log.info("读取到 {} 长度的数据", read);
					ByteBuffer retBuf = ByteBuffer.wrap("Hi NioClient".getBytes());
					TimeUnit.SECONDS.sleep(5); // 阻塞线程,便于观察控制台
					channel.write(retBuf);
					buffer.clear();
					log.info("数据读取完成", channel);
				}
			}
		}
	}
}

3、效果展示

NIO区别于BIO最大的特点就是是否是阻塞获取请求(一定要区分开Java的NIO包和NIO这个概念)。NIO会不断的轮询获取请求,即控制台中不断的轮询打印开始接受请求,如果接收到数据,就会像控制台那样,直接读取对应的数据。结尾的报错是因为我关闭了客户端的连接

服务端:



客户端:



4、总结

对然NIO不再像BIO那样会阻塞线程,造成网络堵塞,但是它不停的去轮询线程,会造成CPU空转,这也是我们不想看到的。如果大量连接不涉及写数据,但此时这个线程轮询获取写入数据的操作就是无意义的。


四、多路复用(Multiplexing)

1、客户端

多路复用IO的代码稍微要复杂一点,是因为它引入了一个Selector组件,我们常听到的Reactor模式、select、poll、epoll等,都是多路复用IO机制里面的名词,本文侧重案例测试,对这些概念不再进行讲解。

多路复用客户端代码结构为:

  1. 初始化对应的与服务端的连接,这里的初始化包含初始化channel和Selector,并且完成对应事件和channel的绑定关系
  2. 开启连接,内部逻辑为依靠selector进行监听对应的事件(这就是我们所说的Reactor事件处理机制),如果监听到对应的连接信息,就根据连接中的事件进行判定,然后再执行不同的业务逻辑
@Slf4j
public class MultioClient {

	private Selector selector;

	public static void main(String[] args) throws IOException {
		MultioClient client = new MultioClient();
		client.initClient("127.0.0.1", 9099);
		client.connect();
	}

	/**
	 * 获得一个Socket通道,并对该通道做一些初始化的工作
	 *
	 * @param ip   连接的服务器的ip
	 * @param port 连接的服务器的端口号
	 */
	public void initClient(String ip, int port) throws IOException {
		// 获得一个Socket通道
		SocketChannel channel = SocketChannel.open();
		// 设置通道为非阻塞
		channel.configureBlocking(false);
		// 获得一个通道管理器
		this.selector = Selector.open();
		// 客户端连接服务器,其实方法执行并没有实现连接,需要在listen()方法中调
		// 用channel.finishConnect() 才能完成连接
		log.info("第1步:客户端初始化,发起连接,触发连接事件请求");
		channel.connect(new InetSocketAddress(ip, port));
		// 将通道管理器和该通道绑定,并为该通道注册SelectionKey.OP_CONNECT事件(连接事件)。

		channel.register(selector, SelectionKey.OP_CONNECT);
	}

	/**
	 * 采用轮询的方式监听selector上是否有需要处理的事件,如果有,则进行处理
	 */
	public void connect() throws IOException {
		// 轮询访问selector
		while (true) {
			selector.select();
			// 获得selector中选中的项的迭代器
			Iterator<SelectionKey> it = this.selector.selectedKeys().iterator();
			while (it.hasNext()) {
				SelectionKey key = it.next();
				// 删除已选的key,以防重复处理
				it.remove();
				// 连接事件发生
				if (key.isConnectable()) {
					SocketChannel channel = (SocketChannel) key.channel();
					// 如果正在连接,则完成连接
					if (channel.isConnectionPending()) {
						channel.finishConnect();
					}
					// 设置成非阻塞
					channel.configureBlocking(false);
					// 在这里可以给服务端发送信息哦
					ByteBuffer buffer = ByteBuffer.wrap("Hello MultServer".getBytes());

					log.info("第3步:向服务端发送数据");
					channel.write(buffer);
					// 在和服务端连接成功之后,为了可以接收到服务端的信息,需要给通道设置读的权限。
					channel.register(this.selector, SelectionKey.OP_READ);  // 获得了可读的事件
				} else if (key.isReadable()) {
					log.info("第6步:读取返回的数据");
					read(key);
				}

			}
		}
	}

	/**
	 * 处理读取服务端发来的信息事件
	 *
	 * @param key
	 */
	public void read(SelectionKey key) throws IOException {
		//和服务端的read方法一样
		// 服务器可读取消息:得到事件发生的Socket通道
		SocketChannel channel = (SocketChannel) key.channel();
		// 创建读取的缓冲区
		ByteBuffer buffer = ByteBuffer.allocate(1024);
		int len = channel.read(buffer);
		if (len != -1) {
			log.info("客户端收到信息:" + new String(buffer.array(), 0, len));
		}
	}
}

2、服务端

多路复用服务端代码逻辑:

  1. 同样的先初始化出channel和selector,然后完成事件的注册。需要区分的是客户端的channel是SocketChannel,服务端的channel是ServerSocketChannel。
  2. 同样的依靠Reactor事件处理机制去监听对应的请求中的事件,然后根据事件的类型做出不同的判定
@Slf4j
public class MultioServer {


	public static void main(String[] args) throws IOException {
		// 创建一个在本地端口进行监听的服务Socket通道.并设置为非阻塞方式
		ServerSocketChannel ssc = ServerSocketChannel.open();
		// 必须配置为非阻塞才能往selector上注册,否则会报错,selector模式本身就是非阻塞模式
		ssc.configureBlocking(false);
		ssc.socket().bind(new InetSocketAddress(9099));
		// 创建一个选择器selector
		Selector selector = Selector.open();
		// 把ServerSocketChannel注册到selector上,并且selector对客户端accept连接操作感兴趣
		ssc.register(selector, SelectionKey.OP_ACCEPT);

		while (true) {
			log.info("等待事件发生——");
			// 轮询监听channel里的key,select是阻塞的,accept()也是阻塞的
			int select = selector.select();

			log.info("有事件发生——");
			// 有客户端请求,被轮询监听到
			Iterator<SelectionKey> it = selector.selectedKeys().iterator();
			while (it.hasNext()) {
				SelectionKey key = it.next();
				//删除本次已处理的key,防止下次select重复处理
				it.remove();
				handle(key);
			}
		}
	}

	/**
	 * @param key 对应的事件key
	 */
	private static void handle(SelectionKey key) throws IOException {
		if (key.isAcceptable()) {

			log.info("第2步:有客户端[连接事件]发生");
			ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
			// NIO非阻塞体现:此处accept方法是阻塞的,但是这里因为是发生了连接事件,所以这个方法会马上执行完,不会阻塞
			// 处理完连接请求不会继续等待客户端的数据发送
			SocketChannel sc = ssc.accept();
			sc.configureBlocking(false);
			// 通过Selector监听Channel时对读事件感兴趣
			sc.register(key.selector(), SelectionKey.OP_READ);
		} else if (key.isReadable()) {

			log.info("第4步:有客户端数据[可读事件]发生");
			SocketChannel sc = (SocketChannel) key.channel();
			ByteBuffer buffer = ByteBuffer.allocate(1024);
			// NIO非阻塞体现:首先read方法不会阻塞,其次这种事件响应模型,当调用到read方法时肯定是发生了客户端发送数据的事件
			int len = sc.read(buffer);
			if (len != -1) {
				log.info("读取到客户端发送的数据,可进行相应的业务处理:",new String(buffer.array(), 0, len));
			}
			// 向客户端写入数据
			ByteBuffer bufferToWrite = ByteBuffer.wrap("Hello MultClient".getBytes());
			sc.write(bufferToWrite);
			// 会触发key.isWritable() 或 key.isReadable()的条件
			key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
		} else if (key.isWritable()) {

			log.info("第5步:有客户端数据[写入事件]发生了");
			SocketChannel sc = (SocketChannel) key.channel();
			// NIO事件触发是水平触发
			// 使用Java的NIO编程的时候,在没有数据可以往外写的时候要取消写事件,
			// 在有数据往外写的时候再注册写事件
			key.interestOps(SelectionKey.OP_READ);
		}
	}
}

3、效果展示

代码执行逻辑为:

  1. 首先客户端会初始化,然后去连接服务端
  2. 服务端接收到对应的连接事件后
  3. 然后客户端向服务端写入数据
  4. 此时服务端接收到客户端发送过来的连接,触发可读事件(key.isReadable())
  5. 读取完数据之后,服务端准备向客户端写入数据,此时触发服务端的可写事件,没有数据可以写入的时候取消写事件
  6. 最终客户端收到服务端写会的数据

客户端:



服务端:



4、总结

多路复用其实是一个相对完美的解决方案,即采用了Reactor模型,当对应的读、写、连接等事件触发的时候,再触发对应的接收请求的逻辑,然后完成后续的业务代码。既不会造成空轮询无效请求,也不会阻塞网络。

正式基于种种特点,Netty就在该IO模型的基础上进行封装。


五、Netty(基于多路复用IO)

1、客户端

Netty客户端代码如下,代码基本骨架都差不多

  1. 船舰对应的group,你可以理解为用来接收请求和分配工作线程的线程池组
  2. 创建对应的channel
  3. 创建需要对数据进行处理的handler。handler分为两大类:ChannelInboundHandler和ChannelOutboundHandler,你可以理解为是对管道中数据入口方向和出口方向数据的处理。
  4. 连接对应的服务端地址
  5. sync阻塞住线程,连接好后再继续向下执行,否则就是异步进行连接
@Slf4j
public class netty客户端 {
	public static void main(String[] args) throws InterruptedException, IOException {

		new Bootstrap()
				.group(new NioEventLoopGroup())
				.channel(NioSocketChannel.class)
				.handler(new ChannelInitializer<NioSocketChannel>() {
					@Override
					protected void initChannel(NioSocketChannel ch) throws Exception {
						ch.pipeline().addLast(new StringEncoder());
						ch.pipeline().addLast(new StringDecoder());                        // inputStream
						ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
							// 接收响应消息
							@Override
							public void channelRead(ChannelHandlerContext ctx, Object msg) {
								log.info("第4步:netty客户端接收到服务端返回的数据为:{}", msg);
							}

							@Override
							public void channelActive(ChannelHandlerContext ctx) {
								log.info("第1步:客户端启动触发事件");
								ctx.writeAndFlush("Hello NettyServer");
							}
						});
					}
				})
				.connect(new InetSocketAddress("localhost", 8099))
				.sync();
	}
}

2、服务端

服务端代码格式和客户端差不多,最开始创建的Bootstrap对象变成了ServerBootstrap。同样的,核心逻辑在initChannel方法里面。

  1. StringEncoder和StringDecoder为一个编解码器,我调用的writeAndFlush方法是直接写入的字节,需要在服务端和客户端分别对该字节进行相同格式的编解码工作,才能够收到对应的数据信息
@Slf4j
public class netty服务端 {
	public static void main(String[] args) {

		new ServerBootstrap()
				.group(new NioEventLoopGroup())
				.channel(NioServerSocketChannel.class)
				.childHandler(new ChannelInitializer<NioSocketChannel>() {
					@Override
					protected void initChannel(NioSocketChannel ch) throws Exception {
						ch.pipeline().addLast(new StringEncoder());
						ch.pipeline().addLast(new StringDecoder());
						ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {

							@Override
							public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
								log.info("第2步:服务端接收消息:{}", msg);
							}

							@Override
							public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
								log.info("第3步:服务端接收到消息后进行业务处理");
								ctx.channel().writeAndFlush("Hello NettyClient");
							}
						});
					}
				})
				.bind(8099);
	}
}

3、效果展示

服务端:



客户端:



4、总结

当你熟悉了Netty代码的编写风格之后,你会发现,我们想要对数据进行处理,只需要编写对应的ChannelHandler,其他的额外的关于数据的解析、读取等繁琐操作都被Netty封装了,即开发人员能够更加专注于业务开发,而非投身于繁琐的字节解析工作中。

想了解Netty一些细节概念的,可以参考:浅谈Netty机制


六、AIO(Asynchronous IO)

1、客户端

对应的Channel变成了AsynchronousSocketChannel

@Slf4j
public class AIOClient {

	public static void main(String... args) throws Exception {
		AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();
		socketChannel.connect(new InetSocketAddress("127.0.0.1", 9000)).get();
		socketChannel.write(ByteBuffer.wrap("Hello AioServer".getBytes()));
		ByteBuffer buffer = ByteBuffer.allocate(512);
		Integer len = socketChannel.read(buffer).get();
		if (len != -1) {
			log.info("客户端收到信息:" + new String(buffer.array(), 0, len));
		}
	}
}

2、服务端

@Slf4j
public class AIOServer {
    public static void main(String[] args) throws Exception {
        final AsynchronousServerSocketChannel serverChannel =
                AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(9000));

        serverChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
            @Override
            public void completed(AsynchronousSocketChannel socketChannel, Object attachment) {
                try {
                    // 再此接收客户端连接,如果不写这行代码后面的客户端连接连不上服务端
                    serverChannel.accept(attachment, this);
                    log.info(socketChannel.getRemoteAddress().toString());
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    socketChannel.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
                        @Override
                        public void completed(Integer result, ByteBuffer buffer) {
                            buffer.flip();
                            log.info(new String(buffer.array(), 0, result));
                            socketChannel.write(ByteBuffer.wrap("Hello AioClient".getBytes()));
                        }

                        @Override
                        public void failed(Throwable exc, ByteBuffer buffer) {
                            exc.printStackTrace();
                        }
                    });
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

            @Override
            public void failed(Throwable exc, Object attachment) {
                exc.printStackTrace();
            }
        });

        Thread.sleep(Integer.MAX_VALUE);
    }
}

3、效果展示

演示的效果也很好理解,就是我们常规的理解的异步获取到数据。

客户端:



服务端:



4、总结

异步非阻塞, 由操作系统完成后回调通知服务端程序启动线程去处理, 一般适用于连接数较多且连接时间较长的应用,JDK7 开始支持。

在Linux系统上,AIO的底层实现仍使用Epoll,没有很好实现AIO,因此在性能上没有明显的优势,而且被JDK封装了一层不容易深度优化,Linux上AIO还不够成熟。Netty是异步非阻塞框架,Netty在JDK的NIO上做了很多异步的封装。

BIO NIO AIO
IO模型 同步阻塞 同步非阻塞(多路复用) 异步非阻塞
编程难度 简单 复杂 复杂
可靠性
吞吐量