李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
Java
正文
16.NIO之多线程优化
Leefs
2022-06-04 PM
755℃
0条
[TOC] ### 前言 之前说到的服务端程序都是在一个线程上进行的,这个线程不仅负责连接客户端发来的请求,同时还要处理读写事件,这样效率还是不够高。如今电脑都是多核处理器,这意味着可以同时进行多个线程,所以服务端应该充分利用这一点。 ### 一、概述 服务端线程可以建立多个线程,将这些线程分成两组: + 单线程配一个选择器(Boss),**专门处理 accept 事件** + 创建 cpu 核心数的线程(Worker),**每个线程配一个选择器,轮流处理 read 事件** **关系图** ![16.NIO之多线程优化01.jpg](https://lilinchao.com/usr/uploads/2022/06/1394560525.jpg) **说明** + Boss线程只负责Accept事件,Worker线程负责客户端与服务端之间的读写问题,他们都各自维护一个Selector负责监听通道的事件。 + 当Boss线程检测到有客户端的连接请求,就会把这个连接返回的`SocketChannel`注册到某一个Worker线程上。 + 当有读写事件发生时,其中一个Worker线程就会检测到事件,就会在该线程中进行处理,这样的设计做到了功能在线程上的分离。 ### 二、实现思路 + 创建**一个**负责处理Accept事件的Boss线程,与**多个**负责处理Read事件的Worker线程; + **Boss线程**执行的操作 - 接受并处理Accepet事件,当Accept事件发生后,调用Worker的register(SocketChannel socket)方法,让Worker去处理Read事件,其中需要**根据标识index去判断将任务分配给哪个Worker** ``` // 创建固定数量的Worker Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()]; // 用于负载均衡的原子整数 AtomicInteger index = new AtomicInteger(0); // 负载均衡,轮询分配Worker workers[index.getAndIncrement()% workers.length].register(socket); ``` - `register(SocketChannel socket)`方法会**通过同步队列完成Boss线程与Worker线程之间的通信**,让`SocketChannel`的注册任务被Worker线程执行。添加任务后需要调用`selector.wakeup()`来唤醒被阻塞的Selector ```java public void register(final SocketChannel socket) throws IOException { // 只启动一次 if (!started) { // 初始化操作 } // 向同步队列中添加SocketChannel的注册事件 // 在Worker线程中执行注册事件 queue.add(new Runnable() { @Override public void run() { try { socket.register(selector, SelectionKey.OP_READ); } catch (IOException e) { e.printStackTrace(); } } }); // 唤醒被阻塞的Selector selector.wakeup(); } ``` + **Worker线程执行**的操作 - **从同步队列中获取注册任务,并处理Read事件** ### 三、代码实现 + **服务端代码** ```java import lombok.extern.slf4j.Slf4j; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Iterator; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; import static com.lilinchao.nio.bytebuffer_2.ByteBufferUtil.debugAll; /** * Created by lilinchao * Date 2022/6/4 * Description 多线程优化 -- 服务端 */ @Slf4j public class MultiThreadServer { public static void main(String[] args) throws IOException { Thread.currentThread().setName("boss"); ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); // 负责轮询Accept事件的Selector Selector boss = Selector.open(); SelectionKey bossKey = ssc.register(boss, 0, null); bossKey.interestOps(SelectionKey.OP_ACCEPT); ssc.bind(new InetSocketAddress(8080)); //创建固定数量的worker = core 数 Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()]; for (int i=0;i
iterator = boss.selectedKeys().iterator(); while (iterator.hasNext()){ SelectionKey key = iterator.next(); iterator.remove(); if(key.isAcceptable()){ SocketChannel sc = ssc.accept(); sc.configureBlocking(false); log.debug("connected:{}",sc.getRemoteAddress()); // 2. 关联 selector (静态内部类可以访问到selector) log.debug("before register:{}",sc.getRemoteAddress()); // 负载均衡,轮询分配Worker workers[index.getAndIncrement() % workers.length].register(sc); log.debug("after register:{}",sc.getRemoteAddress()); } } } } static class Worker implements Runnable{ private Thread thread; private Selector selector; private String name; private volatile boolean start = false; //还未初始化 /** * 同步队列,用于Boss线程与Worker线程之间的通信 */ private ConcurrentLinkedQueue
queue = new ConcurrentLinkedQueue<>(); public Worker(String name) { this.name = name; } //初始化线程和Selector public void register(SocketChannel sc) throws IOException { //只启动一次 if(!this.start){ this.thread = new Thread(this,name); this.selector = Selector.open(); this.thread.start(); this.start = true; } //向队列添加任务,但这个任务并没有立刻执行 queue.add(() -> { try { sc.register(selector,SelectionKey.OP_READ,null); } catch (ClosedChannelException e) { e.printStackTrace(); } }); selector.wakeup(); //唤醒select方法 } @Override public void run() { while (true){ try { selector.select(); //阻塞 // 通过同步队列获得任务并运行 Runnable task = queue.poll(); if(task != null){ task.run(); //获得任务,执行注册 } Iterator
iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()){ SelectionKey key = iterator.next(); iterator.remove(); // Worker只负责Read事件 if(key.isReadable()){ ByteBuffer buffer = ByteBuffer.allocate(16); SocketChannel channel = (SocketChannel) key.channel(); log.debug("read...{}",channel.getRemoteAddress()); channel.read(buffer); buffer.flip(); debugAll(buffer); } } } catch (IOException e) { e.printStackTrace(); } } } } } ``` + **客户端代码** ```java import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; /** * Created by lilinchao * Date 2022/6/3 * Description 客户端 */ public class TestClient { public static void main(String[] args) throws IOException { SocketChannel sc = SocketChannel.open(); sc.connect(new InetSocketAddress("localhost", 8080)); sc.write(Charset.defaultCharset().encode("0123456789abcdef")); System.in.read(); } } ``` + **运行结果** ``` 13:03:57 [DEBUG] [boss] c.l.n.t.MultiThreadServer - connected:/127.0.0.1:52622 13:03:57 [DEBUG] [boss] c.l.n.t.MultiThreadServer - before register:/127.0.0.1:52622 13:03:57 [DEBUG] [boss] c.l.n.t.MultiThreadServer - after register:/127.0.0.1:52622 13:03:57 [DEBUG] [worker-0] c.l.n.t.MultiThreadServer - read.../127.0.0.1:52622 +--------+-------------------- all ------------------------+----------------+ position: [0], limit: [16] +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 30 31 32 33 34 35 36 37 38 39 61 62 63 64 65 66 |0123456789abcdef| +--------+-------------------------------------------------+----------------+ ``` 在运行时,可以同时运行多个客户端程序,查看服务端的输出效果。 #### 问题:如何拿到 cpu 个数 > * `Runtime.getRuntime().availableProcessors()` 如果工作在 docker 容器下,因为容器不是物理隔离的,会拿到物理 cpu 个数,而不是容器申请时的个数 > * 这个问题直到 jdk 10 才修复,使用 jvm 参数 UseContainerSupport 配置, 默认开启
标签:
Netty
,
NIO
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://www.lilinchao.com/archives/2115.html
上一篇
15.NIO Selector之处理write事件
下一篇
17.NIO之IO模型
取消回复
评论啦~
提交评论
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
标签云
Beego
线程池
DataX
Spark Core
Spark RDD
Python
Linux
数据结构
Spark SQL
排序
HDFS
Zookeeper
Spark
前端
设计模式
Flink
MyBatisX
机器学习
栈
JVM
Elastisearch
Yarn
SpringCloudAlibaba
持有对象
Typora
Shiro
Java阻塞队列
Kafka
Java工具类
国产数据库改造
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞