李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
Java
正文
02.Netty进阶之粘包与半包解决方案
Leefs
2022-06-14 PM
789℃
0条
[TOC] ### 一、解决方案 1. **短链接**:发一个包建立一次连接,这样连接建立到连接断开之间就是消息的边界,缺点效率太低 2. **定长解码器**:每一条消息采用固定长度,缺点浪费空间 3. **行解码器**:每一条消息采用分隔符,例如 \n,缺点需要转义 4. **LTC解码器**:每一条消息分为 head 和 body,head 中包含 body 的长 ### 二、示例 #### 2.1 短链接 **客户端每次向服务器发送数据以后,就与服务器断开连接,此时的消息边界为连接建立到连接断开**。这时便无需使用滑动窗口等技术来缓冲数据,则不会发生粘包现象。但如果一次性数据发送过多,接收方无法一次性容纳所有数据,还是会发生半包现象,所以**短链接无法解决半包现象** + **思路** > 服务端:一次接收最大缓冲区大小为16字节大小的数据 > > 客户端:一次发送的字节数大于16字节,一次数据发送完成后断开于服务端的连接,循环发送十次消息 + **服务端代码** ```java import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.AdaptiveRecvByteBufAllocator; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import lombok.extern.slf4j.Slf4j; /** * @author lilinchao * @date 2022/6/14 * @description 短链接 服务端 **/ @Slf4j public class Server01 { void start(){ NioEventLoopGroup boss = new NioEventLoopGroup(); NioEventLoopGroup worker = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.channel(NioServerSocketChannel.class); //调整 netty 的接收缓冲区(byteBuf) //AdaptiveRecvByteBufAllocator:参数1:最小容量 参数2:初始容量 参数3:最大容量 serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,new AdaptiveRecvByteBufAllocator(16,16,16)); serverBootstrap.group(boss,worker); serverBootstrap.childHandler(new ChannelInitializer
() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); } }); ChannelFuture channelFuture = serverBootstrap.bind(8080).sync(); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { log.error("server error",e); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); } } public static void main(String[] args) { new Server01().start(); } } ``` + **客户端代码** ```java import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import lombok.extern.slf4j.Slf4j; /** * @author lilinchao * @date 2022/6/14 * @description 短链接 客户端 * 发送完成一次连接断开一次 **/ @Slf4j public class Client01 { public static void main(String[] args) { //发送十次消息 for (int i = 0; i < 10; i++) { send(); } System.out.println("finish"); } private static void send(){ NioEventLoopGroup worker = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.channel(NioSocketChannel.class); bootstrap.group(worker); bootstrap.handler(new ChannelInitializer
() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){ //会在连接channel建立成功后,触发active事件 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ByteBuf buf = ctx.alloc().buffer(16); buf.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17}); ctx.writeAndFlush(buf); //发送一次消息就直接断开连接 ctx.channel().close(); } }); } }); ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync(); channelFuture.channel().closeFuture().sync(); } catch (Exception e) { log.error("client error",e); } finally { worker.shutdownGracefully(); } } } ``` **运行结果** ``` 21:18:08 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0xe3341a31, L:/127.0.0.1:8080 - R:/127.0.0.1:49486] REGISTERED 21:18:08 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0xe3341a31, L:/127.0.0.1:8080 - R:/127.0.0.1:49486] ACTIVE 21:18:08 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0xe3341a31, L:/127.0.0.1:8080 - R:/127.0.0.1:49486] READ: 16B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................| +--------+-------------------------------------------------+----------------+ 21:18:08 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0xe3341a31, L:/127.0.0.1:8080 - R:/127.0.0.1:49486] READ COMPLETE 21:18:08 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0xe3341a31, L:/127.0.0.1:8080 - R:/127.0.0.1:49486] READ: 2B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 10 11 |.. | +--------+-------------------------------------------------+----------------+ 21:18:08 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0xe3341a31, L:/127.0.0.1:8080 - R:/127.0.0.1:49486] READ COMPLETE 21:18:08 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0xe3341a31, L:/127.0.0.1:8080 - R:/127.0.0.1:49486] READ COMPLETE 21:18:08 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0xe3341a31, L:/127.0.0.1:8080 ! R:/127.0.0.1:49486] INACTIVE 21:18:08 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0xe3341a31, L:/127.0.0.1:8080 ! R:/127.0.0.1:49486] UNREGISTERED ``` 只复制一次,客户端从连接到发送完消息以后断开的过程。 从结果可以很清晰的看到,没有发生粘包现象,但是半包问题未得到解决。 #### 2.2 定长解码器 客户端于服务器**约定一个最大长度,保证客户端每次发送的数据长度都不会大于该长度**。若发送数据长度不足则需要**补齐**至该长度。 服务器接收数据时,**将接收到的数据按照约定的最大长度进行拆分**,即使发送过程中产生了粘包,也可以通过**定长解码器**将数据正确地进行拆分。 `Netty`中提供了一个`FixedLengthFrameDecoder`(固定长度解析器),是一个特殊的handler,专门用来进行解码。 + **思路** > 客户端:给每个发送的数据封装成定长的长度(多余的使用分隔符`_`,统一规定)最后统一通过一个ByteBuf发送出去;(会产生粘包) > > 服务端:通过`FixedLengthFrameDecoder`进行固定长度解析,每次解析到定长的Bytebuf进行处理。 + **服务端代码** ```java import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.AdaptiveRecvByteBufAllocator; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.FixedLengthFrameDecoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import lombok.extern.slf4j.Slf4j; @Slf4j public class Server2 { void start() { NioEventLoopGroup boss = new NioEventLoopGroup(); NioEventLoopGroup worker = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.channel(NioServerSocketChannel.class); // 调整系统的接收缓冲区(滑动窗口) // serverBootstrap.option(ChannelOption.SO_RCVBUF, 10); // 调整 netty 的接收缓冲区(byteBuf) serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(16, 16, 16)); serverBootstrap.group(boss, worker); serverBootstrap.childHandler(new ChannelInitializer
() { @Override protected void initChannel(SocketChannel ch) throws Exception { //对数据进行定长解码 ch.pipeline().addLast(new FixedLengthFrameDecoder(10)); ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); } }); ChannelFuture channelFuture = serverBootstrap.bind(8080).sync(); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { log.error("server error", e); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); } } public static void main(String[] args) { new Server2().start(); } } ``` + **客户端代码** ```java import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import lombok.extern.slf4j.Slf4j; import java.util.Arrays; import java.util.Random; @Slf4j public class Client2 { public static void main(String[] args) { send(); System.out.println("finish"); } //将数据填充到十个字节,不够的部分使用'_' public static byte[] fill10Bytes(char c, int len) { byte[] bytes = new byte[10]; Arrays.fill(bytes, (byte) '_'); for (int i = 0; i < len; i++) { bytes[i] = (byte) c; } System.out.println(new String(bytes)); return bytes; } private static void send() { NioEventLoopGroup worker = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.channel(NioSocketChannel.class); bootstrap.group(worker); bootstrap.handler(new ChannelInitializer
() { @Override protected void initChannel(SocketChannel ch) { ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { // 会在连接 channel 建立成功后,会触发 active 事件 @Override public void channelActive(ChannelHandlerContext ctx) { ByteBuf buf = ctx.alloc().buffer(); char c = '0'; Random r = new Random(); for (int i = 0; i < 10; i++) { byte[] bytes = fill10Bytes(c, r.nextInt(10) + 1); c++; buf.writeBytes(bytes); } ctx.writeAndFlush(buf); } }); } }); ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync(); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { log.error("client error", e); } finally { worker.shutdownGracefully(); } } } ``` 缺点是,数据包的大小不好把握 * 长度定的太大,浪费 * 长度定的太小,对某些数据包又显得不够 #### 2.3 行解码器 行解码器的是**通过分隔符对数据进行拆分**来解决粘包半包问题的 **在Netty中提供了两个解码器:** + **LineBasedFrameDecoder**:指定以换行符作为分隔符。\n或者\r\n,使用它的时候,会有一个最大长度限制,若是超过了最大长度还没有找到换行符就会抛出一个异常。 + **DelimiterBasedFrameDecoder**:可以自定义符号来作为分隔符,在构造方法中有最大长度和一个Bytebuf类型的分隔符。 + **服务端代码** ```java import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.AdaptiveRecvByteBufAllocator; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.FixedLengthFrameDecoder; import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import lombok.extern.slf4j.Slf4j; @Slf4j public class Server3 { void start() { NioEventLoopGroup boss = new NioEventLoopGroup(); NioEventLoopGroup worker = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.channel(NioServerSocketChannel.class); // 调整系统的接收缓冲区(滑动窗口) // serverBootstrap.option(ChannelOption.SO_RCVBUF, 10); // 调整 netty 的接收缓冲区(byteBuf) serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(16, 16, 16)); serverBootstrap.group(boss, worker); serverBootstrap.childHandler(new ChannelInitializer
() { @Override protected void initChannel(SocketChannel ch) throws Exception { //指定以换行符作为分隔符,限制最大字节长度为1024 ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); } }); ChannelFuture channelFuture = serverBootstrap.bind(8080).sync(); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { log.error("server error", e); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); } } public static void main(String[] args) { new Server3().start(); } } ``` + **客户端代码** ```java import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import lombok.extern.slf4j.Slf4j; import java.util.Random; @Slf4j public class Client3 { public static void main(String[] args) { send(); System.out.println("finish"); } public static StringBuilder makeString(char c, int len) { StringBuilder sb = new StringBuilder(len + 2); for (int i = 0; i < len; i++) { sb.append(c); } sb.append("\n"); return sb; } private static void send() { NioEventLoopGroup worker = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.channel(NioSocketChannel.class); bootstrap.group(worker); bootstrap.handler(new ChannelInitializer
() { @Override protected void initChannel(SocketChannel ch) { ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { // 会在连接 channel 建立成功后,会触发 active 事件 @Override public void channelActive(ChannelHandlerContext ctx) { ByteBuf buf = ctx.alloc().buffer(); char c = '0'; Random r = new Random(); for (int i = 0; i < 10; i++) { StringBuilder sb = makeString(c, r.nextInt(256) + 1); c++; buf.writeBytes(sb.toString().getBytes()); } ctx.writeAndFlush(buf); } }); } }); ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync(); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { log.error("client error", e); } finally { worker.shutdownGracefully(); } } } ``` 缺点,处理字符数据比较合适,但如果内容本身包含了分隔符(字节数据常常会有此情况),那么就会解析错误。 同时,效率比较低,需要一个一个字节去匹配消息的边界。 *附参考文章链接:* *https://nyimac.gitee.io/2021/04/25/Netty%E5%9F%BA%E7%A1%80*
标签:
Netty
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://www.lilinchao.com/archives/2165.html
上一篇
01.Netty进阶之粘包与半包现象分析
下一篇
03.Netty进阶之长度域解码器
取消回复
评论啦~
提交评论
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
标签云
Spark Core
Azkaban
字符串
MySQL
ClickHouse
JavaWeb
数据结构
递归
随笔
LeetCode刷题
gorm
栈
Ubuntu
Spark RDD
Hive
Java编程思想
正则表达式
Java工具类
Java
Netty
FastDFS
前端
SQL练习题
人工智能
JavaSE
散列
二叉树
Linux
排序
Docker
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞