李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
Java
正文
08.Netty之Handler与Pipeline
Leefs
2022-06-10 PM
716℃
0条
[TOC] ### 一、概述 ChannelHandler 用来处理 Channel 上的各种事件,分为入站、出站两种。 所有 ChannelHandler 被连成一串,就是 Pipeline + 入站处理器通常是 ChannelInboundHandlerAdapter 的子类,主要用来读取客户端数据,写回结果; + 出站处理器通常是 ChannelOutboundHandlerAdapter 的子类,主要对写回结果进行加工。 **示例** ![08.Netty之Handler与Pipeline01.png](https://lilinchao.com/usr/uploads/2022/06/159428948.png) 在这里,每个 Channel 是一个产品的加工车间,Pipeline 是车间中的流水线,ChannelHandler 就是流水线上的各道工序,而后面要讲的 ByteBuf 是原材料,经过很多工序的加工:先经过一道道入站工序,再经过一道道出站工序最终变成产品。 ### 二、示例 + **服务端代码** ```java import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; /** * @author lilinchao * @date 2022/6/10 * @description 服务端 **/ public class TestPiplineServer { public static void main(String[] args) { new ServerBootstrap() .group(new NioEventLoopGroup()) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer
() { @Override protected void initChannel(NioSocketChannel ch) { //通过channel拿到pipeline ChannelPipeline pipeline = ch.pipeline(); //添加处理器 head -> h1 -> h2 -> h3 -> h4 -> h5 -> h6 -> tail pipeline.addLast("h1",new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { System.out.println(1); ctx.fireChannelRead(msg); // 1 } }); pipeline.addLast("h2",new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { System.out.println(2); // 将数据传递给下个 handler,如果不调用,调用链会断开或者调用 //super.channelRead(ctx, msg); ctx.fireChannelRead(msg); // 2 } }); pipeline.addLast("h3",new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { System.out.println(3); // 必须要往客户端回写数据 出站的handler才会触发执行 // ch.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes())); ctx.channel().write(msg); // 3 } }); // 再添加几个出站的Handler pipeline.addLast("h4",new ChannelOutboundHandlerAdapter(){ @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { System.out.println(4); // super.write(ctx, msg, promise); ctx.write(msg, promise); // 4 } }); pipeline.addLast("h5",new ChannelOutboundHandlerAdapter(){ @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { System.out.println(5); ctx.write(msg, promise); // 5 } }); pipeline.addLast("h6",new ChannelOutboundHandlerAdapter(){ @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { System.out.println(6); ctx.write(msg, promise); // 6 } }); } }) .bind(8080); } } ``` + **客户端代码** ```java import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringEncoder; /** * @author lilinchao * @date 2022/6/10 * @description 客户端 **/ public class HandlerClientDemo { public static void main(String[] args) { new Bootstrap() .group(new NioEventLoopGroup()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer
() { @Override protected void initChannel(Channel ch) { ch.pipeline().addLast(new StringEncoder()); } }) .connect("127.0.0.1", 8080) .addListener((ChannelFutureListener) future -> { future.channel().writeAndFlush("hello,world"); }); } } ``` **运行结果** ``` 1 2 3 6 5 4 ``` **分析** 从输出结果可以看出 + ChannelInboundHandlerAdapter 是按照 addLast 的顺序执行的 + ChannelOutboundHandlerAdapter 是按照 addLast 的逆序执行的 ChannelPipeline 的实现是一个 ChannelHandlerContext(包装了 ChannelHandler) 组成的双向链表 ![08.Netty之Handler与Pipeline02.png](https://lilinchao.com/usr/uploads/2022/06/731851505.png) * 入站处理器中,ctx.fireChannelRead(msg) 是 **调用下一个入站处理器** * 如果注释掉 1 处代码,则仅会打印 1 * 如果注释掉 2 处代码,则仅会打印 1 2 * 3 处的 ctx.channel().write(msg) 会 **从尾部开始触发** 后续出站处理器的执行 * 如果注释掉 3 处代码,则仅会打印 1 2 3 * 类似的,出站处理器中,ctx.write(msg, promise) 的调用也会 **触发上一个出站处理器** * 如果注释掉 6 处代码,则仅会打印 1 2 3 6 **ctx.channel().write(msg) vs ctx.write(msg)** * 都是触发出站处理器的执行 * ctx.channel().write(msg) 从尾部开始查找出站处理器 * ctx.write(msg) 是从当前节点找上一个出站处理器 * 3 处的 ctx.channel().write(msg) 如果改为 ctx.write(msg) 仅会打印 1 2 3,因为节点3 之前没有其它出站处理器了 * 6 处的 ctx.write(msg, promise) 如果改为 ctx.channel().write(msg) 会打印 1 2 3 6 6 6... 因为 ctx.channel().write() 是从尾部开始查找,结果又是节点6 自己 服务端 pipeline 触发的原始流程,下图中数字代表了处理步骤的先后次序 ![08.Netty之Handler与Pipeline03.png](https://lilinchao.com/usr/uploads/2022/06/3815837229.png) *附参考原文* *《黑马程序员Netty教程》*
标签:
Netty
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://www.lilinchao.com/archives/2153.html
上一篇
07.Netty之Future与Promise
下一篇
09.Netty之ByteBuf介绍(一)
取消回复
评论啦~
提交评论
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
标签云
国产数据库改造
MyBatis
Thymeleaf
FastDFS
Jquery
Kafka
递归
机器学习
微服务
设计模式
SpringCloud
栈
Map
Filter
DataWarehouse
Spark SQL
MyBatis-Plus
Spark RDD
Netty
查找
RSA加解密
Git
并发编程
HDFS
JavaWeb
队列
Azkaban
Nacos
Jenkins
字符串
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞