参考资料
Reactor单线程模型详解与实现
异步回调原理详解与实现
Reactor(主从)原理详解与实现
NioEventLoopGroup源码分析
netty服务端启动流程源码分析
netty服务端启动流程总结
netty处理客户端新连接源码分析
先看处理器链初始化整体流程图
配置处理器
//使用配置构建器,配置需要绑定的处理器
serverBootstrap.group(bossEventLoopGroup, workEventExecutors)
.channel(NioServerSocketChannel.class)
.localAddress(port)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new MyHandler()).addLast(new MyHandler2());
}
});
//1.childHandler配置项是给worker的channel绑定的Handler(子通道所属)
//2.Handler配置项是给boss的channel绑定的Handler(父通道所属)
初始化处理器链
//ServerBootstrapAcceptor的channelRead方法
//这个是父通道处理器的处理逻辑,我们这直接分析子通道的处理器链,这也是我们比较关心的
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
//ServerBootstrapAcceptor是我们ServerBootstra的静态内部类,直接引用childHandler
//新连接的通道直接把我们构建时候的ChannelInitializer对象添加到处理器链中
//同时看到,ChannelHandler是添加到pipeline中的
child.pipeline().addLast(childHandler);
//省去不需要代码
}
//DefaultChannelPipeline的addLast方法
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
//将处理器包装为一个AbstractChannelHandlerContext,这种处理器包装了一些上下文信息在里面
//处理器链的所有处理器都是AbstractChannelHandlerContext类型
newCtx = newContext(group, filterName(name, handler), handler);
//将新的handler添加到处理链中
addLast0(newCtx);
//省去不需要代码
return this;
}
//将新的处理器添加到一个双向链表中
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
//1.这里只是对处理器链做了一个简单的初始化,只是把ChannelInitializer加入到处理器链中
//2.ChannelInitializer可以理解为我们配置的handler的一个包装
//3.为什么不直接将我们所有自定义的处理器添加到处理器链中,而是将一个处理器包装暂时放入
// 处理器链中?
将自定义处理器循环放入处理器链中
//ServerBootstrapAcceptor的channelRead方法
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//省去不需要代码
try {
//这里将完成一个新子通道所有初始化的动作,其实这里直接是做了异步去直接做这些事情的
//处理器链的真正添加是在这个异步任务里面的
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
//AbstractUnsafe的register
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
//省去不需要代码
//使用异步任务去做所有初始化通道的事情,像循环添加所有处理器链这种动作,也很耗时
//容易影响处理新连接的效率,不适合放在主线程中
eventLoop.execute(new Runnable() {
@Override
public void run() {
//循环添加所有自定义处理器,放入处理器链中
register0(promise);
}
});
//省去不需要代码
}
//AbstractUnsafe的register0
private void register0(ChannelPromise promise) {
//省去不需要代码
//循环添加所有自定义处理器,放入处理器链中
pipeline.invokeHandlerAddedIfNeeded();
}
//ChannelInitializer的initChannel
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
try {
//回调我们的内部类实现initChannel(SocketChannel socketChannel)方法
initChannel((C) ctx.channel());
} catch (Throwable cause) {
exceptionCaught(ctx, cause);
} finally {
//添加完毕后移除之前处理器包装
remove(ctx);
}
return true;
}
return false;
}
//我们自定义启动类里面
//将包装处理里面的自定义处理器放入处理器链中
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//将每个处理器放入处理器链中
socketChannel.pipeline().addLast(new MyHandler()).addLast(new MyHandler2());
}
});
//循环处理
@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
if (handlers == null) {
throw new NullPointerException("handlers");
}
//循环处理
for (ChannelHandler h: handlers) {
if (h == null) {
break;
}
addLast(executor, null, h);
}
return this;
}
//删除处理链中的包装处理器
private void remove(ChannelHandlerContext ctx) {
try {
ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) {
pipeline.remove(this);
}
} finally {
initMap.remove(ctx);
}
}
总结:现在知道为什么在这里正真添加我们的处理器了吧,注册一个channel,整体都是在异步任务的子线程中的
这样不会阻塞或者影响连接处理
总结
- boss的线程依然是只是为了快速处理连接,具体逻辑到worker的线程中去做,这是ChannelInitializer存在的原因, boss和worker做了很好的边界处理
- 处理器扩展方式使用了双向链表,这样有利于程序的扩展,使用了经典的责任链模式
- 不影响主流程的业务尽量使用异步任务去做,尽量不要占用主线程资源