/** * Create a new instance. * * @param nThreads the number of threads that will be used by this instance. * @param executor the Executor to use, or {@code null} if the default should be used. * @param chooserFactory the {@link EventExecutorChooserFactory} to use. * @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call */ protectedMultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args){ checkPositive(nThreads, "nThreads");
if (executor == null) { //创建线程执行器及线程工厂 executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); }
//根据线程数构建 EventExecutor 数组 children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) { boolean success = false; try { //初始化线程组中的线程,由NioEventLoopGroup创建NioEventLoop类实例 children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type thrownew IllegalStateException("failed to create a child event loop", e); } finally { // 初始化失败,需要优雅关闭,清理资源 if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); }
for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { //当线程没有中止,等待中止。 while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } } //根据创建出来的EventLoop实例创建线程选择器 chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override publicvoidoperationComplete(Future<Object> future)throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } };
//为每个EventLoop添加线程终止监听器 for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); }
if (!(maybeSelectorImplClass instanceof Class) || // ensure the current selector implementation is what we can instrument. !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) { if (maybeSelectorImplClass instanceof Throwable) { Throwable t = (Throwable) maybeSelectorImplClass; logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t); } returnnew SelectorTuple(unwrappedSelector); }
//使用SelectedSelectionKeySet替换Selector中的selectedKey和publicSelectKeys final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass; final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() { @Override public Object run(){ try { Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys"); Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
//>=jdk9 unsafe replace if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) { // Let us try to use sun.misc.Unsafe to replace the SelectionKeySet. // This allows us to also do this in Java9+ without any extra flags. long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField); long publicSelectedKeysFieldOffset = PlatformDependent.objectFieldOffset(publicSelectedKeysField);
if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) { PlatformDependent.putObject( unwrappedSelector, selectedKeysFieldOffset, selectedKeySet); PlatformDependent.putObject( unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet); returnnull; } // We could not retrieve the offset, lets try reflection as last-resort. }
//<jdk9 relection replace Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true); if (cause != null) { return cause; } cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true); if (cause != null) { return cause; }
//io.netty.channel.nio.NioEventLoop#run @Override protectedvoidrun(){ int selectCnt = 0; for (;;) { try { int strategy; try { // 计算当前的选择策略,CONTINUE、BUSY_WAIT、SELECT strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()); switch (strategy) { case SelectStrategy.CONTINUE: continue;
case SelectStrategy.BUSY_WAIT: // fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT: long curDeadlineNanos = nextScheduledTaskDeadlineNanos(); if (curDeadlineNanos == -1L) { curDeadlineNanos = NONE; // nothing on the calendar } nextWakeupNanos.set(curDeadlineNanos); try { if (!hasTasks()) { // 执行Select方法。轮询就绪的Channel,方法会阻塞。 strategy = select(curDeadlineNanos); } } finally { // This update is just to help block unnecessary selector wakeups // so use of lazySet is ok (no race condition) nextWakeupNanos.lazySet(AWAKE); } // fall through default: } } catch (IOException e) { // If we receive an IOException here its because the Selector is messed up. Let's rebuild // the selector and retry. https://github.com/netty/netty/issues/8566 rebuildSelector0(); selectCnt = 0; handleLoopException(e); continue; }
selectCnt++; cancelledKeys = 0; needsToSelectAgain = false; finalint ioRatio = this.ioRatio; boolean ranTasks; if (ioRatio == 100) { //如果I/O操作占比100% try { if (strategy > 0) { //当前需要处理的Key的数量>0,用来处理轮询到的SelectionKey. processSelectedKeys(); } } finally { // Ensure we always run tasks. // 执行所有任务。 ranTasks = runAllTasks(); } } elseif (strategy > 0) { //计算当前I/O占比时间 finallong ioStartTime = System.nanoTime(); try { //当前需要处理的Key的数量>0,用来处理轮询到的SelectionKey. processSelectedKeys(); } finally { // Ensure we always run tasks. //按照时间比例执行队列任务 finallong ioTime = System.nanoTime() - ioStartTime; ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } else { //即使什么都没有执行也要执行最小时间单位的队列任务 ranTasks = runAllTasks(0); // This will run the minimum number of tasks }
if (ranTasks || strategy > 0) { if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.", selectCnt - 1, selector); } selectCnt = 0; // unexpectedSelectorWakeup(selectCnt) 判断当前是否有空轮询bug,如果存在空轮询bug需要处理(rebuild)。 } elseif (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case) selectCnt = 0; } } catch (CancelledKeyException e) { // Harmless exception - log anyway if (logger.isDebugEnabled()) { logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?", selector, e); } } catch (Error e) { throw e; } catch (Throwable t) { handleLoopException(t); } finally { // Always handle shutdown even if the loop processing threw an exception. //如果循环退出优雅关闭EventLoop try { if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return; } } } catch (Error e) { throw e; } catch (Throwable t) { handleLoopException(t); } } } }
//io.netty.channel.nio.NioEventLoop#select privateintselect(long deadlineNanos)throws IOException { if (deadlineNanos == NONE) { return selector.select(); } // Timeout will only be 0 if deadline is within 5 microsecs long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L; return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis); }
//java.nio.channels.Selector#select() /** * Selects a set of keys whose corresponding channels are ready for I/O * operations. * * <p> This method performs a blocking <a href="#selop">selection * operation</a>. It returns only after at least one channel is selected, * this selector's {@link #wakeup wakeup} method is invoked, or the current * thread is interrupted, whichever comes first. </p> * * @return The number of keys, possibly zero, * whose ready-operation sets were updated * * @throws IOException * If an I/O error occurs * * @throws ClosedSelectorException * If this selector is closed */ publicabstractintselect()throws IOException;
//java.nio.channels.Selector#selectNow /** * Selects a set of keys whose corresponding channels are ready for I/O * operations. * * <p> This method performs a non-blocking <a href="#selop">selection * operation</a>. If no channels have become selectable since the previous * selection operation then this method immediately returns zero. * * <p> Invoking this method clears the effect of any previous invocations * of the {@link #wakeup wakeup} method. </p> * * @return The number of keys, possibly zero, whose ready-operation sets * were updated by the selection operation * * @throws IOException * If an I/O error occurs * * @throws ClosedSelectorException * If this selector is closed */ publicabstractintselectNow()throws IOException;
/** * Selects a set of keys whose corresponding channels are ready for I/O * operations. * * <p> This method performs a blocking <a href="#selop">selection * operation</a>. It returns only after at least one channel is selected, * this selector's {@link #wakeup wakeup} method is invoked, the current * thread is interrupted, or the given timeout period expires, whichever * comes first. * * <p> This method does not offer real-time guarantees: It schedules the * timeout as if by invoking the {@link Object#wait(long)} method. </p> * * @param timeout If positive, block for up to <tt>timeout</tt> * milliseconds, more or less, while waiting for a * channel to become ready; if zero, block indefinitely; * must not be negative * * @return The number of keys, possibly zero, * whose ready-operation sets were updated * * @throws IOException * If an I/O error occurs * * @throws ClosedSelectorException * If this selector is closed * * @throws IllegalArgumentException * If the value of the timeout argument is negative */ publicabstractintselect(long timeout) throws IOException;
//io.netty.channel.nio.NioEventLoop#processSelectedKeysOptimized privatevoidprocessSelectedKeysOptimized(){ for (int i = 0; i < selectedKeys.size; ++i) { final SelectionKey k = selectedKeys.keys[i]; // null out entry in the array to allow to have it GC'ed once the Channel close // See https://github.com/netty/netty/issues/2363 // 将selectedKeys.keys[i]设置为null,并被JVM快速回收。 selectedKeys.keys[i] = null;
if (needsToSelectAgain) { // null out entries in the array to allow to have it GC'ed once the Channel close // See https://github.com/netty/netty/issues/2363 selectedKeys.reset(i + 1);
selectAgain(); i = -1; } } }
//io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel) privatevoidprocessSelectedKey(SelectionKey k, AbstractNioChannel ch){ final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); //如果当前selectionKey无效,直接关闭unsafe。 if (!k.isValid()) { final EventLoop eventLoop; try { eventLoop = ch.eventLoop(); } catch (Throwable ignored) { // If the channel implementation throws an exception because there is no event loop, we ignore this // because we are only trying to determine if ch is registered to this event loop and thus has authority // to close ch. return; } // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is // still healthy and should not be closed. // See https://github.com/netty/netty/issues/5125 if (eventLoop == this) { // close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); } return; }
try { int readyOps = k.readyOps(); // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise // the NIO JDK channel implementation may throw a NotYetConnectedException. // 处理连接事件 if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops);
unsafe.finishConnect(); }
// 处理写事件 // Process OP_WRITE first as we may be able to write some queued buffers and so free memory. if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); }
// 处理读事件 // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
//run方法中的runAllTask if (ioRatio == 100) { //如果I/O操作占比100% try { if (strategy > 0) { //当前需要处理的Key的数量>0,用来处理轮询到的SelectionKey. processSelectedKeys(); } } finally { // Ensure we always run tasks. // 执行所有任务。 ranTasks = runAllTasks(); } } elseif (strategy > 0) { //计算当前I/O占比时间 finallong ioStartTime = System.nanoTime(); try { //当前需要处理的Key的数量>0,用来处理轮询到的SelectionKey. processSelectedKeys(); } finally { // Ensure we always run tasks. //按照时间比例执行队列任务 finallong ioTime = System.nanoTime() - ioStartTime; ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } else { //即使什么都没有执行也要执行最小时间单位的队列任务 ranTasks = runAllTasks(0); // This will run the minimum number of tasks }
// io.netty.channel.nio.NioEventLoop#setIoRatio /** * Sets the percentage of the desired amount of time spent for I/O in the event loop. Value range from 1-100. * The default value is {@code 50}, which means the event loop will try to spend the same amount of time for I/O * as for non-I/O tasks. The lower the number the more time can be spent on non-I/O tasks. If value set to * {@code 100}, this feature will be disabled and event loop will not attempt to balance I/O and non-I/O tasks. */ publicvoidsetIoRatio(int ioRatio){ if (ioRatio <= 0 || ioRatio > 100) { thrownew IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)"); } this.ioRatio = ioRatio; }
//io.netty.util.concurrent.SingleThreadEventExecutor#runAllTasks() /** * Poll all tasks from the task queue and run them via {@link Runnable#run()} method. * * @return {@code true} if and only if at least one task was run */ protectedbooleanrunAllTasks(){ assertinEventLoop(); boolean fetchedAll; boolean ranAtLeastOne = false;
do { // 循环拿出队列中的所有任务并执行任务 fetchedAll = fetchFromScheduledTaskQueue(); if (runAllTasksFrom(taskQueue)) { ranAtLeastOne = true; } } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
/** * Poll all tasks from the task queue and run them via {@link Runnable#run()} method. This method stops running * the tasks in the task queue and returns if it ran longer than {@code timeoutNanos}. */ protectedbooleanrunAllTasks(long timeoutNanos){ //从队列中取出任务 fetchFromScheduledTaskQueue(); Runnable task = pollTask(); if (task == null) { // 如果当前没有任务直接返回 afterRunningAllTasks(); returnfalse; }
//计算得出执行任务的deadline finallong deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0; long runTasks = 0; long lastExecutionTime; for (;;) { safeExecute(task);
runTasks ++;
// Check timeout every 64 tasks because nanoTime() is relatively expensive. // XXX: Hard-coded value - will make it configurable if it is really a problem. //执行了64次任务检查时间是否超时 if ((runTasks & 0x3F) == 0) { lastExecutionTime = ScheduledFutureTask.nanoTime(); if (lastExecutionTime >= deadline) { break; } }
// io.netty.channel.nio.NioEventLoop#unexpectedSelectorWakeup // returns true if selectCnt should be reset privatebooleanunexpectedSelectorWakeup(int selectCnt){ if (Thread.interrupted()) { // Thread was interrupted so reset selected keys and break so we not run into a busy loop. // As this is most likely a bug in the handler of the user or it's client library we will // also log it. // // See https://github.com/netty/netty/issues/2426 if (logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely because " + "Thread.currentThread().interrupt() was called. Use " + "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop."); } returntrue; } if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { // The selector returned prematurely many times in a row. // Rebuild the selector to work around the problem. logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.", selectCnt, selector); rebuildSelector(); returntrue; } returnfalse; }
// io.netty.channel.nio.NioEventLoop#rebuildSelector /** * Replaces the current {@link Selector} of this event loop with newly created {@link Selector}s to work * around the infamous epoll 100% CPU bug. */ publicvoidrebuildSelector(){ if (!inEventLoop()) { execute(new Runnable() { @Override publicvoidrun(){ rebuildSelector0(); } }); return; } rebuildSelector0(); }
//io.netty.channel.nio.NioEventLoop#rebuildSelector0 privatevoidrebuildSelector0(){ final Selector oldSelector = selector; final SelectorTuple newSelectorTuple;
if (oldSelector == null) { return; }
try { // 重新打开一个新的Selector newSelectorTuple = openSelector(); } catch (Exception e) { logger.warn("Failed to create a new Selector.", e); return; }
// Register all channels to the new Selector. int nChannels = 0; // 循环将旧的selector上的attachment注册到新的selctor上 for (SelectionKey key: oldSelector.keys()) { Object a = key.attachment(); try { //判断当前的Key是否有效 if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) { continue; }
//在旧的Selector上触发的事件需要取消 int interestOps = key.interestOps(); key.cancel(); //把Channel重新注册到新的Selector上 SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a); if (a instanceof AbstractNioChannel) { // Update SelectionKey ((AbstractNioChannel) a).selectionKey = newKey; } nChannels ++; } catch (Exception e) { logger.warn("Failed to re-register a Channel to the new Selector.", e); if (a instanceof AbstractNioChannel) { AbstractNioChannel ch = (AbstractNioChannel) a; ch.unsafe().close(ch.unsafe().voidPromise()); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; invokeChannelUnregistered(task, key, e); } } }
try { // time to close the old selector as everything else is registered to the new one // 关闭旧Selector oldSelector.close(); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("Failed to close the old Selector.", t); } }
if (logger.isInfoEnabled()) { logger.info("Migrated " + nChannels + " channel(s) to the new Selector."); } }