该系列已分别介绍了服务端、客户端的启动流程、网络读事件处理流程,本文将重点剖析Netty是如何封装NIO的写事件。
温馨提示:本文虽然是源码分析,但强烈建议精读,因为根据源码阐述其背后的设计哲学,也用黑体进行了标注,请特别留意。
在阅读本篇文章之前,请稍微思考如下几个问题:
- 写事件需要先注册才能往通道中写入数据?
- 什么时候需要向通道注册写事件呢?
- 业务线程池执行业务逻辑后,是如何通过IO线程将数据写入到网络中的呢?
- Netty中如何针对写限流
1、写事件概述
写事件,顾名思义,就是将数据写入网络,通过网络传输给接收端,通常我们知道业务都会在专属的业务线程池中执行,那数据是如何通过IO线程写入网络中的呢?
正如上图中的线程模型,业务线程池是如何将数据通过IO线程写入网络中的呢?
接下来我们将带着问题,尝试看Netty是如何封装NIO的写事件。
2、写事件处理流程分析
在Netty中,写事件的处理入口为NioEventLoop的processSelectedKey方法:
根据调用链,最终调用AbstractChannel的内部类AbstractUnsafe的flush0方法。
该方法有三个实现要点:
- 获取写缓存队列,如果写缓存队列为空,则跳过本次写事件。每一个通道独享一个写缓存队列,写事件触发执行的动作就是要将写缓存中的数据写入到网络中。
- 如果通道处于未激活状态,需要清理写缓存区
- 通过调用doWrite方法将写缓存中的数据写入网络通道中。
接下来将详细介绍上述关键步骤。
2.1 NIO写事件的优雅封装
首先,我们将写缓存区当成一个黑盒,先重点看一下doWrite方法的实现,窥探一下Netty是如何基于NIO来处理网络写入的。
Step1:如果写缓存区中没有可写的数据,取消注册写事件。我们来看一下取消写事件的经典实现技巧:
首先判断一下注册键是否有效,然后通过为位运算,取消写事件。
思考题:问题来了,取消写事件,从系统层面就无法继续触发写操作了,那后续如何触发写事件呢?
写事件的处理要考虑如下几个问题:
- 本次写缓存区中数据是否写完?
- 如果底层网络Socket缓存区积压,导致写缓冲区未写完如何处理?
- 如果网络缓存区数据特别大又如何处理?
Netty给出的解决方案如下:
- 通过底层NIO的SocketChannel的write方法将数据写入到Socket缓存区,如果返回值为0,表示Socket缓冲区已满,需要暂停写入,具体做法,注册写事件,等待下次继续写入。
- 如果写缓存区的数据全部处理完毕,可取消注册写事件,避免毫无意义的写事件就绪。
- 如果写缓存区中数据很大,为了避免单个通道对其他通道的影响,默认设置单次写事件最多调用底层NIO SocketChannel的write方法次数,默认为16。
写事件的核心处理要点就介绍到这里了。
2.2 通道写缓冲区详解
在Netty中调用通道的write方法并不会立即将数据写入到底层网络Socket中,而是写入到“写缓存区”,为应用级别的缓存区,即ChannelOutboundBuffer,这是Netty实现写操作最重要的一个数据结构。
2.2.1 类图
ChannelOutboundBuffer 的核心类图如下:
核心属性与方法简介:
- FastThreadLocal<ByteBuffer[]> NIO_BUFFERS
可以看出是线程本地变量ThreadLocal的优化版本,存储一个一个ByteBuffer数组。 - Channel channel
该写缓存区所属的通道,每一个Channel独享一个写缓冲区。 - Entry flushedEntry
表示第一个被刷新的Entry,在写入时,从该Entry开始写。 - Entry unflushedEntry
在链表中第一个未刷新的节点(未刷新链表中第一个节点)。 - Entry tailEntry
在链表中尾部的节点。 - int flushed
待写入的entry个数,这个数据代表在执行一次真正的flush(flush0),将会有多少个entry中的内容会被写入到通道。
接下来我们按照写事件对待写入缓存区方法调用的顺序来讲解一下该方法的核心实现逻辑。
2.2.2 size方法详解
public int size() {
return flushed;
}
返回本次写缓存区可期望刷新的消息个数(Entry)。在NioSocketChannel的doWriter方法中,如果isEmpty返回true,直接结束本次写入操作,更加准确的是结束本次flush操作。
flushed该字段代表的当时待写入的Entry,如果为0,表示没有待flush的Entry,但不代表ChannelOutboundBuffer中没有Entry存在,比如调用Channel.writer方法,会往ChannelOutboundBuffer增加Entry,但在没有调用addFlush方法之前,ChannelOutboundBuffer中的flushed 字段的值不会增加。
2.2.3 addMessage方法详解
向写缓存中添加消息,方法本身的实现非常简单,因为ChannelOutboundBuffer其内部数据结构为一个链表,这是一个往链表中添加消息的过程,这里的关键点是该方法的调用入口为Channel的write方法,即调用通道的write方法只是将数据写入到写缓存,并不会触发真正的往网络中写消息。
该方法会调用incrementPendingOutboundBytes,我们简单看一下该方法的实现细节:
该方法蕴含了Netty一个非常重要的机制,写操作限流,高低水位线机制。
当缓存区中存储的数据超过了设置的高水位线(阔值),则会设置为不可写,并向通道传播写状态变更事件。
2.2.4 addMessage方法详解
该方法并没有真正的执行刷新动作,而是计算可刷写的Entry个数,一次刷新动作,会将unfluedEntry开始,一直扫描到tailEntry。
同样这里和Netty的写限流有关,将数据刷写后,会减少缓存区中的大小,如果低于设置的低水位线,会将缓存区恢复到可写状态。
该方法的调用入口为下图:
即在调用通道的flush方法时会先计算本次看刷写到Socket缓存区中的数据,然后执行flush0方法执行真正的网络写,该方法在第一部分中已详细介绍。
本文就介绍到这里了,文章开头的问题大家是否有了自己的理解了呢?欢迎私信或留言与我交流互动,让我们在交流中共同进步。
一键三连是对我最大的支持与肯定
为了让大家在实战中学习Netty,笔者将RocketMQ的网络模块单独抽取成一个框架,可直接用于项目开发中,代码已上传github
Netty项目实战代码仓库
推荐阅读:
让天下没有难学的Netty4专栏
见字如面,我是威哥,一个从普通二本院校毕业,从未曾接触分布式、微服务、高并发到通过技术分享实现职场蜕变,成长为RocketMQ社区优秀布道师、大厂资深架构师,出版《RocketMQ技术内幕》一书,在CSDN中记录了我的成长历程,欢迎大家关注,私信,一起交流进步。
分享笔者一个硬核的RocketMQ电子书:
获取方式:微信搜索【中间件兴趣圈】,回复RMQPDF即可获取。