李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
Java
正文
14.NIO消息边界问题处理
Leefs
2022-06-03 PM
888℃
0条
[TOC] ### 一、消息边界问题的产生 #### 1.1 服务端代码 ```java import lombok.extern.slf4j.Slf4j; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.Iterator; /** * Created by lilinchao * Date 2022/6/3 * Description 消息边界问题 服务端 */ @Slf4j public class Server { public static void main(String[] args) throws IOException { //1.创建selector,管理多个channel Selector selector = Selector.open(); ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); //2. 建立channel和selector之间的联系(注册) SelectionKey sscKey = ssc.register(selector, 0, null); sscKey.interestOps(SelectionKey.OP_ACCEPT); ssc.bind(new InetSocketAddress(8080)); while (true){ //3. selector.select()方法,没有事件就阻塞,有了事件发送了就恢复运行继续向下处理 selector.select(); //4. 处理事件,selectionKeys拿到所有发生的可读可写的事件 Iterator
iterator = selector.selectedKeys().iterator(); //多个key的时候,accept和read方法都会触发事件,所以要区分事件类型 while (iterator.hasNext()){ SelectionKey key = iterator.next(); //处理key的时候要从selectKeys中删除,否则会报错 iterator.remove(); //5.区分事件类型 if(key.isAcceptable()){ //拿到触发事件的channel ServerSocketChannel channel = (ServerSocketChannel)key.channel(); SocketChannel sc = channel.accept(); //设置为非阻塞 sc.configureBlocking(false); //scKey管sc的channel SelectionKey scKey = sc.register(selector, 0, null); //scKey关注读事件,也就是说客户端的通道关注可读事件 scKey.interestOps(SelectionKey.OP_READ); }else if(key.isReadable()){ //客户端关闭之后也会引发read事件,这时需要从key中remove掉,否则拿不到channel,报错 try { SocketChannel channel = (SocketChannel)key.channel(); //将缓冲区大小设置为4 ByteBuffer buffer1 = ByteBuffer.allocate(4); //客户端正常断开,read返回值是-1 int read = channel.read(buffer1); if(read == -1){ //正常断开 key.channel(); } buffer1.flip(); System.out.println(Charset.defaultCharset().decode(buffer1)); } catch (IOException e) { e.printStackTrace(); key.cancel();//客户端断开,需要将key取消(从selector的key集合中真正删除) } } } } } } ``` #### 1.2 客户端代码 ```java import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.channels.SocketChannel; /** * Created by lilinchao * Date 2022/6/3 * Description 客户端 */ public class Client { public static void main(String[] args) throws IOException { SocketChannel sc = SocketChannel.open(); sc.connect(new InetSocketAddress("localhost", 8080)); SocketAddress localAddress = sc.getLocalAddress(); System.out.println("waiting..."); } } ``` **运行程序** (1)运行服务端代码 (2)通过Debug模式运行客户端代码 (3)通过客户端向服务端发送如下请求 ```java sc.write(Charset.defaultCharset().encode("中国")); ``` **服务端输出结果** ![14.NIO消息边界问题处理01.jpg](https://lilinchao.com/usr/uploads/2022/06/2258012529.jpg) 从输出结果可以看到,**国**字出现了乱码。 **问题分析** 因为在服务端代码中设置的接收客户端数据的缓冲区大小是4个字节,在UTF-8编码中,一个汉字占三个字节,也就是服务端在接收客户端发送到的消息时,只接收到了中字的三个字节和国字的第一个字节就进行了打印输出,导致国字出现了半包问题,产生了乱码。 ### 二、消息边界问题分析 ![14.NIO消息边界问题处理02.png](https://lilinchao.com/usr/uploads/2022/06/720319548.png) **分析** + **时刻1**:ByteBufeer较小,但是发送过来的消息比较大,一次处理不完; + **时刻2**:ByteBufeer较大,消息比较小。会出现半包现象 + **时刻3**:ButeBuffer可以一次性接收客户端发送过来的多条消息。此时会出现黏包现象 **解决思路** (1)**固定消息长度**,数据包大小一样,服务器按预定长度读取,当发送的数据较少时,需要将数据进行填充,直到长度与消息规定长度一致。缺点是浪费带宽 (2)**按分隔符拆分**,缺点是效率低,需要一个一个字符去匹配分隔符 (3)**TLV 格式,即 Type 类型、Length 长度、Value 数据**(也就是在消息开头**用一些空间存放后面数据的长度**),如HTTP请求头中的Content-Type与**Content-Length**。类型和长度已知的情况下,就可以方便获取消息大小,分配合适的 buffer,缺点是 buffer 需要提前分配,如果内容过大,则影响 server 吞吐量 ![14.NIO消息边界问题处理03.png](https://lilinchao.com/usr/uploads/2022/06/3010171246.png) ### 三、解决消息边界问题 本示例将按照第二种方式,**按分隔符拆分**来解决消息边界问题。 #### 3.1 附件与扩容 Channel的register方法还有**第三个参数**:`附件`,可以向其中放入一个Object类型的对象,该对象会与登记的Channel以及其对应的`SelectionKey`绑定,可以从`SelectionKey`获取到对应通道的附件 ```java public final SelectionKey register(Selector sel, int ops, Object att) ``` 可通过SelectionKey的**attachment()方法获得附件** ```java ByteBuffer buffer = (ByteBuffer) key.attachment(); ``` 需要在Accept事件发生后,将通道注册到Selector中时,**对每个通道添加一个ByteBuffer附件**,让每个通道发生读事件时都使用自己的通道,避免与其他通道发生冲突而导致问题 ```java // 设置为非阻塞模式,同时将连接的通道也注册到选择器中,同时设置附件 socketChannel.configureBlocking(false); ByteBuffer buffer = ByteBuffer.allocate(16); // 添加通道对应的Buffer附件 socketChannel.register(selector, SelectionKey.OP_READ, buffer); ``` 当Channel中的数据大于缓冲区时,需要对缓冲区进行**扩容**操作。此代码中的扩容的判定方法:**Channel调用compact方法后,position与limit相等,说明缓冲区中的数据并未被读取(容量太小),此时创建新的缓冲区,其大小扩大为两倍。同时还要将旧缓冲区中的数据拷贝到新的缓冲区中,同时调用SelectionKey的attach方法将新的缓冲区作为新的附件放入SelectionKey中** ```java // 如果缓冲区太小,就进行扩容 if (buffer.position() == buffer.limit()) { ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity()*2); // 将旧buffer中的内容放入新的buffer中 newBuffer.put(buffer); // 将新buffer作为附件放到key中 key.attach(newBuffer); } ``` #### 3.2 完整代码 + **需求** > 将服务端缓冲区大小设置成16,客户端向服务端发送数据21个字节的数据`0123456789abcdef3333\n` > > + `\n`为消息的分隔符,占一个字节大小 + **过程分析** ![14.NIO消息边界问题处理04.jpg](https://lilinchao.com/usr/uploads/2022/06/1694622757.jpg) + **服务端代码** ```java import lombok.extern.slf4j.Slf4j; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import static com.lilinchao.nio.bytebuffer_2.ByteBufferUtil.debugAll; /** * Created by lilinchao * Date 2022/6/3 * Description 服务端 */ @Slf4j public class MessageBorderServer { public static void main(String[] args) throws IOException { // 1. 创建 selector, 管理多个 channel Selector selector = Selector.open(); ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); // 2. 建立 selector 和 channel 的联系(注册) // SelectionKey 就是将来事件发生后,通过它可以知道事件和哪个channel的事件 SelectionKey sscKey = ssc.register(selector, 0, null); // key 只关注 accept 事件 sscKey.interestOps(SelectionKey.OP_ACCEPT); log.debug("sscKey:{}", sscKey); ssc.bind(new InetSocketAddress(8080)); while (true) { // 3. select 方法, 没有事件发生,线程阻塞,有事件,线程才会恢复运行 // select 在事件未处理时,它不会阻塞, 事件发生后要么处理,要么取消,不能置之不理 selector.select(); // 4. 处理事件, selectedKeys 内部包含了所有发生的事件 Iterator
iter = selector.selectedKeys().iterator(); // accept, read while (iter.hasNext()) { SelectionKey key = iter.next(); // 处理key 时,要从 selectedKeys 集合中删除,否则下次处理就会有问题 iter.remove(); log.debug("key: {}", key); // 5. 区分事件类型 if (key.isAcceptable()) { // 如果是 accept ServerSocketChannel channel = (ServerSocketChannel) key.channel(); SocketChannel sc = channel.accept(); sc.configureBlocking(false); ByteBuffer buffer = ByteBuffer.allocate(16); // attachment // 将一个 byteBuffer 作为附件关联到 selectionKey 上 SelectionKey scKey = sc.register(selector, 0, buffer); scKey.interestOps(SelectionKey.OP_READ); log.debug("{}", sc); log.debug("scKey:{}", scKey); } else if (key.isReadable()) { // 如果是 read try { SocketChannel channel = (SocketChannel) key.channel(); // 拿到触发事件的channel // 获取 selectionKey 上关联的附件 ByteBuffer buffer = (ByteBuffer) key.attachment(); int read = channel.read(buffer); // 如果是正常断开,read 的方法的返回值是 -1 if(read == -1) { key.cancel(); } else { split(buffer); // 需要扩容 if (buffer.position() == buffer.limit()) { ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2); buffer.flip(); newBuffer.put(buffer); // 0123456789abcdef3333\n key.attach(newBuffer); } } } catch (IOException e) { e.printStackTrace(); key.cancel(); // 因为客户端断开了,因此需要将 key 取消(从 selector 的 keys 集合中真正删除 key) } } } } } private static void split(ByteBuffer source) { source.flip(); for (int i = 0; i < source.limit(); i++) { // 找到一条完整消息 if (source.get(i) == '\n') { int length = i + 1 - source.position(); // 把这条完整消息存入新的 ByteBuffer ByteBuffer target = ByteBuffer.allocate(length); // 从 source 读,向 target 写 for (int j = 0; j < length; j++) { target.put(source.get()); } debugAll(target); } } source.compact(); // 0123456789abcdef position 16 limit 16 } } ``` + **客户端代码** ```java import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; /** * Created by lilinchao * Date 2022/6/3 * Description 1.0 */ public class MessageBorderClient { public static void main(String[] args) throws IOException { SocketChannel sc = SocketChannel.open(); sc.connect(new InetSocketAddress("localhost", 8080)); sc.write(Charset.defaultCharset().encode("0123456789abcdef3333\n")); System.in.read(); } } ``` + **输出结果** ``` 11:50:04 [DEBUG] [main] c.l.n.b.MessageBorderServer - sscKey:sun.nio.ch.SelectionKeyImpl@7dc36524 11:50:32 [DEBUG] [main] c.l.n.b.MessageBorderServer - key: sun.nio.ch.SelectionKeyImpl@7dc36524 11:50:32 [DEBUG] [main] c.l.n.b.MessageBorderServer - java.nio.channels.SocketChannel[connected local=/127.0.0.1:8080 remote=/127.0.0.1:51861] 11:50:32 [DEBUG] [main] c.l.n.b.MessageBorderServer - scKey:sun.nio.ch.SelectionKeyImpl@27f674d 11:50:32 [DEBUG] [main] c.l.n.b.MessageBorderServer - key: sun.nio.ch.SelectionKeyImpl@27f674d 11:50:32 [DEBUG] [main] c.l.n.b.MessageBorderServer - key: sun.nio.ch.SelectionKeyImpl@27f674d +--------+-------------------- all ------------------------+----------------+ position: [21], limit: [21] +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 30 31 32 33 34 35 36 37 38 39 61 62 63 64 65 66 |0123456789abcdef| |00000010| 33 33 33 33 0a |3333. | +--------+-------------------------------------------------+----------------+ ``` ### 四、bytebuffer大小分配 * 每个 channel 都需要记录可能被切分的消息,因为 ByteBuffer 不能被多个 channel 共同使用,因此需要为每个 channel 维护一个独立的 ByteBuffer * ByteBuffer 不能太大,比如一个 ByteBuffer 1Mb 的话,要支持百万连接就要 1Tb 内存,因此需要设计大小可变的 ByteBuffer * 一种思路是首先分配一个较小的 buffer,例如 4k,如果发现数据不够,再分配 8k 的 buffer,将 4k buffer 内容拷贝至 8k buffer,优点是消息连续容易处理,缺点是数据拷贝耗费性能,参考实现 [http://tutorials.jenkov.com/java-performance/resizable-array.html](http://tutorials.jenkov.com/java-performance/resizable-array.html) * 另一种思路是用多个数组组成 buffer,一个数组不够,把多出来的内容写入新的数组,与前面的区别是消息存储不连续解析复杂,优点是避免了拷贝引起的性能损耗 *附参考文章:* *《黑马程序员Netty教程》*
标签:
Netty
,
NIO
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://www.lilinchao.com/archives/2112.html
上一篇
13.Selector处理accept和read事件
下一篇
15.NIO Selector之处理write事件
取消回复
评论啦~
提交评论
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
标签云
FastDFS
Golang基础
链表
线程池
Filter
随笔
Sentinel
CentOS
Typora
Java编程思想
SQL练习题
Netty
Kafka
Spark SQL
Spark Core
Azkaban
稀疏数组
排序
RSA加解密
高并发
Nacos
Quartz
Zookeeper
Redis
Java
Spark
GET和POST
JavaWEB项目搭建
锁
Git
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞