EventLoop

EventLoop本质上就是一个Selector+一个单线程执行器。里面的run方法处理channel上源源不断的io事件

EventLoop它继承两个类

  • 第一个是继承netty自己的OrderedEventExecutor接口
  • 第二个是继承java.util.concurrent.ScheduledExecutorService接口

我们一般是不会直接用EventLoop 一般是使用EventLoopGroup。

因为我们肯定是需要多个EventLoop,也就相当于是需要多个线程来处理各种事件,而EventLoop本质又是一个单线程执行器。

channel一般会调用EventLoopGroup中的register()方法来绑定其中一个EventLoop,之后这个channel的io事件都又该EventLoop来处理。

创建EventLoopGroup

public static void main(String[] args) {
    // 创建EventLoopGroup,它是一个接口,常用的使用就是NioEventLoopGroup
    // 它可以处理io事件,普通任务,定时任务
    // 还有另一个实现 DefaultEventLoop 它可以处理普通任务,定时任务。适用于一些没有io事件发生的场景
    EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
}

一般情况我们都没有往构造方法里面写参数,可以写一个int表示要创建几个EventLoop也就是创建几个线程。那默认会创建几个嘞?

在源码中的第一层,这里默认写了0,然后再一直点下去。

public NioEventLoopGroup() {
    this(0);
}

然后到了父类MultithreadEventLoopGroup类

public abstract class MultithreadEventLoopGroup ... {
    // 这里最后是创建了cpu核数的两倍 NettyRuntime.availableProcessors()就是cpu核数
    private static final int DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

    // 如果为0就采用一个默认的静态常量的值DEFAULT_EVENT_LOOP_THREADS
    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }
}

可以发现,如果我们指定了EventLoop的数量那就以我们指定的为准,如果没有指定那就是采用的cou核数*2

执行普通任务和定时任务

package com.hs.nettyPrimary;

import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.TimeUnit;

/**
 * @author hs
 * @date 2021/07/17
 */
@Slf4j
public class EventLoopGroupTest {
    public static void main(String[] args) {
        // 创建EventLoopGroup,它是一个接口,常用的使用就是NioEventLoopGroup
        // 它可以处理io事件,普通任务,定时任务
        // 还有另一个实现 DefaultEventLoop 它可以处理普通任务,定时任务。适用于一些没有io事件发生的场景
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

        // 获取下一个EventLoop对象
        eventLoopGroup.next();

        // 执行普通任务,因为它还有线程池的方法,就可以使用submit(Runnable run)方法来执行一个普通任务
        // 执行普通任务的好处是可以执行一个异步处理
        eventLoopGroup.next().execute(() -> {
            log.debug("执行普通任务");
        });

        // 执行定时任务 scheduleAtFixedRate(任务对象,初始延迟时间 , 间隔时间 , 时间单位)
        // 下面的含义是,刚开始等待1秒执行,然后接下来每隔两秒执行一次
        eventLoopGroup.next().scheduleAtFixedRate(() -> {
            log.debug("执行定时任务");
        } , 1 , 2 , TimeUnit.SECONDS);

    }
}

处理io事件

服务器的代码

@Slf4j
public class EventLoopGroupServerTest2 {
    public static void main(String[] args) {
        new ServerBootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel socketChannel) throws Exception {
                        // 这里就没有定义处理ByteBuf和字符串转换的Handler,仅仅定义一个,现在自己进行转换
                        socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                            @Override
                           public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf byteBuf = (ByteBuf) msg;
                                log.debug("接收客户端的消息为:" + byteBuf.toString(Charsets.UTF_8));
                            }
                        });
                    }
                })
                .bind(8080);
    }
}

客户端的代码基本上没什么变化
在这里插入图片描述
但是服务器启动后,客户端也启动后,使用debug发送的数据,服务器竟然接收不到,这是因为netty采用的是多线程,而这里的断点主线程和发送数据的线程都停了,这里应该将下面的单选按钮选为Thread,就表示只是停主线程,但是发送数据的线程还是不会停止
在这里插入图片描述

EventLoopGroup

我们现在的代码是如下

new ServerBootstrap()
        .group(new NioEventLoopGroup())
        .channel(NioServerSocketChannel.class)
    	.childHandler(...)
    	.bind(8080);

这里的group()方法里面只是传了一个参数,就相当于这个组里面有处理accept事件的EventLoop 也有处理read事件的EventLoop。我们可以划分的更细,这个方法里面可以传两个EventLoopGroup对象

public static void main(String[] args) {
    new ServerBootstrap()
            // 前面负责ServerSocketChannel的accept事件,后面的负责SocketChannel的read事件
            .group(new NioEventLoopGroup() , new NioEventLoopGroup())
        	.channel(NioServerSocketChannel.class)
    		.childHandler(...)
    		.bind(8080);
}

这里就想,group()方法里面的第一个EventLoopGroup要不要指定数量,其实是不用指定的。因为这里只有一个ServerSocketChannel,只会占用一个EventLoop。

进一步细分,我们往pipeline中添加了多个Handler,但如果其中一个Handler耗时较长,进而影响到了其他的Handler执行,我们可以进一步细分,在创建一个NioEventLoopGroup,让这个新的EventLoopGroup专门去处理耗时较长的操作。

@Slf4j
public class EventLoopGroupServerTest2 {
    public static void main(String[] args) {
        // 创建一个专门处理耗时较长Handler 的EventLoopGroup 
        // 因为不是处理io操作,这里可以使用另一个实现 DefaultEventLoopGroup
        EventLoopGroup eventLoopGroup = new DefaultEventLoopGroup();
        
        new ServerBootstrap()
                // 前面负责ServerSocketChannel的accept事件,后面的负责SocketChannel的read事件
                .group(new NioEventLoopGroup() , new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(。。。)
                .bind(8080);
    }
}

现在的问题就是,EventLoopGroup创建好了后,如何跟这个耗时较长的handler联系起来。其实我们在调用nioSocketChannel.pipeline().addLast() addlast()方法可以接收多个参数

@Slf4j
public class EventLoopGroupServerTest2 {
    public static void main(String[] args) {
        // 创建一个专门处理耗时较长Handler 的EventLoopGroup
        // 因为不是处理io操作,这里可以使用另一个实现 DefaultEventLoopGroup
        EventLoopGroup group = new DefaultEventLoopGroup();

        new ServerBootstrap()
                .group(new NioEventLoopGroup() , new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel socketChannel) throws Exception {
                        
                        // 使用NioEventLoopGroup来处理的handler
                        socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                            @Override
                           public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf byteBuf = (ByteBuf) msg;
                                log.debug("接收客户端的消息为:" + byteBuf.toString(Charsets.UTF_8));
                                // 让消息传递给下一个handler
                                ctx.fireChannelRead(msg);
                            }
                        });
                        // 将这个handler交给我们自定义的另一个EventLoopGroup处理
                        // addLast()第一个参数就是我们自定义的另一个EventLoopGroup处理,
                        // 第二个参数是这个handler起一个名字
                        // 第三个参数就是handler具体要做的事情
                     socketChannel.pipeline().addLast(group, "handler1", new ChannelInboundHandlerAdapter(){
                            @Override
                           public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf byteBuf = (ByteBuf) msg;
                                log.debug("接收客户端的消息为:" + byteBuf.toString(Charsets.UTF_8));
                            }
                        });
                    }
                })
                .bind(8080);
    }
}

这里其实又有一个线程切换的知识点,因为一个EventLoop就相当于是一个线程,这里将pipeline其中的一个Handler给其他EventLoop去执行就有一个线程切换的问题,那么是这两个线程是如何切换的嘞?
关键代码io.netty.channel.AbstractChannelHandlerContext类的invokeChannelRead()方法
在这里插入图片描述


版权声明:本文为qq_44027353原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/qq_44027353/article/details/121332382