Netty核心组件源码剖析 — NioEventLoop

前言

从这一小节我们开始深入源码学习,我们的源码分析会围绕着《Netty源码剖析与应用》这本书来学习,并且结合自己分析看源码,毕竟只有自己看到才是真的。这一小节我们要分析梳理核心组件NioEventLoop的源码,从源码角度看看NioEventLoop到底做了哪些事。第一次深入的研究源码学习,源码看得的确挺头大的,但是通过源码角度对Netty又有了一个全新的认知收获很多,那我们就一起来分析以下NioEventLoop吧。

我们分析解读的Netty版本为 4.1.68.Final

NioEventLoopGroup 源码分析

我们既然是分析NioEventLoop,那自然是离不开NioEventLoopGroup的分析,我们前面提到了NioEventLoop用于处理Channel的I/O操作,而NioEventLoopGroup则是这样一个操作的集合。一个NioEventLoopGroup里面包含一个或多个NioEventLoop。在前面的Demo中,我们还创建了辅助启动类ServerBootStrap。它也包括了两个NioEventLoopGroup,用于构建主从Reactor结构。

NioEventLoopGroup类主要完成了下面的3件事:

  1. 创建一定数量的NioEventLoop线程组并完成初始化。

  2. 创建线程选择器,当获取线程时候,通过选择器来选取线程。

  3. 创建线程工厂并构造线程执行器。

创建过程分析

NioEventLoopGroup的父类为MultithreadEventLoopGroup,父类继承了抽象类MultithreadEventExecutorGroup。在初始化NioEventLoopGroup时,会调用其父类的构造方法。其中DEFAULT_EVENT_LOOP_GROUP会决定生成多少NioEventLoop线程,默认值是CPU核数的两倍,同时这个值会先从系统配置中读取。在构造方法会传入这个参数,如果这个参数不传使用默认构造器构造线程数,否则按照传递的参数构造线程数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private static final int DEFAULT_EVENT_LOOP_THREADS;

static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}

/**
* @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)
*/
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

线程组的生产分两步:第一步,创建一定数量的 EventExecutor 数组;第二步,通过调用子类的newChild()方法完成这些 EventExecutor数组的初始。为了提高可拓展性,Netty的线程组除了NioEventLoopGroup,还有Netty通过JNI方式提供的一套由epoll模型实现的EpollEventLoopGroup 线程组,以及其他I/O多路复用模型线程组,因此newChild()方法由具体的线程组子类来实现。MultithreadEventExecutorGroup的构造方法和newChild()方法的解读如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
/**
* Create a new instance.
*
* @param nThreads the number of threads that will be used by this instance.
* @param executor the Executor to use, or {@code null} if the default should be used.
* @param chooserFactory the {@link EventExecutorChooserFactory} to use.
* @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call
*/
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
checkPositive(nThreads, "nThreads");

if (executor == null) {
//创建线程执行器及线程工厂
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}

//根据线程数构建 EventExecutor 数组
children = new EventExecutor[nThreads];

for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
//初始化线程组中的线程,由NioEventLoopGroup创建NioEventLoop类实例
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
// 初始化失败,需要优雅关闭,清理资源
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}

for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
//当线程没有中止,等待中止。
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
//根据创建出来的EventLoop实例创建线程选择器
chooser = chooserFactory.newChooser(children);

final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};

//为每个EventLoop添加线程终止监听器
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}

Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
//创建只读副本,在迭代查询时使用
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
SelectorProvider selectorProvider = (SelectorProvider) args[0];
SelectStrategyFactory selectStrategyFactory = (SelectStrategyFactory) args[1];
RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[2];
EventLoopTaskQueueFactory taskQueueFactory = null;
EventLoopTaskQueueFactory tailTaskQueueFactory = null;

int argsLength = args.length;
if (argsLength > 3) {
taskQueueFactory = (EventLoopTaskQueueFactory) args[3];
}
if (argsLength > 4) {
tailTaskQueueFactory = (EventLoopTaskQueueFactory) args[4];
}
return new NioEventLoop(this, executor, selectorProvider,
selectStrategyFactory.newSelectStrategy(),
rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
}

在newChild()方法中,NioEventLoop的初始化参数有7个:

  • 第1个参数为NioEventLoopGroup线程组本身。

  • 第2个参数为线程执行器,用于启动线程,在SingleThreadEventExecutor的doStartThread()方法中被调用。

  • 第3个参数为NIO的Selector选择器的提供者。

  • 第4个参数主要在NioEventLoop的run()方法中用于控制选择循环。

  • 第5个参数为非I/O任务提交被拒时的处理Handler。

  • 第6个和第7个参数是两个队列工厂分别是taskQueuetailTaskQueue

ExecutorChooser 分析

NioEventLoopGroup通过next()方法获取NioEventLoop线程,最终会调用其父MultithreadEventLoopGroupnext()方法,委托父类构造EventExecutorChooser。具体使用哪些对象取决于MultithreadEventLoopGroup的构造方法中使用的策略模式。

1
2
3
4
5
6
7
8
9
@Override
public EventLoop next() {
return (EventLoop) super.next();
}

@Override
public EventExecutor next() {
return chooser.next();
}

初始化过程中,通过newChooser()来创建选择器,根据线程数是否为2的幂次来选择策略,如果是,选择PowerOfTwoEventExecutorChooser其使用与运算计算下一个选择器,否则选择GenericEventExecutorChooser其选择策略是通过取余方式来计算出下一个选择器。其中PowerOfTwoEventExecutorChooser有着更好的性能

1
2
3
4
5
6
7
8
9
10
11
12
chooser = chooserFactory.newChooser(children);

@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
//与运算方法计算下一个executor
return new PowerOfTwoEventExecutorChooser(executors);
} else {
//取余方式计算下一个executor
return new GenericEventExecutorChooser(executors);
}
}

ThreadFactory源码分析

Netty的NioEventLoop线程被包装成了FastThreadLocalThread线程,同时,NioEventLoop线程的状态由它自身管理,因此每个NioEventLoop线程都需要一个线程执行器,并且在线程执行前需要通过io.netty.util.concurrent.DefaultThreadFactory将其包装成FastThreadLocalThread线程。执行器ThreadPerTaskExecutorDefaultThreadFactorynewThread() 方法源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//io.netty.util.concurrent.MultithreadEventExecutorGroup#newDefaultThreadFactory
protected ThreadFactory newDefaultThreadFactory() {
return new DefaultThreadFactory(getClass());
}

//io.netty.util.concurrent.ThreadPerTaskExecutor#execute
@Override
public void execute(Runnable command) {
//调用ThreadFactory的newThread()方法包装并启动线程。
threadFactory.newThread(command).start();
}

//io.netty.util.concurrent.DefaultThreadFactory#newThread(java.lang.Runnable)
@Override
public Thread newThread(Runnable r) {
Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
try {
if (t.isDaemon() != daemon) {
t.setDaemon(daemon);
}

if (t.getPriority() != priority) {
t.setPriority(priority);
}
} catch (Exception ignored) {
// Doesn't matter even if failed to set.
}
return t;
}

NioEventLoop源码分析

NioEventLoop源码相比于NioEventLoopGroup源码就复杂多了,每个NioEventLoop对象都与NIO中的多路复用 Selector 一样,要管理成千上万条链路,所有链路数据的读/写事件都有它发起。NioEventLoop有以下5个核心功能:

  1. 开启 Selector 并初始化

  2. 把 ServerSocketChannel 注册到 Selector 上

  3. 处理各种I/O事件,例如OP_ACCEPT、OP_CONNECT、OP_READ、OP_WRITE事件。

  4. 执行定时调度任务。

  5. 解决JDK NIO空轮询的bug。

NioEventLoop这些功能的具体实现大部分都是委托其他类来完成的,其本身只完成数据流的接入工作。这样的设计减轻了NioEventLoop的负担,同时增强了其拓展性。NioEventLoop的整体功能如下图:

其中上图中,第二层为NioEventLoop的4个核心方法。对于每条EventLoop线程来说,由于链路注册到Selector上的具体实现都是委托给Unsafe方法来完成,因此register()方法存在其父类SingleThreadEventLoop中。

开启 Selector

在初始化NioEventLoop时,通过openSelector()方法开启Selector。在rebuildSelector() 方法中也可以调用openSelector()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
//io.netty.channel.nio.NioEventLoop#NioEventLoop    
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {
super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
rejectedExecutionHandler);
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
// 开启Selector
final SelectorTuple selectorTuple = openSelector();
this.selector = selectorTuple.selector;
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}

在NIO中开启Selector,只需要调用Selector.open()SelectorProvideropenSelector()方法即可。Netty为Selector设置了优化开关,如果开启优化开关,则通过反射加载sun.nio.ch.SelectorImpl对象,并通过已经优化过的SelectorSelectionKeySet替换sun.nio.ch.SelectorImpl对象中的selectedKeyspublicSelectedKeys两个HashSet集合。其中,selectedKeys为就绪Key的集合,拥有所有操作事件准备就绪的选择Key;publicSelectedKeys为外部访问就绪Key的集合代理,由selectedKeys集合包装成不可修改的集合。

SelectedSelectionKeySet具体做了哪些优化呢?

主要是改变了数据结构,用数组代替了HashSet,重写了add()iterator()方法,使数组的遍历效率更高,开启优化开关,需要将系统属性io.netty.noKeySetOptimization设置为true。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
//io.netty.channel.nio.NioEventLoop#openSelector
private SelectorTuple openSelector() {
final Selector unwrappedSelector;
try {
// 开启Selector
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
// 是否开启优化,如果未开启优化直接返回Selector元组
if (DISABLE_KEY_SET_OPTIMIZATION) {
return new SelectorTuple(unwrappedSelector);
}

// 开启优化通过反射加载sun.nio.ch.SelectorImpl
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
return Class.forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader());
} catch (Throwable cause) {
return cause;
}
}
});

if (!(maybeSelectorImplClass instanceof Class) ||
// ensure the current selector implementation is what we can instrument.
!((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
if (maybeSelectorImplClass instanceof Throwable) {
Throwable t = (Throwable) maybeSelectorImplClass;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
}
return new SelectorTuple(unwrappedSelector);
}

//使用SelectedSelectionKeySet替换Selector中的selectedKey和publicSelectKeys
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

//>=jdk9 unsafe replace
if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
// Let us try to use sun.misc.Unsafe to replace the SelectionKeySet.
// This allows us to also do this in Java9+ without any extra flags.
long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
long publicSelectedKeysFieldOffset =
PlatformDependent.objectFieldOffset(publicSelectedKeysField);

if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
PlatformDependent.putObject(
unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
PlatformDependent.putObject(
unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
return null;
}
// We could not retrieve the offset, lets try reflection as last-resort.
}

//<jdk9 relection replace
Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
if (cause != null) {
return cause;
}
cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
if (cause != null) {
return cause;
}

selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
return null;
} catch (NoSuchFieldException e) {
return e;
} catch (IllegalAccessException e) {
return e;
}
}
});

if (maybeException instanceof Exception) {
selectedKeys = null;
Exception e = (Exception) maybeException;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
return new SelectorTuple(unwrappedSelector);
}
// 把selectedKeySet赋给 NioEventLoop的属性,并返回selector元组
selectedKeys = selectedKeySet;
logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
return new SelectorTuple(unwrappedSelector,
new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}

关于AccessController.doPrivileged

来自不同为止的代码可以有一个CodeSource对象描述其位置和签名证书。根据代码的CodeSource的不同,代码拥有不同的权限。例如所有Java SDK自带的代码都具有所有的权限,而Applet中的代码则具有非常受限的权限,用户便携的代码可以自己定制权限(通过SecurityMananger)。当执行一段代码时,这段代码的StackTrace包含了从Main开始所有正在被调用而且没有结束的方法。在这个调用过程中,很有可能出现跨多个不同的CodeSource的调用序列。由于CodeSource不同,这些代码通常拥有不同的权限集。只有所有途径CodeSource都具有对应的权限集合时,当前正在运行的代码才能存取某个ResourcedoPrivileged方法是对这个规则的一种补充。他类似于Unix中的setuid程序。Unix中的login程序具有setuid位,它不管被哪个用户调用,都具有root权限。调用doPrivileged的方法不管其StackTrace中其他方法的权限,而仅仅根据当前方法的权限来判断用户是否能访问某个resource。也即可以规定用户只能用某种预定的方式来访问其本来不能访问的resource使用doPrivileged方法和使用setuid位都需要注意的地方,例如仅仅执行必要的操作。否则,可能会带来安全上的问题。

关于setuid

setuid是类unix系统提供的一个标志位,其实际意义是set一个process的euid为这个可执行文件或程序的拥有者(比如root)的uid,也就是当setuid位被设置之后,当文件或程序(统称为executable)被执行时,操作系统会赋予文件所有者的权限,因为其euid是文件所有者的uid。

run() 方法解读

run()方法是EventLoop的核心方法,EventLoop循环阻塞在这个方法对Channel进行监听并完成后续的各种操作,例如处理轮询到的SelectionKey,执行队列任务等。这个部分的代码解读如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
//io.netty.channel.nio.NioEventLoop#run
@Override
protected void run() {
int selectCnt = 0;
for (;;) {
try {
int strategy;
try {
// 计算当前的选择策略,CONTINUE、BUSY_WAIT、SELECT
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;

case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO

case SelectStrategy.SELECT:
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) {
// 执行Select方法。轮询就绪的Channel,方法会阻塞。
strategy = select(curDeadlineNanos);
}
} finally {
// This update is just to help block unnecessary selector wakeups
// so use of lazySet is ok (no race condition)
nextWakeupNanos.lazySet(AWAKE);
}
// fall through
default:
}
} catch (IOException e) {
// If we receive an IOException here its because the Selector is messed up. Let's rebuild
// the selector and retry. https://github.com/netty/netty/issues/8566
rebuildSelector0();
selectCnt = 0;
handleLoopException(e);
continue;
}

selectCnt++;
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
boolean ranTasks;
if (ioRatio == 100) {
//如果I/O操作占比100%
try {
if (strategy > 0) {
//当前需要处理的Key的数量>0,用来处理轮询到的SelectionKey.
processSelectedKeys();
}
} finally {
// Ensure we always run tasks.
// 执行所有任务。
ranTasks = runAllTasks();
}
} else if (strategy > 0) {
//计算当前I/O占比时间
final long ioStartTime = System.nanoTime();
try {
//当前需要处理的Key的数量>0,用来处理轮询到的SelectionKey.
processSelectedKeys();
} finally {
// Ensure we always run tasks.
//按照时间比例执行队列任务
final long ioTime = System.nanoTime() - ioStartTime;
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
//即使什么都没有执行也要执行最小时间单位的队列任务
ranTasks = runAllTasks(0); // This will run the minimum number of tasks
}

if (ranTasks || strategy > 0) {
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
selectCnt = 0;
// unexpectedSelectorWakeup(selectCnt) 判断当前是否有空轮询bug,如果存在空轮询bug需要处理(rebuild)。
} else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
selectCnt = 0;
}
} catch (CancelledKeyException e) {
// Harmless exception - log anyway
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
} catch (Error e) {
throw e;
} catch (Throwable t) {
handleLoopException(t);
} finally {
// Always handle shutdown even if the loop processing threw an exception.
//如果循环退出优雅关闭EventLoop
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Error e) {
throw e;
} catch (Throwable t) {
handleLoopException(t);
}
}
}
}

其实整体的方法也是比较简单的,在这个方法主要可以分为以下几个部分:

  • select(curDeadlineNanos)用来轮询就绪的 Channel;

  • processSelectedKeys用来处理轮询的SelectKey;

  • runAllTask 用来执行队列任务。

  • unexpectedSelectorWakeup(selectCnt)处理空轮询bug。

接下来我们就针对这几个核心方法进行分析解读。

select轮询就绪Key

首先我们看select(curDeadlineNanos),这个方法用来轮询就绪的Channel。这个方法也非常的简单如果当前传进来的deadlineNanos 是一个无效值直接进行监听就绪的Channel。否则计算一个阻塞的超时时间,如果这个超时时间<=0,直接非阻塞select,否则按照超时时间阻塞select。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
//io.netty.channel.nio.NioEventLoop#select
private int select(long deadlineNanos) throws IOException {
if (deadlineNanos == NONE) {
return selector.select();
}
// Timeout will only be 0 if deadline is within 5 microsecs
long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
}

//java.nio.channels.Selector#select()
/**
* Selects a set of keys whose corresponding channels are ready for I/O
* operations.
*
* <p> This method performs a blocking <a href="#selop">selection
* operation</a>. It returns only after at least one channel is selected,
* this selector's {@link #wakeup wakeup} method is invoked, or the current
* thread is interrupted, whichever comes first. </p>
*
* @return The number of keys, possibly zero,
* whose ready-operation sets were updated
*
* @throws IOException
* If an I/O error occurs
*
* @throws ClosedSelectorException
* If this selector is closed
*/
public abstract int select() throws IOException;


//java.nio.channels.Selector#selectNow
/**
* Selects a set of keys whose corresponding channels are ready for I/O
* operations.
*
* <p> This method performs a non-blocking <a href="#selop">selection
* operation</a>. If no channels have become selectable since the previous
* selection operation then this method immediately returns zero.
*
* <p> Invoking this method clears the effect of any previous invocations
* of the {@link #wakeup wakeup} method. </p>
*
* @return The number of keys, possibly zero, whose ready-operation sets
* were updated by the selection operation
*
* @throws IOException
* If an I/O error occurs
*
* @throws ClosedSelectorException
* If this selector is closed
*/
public abstract int selectNow() throws IOException;


/**
* Selects a set of keys whose corresponding channels are ready for I/O
* operations.
*
* <p> This method performs a blocking <a href="#selop">selection
* operation</a>. It returns only after at least one channel is selected,
* this selector's {@link #wakeup wakeup} method is invoked, the current
* thread is interrupted, or the given timeout period expires, whichever
* comes first.
*
* <p> This method does not offer real-time guarantees: It schedules the
* timeout as if by invoking the {@link Object#wait(long)} method. </p>
*
* @param timeout If positive, block for up to <tt>timeout</tt>
* milliseconds, more or less, while waiting for a
* channel to become ready; if zero, block indefinitely;
* must not be negative
*
* @return The number of keys, possibly zero,
* whose ready-operation sets were updated
*
* @throws IOException
* If an I/O error occurs
*
* @throws ClosedSelectorException
* If this selector is closed
*
* @throws IllegalArgumentException
* If the value of the timeout argument is negative
*/
public abstract int select(long timeout)
throws IOException;

通过分析上面的代码,我们可以看到阻塞的Select当遇到以下4种情况会返回:

  • 当至少有一个Key就绪。

  • 当selector的wakeup被调用。

  • 当前线程被interrupt。

  • 超时自动醒来。

当然这里还有一个空轮询bug。

processSelectedKeys处理就绪Keys

第二个部分,processSelectedKeys:主要处理第一部分轮询到的就绪Keys,并取出这些SelectionKey及附件attachment。附件有两种类型:第一中是AbstractNioChannel,第二种是NioTask。其中,第二种附件在Netty内部未使用,因此只分析AbstractNioChannel。根据Key的事件类型触发AbstractNioChannelunsafe()的不同方法。这些方法主要是I/O的读写操作。其具体源码包括附件注册,在剖析Channel源码时会详细讲解。processSelectedKeys的核心代码解读如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
//io.netty.channel.nio.NioEventLoop#processSelectedKeys
private void processSelectedKeys() {
//判断优化后的selectedKeys是否为空
if (selectedKeys != null) {
// 优化处理
processSelectedKeysOptimized();
} else {
// 原始处理
processSelectedKeysPlain(selector.selectedKeys());
}
}

//io.netty.channel.nio.NioEventLoop#processSelectedKeysOptimized
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
// 将selectedKeys.keys[i]设置为null,并被JVM快速回收。
selectedKeys.keys[i] = null;

final Object a = k.attachment();

if (a instanceof AbstractNioChannel) {
//根据Key的就绪事件触发对应的事件方法。
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}

if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1);

selectAgain();
i = -1;
}
}
}

//io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel)
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
//如果当前selectionKey无效,直接关闭unsafe。
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
}
// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
if (eventLoop == this) {
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
}
return;
}

try {
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
// 处理连接事件
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);

unsafe.finishConnect();
}

// 处理写事件
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}

// 处理读事件
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}

runAllTasks 执行队列中任务

第三个部分,runAllTasks:主要目的是执行taskQueue队列和定时任务队列中的任务,如心跳检测,异步写操作等。首先NioEventLoop会根据ioRatio(I/O事件与taskQueue运行时间占比)执行任务时长。这里有一个点,就是如果IO占比达到100%不应该是所有的时间都会用来执行I/O时间,不会来执行队列任务么?后来从注释中发现当ioRatio这个参数达到100%时,将不在平衡I/O任务和任务队列之间的占比,会处理所有的I/O事件和所有的任务队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
//run方法中的runAllTask
if (ioRatio == 100) {
//如果I/O操作占比100%
try {
if (strategy > 0) {
//当前需要处理的Key的数量>0,用来处理轮询到的SelectionKey.
processSelectedKeys();
}
} finally {
// Ensure we always run tasks.
// 执行所有任务。
ranTasks = runAllTasks();
}
} else if (strategy > 0) {
//计算当前I/O占比时间
final long ioStartTime = System.nanoTime();
try {
//当前需要处理的Key的数量>0,用来处理轮询到的SelectionKey.
processSelectedKeys();
} finally {
// Ensure we always run tasks.
//按照时间比例执行队列任务
final long ioTime = System.nanoTime() - ioStartTime;
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
//即使什么都没有执行也要执行最小时间单位的队列任务
ranTasks = runAllTasks(0); // This will run the minimum number of tasks
}


// io.netty.channel.nio.NioEventLoop#setIoRatio
/**
* Sets the percentage of the desired amount of time spent for I/O in the event loop. Value range from 1-100.
* The default value is {@code 50}, which means the event loop will try to spend the same amount of time for I/O
* as for non-I/O tasks. The lower the number the more time can be spent on non-I/O tasks. If value set to
* {@code 100}, this feature will be disabled and event loop will not attempt to balance I/O and non-I/O tasks.
*/
public void setIoRatio(int ioRatio) {
if (ioRatio <= 0 || ioRatio > 100) {
throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)");
}
this.ioRatio = ioRatio;
}

由于一个NioEventLoop线程需要管理很多Channel,这些Channel的任务非常多,若要全部执行完,则I/O事可能得不到及时的处理,因此每次执行64个任务后就会检测执行任务的时间已经用完,如果任务执行的时间用完了,就不再执行后续的任务了。代码解析如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
//io.netty.util.concurrent.SingleThreadEventExecutor#runAllTasks()
/**
* Poll all tasks from the task queue and run them via {@link Runnable#run()} method.
*
* @return {@code true} if and only if at least one task was run
*/
protected boolean runAllTasks() {
assert inEventLoop();
boolean fetchedAll;
boolean ranAtLeastOne = false;

do {
// 循环拿出队列中的所有任务并执行任务
fetchedAll = fetchFromScheduledTaskQueue();
if (runAllTasksFrom(taskQueue)) {
ranAtLeastOne = true;
}
} while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.

if (ranAtLeastOne) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
}
//收尾工作
afterRunningAllTasks();
return ranAtLeastOne;
}

/**
* Poll all tasks from the task queue and run them via {@link Runnable#run()} method. This method stops running
* the tasks in the task queue and returns if it ran longer than {@code timeoutNanos}.
*/
protected boolean runAllTasks(long timeoutNanos) {
//从队列中取出任务
fetchFromScheduledTaskQueue();
Runnable task = pollTask();
if (task == null) {
// 如果当前没有任务直接返回
afterRunningAllTasks();
return false;
}

//计算得出执行任务的deadline
final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
safeExecute(task);

runTasks ++;

// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
//执行了64次任务检查时间是否超时
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}

//如果没有任务了跳出循环
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}

//收尾处理
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}

处理空轮询bug

最后一个部分是对空轮询bug的处理,Netty通过重新构建Selector的方式去规避空轮询的bug。Netty是使用计算空轮询的次数来处理这个bug。如果当前轮询未执行任何I/O任务或处理任何队列任务,则selectCnt累加,当空轮询次数达到了阈值则重新构建Selector。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
//selectorAutoRebuildThreshold重新构建阈值,从系统参数中取,默认值512
int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
selectorAutoRebuildThreshold = 0;
}

SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;


// io.netty.channel.nio.NioEventLoop#unexpectedSelectorWakeup
// returns true if selectCnt should be reset
private boolean unexpectedSelectorWakeup(int selectCnt) {
if (Thread.interrupted()) {
// Thread was interrupted so reset selected keys and break so we not run into a busy loop.
// As this is most likely a bug in the handler of the user or it's client library we will
// also log it.
//
// See https://github.com/netty/netty/issues/2426
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
return true;
}
if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The selector returned prematurely many times in a row.
// Rebuild the selector to work around the problem.
logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
selectCnt, selector);
rebuildSelector();
return true;
}
return false;
}

run()方法小结

最后回到NioEventLoop的run()方法,在这个方法中首先调用select(curDeadlineNanos)方法轮训就绪的Channel;然后调用processSelectedKeys()方法处理I/O事件;最后执行runAllTasks()方法处理任务队列。如果当前是空轮询,则进行空轮询校验unexpectedSelectorWakeup(selectCnt),如果当前空轮训的次数大于或等于阈值(默认512)则重新构建selector。以上就是关于整个run方法的梳理。

重新构建Selector和Channel的注册

从selector函数的代码解读中发现,Netty在空轮询次数大于或等于阈值(默认512)时,需要重新构建Selector。重新构建的过程为,重新打开一个新的Selector,将旧的Selector上的key和Attachment复制过去,同时关闭旧的selector,具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
// io.netty.channel.nio.NioEventLoop#rebuildSelector
/**
* Replaces the current {@link Selector} of this event loop with newly created {@link Selector}s to work
* around the infamous epoll 100% CPU bug.
*/
public void rebuildSelector() {
if (!inEventLoop()) {
execute(new Runnable() {
@Override
public void run() {
rebuildSelector0();
}
});
return;
}
rebuildSelector0();
}

//io.netty.channel.nio.NioEventLoop#rebuildSelector0
private void rebuildSelector0() {
final Selector oldSelector = selector;
final SelectorTuple newSelectorTuple;

if (oldSelector == null) {
return;
}

try {
// 重新打开一个新的Selector
newSelectorTuple = openSelector();
} catch (Exception e) {
logger.warn("Failed to create a new Selector.", e);
return;
}

// Register all channels to the new Selector.
int nChannels = 0;
// 循环将旧的selector上的attachment注册到新的selctor上
for (SelectionKey key: oldSelector.keys()) {
Object a = key.attachment();
try {
//判断当前的Key是否有效
if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
continue;
}

//在旧的Selector上触发的事件需要取消
int interestOps = key.interestOps();
key.cancel();
//把Channel重新注册到新的Selector上
SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
if (a instanceof AbstractNioChannel) {
// Update SelectionKey
((AbstractNioChannel) a).selectionKey = newKey;
}
nChannels ++;
} catch (Exception e) {
logger.warn("Failed to re-register a Channel to the new Selector.", e);
if (a instanceof AbstractNioChannel) {
AbstractNioChannel ch = (AbstractNioChannel) a;
ch.unsafe().close(ch.unsafe().voidPromise());
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
invokeChannelUnregistered(task, key, e);
}
}
}

selector = newSelectorTuple.selector;
unwrappedSelector = newSelectorTuple.unwrappedSelector;

try {
// time to close the old selector as everything else is registered to the new one
// 关闭旧Selector
oldSelector.close();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close the old Selector.", t);
}
}

if (logger.isInfoEnabled()) {
logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
}
}

注册方法register()在两个地方被调用:一是端口绑定前,需要把NioServerSocketChannel注册到Boss线程的Selector上;二是当NioEventLoop监听到有链路接入时,把链路SocketChannel包装成NioSocketChannel,并注册到Worker线程中。最终调用NioSocketChannel的辅助对象unsafe的register方法,unsafe执行父类AbstractUnsaferegister()模板方法。

总结

这一小节我们分析了NioEventLoop的源码包括两个部分分别是NioEventLoopGroupNioEventLoop,其中一个NioEventLoopGroup中包含一个或多个NIoEventLoop。其中NioEventLoopGroup的源码较为简单,包括创建NioEventLoop并完成初始化创建线程选择器提供获取线程的策略创建线程工厂并构造线程执行器。创建时候通过指定或是系统默认的线程数循环创建NioEventLoop。调用newChild()初始化,这个方法也很简单,初始化了一些必要的参数之后,构造了一个NioEventLoop。选择器也很简单,依据EventLoop的个数有两种策略,偶数使用&的策略,基数使用取余的策略。随后是ThreadFactory将新创建的线程包装成FastThreadLocalThread。随后我们剖析了NioEventLoop的源码。NioEventLoop的源码可比NioEventLoopGroup 的源码复杂多了。首先我们分析了NioEventLoop是如何开启一个Selector的。在开启Selector,我们可以使用io.netty.noKeySetOptimization选择开启优化Selector。随后我们梳理了NioEventLoop核心的run方法,run方法整体的逻辑上包括select轮询就绪Key,processSelectedKeys处理就绪Keys,runAllTask执行队列中的任务。run方法的最后校验判断当前是否存在空轮询的bug,如果空轮询达到一定次数则重新构建select和注册channel。下一小节我们将对Channel进行分析。

学习资料