Netty源码分析——EventLoopGroup建立

2020-03-22 16:05:24来源:博客园 阅读 ()

新老客户大回馈,云服务器低至5折

Netty源码分析——EventLoopGroup建立

在上一篇中的简单代码中开头的两行代码是

1         EventLoopGroup boss = new NioEventLoopGroup(1);
2         EventLoopGroup worker = new NioEventLoopGroup();

服务端应用要创建首先要创建两个EventLoopGroup ,为什么是两个EventLoopGroup ,这就涉及netty的核心设计思想——主从Reactor模型。这里不展开说,内容比较多,可以自行学习。每个EventLoopGroup 可以包含一个或多个EventLoop,每一个EventLoop简单来说的就是一个死循环的线程,这个线程循环监听一个通道的所有事件,这个模式下面其他流程中也会反复提到。下面开始追代码。

说明一下,上面的两个构造函数一个有参数一个无参数,无参数的通过获取本机cpu的核数(注意是核数不是个数,例如我的电脑4个cpu,每个cpu都是双核的,所以无参的时候应该创建8个EventLoop)来决定创建线程的个数,有参数就是指定在这个EventLoopGroup 中创建具体个数的EventLoop。

  • 一步一步追NioEventLoopGroup的构造函数可以发现,代码中,如果指定nThreads 就是用指定的,例如第一个构造函数中那样,不然就是用DEFAULT_EVENT_LOOP_THREADS,DEFAULT_EVENT_LOOP_THREADS在本类的静态代码块中初始化
  protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
    }
    static {
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));  //获取本机cup核数

        if (logger.isDebugEnabled()) {
            logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
        }
    }
  • 再追一步会看到,DefaultEventExecutorChooserFactory.INSTANCE使用默认的选择器工厂,在下一步中会提到,这个东东决定了在EventLoopGroup 执行next()时候,也就是切换到下一个EventLoop的时候,是以怎样的算法找到下一个EventLoop,默认就是按照EventLoop数组中的顺序查找下一个的
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
        this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
    }
  • 再追下一步构造函数,真正干活的来了,注意!!!在主要代码会有注释
        protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                                EventExecutorChooserFactory chooserFactory, Object... args) {
            if (nThreads <= 0) {
                throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
            }
    
            if (executor == null) {
                executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());   //创建默认的线程执行器
            }
    
            children = new EventExecutor[nThreads];  //创建保存EventLoop的数组
    
            for (int i = 0; i < nThreads; i ++) {
                boolean success = false;
                try {
                    children[i] = newChild(executor, args);   //循环填充数组,在这里实例化每一个EventLoop
                    success = true;
                } catch (Exception e) {
                    // TODO: Think about if this is a good exception type
                    throw new IllegalStateException("failed to create a child event loop", e);
                } finally {
                    if (!success) {
                        for (int j = 0; j < i; j ++) {
                            children[j].shutdownGracefully();
                        }
    
                        for (int j = 0; j < i; j ++) {
                            EventExecutor e = children[j];
                            try {
                                while (!e.isTerminated()) {
                                    e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                                }
                            } catch (InterruptedException interrupted) {
                                // Let the caller handle the interruption.
                                Thread.currentThread().interrupt();
                                break;
                            }
                        }
                    }
                }
            }
    
            chooser = chooserFactory.newChooser(children);  //将创建好的EventLoop数组注册到选择器工厂
    
            final FutureListener<Object> terminationListener = new FutureListener<Object>() {
                @Override
                public void operationComplete(Future<Object> future) throws Exception {
                    if (terminatedChildren.incrementAndGet() == children.length) {
                        terminationFuture.setSuccess(null);
                    }
                }
            };
    
            for (EventExecutor e: children) {
                e.terminationFuture().addListener(terminationListener);
            }
    
            Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
            Collections.addAll(childrenSet, children);
            readonlyChildren = Collections.unmodifiableSet(childrenSet);
        }
    EventExecutor是一个接口,是EventLoop接口的子接口,当children[i] = newChild(executor, args)执行时,实际上执行的是NioEventLoopGroup中重写的,可以发现创建的NioEventLoop的实例,而NioEventLoop间接实现了EventLoop
        @Override
        protected EventLoop newChild(Executor executor, Object... args) throws Exception {
            return new NioEventLoop(this, executor, (SelectorProvider) args[0],
                ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
        }

    至此EventLoopGroup以及包含的多个EventLoop创建完成,这里只是在EventLoopGroup中创建了多个NioEventLoop,NioEventLoop并没有run起来,需要在注册通道后才能run起来监控通道的事件和执行task。后面章节还会介绍NioEventLoop类中的在其他方法。ok,这个部分就到这。

     

 


原文链接:https://www.cnblogs.com/chirsli/p/12539624.html
如有疑问请与原作者联系

标签:

版权申明:本站文章部分自网络,如有侵权,请联系:west999com@outlook.com
特别注意:本站所有转载文章言论不代表本站观点,本站所提供的摄影照片,插画,设计作品,如需使用,请与原作者联系,版权归原作者所有

上一篇:SpringCloud创建eureka server和eureka client

下一篇:springboot整合mybatis并设置多数据源