netty处理器链初始化源码分析

   日期:2020-09-21     浏览:102    评论:0    
核心提示:先看处理器链初始化整体流程图配置处理器//使用配置构建器,配置需要绑定的处理器serverBootstrap.group(bossEventLoopGroup, workEventExecutors) .channel(NioServerSocketChannel.class) .localAddress(port) .childHandler(new ChannelInitializer<SocketChannel>() {

参考资料

Reactor单线程模型详解与实现
异步回调原理详解与实现
Reactor(主从)原理详解与实现
NioEventLoopGroup源码分析
netty服务端启动流程源码分析
netty服务端启动流程总结
netty处理客户端新连接源码分析

先看处理器链初始化整体流程图

配置处理器

//使用配置构建器,配置需要绑定的处理器
serverBootstrap.group(bossEventLoopGroup, workEventExecutors)
        .channel(NioServerSocketChannel.class)
        .localAddress(port)
        .childHandler(new ChannelInitializer<SocketChannel>() { 
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception { 
                socketChannel.pipeline().addLast(new MyHandler()).addLast(new MyHandler2());
            }
        });
//1.childHandler配置项是给worker的channel绑定的Handler(子通道所属)
//2.Handler配置项是给boss的channel绑定的Handler(父通道所属)

初始化处理器链

//ServerBootstrapAcceptor的channelRead方法
//这个是父通道处理器的处理逻辑,我们这直接分析子通道的处理器链,这也是我们比较关心的
public void channelRead(ChannelHandlerContext ctx, Object msg) { 
    final Channel child = (Channel) msg;

  	//ServerBootstrapAcceptor是我们ServerBootstra的静态内部类,直接引用childHandler
  	//新连接的通道直接把我们构建时候的ChannelInitializer对象添加到处理器链中
  	//同时看到,ChannelHandler是添加到pipeline中的
    child.pipeline().addLast(childHandler);
		
  	//省去不需要代码
}

//DefaultChannelPipeline的addLast方法
 public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { 
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) { 
            checkMultiplicity(handler);

          	//将处理器包装为一个AbstractChannelHandlerContext,这种处理器包装了一些上下文信息在里面
          	//处理器链的所有处理器都是AbstractChannelHandlerContext类型
            newCtx = newContext(group, filterName(name, handler), handler);

          	//将新的handler添加到处理链中
            addLast0(newCtx);
          
          //省去不需要代码
           
        return this;
    }

//将新的处理器添加到一个双向链表中
private void addLast0(AbstractChannelHandlerContext newCtx) { 
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }
   
//1.这里只是对处理器链做了一个简单的初始化,只是把ChannelInitializer加入到处理器链中
//2.ChannelInitializer可以理解为我们配置的handler的一个包装
//3.为什么不直接将我们所有自定义的处理器添加到处理器链中,而是将一个处理器包装暂时放入
// 处理器链中?

将自定义处理器循环放入处理器链中

//ServerBootstrapAcceptor的channelRead方法
public void channelRead(ChannelHandlerContext ctx, Object msg) { 
    //省去不需要代码
    try { 
        //这里将完成一个新子通道所有初始化的动作,其实这里直接是做了异步去直接做这些事情的
        //处理器链的真正添加是在这个异步任务里面的
        childGroup.register(child).addListener(new ChannelFutureListener() { 
            @Override
            public void operationComplete(ChannelFuture future) throws Exception { 
                if (!future.isSuccess()) { 
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) { 
        forceClose(child, t);
    }
}

//AbstractUnsafe的register
 @Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) { 

  //省去不需要代码
  
  //使用异步任务去做所有初始化通道的事情,像循环添加所有处理器链这种动作,也很耗时
  //容易影响处理新连接的效率,不适合放在主线程中
  eventLoop.execute(new Runnable() { 
    @Override
    public void run() { 
      //循环添加所有自定义处理器,放入处理器链中
      register0(promise);
    }
  });
  //省去不需要代码
}

//AbstractUnsafe的register0
private void register0(ChannelPromise promise) { 
  //省去不需要代码 
  
  //循环添加所有自定义处理器,放入处理器链中
  pipeline.invokeHandlerAddedIfNeeded();
               
 }

//ChannelInitializer的initChannel
private boolean initChannel(ChannelHandlerContext ctx) throws Exception { 
        if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) {  // Guard against re-entrance.
            try { 
                //回调我们的内部类实现initChannel(SocketChannel socketChannel)方法
                initChannel((C) ctx.channel());
            } catch (Throwable cause) { 
                exceptionCaught(ctx, cause);
            } finally { 
               //添加完毕后移除之前处理器包装
                remove(ctx);
            }
            return true;
        }
        return false;
    }

//我们自定义启动类里面
//将包装处理里面的自定义处理器放入处理器链中
.childHandler(new ChannelInitializer<SocketChannel>() { 
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception { 
                //将每个处理器放入处理器链中
                socketChannel.pipeline().addLast(new MyHandler()).addLast(new MyHandler2());
            }
        });

//循环处理
 @Override
    public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) { 
        if (handlers == null) { 
            throw new NullPointerException("handlers");
        }

       //循环处理
        for (ChannelHandler h: handlers) { 
            if (h == null) { 
                break;
            }
            addLast(executor, null, h);
        }

        return this;
    }

//删除处理链中的包装处理器
 private void remove(ChannelHandlerContext ctx) { 
        try { 
            ChannelPipeline pipeline = ctx.pipeline();
            if (pipeline.context(this) != null) { 
                pipeline.remove(this);
            }
        } finally { 
            initMap.remove(ctx);
        }
    }
总结:现在知道为什么在这里正真添加我们的处理器了吧,注册一个channel,整体都是在异步任务的子线程中的
     这样不会阻塞或者影响连接处理

总结

  1. boss的线程依然是只是为了快速处理连接,具体逻辑到worker的线程中去做,这是ChannelInitializer存在的原因, boss和worker做了很好的边界处理
  2. 处理器扩展方式使用了双向链表,这样有利于程序的扩展,使用了经典的责任链模式
  3. 不影响主流程的业务尽量使用异步任务去做,尽量不要占用主线程资源
 
打赏
 本文转载自:网络 
所有权利归属于原作者,如文章来源标示错误或侵犯了您的权利请联系微信13520258486
更多>最近资讯中心
更多>最新资讯中心
0相关评论

推荐图文
推荐资讯中心
点击排行
最新信息
新手指南
采购商服务
供应商服务
交易安全
关注我们
手机网站:
新浪微博:
微信关注:

13520258486

周一至周五 9:00-18:00
(其他时间联系在线客服)

24小时在线客服