Netty简介

  • Netty是一个异步事件驱动和网络应用程序框架,用于快速开发可维护的高性能服务器和客户端。
  • Netty是一个NIO客户机->服务器框架,它支持快速、简单地开发网络应用程序,如服务器和客户机。它大大简化了网络编程,如TCP和UDP套接字服务器。
  • “快速和简单”并不意味着生成的应用程序将受到可维护性或性能问题的影响。Netty经过精心设计,并积累了许多协议(ftp、smtp、http)的实施经验,以及各种二进制和基于文本的遗留协议。因此Netty成功地找到一种方法,在不妥协的情况下实现了易于开发、性能、稳定性和灵活性。

Dubbo、ZK、RocketMQ、ElasticSearch、Spring5(对HTTP协议的实现)、GRpc、Spark等大型开源项目都在使用Netty作为底层通讯框架。

Netty核心概念

  1. Channel 管道,其是对Socket的封装,其包含了一组API,大大简化了直接与Socket进行操作的复杂性;
  2. EventLoopGroup 是一个EventLoop池,包含很多的EventLoop;
    Netty为每个Channel分配了一个EventLoop,用于处理用户连接请求、对用户请求的处理等所有事件。EventLoop本身只是一个线程驱动,在其生命周期内只会绑定一个线程,让该线程处理一个Channel的所有IO事件。
    一个Channel一旦与一个EventLoop相绑定,那么在Channel的整个生命周期内是不能改变的。一个EventLoop可以与多个Channel相绑定。即Channel与EventLoop关系是n:1,而EventLoop与线程关系是1:1;
  3. ServerBootStrap 用于配置整个Netty代码,将各个组件关联起来。服务端使用的是ServerBootStrap,而客户端使用的则是BootStrap;
  4. ChannelHandler和ChannelPipeline ChannelHandler是对Channel中数据的处理器,这些处理器可以是系统自身定义好的编解码器,也可以是用户自定义的。这些处理器会被统一添加到一个ChannelPipeline对象中,然后按照添加顺序对Channel中的数据进行依次处理;
  5. ChannelFuture Netty中所有I/O操作都是异步的,即操作不会立即得到返回结果,所以Netty中定义了一个ChannelFuture对象作为这个异步操作的“代言人”,表示异步操作本身。如果想获取到该异步操作的返回值,可以通过该异步操作对象的addListener()方法为该异步操作添加监听器,为其注册回调(结果出来立马执行)。
    Netty的异步编程模型都是建立在Future与回调概念上的;

Netty执行流程

在这里插入图片描述

小编用的Netty版本是4.1.29.Final。pom引用netty-all即可。

	<dependency>
		<groupId>io.netty</groupId>
		<artifactId>netty-all</artifactId>
	</dependency>

代码示例

编写程序:客户端连接上服务端后立马向服务端发送一个数据,服务端在接收到数据后会立马向客户端回复一个数据,客户端每收到服务端一个数据后便会再向服务端发送一个数据,如此反复。

  1. 服务端启动类
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

/**
 * @author: LailaiMonkey
 * @description:
 * @date:Created in 2022/4/10 2:08 下午
 * @modified By:
 */
public class SomeServer {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup parentGroup = new NioEventLoopGroup();
        NioEventLoopGroup childGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(parentGroup, childGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            // StringDecoder:字符串解码器,将channel中的ByteBuf数据解码成string
                            pipeline.addLast(new StringDecoder());
                            // StringEncoder:字符串编码器,将string编码为为将要发送到channel中的ByteBuf
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(new SomeServerHandler());
                        }
                    });
            ChannelFuture future = bootstrap.bind(8888).sync();
            System.out.println("服务器已启动");
            future.channel().closeFuture().sync();
        } finally {
            parentGroup.shutdownGracefully();
            childGroup.shutdownGracefully();
        }
    }
}
  1. 自定义服务端处理器
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
 * @author: LailaiMonkey
 * @description:
 * @date:Created in 2022/4/10 2:20 下午
 * @modified By:
 */
public class SomeServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 将来自于客户端数据显示在服务端控制台
        System.out.println(ctx.channel().remoteAddress() + "," + msg);

        // 向客户端发送数据
        ctx.channel().writeAndFlush("from server:" + UUID.randomUUID().toString());
        TimeUnit.MILLISECONDS.sleep(500);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}
  1. 客户端启动类
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

/**
 * @author: LailaiMonkey
 * @description:
 * @date:Created in 2022/4/10 2:28 下午
 * @modified By:
 */
public class SomeClient {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new SomeClientHandler());
                        }
                    });
            ChannelFuture future = bootstrap.connect("localhost", 8888).sync();
            future.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}
  1. 自定义客户端处理器
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;

/**
 * @author: LailaiMonkey
 * @description:
 * @date:Created in 2022/4/10 2:34 下午
 * @modified By:
 */
public class SomeClientHandler extends SimpleChannelInboundHandler<String> {

    // msg消息类型与类中泛型类型一致
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println(ctx.channel().remoteAddress() + "," + msg);
        ctx.channel().writeAndFlush("from client:" + LocalDateTime.now());
        TimeUnit.MILLISECONDS.sleep(500);
    }

    // 当channel被激活会触发该方法执行(没有该方法服务端等客户端发消息,客户端等服务端发消息)
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.channel().writeAndFlush("from client: begin talking");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

TCP拆包与粘包

Netty是基于TCP协议的网络通信,存在拆包与粘包情况。拆包与粘包同时发生在数据的发送方与接收方。
发送方通过网络每发送一批二进制数据包,那么这次所发送的数据包就称为一帧,即Frame。在进行TCP网络传输时,TCP协议会将用户真正要发送的数据根据当前缓存的实际情况对其进行拆分或重组,变为用于网络传输的Frame。在Netty中就是将ByteBuf中的数据拆分或重组为二进制的Frame,而接收方则需要将接收到的Frame中的数据进行重组或拆分,重新恢复成发送方发送时的ByteBuf数据。

  • 发送方发送的ByteBuf较大,在传输之前会被TCP底层拆分为多个Frame进行发送,这个过程称为发送拆包。接收方需要将这些Frame进行合并,这个全并过程称为接收方粘包。
  • 发送方发送的ByteBuf较小,无法形成一个Frame,此时TCP底层将很多的这样小的ByteBuf合并为一个Frame进行传输,这个合并过程称为发送方的粘包。接收方在接收到Frame后需要进行拆包,拆分他多个原来小的ByteBuf,这个拆分的过程称为接收方拆包。
  • 当一个Frame无法放入整数倍个ByteBuf时,最后一个ByteBuf会发生拆包,这个ByteBuf一部分放入到一个Frame中,另一部分被放入另一个Frame中,这个过程就是发送方拆包。但是对于这些ByteBuf放入到一个Frame的过程,就是发送方粘包。但对于这些ByteBuf放入到一个Frame的过程,就是发送方粘包。当接收方在接收到两个Frame后,对于第一个Frame最后一部分与第二个Frame的最前部分进行合并,这个合并过程就是接收方粘包。但在将Frame中的各个ByteBuf拆分出来的过程,就是接收方拆包。

发送方拆包代码示例

  1. 服务端启动类
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;

/**
 * @author: LailaiMonkey
 * @description:
 * @date:Created in 2022/4/10 2:08 下午
 * @modified By:
 */
public class SomeServer {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup parentGroup = new NioEventLoopGroup();
        NioEventLoopGroup childGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(parentGroup, childGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            // StringDecoder:字符串解码器,将channel中的ByteBuf数据解码成string
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new SomeServerHandler());
                        }
                    });
            ChannelFuture future = bootstrap.bind(8888).sync();
            System.out.println("服务器已启动");
            future.channel().closeFuture().sync();
        } finally {
            parentGroup.shutdownGracefully();
            childGroup.shutdownGracefully();
        }
    }
}
  1. 自定义服务端处理器
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
 * @author: LailaiMonkey
 * @description:
 * @date:Created in 2022/4/10 2:20 下午
 * @modified By:
 */
public class SomeServerHandler extends SimpleChannelInboundHandler<String> {

    private int count;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println("server接收到的第【" + ++count + "】个数据包:" + msg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
  1. 客户端启动类
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;

/**
 * @author: LailaiMonkey
 * @description:
 * @date:Created in 2022/4/10 2:28 下午
 * @modified By:
 */
public class SomeClient {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new SomeClientHandler());
                        }
                    });
            ChannelFuture future = bootstrap.connect("localhost", 8888).sync();
            future.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}
  1. 自定义客户端处理器
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * @author: LailaiMonkey
 * @description:
 * @date:Created in 2022/4/10 2:34 下午
 * @modified By:
 */
public class SomeClientHandler extends ChannelInboundHandlerAdapter {

    private String message = "此处省略很长很长字符串。。。";

    // 当channel被激活会触发该方法执行
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        byte[] bytes = message.getBytes();
        ByteBuf buffer;
        // 发送两次
        for (int i = 0; i < 2; i++) {
            // 申请缓存空间
            buffer = Unpooled.buffer(bytes.length);
            // 将数据写入到缓存中
            buffer.writeBytes(bytes);
            // 将缓存中数据写入到channel
            ctx.writeAndFlush(buffer);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

发送方粘包代码示例

  1. 服务端启动类
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;

/**
 * @author: LailaiMonkey
 * @description:
 * @date:Created in 2022/4/10 2:08 下午
 * @modified By:
 */
public class SomeServer {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup parentGroup = new NioEventLoopGroup();
        NioEventLoopGroup childGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(parentGroup, childGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            // StringDecoder:字符串解码器,将channel中的ByteBuf数据解码成string
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new SomeServerHandler());
                        }
                    });
            ChannelFuture future = bootstrap.bind(8888).sync();
            System.out.println("服务器已启动");
            future.channel().closeFuture().sync();
        } finally {
            parentGroup.shutdownGracefully();
            childGroup.shutdownGracefully();
        }
    }
}
  1. 自定义服务端处理器
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
 * @author: LailaiMonkey
 * @description:
 * @date:Created in 2022/4/10 2:20 下午
 * @modified By:
 */
public class SomeServerHandler extends SimpleChannelInboundHandler<String> {

    private int count;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println("server接收到的第【" + ++count + "】个数据包:" + msg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
  1. 客户端启动类
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;

/**
 * @author: LailaiMonkey
 * @description:
 * @date:Created in 2022/4/10 2:28 下午
 * @modified By:
 */
public class SomeClient {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new SomeClientHandler());
                        }
                    });
            ChannelFuture future = bootstrap.connect("localhost", 8888).sync();
            future.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}
  1. 自定义客户端处理器
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * @author: LailaiMonkey
 * @description:
 * @date:Created in 2022/4/10 2:34 下午
 * @modified By:
 */
public class SomeClientHandler extends ChannelInboundHandlerAdapter {

    private String message = "hello world ";

    // 当channel被激活会触发该方法执行
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        byte[] bytes = message.getBytes();
        ByteBuf buffer;
        for (int i = 0; i < 100; i++) {
            // 申请缓存空间
            buffer = Unpooled.buffer(bytes.length);
            // 将数据写入到缓存中
            buffer.writeBytes(bytes);
            // 将缓存中数据写入到channel
            ctx.writeAndFlush(buffer);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

接收方粘包与拆包

为了解决接收方接收到数据的混乱性,接收方也可以对接收到Frame包进行粘包与拆包。Netty中已经定义好了很多接收方粘包拆包解决方案,我们可能直接使用,小编介绍几种解决方案。

接收方粘包拆包实际工作是编解码。这个解码基本思想:发送方在发送数据中添加一个分隔标记,并告诉接收方该标记是什么,这样在接收方接收到Frame后会根据事先约定好的分隔标记进行拆分与合并,产生相应的ByteBuf数据。这个拆分或合并过程称为接收方的拆包与粘包。

LineBasedFrameDecoder

基于行的帧解码器,即会按照行分隔符对数据进行拆包粘包,解码出ByteBuf。

  1. 修改服务端启动类
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

/**
 * @author: LailaiMonkey
 * @description:
 * @date:Created in 2022/4/10 2:08 下午
 * @modified By:
 */
public class SomeServer {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup parentGroup = new NioEventLoopGroup();
        NioEventLoopGroup childGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(parentGroup, childGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            // 5120=5k,发送方数据不能大于5k
                            pipeline.addLast(new LineBasedFrameDecoder(5120));
                            // StringDecoder:字符串解码器,将channel中的ByteBuf数据解码成string
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new SomeServerHandler());
                        }
                    });
            ChannelFuture future = bootstrap.bind(8888).sync();
            System.out.println("服务器已启动");
            future.channel().closeFuture().sync();
        } finally {
            parentGroup.shutdownGracefully();
            childGroup.shutdownGracefully();
        }
    }
}
  1. 修改客户端处理器
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * @author: LailaiMonkey
 * @description:
 * @date:Created in 2022/4/10 2:34 下午
 * @modified By:
 */
public class SomeClientHandler extends ChannelInboundHandlerAdapter {

    private String message = "字符串"+
            System.getProperty("line.separator"); // 添加行分隔符

    // 当channel被激活会触发该方法执行
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        byte[] bytes = message.getBytes();
        ByteBuf buffer;
        for (int i = 0; i < 2; i++) {
            // 申请缓存空间
            buffer = Unpooled.buffer(bytes.length);
            // 将数据写入到缓存中
            buffer.writeBytes(bytes);
            // 将缓存中数据写入到channel
            ctx.writeAndFlush(buffer);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

DelimiterBasedFrameDecoder

基于分隔符的帧解码器,即会按照指定分隔符对数据进行拆包粘包,解码出ByteBuf。(这里指定分隔符:###===###)

  1. 修改服务端启动类
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

/**
 * @author: LailaiMonkey
 * @description:
 * @date:Created in 2022/4/10 2:08 下午
 * @modified By:
 */
public class SomeServer {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup parentGroup = new NioEventLoopGroup();
        NioEventLoopGroup childGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(parentGroup, childGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new DelimiterBasedFrameDecoder(6144, Unpooled.copiedBuffer("###===###".getBytes())));
                            // StringDecoder:字符串解码器,将channel中的ByteBuf数据解码成string
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new SomeServerHandler());
                        }
                    });
            ChannelFuture future = bootstrap.bind(8888).sync();
            System.out.println("服务器已启动");
            future.channel().closeFuture().sync();
        } finally {
            parentGroup.shutdownGracefully();
            childGroup.shutdownGracefully();
        }
    }
}
  1. 修改客户端处理器
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * @author: LailaiMonkey
 * @description:
 * @date:Created in 2022/4/10 2:34 下午
 * @modified By:
 */
public class SomeClientHandler extends ChannelInboundHandlerAdapter {

    private String message = "字符串###===###字符串###===###字符串字符串";

    // 当channel被激活会触发该方法执行
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        byte[] bytes = message.getBytes();
        ByteBuf buffer;
        for (int i = 0; i < 2; i++) {
            // 申请缓存空间
            buffer = Unpooled.buffer(bytes.length);
            // 将数据写入到缓存中
            buffer.writeBytes(bytes);
            // 将缓存中数据写入到channel
            ctx.writeAndFlush(buffer);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

FixedLengthFrameDecoder

固定长度帧解码器,即会按照指定的长度对Frame中的数据进行拆粘包。

  1. 修改服务端启动类
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

/**
 * @author: LailaiMonkey
 * @description:
 * @date:Created in 2022/4/10 2:08 下午
 * @modified By:
 */
public class SomeServer {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup parentGroup = new NioEventLoopGroup();
        NioEventLoopGroup childGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(parentGroup, childGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new FixedLengthFrameDecoder(11));
                            // StringDecoder:字符串解码器,将channel中的ByteBuf数据解码成string
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new SomeServerHandler());
                        }
                    });
            ChannelFuture future = bootstrap.bind(8888).sync();
            System.out.println("服务器已启动");
            future.channel().closeFuture().sync();
        } finally {
            parentGroup.shutdownGracefully();
            childGroup.shutdownGracefully();
        }
    }
}

LengthFieldBasedFrameDecoder

基于长度域的帧解码器,用于对LengthFieldPrepender编码器编码后的数据进行解码的。所以要清楚LengthFieldPrepender编码器的编码原理。

  • maxFrameLength要解码的Frame的最大长度。
  • lengthFieldOffset长度域的偏移量。
  • lengthFieldLength长度域的长度。
  • lengthAdjustment要添加到长度域中的补偿值,长度矫正值。
  • initialBytesToStrip从解码帧中要剥去的前面字节。
  1. 服务端启动类
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

/**
 * @author: LailaiMonkey
 * @description:
 * @date:Created in 2022/4/10 2:08 下午
 * @modified By:
 */
public class SomeServer {

    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup parentGroup = new NioEventLoopGroup();
        NioEventLoopGroup childGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(parentGroup, childGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
                            pipeline.addLast(new LengthFieldPrepender(4,true));
                            // StringDecoder:字符串解码器,将channel中的ByteBuf数据解码成string
                            pipeline.addLast(new StringDecoder());
                            // StringEncoder:字符串编码器,将string编码为为将要发送到channel中的ByteBuf
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(new SomeServerHandler());
                        }
                    });
            ChannelFuture future = bootstrap.bind(8888).sync();
            System.out.println("服务器已启动");
            future.channel().closeFuture().sync();
        } finally {
            parentGroup.shutdownGracefully();
            childGroup.shutdownGracefully();
        }
    }
}
  1. 自定义服务端处理器
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
 * @author: LailaiMonkey
 * @description:
 * @date:Created in 2022/4/10 2:20 下午
 * @modified By:
 */
public class SomeServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 将来自于客户端数据显示在服务端控制台
        System.out.println(ctx.channel().remoteAddress() + "," + msg);

        // 向客户端发送数据
        ctx.channel().writeAndFlush("from server:" + UUID.randomUUID().toString());
        TimeUnit.MILLISECONDS.sleep(500);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}
  1. 客户端启动类
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

/**
 * @author: LailaiMonkey
 * @description:
 * @date:Created in 2022/4/10 2:28 下午
 * @modified By:
 */
public class SomeClient {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new LengthFieldPrepender(4));
                            pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 4));
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new SomeClientHandler());
                        }
                    });
            ChannelFuture future = bootstrap.connect("localhost", 8888).sync();
            future.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}
  1. 自定义客户端处理器
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;

/**
 * @author: LailaiMonkey
 * @description:
 * @date:Created in 2022/4/10 2:34 下午
 * @modified By:
 */
public class SomeClientHandler extends SimpleChannelInboundHandler<String> {

    // msg消息类型与类中泛型类型一致
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println(ctx.channel().remoteAddress() + "," + msg);
        ctx.channel().writeAndFlush("from client:" + LocalDateTime.now());
        TimeUnit.MILLISECONDS.sleep(500);
    }

    // 当channel被激活会触发该方法执行
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.channel().writeAndFlush("from client: begin talking");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

版权声明:本文为h273979586原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/h273979586/article/details/124541280