EventLoop
EventLoop本质上就是一个Selector+一个单线程执行器。里面的run方法处理channel上源源不断的io事件
EventLoop它继承两个类
- 第一个是继承netty自己的OrderedEventExecutor接口
- 第二个是继承java.util.concurrent.ScheduledExecutorService接口
我们一般是不会直接用EventLoop 一般是使用EventLoopGroup。
因为我们肯定是需要多个EventLoop,也就相当于是需要多个线程来处理各种事件,而EventLoop本质又是一个单线程执行器。
channel一般会调用EventLoopGroup中的register()方法来绑定其中一个EventLoop,之后这个channel的io事件都又该EventLoop来处理。
创建EventLoopGroup
public static void main(String[] args) {
// 创建EventLoopGroup,它是一个接口,常用的使用就是NioEventLoopGroup
// 它可以处理io事件,普通任务,定时任务
// 还有另一个实现 DefaultEventLoop 它可以处理普通任务,定时任务。适用于一些没有io事件发生的场景
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
}
一般情况我们都没有往构造方法里面写参数,可以写一个int表示要创建几个EventLoop也就是创建几个线程。那默认会创建几个嘞?
在源码中的第一层,这里默认写了0,然后再一直点下去。
public NioEventLoopGroup() {
this(0);
}
然后到了父类MultithreadEventLoopGroup类
public abstract class MultithreadEventLoopGroup ... {
// 这里最后是创建了cpu核数的两倍 NettyRuntime.availableProcessors()就是cpu核数
private static final int DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
// 如果为0就采用一个默认的静态常量的值DEFAULT_EVENT_LOOP_THREADS
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
}
可以发现,如果我们指定了EventLoop的数量那就以我们指定的为准,如果没有指定那就是采用的cou核数*2
执行普通任务和定时任务
package com.hs.nettyPrimary;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
/**
* @author hs
* @date 2021/07/17
*/
@Slf4j
public class EventLoopGroupTest {
public static void main(String[] args) {
// 创建EventLoopGroup,它是一个接口,常用的使用就是NioEventLoopGroup
// 它可以处理io事件,普通任务,定时任务
// 还有另一个实现 DefaultEventLoop 它可以处理普通任务,定时任务。适用于一些没有io事件发生的场景
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
// 获取下一个EventLoop对象
eventLoopGroup.next();
// 执行普通任务,因为它还有线程池的方法,就可以使用submit(Runnable run)方法来执行一个普通任务
// 执行普通任务的好处是可以执行一个异步处理
eventLoopGroup.next().execute(() -> {
log.debug("执行普通任务");
});
// 执行定时任务 scheduleAtFixedRate(任务对象,初始延迟时间 , 间隔时间 , 时间单位)
// 下面的含义是,刚开始等待1秒执行,然后接下来每隔两秒执行一次
eventLoopGroup.next().scheduleAtFixedRate(() -> {
log.debug("执行定时任务");
} , 1 , 2 , TimeUnit.SECONDS);
}
}
处理io事件
服务器的代码
@Slf4j
public class EventLoopGroupServerTest2 {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel socketChannel) throws Exception {
// 这里就没有定义处理ByteBuf和字符串转换的Handler,仅仅定义一个,现在自己进行转换
socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
log.debug("接收客户端的消息为:" + byteBuf.toString(Charsets.UTF_8));
}
});
}
})
.bind(8080);
}
}
客户端的代码基本上没什么变化
但是服务器启动后,客户端也启动后,使用debug发送的数据,服务器竟然接收不到,这是因为netty采用的是多线程,而这里的断点主线程和发送数据的线程都停了,这里应该将下面的单选按钮选为Thread,就表示只是停主线程,但是发送数据的线程还是不会停止
EventLoopGroup
我们现在的代码是如下
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(...)
.bind(8080);
这里的group()方法里面只是传了一个参数,就相当于这个组里面有处理accept事件的EventLoop 也有处理read事件的EventLoop。我们可以划分的更细,这个方法里面可以传两个EventLoopGroup对象
public static void main(String[] args) {
new ServerBootstrap()
// 前面负责ServerSocketChannel的accept事件,后面的负责SocketChannel的read事件
.group(new NioEventLoopGroup() , new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(...)
.bind(8080);
}
这里就想,group()方法里面的第一个EventLoopGroup要不要指定数量,其实是不用指定的。因为这里只有一个ServerSocketChannel,只会占用一个EventLoop。
进一步细分,我们往pipeline中添加了多个Handler,但如果其中一个Handler耗时较长,进而影响到了其他的Handler执行,我们可以进一步细分,在创建一个NioEventLoopGroup,让这个新的EventLoopGroup专门去处理耗时较长的操作。
@Slf4j
public class EventLoopGroupServerTest2 {
public static void main(String[] args) {
// 创建一个专门处理耗时较长Handler 的EventLoopGroup
// 因为不是处理io操作,这里可以使用另一个实现 DefaultEventLoopGroup
EventLoopGroup eventLoopGroup = new DefaultEventLoopGroup();
new ServerBootstrap()
// 前面负责ServerSocketChannel的accept事件,后面的负责SocketChannel的read事件
.group(new NioEventLoopGroup() , new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(。。。)
.bind(8080);
}
}
现在的问题就是,EventLoopGroup创建好了后,如何跟这个耗时较长的handler联系起来。其实我们在调用nioSocketChannel.pipeline().addLast()
addlast()方法可以接收多个参数
@Slf4j
public class EventLoopGroupServerTest2 {
public static void main(String[] args) {
// 创建一个专门处理耗时较长Handler 的EventLoopGroup
// 因为不是处理io操作,这里可以使用另一个实现 DefaultEventLoopGroup
EventLoopGroup group = new DefaultEventLoopGroup();
new ServerBootstrap()
.group(new NioEventLoopGroup() , new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel socketChannel) throws Exception {
// 使用NioEventLoopGroup来处理的handler
socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
log.debug("接收客户端的消息为:" + byteBuf.toString(Charsets.UTF_8));
// 让消息传递给下一个handler
ctx.fireChannelRead(msg);
}
});
// 将这个handler交给我们自定义的另一个EventLoopGroup处理
// addLast()第一个参数就是我们自定义的另一个EventLoopGroup处理,
// 第二个参数是这个handler起一个名字
// 第三个参数就是handler具体要做的事情
socketChannel.pipeline().addLast(group, "handler1", new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
log.debug("接收客户端的消息为:" + byteBuf.toString(Charsets.UTF_8));
}
});
}
})
.bind(8080);
}
}
这里其实又有一个线程切换的知识点,因为一个EventLoop就相当于是一个线程,这里将pipeline其中的一个Handler给其他EventLoop去执行就有一个线程切换的问题,那么是这两个线程是如何切换的嘞?
关键代码io.netty.channel.AbstractChannelHandlerContext
类的invokeChannelRead()
方法