李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
Java
正文
04.Netty源码分析之启动流程分析
Leefs
2022-06-24 PM
901℃
0条
[TOC] ### 一、NIO启动流程 Netty底层是NIO,从对NIO的组件封装开始进行分析。本次对Netty的分析主要是在源码中找到下方NIO的方法,来看看netty中对下面的代码是怎样进行处理的 ```java //1 netty 中使用 NioEventLoopGroup (简称 nio boss 线程)来封装线程和 selector //可以同时监听多个channel上的读和写事件 Selector selector = Selector.open(); //2 创建 NioServerSocketChannel,同时会初始化它关联的 handler,以及为原生 ssc 存储 config NioServerSocketChannel attachment = new NioServerSocketChannel(); //3 创建 NioServerSocketChannel 时,创建了 JDK 原生的 ServerSocketChannel ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); //设置为非阻塞模式 serverSocketChannel.configureBlocking(false); //4 启动 nio boss 线程执行接下来的操作 //5 注册(仅关联 selector 和 NioServerSocketChannel),0表示未关注事件 //selector一旦发生了事件,如何找到Nio相关的类去处理?答案是通过attachment,其将JDK原生的ssc和NioSsc联系起来。 SelectionKey selectionKey = serverSocketChannel.register(selector, 0, attachment); //6 head -> 初始化器 -> ServerBootstrapAcceptor -> tail,初始化器是一次性的,只为添加 acceptor //7 绑定端口,监听端口8080 serverSocketChannel.bind(new InetSocketAddress(8080)); //8 ssc关注可连接事件,触发 channel active 事件,selectionKey 关注 op_accept 事件 selectionKey.interestOps(SelectionKey.OP_ACCEPT); ``` ### 二、Netty启动流程 #### 2.1 Netty服务端的启动代码 ```java import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.logging.LoggingHandler; /** * @author lilinchao * @date 2022/6/24 * @description 服务端 **/ public class TestSourceServer { public static void main(String[] args) { new ServerBootstrap() .group(new NioEventLoopGroup()) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer
() { @Override protected void initChannel(NioSocketChannel ch) { ch.pipeline().addLast(new LoggingHandler()); } }).bind(8080); } } ``` 通过该代码跟进源码,对服务端启动过程进行分析 #### 2.2 入口bind 选择器Selector的创建是在`NioEventloopGroup`中完成的。 `NioServerSocketChannel`与`ServerSocketChannel`的创建,`ServerSocketChannel`注册到Selector中以及绑定操作都是由bind方法完成的。所以服务器启动的入口便是`io.netty.bootstrap.ServerBootstrap.bind`。 ```java public ChannelFuture bind(SocketAddress localAddress) { validate(); return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress")); } ``` > 关键代码doBind ```java private ChannelFuture doBind(final SocketAddress localAddress) { // 1. 执行初始化和注册 regFuture 会由 initAndRegister 设置其是否完成,从而回调 3.2 处代码 final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } // 2. 因为是 initAndRegister 异步执行,需要分两种情况来看,调试时也需要通过 suspend 断点类型加以区分 // 2.1 如果已经完成 if (regFuture.isDone()) { ChannelPromise promise = channel.newPromise(); // 3.1 立刻调用 doBind0 doBind0(regFuture, channel, localAddress, promise); return promise; } // 2.2 还没有完成 else { final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); // 3.2 回调 doBind0 regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // 处理异常... promise.setFailure(cause); } else { promise.registered(); // 3. 由注册线程去执行 doBind0 doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } } ``` 其中有两个重要的方法:`initAndRegister()`和`doBind0(regFuture,channel,localAddress,promise)` - **initAndRegister**主要负责NioServerSocketChannel和ServerSocketChannel的创建(主线程中完成)与ServerSocketChannel注册(NIO线程中完成)工作 `init`就对应原生NIO中的 : `ServerSocketChannel ssc = ServerSocketChannel.open();` `register`就对应原生NIO中的: `SelectionKey selectionKey = ssc.register(selector,0,nettySsc);` - **doBind0**则负责连接的创建工作,对应原生NIO中的 :`ssc.bind(new InetSocketAddress(8080,backlog));` > 关键代码 `io.netty.bootstrap.AbstractBootstrap#initAndRegister` ```java final ChannelFuture initAndRegister() { Channel channel = null; try { // 这里才是真正初始化了一个channel, 通过 ServerBootstrap 的通 道工厂反射创建一个 NioServerSocketChannel channel = channelFactory.newChannel(); // 1.1 初始化 - 做的事就是添加一个初始化器 ChannelInitializer init(channel); } catch (Throwable t) { // 处理异常... return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); } // 1.2 注册 - 做的事就是将原生 channel 注册到 selector 上 ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { // 处理异常... } return regFuture; } ``` > 关键代码 `io.netty.bootstrap.ServerBootstrap#init` ```java // 这里 channel 实际上是 NioServerSocketChannel void init(Channel channel) throws Exception { final Map
, Object> options = options0(); synchronized (options) { setChannelOptions(channel, options, logger); } final Map
, Object> attrs = attrs0(); synchronized (attrs) { for (Entry
, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey
key = (AttributeKey
) e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry
, Object>[] currentChildOptions; final Entry
, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0)); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0)); } // 为 NioServerSocketChannel 添加初始化器 p.addLast(new ChannelInitializer
() { // register之后才调用该方法,可以在此添加断点通过debug的方式查看何时被调用 @Override public void initChannel(final Channel ch) throws Exception { // 创建handler并加入到pipeline中 final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } // 初始化器的职责是将 ServerBootstrapAcceptor 加入至 NioServerSocketChannel ch.eventLoop().execute(new Runnable() { @Override public void run() { // 添加新的handler,在发生Accept事件后建立连接 pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); } ``` **分析** 从源码中可以看出, init 的方法的核心作用在和 ChannelPipeline 相关。 从 NioServerSocketChannel **的初始化过程中**,我们知道,pipeline 是一个双向链表,并且,他本身就初始化了 head 和 tail,这里调用了他的 addLast 方法,也就是将整个 handler 插入到 tail 的前面,因为 tail 永远会在后面,需要做一些系统的固定工作。 #### 2.3 Register init执行完毕后,便执行`ChannelFuture regFuture = config().group().register(channel)`操作 该方法最终调用的是`promise.channel().unsafe().register(this, promise)`方法 ```java public final void register(EventLoop eventLoop, final ChannelPromise promise) { // 一些检查,略... // 获取EventLoop AbstractChannel.this.eventLoop = eventLoop; //用于判断当前线程是不是eventLoop线程中的线程,也就是判断当前线程是否为NIO线程 if (eventLoop.inEventLoop()) { register0(promise); } else { try { // 首次执行 execute 方法时,会启动 nio 线程,之后注册等操作在 nio 线程上执行 // 因为只有一个 NioServerSocketChannel 因此,也只会有一个 boss nio 线程 // 这行代码完成的事实是 main -> nio boss 线程的切换 eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { // 日志记录... closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } } ``` > `io.netty.channel.AbstractChannel.AbstractUnsafe#register0` ```java private void register0(ChannelPromise promise) { try { if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; // 1.2.1 原生的 nio channel 绑定到 selector 上,注意此时没有注册 selector 关注事件,附件为 NioServerSocketChannel doRegister(); neverRegistered = false; registered = true; // 1.2.2 执行 NioServerSocketChannel 初始化器的 initChannel pipeline.invokeHandlerAddedIfNeeded(); // 回调 3.2 io.netty.bootstrap.AbstractBootstrap#doBind0 safeSetSuccess(promise); pipeline.fireChannelRegistered(); // 对应 server socket channel 还未绑定,isActive 为 false if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } ``` 后续`ChannelFuture regFuture = initAndRegister();`中的`regFuture`就是`safeSetSuccess`设置的`promise`,可以通过debug进行标识以及查看对象值来确定。 一般来说,真正执行操作的方法名前都会去加`do`,spring和Alibaba的源码中都是如此。 > 关键代码 `io.netty.channel.AbstractNioChannel#doRegister` ```java @Override protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { // javaChannel()即为ServerSocketChannel // eventLoop().unwrappedSelector()获取eventLoop中的Selector // this为NIOServerSocketChannel,作为附件 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { if (!selected) { eventLoop().selectNow(); selected = true; } else { throw e; } } } } ``` **回调initChannel** ```java @Override public void initChannel(final Channel ch) { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } // 添加新任务,任务负责添加handler // 该handler负责发生Accepet事件后建立连接 ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } ``` **Register主要完成了以下三个操作** - 完成了主线程到NIO的**线程切换** - 通过`eventLoop.inEventLoop()`进行线程判断,判断当前线程是否为NIO线程 - 切换的方式为让eventLoop执行register的操作 - **register的操作在NIO线程中完成** - **调用doRegister方法** - **将ServerSocketChannel注册到EventLoop的Selector中** - 此时还未关注事件 - 添加NioServerSocketChannel附件 - 通过`invokeHandlerAddedIfNeeded`调用init中的`initChannel`方法 - initChannel方法主要创建了 两个handler - 一个handler负责设置配置 - 一个handler负责发生Accept事件后建立连接 #### 2.4 doBind0 **绑定端口** 在`doRegister`和`invokeHandlerAddedIfNeeded`操作中的完成后,会调用`safeSetSuccess(promise)`方法,向Promise中设置执行成功的结果。此时`doBind`方法中由`initAndRegister`返回的ChannelFuture对象regFuture便会由NIO线程异步执行doBind0绑定操作 ```java // initAndRegister为异步方法,会返回ChannelFuture对象 final ChannelFuture regFuture = initAndRegister(); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { promise.setFailure(cause); } else { promise.registered(); // 如果没有异常,则执行绑定操作 doBind0(regFuture, channel, localAddress, promise); } } }); ``` **doBind0最底层调用的是ServerSocketChannel的bind方法** NioServerSocketChannel.doBind方法,通过该方法,绑定了对应的端口 ```java @SuppressJava6Requirement(reason = "Usage guarded by java version check") @Override protected void doBind(SocketAddress localAddress) throws Exception { // 判断java版本是不是大于7 if (PlatformDependent.javaVersion() >= 7) { // 调用ServerSocketChannel的bind方法,绑定端口 javaChannel().bind(localAddress, config.getBacklog()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); } } ``` **关注事件** 在绑定端口操作完成后,会判断各种所有初始化操作是否已经完成,若完成,则会添加ServerSocketChannel感兴趣的事件 ```java if (!wasActive && isActive()) { invokeLater(new Runnable() { @Override public void run() { pipeline.fireChannelActive(); } }); } ``` 最终在`AbstractNioChannel.doBeginRead`方法中,会添加ServerSocketChannel添加Accept事件 ```java @Override protected void doBeginRead() throws Exception { // Channel.read() or ChannelHandlerContext.read() was called final SelectionKey selectionKey = this.selectionKey; if (!selectionKey.isValid()) { return; } readPending = true; final int interestOps = selectionKey.interestOps(); // 如果ServerSocketChannel没有关注Accept事件 if ((interestOps & readInterestOp) == 0) { // 则让其关注Accepet事件 // readInterestOp 取值是 16 // 在 NioServerSocketChannel 创建时初始化 selectionKey.interestOps(interestOps | readInterestOp); } } ``` **注意**:此处设置interestOps时使用的方法,**避免覆盖关注的其他事件** - 首先获取Channel所有感兴趣的事件 ```java final int interestOps = selectionKey.interestOps(); ``` - 然后再设置其感兴趣的事件 ```java selectionKey.interestOps(interestOps | readInterestOp); ``` *附参考文章链接* *https://nyimac.gitee.io/2021/04/25/Netty%E5%9F%BA%E7%A1%80/* *https://blog.csdn.net/weixin_45296116/article/details/123227825*
标签:
Netty
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://www.lilinchao.com/archives/2182.html
上一篇
03.Netty搭建RPC框架
下一篇
01.Golang介绍
取消回复
评论啦~
提交评论
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
标签云
MyBatisX
SpringCloudAlibaba
Nacos
前端
国产数据库改造
链表
Spark SQL
Hadoop
锁
Yarn
BurpSuite
随笔
Sentinel
Git
JavaWeb
LeetCode刷题
数学
Linux
Spark Streaming
Thymeleaf
查找
哈希表
Spark
JavaWEB项目搭建
Kibana
Flink
队列
Flume
Map
Livy
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞