客户端发送16进制给服务端,并行实现socket通道活动状态和断开重新连接的功能, 监听接口是否存在数据,如果存在socket客户端发送给socket服务端的实现 随着物联网的发展,随之出现了各种传感器监测数据的实时发送,需要和netty服务器通讯,netty和传感器之间需要保持长连接(换句话说,netty和gateway之间都会主动给对方发送消息)
netty 问题总结
当服务因为心跳反应断开时,在 userEventTriggered 方法中关闭一次后, 会执行 channelInactive 接口,也就是说,如果进行心跳检测,
但是客户端主动关闭后,只会调用channelInactive 的接口
在netty 中,主要涉及到几个内容
服务端 (姑且认为只有一个)
客户端 (可以有多个)
channel 通道 (双向通道 ) 是入站或者出站的载体。 服务器端的通道和客户端的通道并不是一个通道
channelpipeline 对输入输出的管理
下面这两个相当于过滤器或者拦截器之类的。 许多类都是从这上面扩展而来
ChannelInboundHandlerAdapter 输入的过滤
ChannelOutboundHandlerAdapter 输出的过滤
主要理解的是通道,一个客户端连接一个服务器,产生一个通道,所有的输入输出都是对通道的操作。
public class NettyClient {
private String host;
private int port;
/** 存放客户端bootstrap对象 */
private Bootstrap bootstrap;
/** 客户端 */
private EventLoopGroup group;
/** 存放客户端channel对象 */
private Channel channel;
public static void main(String[] args) throws Exception {
//NettyClient nettyClient = new NettyClient(“222.91.99.38”, 5100);
NettyClient nettyClient = new NettyClient(“localhost”, 5100);
nettyClient.connect();
}
public NettyClient(String host, int port) {
this.host = host;
this.port = port;
init();
}
private void init() {
//客户端需要一个事件循环组
group = new NioEventLoopGroup();
//创建客户端启动对象
// bootstrap 可重用, 只需在NettyClient实例化的时候初始化即可.
bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)/// 设置nio双向通道
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//加入处理器
//服务端设定IdleStateHandler心跳检测每五秒进行一次读检测,如果五秒内ChannelRead()方法未被调用则触发一次userEventTrigger()方法
//ch.pipeline().addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
//客户端 设定IdleStateHandler心跳检测每四秒进行一次写检测,如果四秒内write()方法未被调用则触发一次userEventTrigger()方法,实现客户端每四秒向服务端发送一次消息;
ch.pipeline().addLast(new IdleStateHandler(0,4,0, TimeUnit.SECONDS));
/*readerIdleTime:读空闲超时时间
writerIdleTime:写空闲超时时间
allIdleTime:读和写都空闲的超时时间*/
ch.pipeline().addLast(new NettyClientHandler(NettyClient.this));
}
});
}
//服务端
/* ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
//客户端
Bootstrap bootstrap = new Bootstrap();
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);*/
/**
* 连接到服务端
* @throws InterruptedException 中断异常
*/
public void doConnect() throws InterruptedException {
log.info(“Start connecting server”);
if (channel != null && channel.isActive()) {
return;
}
bootstrap.connect(host, port).addListener(new ChannelListener(host,port)).sync().channel();
}
/**
* 重新连接
* @param serverIp 连接的IP
*
*/
public void reConnect() {
try {
log.info(“Start reconnect to server.” + host + “:” + port);
if (channel != null && channel.isOpen()) {
channel.close();
}
bootstrap.connect(new InetSocketAddress(host, port))
.addListener(new ChannelListener(host,port)).sync().channel();
} catch (Exception e) {
log.info(“ReConnect to server failure.server=” + host + “:” + port + “:” + e.getMessage());
}
}
public void close() {
if (channel != null && channel.isOpen()) {
channel.close();
}
bootstrap.clone();
group.shutdownGracefully();
}
public void connect() throws Exception {
System.out.println(“netty client start。。”);
//启动客户端去连接服务器端
ChannelFuture cf = bootstrap.connect(host, port);
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
//重连交给后端线程执行
future.channel().eventLoop().schedule(() -> {
System.err.println(“重连服务端…”);
try {
connect();
} catch (Exception e) {
e.printStackTrace();
}
}, 20, TimeUnit.SECONDS);
} else {
System.out.println(“服务端连接成功…”);
}
}
});
//对通道关闭进行监听
cf.channel().closeFuture().sync();
}
}
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Autowired
private NettyClient nettyClient;
public NettyClientHandler(NettyClient nettyClient) {
this.nettyClient = nettyClient;
}
/**
* 当客户端连接服务器完成就会触发该方法
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf buf = Unpooled.copiedBuffer(“HelloServer”.getBytes(CharsetUtil.UTF_8));
System.err.println(“当客户端连接服务器完成就会触发该方法。。。”);
String str=”48 47 12 00 87 00 16 09 02 11 10 26 21 30 30 30 30 30 30 30 30 30 30 60 57 08 B5 0D 71 8E 74 94 5E 30 29 15 D1 9A D4 AA C2 FD 1E 53 E9 3A E1 CA F5 CF A2 6A 6A 74 2B 8F 87 B6 DF A1 5B CA 05 03 1F 3F 86 CF 8A 0C 05 85 7D 94 65 DE 12 4A A5 E3 EF 46 CD DE C8 13 5F 4C 17 2C 08 41 8F 31 99 44 55 AA B5 A5 A8 25 2A 8D 97 FC 22 76 11 50 02 67 05 30 D0 1D 5B 51 5A A1 11″;
byte[] bytes = hexStringToByteArray(str);
ByteBuf bufx = Unpooled.copiedBuffer(bytes);
//UnpooledHeapByteBuf.class
ctx.writeAndFlush(bufx);
Channel channel = ctx.channel();
log.info(“—map—“+channel);
ChannelMap.addChannel(“typeid”,ctx.channel());
log.info(“客户端连接成功: client address :{}”+ channel.remoteAddress());
}
public byte[] hexStringToByteArray(String hexString) {
hexString = hexString.replaceAll(” “, “”);
int len = hexString.length();
byte[] bytes = new byte[len / 2];
for (int i = 0; i < len; i += 2) {
// 两位一组,表示一个字节,把这样表示的16进制字符串,还原成一个字节
bytes[i / 2] = (byte) ((Character.digit(hexString.charAt(i), 16) << 4) + Character
.digit(hexString.charAt(i + 1), 16));
}
return bytes;
}
//当通道有读取事件时会触发,即服务端发送数据给客户端
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(“–收到服务端的消息–“+msg.toString());
ByteBuf buf = (ByteBuf) msg;
byte[] bytes = new byte[buf.readableBytes()];
// 复制内容到字节数组bytes
buf.readBytes(bytes);
String bytesToHexString = bytesToHexString(bytes);
String retx =Arrays.toString(bytes);
List<String> splitToList_r080928_val = Splitter.on(“,”).splitToList(retx);
List<String> result= new ArrayList<>();
splitToList_r080928_val.forEach(x->{
result.add(x);
});
System.out.println(“–收到服务端的消息bytesToHexString–“+bytesToHexString);
System.out.println(“–收到服务端的消息–“+retx);
//System.out.println(“收到服务端的消息:” + buf.toString(CharsetUtil.UTF_8));
System.out.println(“服务端的地址: ” + ctx.channel().remoteAddress());
// 保存当前连接
//在服务器端EchoServerHandler中的ChannelRead中保存当前的连接
ChannelMap.addChannel(bytesToHexString,ctx.channel());
System.out.println(“服务端的地址 map: ” + ChannelMap.getChannelHashMap());
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx)
throws Exception {
System.out.println(“channelReadComplete: channel读数据完成”);
super.channelReadComplete(ctx);
}
/**
* 将接收到的数据转换为16进制
* @param bytes
* @return
*/
public static String bytesToHexString(byte[] bytes) {
StringBuilder sb = new StringBuilder();
List<String> hexList=new ArrayList<String>();
for (int i = 0; i < bytes.length; i++) {
String hex = Integer.toHexString(0xFF & bytes[i]);
if (hex.length() == 1) {
hex=”0″+hex;
hexList.add(hex);
sb.append(‘0’);
}else{
hexList.add(hex);
}
sb.append(hex);
}
String join = Joiner.on(“,”).skipNulls().join(hexList);
return join;
}
//客户端与服务端断开连接时调用
// channel 处于不活动状态时调用
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.err.println(“运行中断开重连。。。”);
nettyClient.connect();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
/*
* readerIdleTime:读空闲超时时间
writerIdleTime:写空闲超时时间
allIdleTime:读和写都空闲的超时时间
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
System.err.println(“发送心跳,保持长连接。。。”);
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state().equals(IdleState.READER_IDLE)) {
System.out.println(“READER_IDLE”);
} else if (event.state().equals(IdleState.WRITER_IDLE)) {
/**发送心跳,保持长连接*/
String s = “ping$_”;
ctx.channel().writeAndFlush(s);
System.out.println(“心跳发送成功!”);
} else if (event.state().equals(IdleState.ALL_IDLE)) {
System.out.println(“ALL_IDLE”);
}
}
super.userEventTriggered(ctx, evt);
}