Netty流程剖析 — 服务启动源码剖析

前言

前面几个小节,我们梳理了几个核心组建,深入的分析了这几个组建的工作原理和一些实现细节,但是细看局部总是难以获得一个全局的视角,很难把它们真正的串起来。在梳理完这几个组件之后,我最想知道的就是启动的时候和读写操作的时候,这几个组件是怎么配合工作。虽然我也看过一切流程图,和一些简单的总结,但是自己没有深入过源码总感觉缺了点什么。我相信只有真正分析调试过源码才能真正理解,各个组件之间是怎么协作的,各个组件之间的设计要点是什么了。带着这些疑惑,我深入了源码也收获颇丰。这个部分,我们将对Netty的启动过程进行深入的分析,看看Netty是如何进行注册、绑定和监听。

NIO服务器的基本设计思路

我们知道Netty也是一套高性能网络框架,Netty底层有对各种IO模型的支持,包括BIO、NIO和后来的AIO(只是Netty对于AIO的尝试是一个悲伤的故事),最终Netty选择基于NIO模型来实现。在深入Netty的启动流程之前,我们先眼熟几个NIO中比较重要的类/组件:

  • Selector:多路复用器,是NIO的一个核心组件,在BIO中是没有的,主要用于监听NIO Channel的各种事件,并检查一个或者多个通道是否处于可读/写状态。在实现单条线程管理多条链路时候,传统的BIO管理多条链路是通过多线程上下文切换来实现的,而NIO有了Selector之后,一个Selector只需要用一个线程就可以轮询处理多个链路通道。

  • ServerSocketChannel:与普通BIO中的ServerSocket一样,主要用来监听新加入的TCP连接通道,而且其启动方式与ServerSocket的启动方式也非常类似,只需要在开启端口监听之前,把ServerSocketChannel注册到Selector上,并设置OP_ACCEPT事件即可

  • SocketChannel:与普通BIO中的Socket一样,是连接到TCP网络Socket上的Channel。它的创建方式有两种:一种方式与ServerSocketChannel的创建方式类似,打开一个SocketChannel,这种方式主要用于客户端主动连接器另一种方式是当有客户端连接器ServerSocketChannel上时,会创建一个SocketChannel。

NIO服务设计设计主要分为三步:第一步,服务器启动与端口监听;第二步,ServerSocketChannel 处理新接入链路;第三步,SocketChannel 读/写数据。

Server端服务启动源码分析

Netty服务的过程和上面的NIO服务器的启动的核心部分并没有多大的区别,同样是要创建Selector,不过Netty会使用额外的线程去创建,也需要打开ServerSocketChannel,只不过采用了NIOServerSocketChannel 来进行包装实现,同样也需要把ServerSocketChannel注册到Selector上,这一步Netty也是使用NioEventLoop来实现,并返回ChannelPromise来异步通知是否注册成功。Netty的启动分三步走,register(注册)bind(绑定)active(激活)。在后续的源码分析中,我们也将整个启动流程拆分成这三个阶段来帮助源码的分析和理解,以及看看启动是如何把我们前面的几个模块串接起来的。在开始分析启动流程之前,我们先再次过一下我们熟悉的服务端启动代码和Server启动类ServerBootstrap

鸟瞰全局与ServerBootstrap

这里我们分析的启动类是EchoServer,这是一个io.netty.example中的一个example,因为它足够简单,也方便我们对启动类信息分析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
//io.netty.example.echo.EchoServer
public final class EchoServer {

static final boolean SSL = System.getProperty("ssl") != null;
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

public static void main(String[] args) throws Exception {
// Configure SSL.
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
} else {
sslCtx = null;
}

// 核心配置代码从这里开始
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});

// Start the server.
ChannelFuture f = b.bind(PORT).sync();

// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

从上面的代码我们不难看出,我们梳理过的一些核心组件都是通过ServerBootstrap组合在一起,其中group()方法关联主从EventLoop,channel()指定bossGroup中的负责监听的channel。option()方法可以指定一些参数,handler()则是启动过程中的一些处理事件。后面的childHandler()则是具读写事件的channel已经channelPipline处理链的一些配置,最后一个重要的方法是bind()在上面所有的配置完成之后,bind()方法将拉起整个Netty服务。下面我们来深入ServerBootstrap的细节。

1
2
3
4
5
6
7
8
9
10
//用于保存 childChannel 中的一些配置
private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();
// 用于保存 childChannel 中的一些属性
private final Map<AttributeKey<?>, Object> childAttrs = new ConcurrentHashMap<AttributeKey<?>, Object>();
// 一个包裹当前ServerBootstrap的包装对象。
private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
// 主从reactor模型中的 workerEventLoopGroup
private volatile EventLoopGroup childGroup;
// childHandler 也就是上面配置过程中的 ChannelInitializer
private volatile ChannelHandler childHandler;

在ServerBootstrap中还有一个内部类对象ServerBootstrapAcceptor,这个对象在启动过程终会被加入到BossGroup的channel的Pipline中,作为Pipeline的最后一个处理器用于监听连接事件,它继承了ChannelInboundHandlerAdapter是一个InBoundhandler。这个类可以理解发挥着承上启下的功能,从他的名字Acceptor也不难看出它是用于监听OP_ACCEPT连接事件,当Accept事件发生后会触发channelRead方法,将childChannel注册到 childGroup (即我们创建 workerGroup)进行具体的读写操作。这个会在Netty就绪事件处理的部分进行详细分析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
//io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {

private final EventLoopGroup childGroup;
private final ChannelHandler childHandler;
private final Entry<ChannelOption<?>, Object>[] childOptions;
private final Entry<AttributeKey<?>, Object>[] childAttrs;
private final Runnable enableAutoReadTask;

ServerBootstrapAcceptor(
final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,
Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
this.childGroup = childGroup;
this.childHandler = childHandler;
this.childOptions = childOptions;
this.childAttrs = childAttrs;

// Task which is scheduled to re-enable auto-read.
// It's important to create this Runnable before we try to submit it as otherwise the URLClassLoader may
// not be able to load the class because of the file limit it already reached.
//
// See https://github.com/netty/netty/issues/1328
enableAutoReadTask = new Runnable() {
@Override
public void run() {
channel.config().setAutoRead(true);
}
};
}

@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;

child.pipeline().addLast(childHandler);

setChannelOptions(child, childOptions, logger);
setAttributes(child, childAttrs);

try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}

private static void forceClose(Channel child, Throwable t) {
child.unsafe().closeForcibly();
logger.warn("Failed to register an accepted channel: {}", child, t);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
final ChannelConfig config = ctx.channel().config();
if (config.isAutoRead()) {
// stop accept new connections for 1 second to allow the channel to recover
// See https://github.com/netty/netty/issues/1328
config.setAutoRead(false);
ctx.channel().eventLoop().schedule(enableAutoReadTask, 1, TimeUnit.SECONDS);
}
// still let the exceptionCaught event flow through the pipeline to give the user
// a chance to do something with it
ctx.fireExceptionCaught(cause);
}
}

ServerBootstrap 中还有一个需要我们注意的重要的方法init(),这个方法在后续整个启动流程的注册和启动部分会被调用。这个方法整体来说非常简单,需要注意的点是后半部分,将配置的handler加入到ServerSocketChannel 的 pipleline的这部分代码,这部分代码和我们构建ServerBootstrap时候写childHandler初始化的代码类似。这里在ServerSocketChannel中加入一个channel初始化的Handler即ChannelInitializer,在这个初始化过程中,又将我们构造ServerBootstrap时添加的LoggingHandler加入pipeline。最后包装创建的Accptor(ServerBootstrapAcceptor)对象加入到Pipeline中。但是加入的方式有一些特殊,并不是直接由当前执行线程直接加入,而是包装成一个EventLoop的task,等待后续的BossGroup的EventLoop进行执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
//io.netty.bootstrap.ServerBootstrap#init
@Override
void init(Channel channel) {
setChannelOptions(channel, newOptionsArray(), logger);
setAttributes(channel, newAttributesArray());

ChannelPipeline p = channel.pipeline();

final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);

// 将我们配置LoggerHandler加入到NioServerSocketChannel的Pipeline中,
// 并最后加入ServerBootstrapAcceptor
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}

ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}

看完上面的一些逻辑后,我们对于ServerBootstrap也有了一个大致的了解,接下来我们将从bind方法开始,具体看看Netty的启动是怎么样的,看看Netty是如何一步步的完成registerbindactive这几步操作的。在分析源码之前先提出两个两个问题,我们可以带着这两个问题去看源码;

NioServerSocketChannel的Handle管道DefaultChannelPipeline是如何添加Handler并触发各种事件的?

在服务启动的过程中,register、bind、active操作是通过EventLoopTask的形式执行的,那他们是如何保证有序的?

接下来我们就来从ServerBootstrapbind()方法开始分析Netty的启动流程源码。

initAndRegister(初始化与注册)部分剖析

这个部分我们要关注的主体方法initAndRegister(),这个方法会返回一个ChannelFuture(一个包着Channel的Future,因为这个channel在bind部分需要用)。在这个大方法中我们将完成通过反射创建在配置中预先设置的channel的NioServerSocketChannel实例对象,再调用SelectableChannelregister()方法注册到NioEventLoop的Selector上,并返回对应的selectionKey。完成注册之后,再调用DefaultChannelPipeline的callHandlerAddedForAllHandlers()方法,将ServerBootstrapAcceptor和一些自定义的handler加入到NioServerSocketChannel的pipleline。

这里要注意⚠️,这里的pipleline还只是NioServerSocketChannel的pipleline。

完成了这一步的操作之后,设置初始化完成和注册操作的主体就已经完成了,接下来将future设置为成功,并执行future中设置的监听器,也就是拉起我们的下一步bind操作。bind操作则是加入到eventLoop的下一个task。在完成bind方法的链接之后,我们的eventLoop线程(是的,这个时候已经不是主线程在操作了,而是eventLoop线程了)将沿着我们的NioServerSocketChannel的pipleline挨个执行InboundHandler的channelRegistered()方法。当这个方法执行完成之后,我们的initAndRegister部分就执行结束了。所以通过上面的简单介绍,我们对initAndRegister有一个大致的印象了,主要有一下几个部分。

不用着急,下面我们会深入源码一步步走一遍。

深入bind()方法

以下我们从bind方法开始这一部分的源码分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
//io.netty.bootstrap.AbstractBootstrap#bind(int)
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}

//io.netty.bootstrap.AbstractBootstrap#bind(java.net.SocketAddress)
public ChannelFuture bind(SocketAddress localAddress) {
validate();
return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}

// 以上都是两个bind重载方法,以下进行到真正的bind方法。
//io.netty.bootstrap.AbstractBootstrap#doBind
private ChannelFuture doBind(final SocketAddress localAddress) {
//前面提到的initAndRegister方法,初始化channel并完成handle的注册操作
// 返回一个包着channel的future对象。
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}

// 如果主线执行到这里异步执行的initAndRegister已经执行完成了,直接调用doBind0()方法,
// 否则使用channelFuture添加监听器,在完成之后执行doBind0()方法。
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();

doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}

上面这部分我们从bind()方法入手,再经过多次调用重载方法之后,我们看到这个初始化与注册阶段的核心方法调用initAndRegister。initAndRegister是一个异步方法,返回一个ChannelFuture对象。在这个调用完成之后,拉起doBind0()方法执行端口绑定操作。以下是initAndRegister()方法的源码剖析。

initAndRegister()方法剖析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
// 初始化注册
//io.netty.bootstrap.AbstractBootstrap#initAndRegister
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 通过channelFactory创建一个channel,这里channelFactory,初始化ServerBootstrap调用channel()时创建的,
// channelFactory.newChannel()构建出channel()方法入参的实例,即NioServerSocketChannel实例。
channel = channelFactory.newChannel();

// 初始化channel,这个初始化上面我们分析过,这里就不深入了。
// 里面的大致逻辑是,将ServerBootstrap的handler传入的handler对象加入到channel的pipleline,
// 并创建一个eventloop的任务将 ServerBootstrapAcceptor 加入到pipleline最后。
init(channel);
} catch (Throwable t) {
// 异常处理
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}

// channel注册并返回 ChannelFuture
// 这里调用io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.Channel)
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
// 如果发生了异常关闭channel
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}

// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.

return regFuture;
}

//io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.Channel)
@Override
public ChannelFuture register(Channel channel) {
// 方法重载调用 io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.ChannelPromise)
return register(new DefaultChannelPromise(channel, this));
}

//io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.ChannelPromise)
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
// 最终执行的注册方法为channel的unsafe类中的注册方法。
// 也就是io.netty.channel.AbstractChannel.AbstractUnsafe#register
promise.channel().unsafe().register(this, promise);
return promise;
}

//io.netty.channel.AbstractChannel.AbstractUnsafe#register
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
ObjectUtil.checkNotNull(eventLoop, "eventLoop");
if (isRegistered()) {
// 如果已经注册过了
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}

AbstractChannel.this.eventLoop = eventLoop;

if (eventLoop.inEventLoop()) {
// 如果当前线程是EventLoop,直接调用注册方法。
register0(promise);
} else {
// 将注册动作加入到eventLoop的Task中
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}

// 注册主方法,这个方法里面完成了注册操作的主要逻辑
//io.netty.channel.AbstractChannel.AbstractUnsafe#register0
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
// 将channel注册到selector上,并设置类局部变量selectionKey
doRegister();
neverRegistered = false;
registered = true;

// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.

// 下面这个方法将handler加入到pipeline中。这里是NioServerChannel的pipeline
pipeline.invokeHandlerAddedIfNeeded();
// 将异步的promise设置成功,并调用promise中添加的Listener监听器。
safeSetSuccess(promise);
// 调用依次调用pipleline中inBoundhandler的channelRegistered()方法。
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.

// 如果channel已经被打开,并且已经绑定端口,完成下面的操作,否则方法结束。
// 这个方法在我们后面的源码代码分析中会经常见到。
if (isActive()) {
if (firstRegistration) {
// 第一次注册要依次调用触发
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// 如果开启了自动读,直接开始监听accept事件。
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}

// NioChannel的注册方法,将channel注册到selector上,并返回selectionKey。
//io.netty.channel.nio.AbstractNioChannel#doRegister
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}

这个部分我们深入initAndRegister源码,这部分的源码逻辑也像方法名 initAndRegister,完成了serverBootstrap的init操作,随后就是register 方法,也就是随后将channel注册到selector上。(这个register 方法也包了好几层,最后是在AbstractUnsafe中实现了这个register方法,没错又是unsafe方法。)在注册完成后,调用invokeHandlerAddedIfNeeded将handler加入到channel的pipeline中,然后调用safeSetSuccess(promise)设promise操作成功,调用promise中设置的listener方法。在将handler加入到pipeline之后,调用pipeline.fireChannelRegistered()顺着pipeline调用inboundHandler的channelRegistered(),最后调用isActive()校验下是否channel是否已经开启并且已经绑定端口,正常情况下到这里都是没有isActive的结果都是false的。接下来,我们深入看看那invokeHandlerAddedIfNeededsafeSetSuccess(promise)pipeline.fireChannelRegistered()深入剖析下这些方法底层的实现细节。

invokeHandlerAddedIfNeeded() 方法剖析

在initAndRegister这部分分析的开始我们提到了两个问题,其中第一个问题是“NioServerSocketChannel的Handle管道DefaultChannelPipeline是如何添加Handler并触发各种事件的?”,而invokeHandlerAddedIfNeeded()就是将ServerBootstrap中初始化的handler加入到Channel的Pipeline中。接下来我们一起来看看吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
//io.netty.channel.DefaultChannelPipeline#invokeHandlerAddedIfNeeded
final void invokeHandlerAddedIfNeeded() {
// 简单的校当前是否是eventloop线程,执行到这,当前一定是eventloop线程。
assert channel.eventLoop().inEventLoop();
// 是否是第一次注册。
if (firstRegistration) {
firstRegistration = false;
// We are now registered to the EventLoop. It's time to call the callbacks for the ChannelHandlers,
// that were added before the registration was done.
// 调用真正实现添加handler到pipeline的方法。
callHandlerAddedForAllHandlers();
}
}

//io.netty.channel.DefaultChannelPipeline#callHandlerAddedForAllHandlers
private void callHandlerAddedForAllHandlers() {
// PendingHandlerCallback 这个类的功能类似于一个触发器,本质上是一个链表结构。
// 链表中的元素会在callHandlerCallbackLater()中被添加,
// 而方法 callHandlerCallbackLater()会在pipeline.addLast()中被调用。
final PendingHandlerCallback pendingHandlerCallbackHead;
synchronized (this) {
// 这里锁操作,确保只能被注册一次。
assert !registered;

// This Channel itself was registered.
// 将 register 设置为true,这个会影响后面代码的逻辑,在后面的initAndRegister部分会多次出现。
registered = true;

// 将类局部变量赋值给方法变量,方便GC回收。
pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
// Null out so it can be GC'ed.
this.pendingHandlerCallbackHead = null;
}

// This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while
// holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside
// the EventLoop.
PendingHandlerCallback task = pendingHandlerCallbackHead;
while (task != null) {
// 循环执行依次执行链表中每个callBackTask的execute方法。
task.execute();
task = task.next;
}
}

//io.netty.channel.DefaultChannelPipeline#callHandlerCallbackLater
private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
assert !registered;

// 创建得到一个PendingHandlerAddedTask,这个是DefaultChannelPipeline的一个内部类。
PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
PendingHandlerCallback pending = pendingHandlerCallbackHead;
if (pending == null) {
//因为PendingHandlerCallback这个对象本身是一个链表结构,这里的操作是设置头
pendingHandlerCallbackHead = task;
} else {
// Find the tail of the linked-list.
// 循环找到链表的末尾元素并加入到最后。
while (pending.next != null) {
pending = pending.next;
}
pending.next = task;
}
}

// 这个类是DefaultChannelPipeline的一个内部类。
// io.netty.channel.DefaultChannelPipeline.PendingHandlerAddedTask
private final class PendingHandlerAddedTask extends PendingHandlerCallback {

PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) {
super(ctx);
}

@Override
public void run() {
callHandlerAdded0(ctx);
}

// 从这个方法就不难看出,这只是一个handler添加的触发器,作为PendingHandlerCallback链表结构的头元素,
//拉起整个pipleline中handler的添加。
@Override
void execute() {
EventExecutor executor = ctx.executor();
// 这里的代码结构就不深入了,本质上还是使用eventLoop去调用callHandlerAdded0()方法
if (executor.inEventLoop()) {
callHandlerAdded0(ctx);
} else {
try {
executor.execute(this);
} catch (RejectedExecutionException e) {
if (logger.isWarnEnabled()) {
logger.warn(
"Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.",
executor, ctx.name(), e);
}
atomicRemoveFromHandlerList(ctx);
ctx.setRemoved();
}
}
}
}

//io.netty.channel.DefaultChannelPipeline#callHandlerAdded0
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
// 这个方法也只是包了一层, 真正处理逻辑的是ctx.callHandlerAdded(),这个方法。
// 进到方法里面看handler调用自己的handlerAdded方法。
try {
ctx.callHandlerAdded();
} catch (Throwable t) {
// 如果发生了异常,执行handler的remove操作。
boolean removed = false;
try {
atomicRemoveFromHandlerList(ctx);
ctx.callHandlerRemoved();
removed = true;
} catch (Throwable t2) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to remove a handler: " + ctx.name(), t2);
}
}

//设置异常
if (removed) {
fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() +
".handlerAdded() has thrown an exception; removed.", t));
} else {
fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() +
".handlerAdded() has thrown an exception; also failed to remove.", t));
}
}
}

//io.netty.channel.AbstractChannelHandlerContext#callHandlerAdded
final void callHandlerAdded() throws Exception {
// We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates
// any pipeline events ctx.handler() will miss them because the state will not allow it.

// 调用添加handler方法。
if (setAddComplete()) {
handler().handlerAdded(this);
}
}

上面这半段逻辑是invokeHandlerAddedIfNeeded()方法的前半段。这个部分我们看到在方法内部调用了callHandlerAddedForAllHandlers方法,这个方法是拉起channel的pipeline中handlerAdded的主要方法,这个方法里面我们重点关注了一个类PendingHandlerCallback,这个类本质上是一个链表结构的类,在第一次调用pipeline.addLast()的时候会包装一个handler进去作为链表的头,并将add的handler添加到链表的最后。并且每次调用pipeline.addLast()如果此时还没完成注册,就会在这个链表的最后添加handler。回到我们的callHandlerAddedForAllHandlers方法,在拿到了PendingHandlerCallback之后,我们顺着链表依次执行callHandlerAdded0方法即ChannelHandler调用自己的handlerAdded()方法。

理解这一段需要结合源码反复品。

那么我的问题来,我们前面提到了PendingHandlerCallback是一个链表结构,我们会依次沿着这个链表初始化ChannelHandler,那么第一次执行到这里的handler(),具体是哪个对象呢?换一个角度问这个问题,这个链表结构中,第一个被加入的handler是什么呢,第一次调用pipeline.addLast()被加的handler是哪个实例呢?其实这个实例是ChannelInitializer,channel初始化的handler。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
// 第一次添加到pipeline中是在我们一开始bind方法中,
// 调用的serverBootstrap里面调用的init方法。
//io.netty.bootstrap.ServerBootstrap#init
@Override
void init(Channel channel) {
setChannelOptions(channel, newOptionsArray(), logger);
setAttributes(channel, newAttributesArray());

ChannelPipeline p = channel.pipeline();

final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);

p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}

ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}

// 都看到这了,我们顺便来看看pipeline.addLast()里面的源码
//io.netty.channel.DefaultChannelPipeline#addLast(io.netty.channel.ChannelHandler...)
@Override
public final ChannelPipeline addLast(ChannelHandler... handlers) {
// 原来addLast方法可以一次添加多个ChannelHandler
return addLast(null, handlers);
}

//再继续往下,addLast方法还需要一个executor,上面调用这个方法的时候直接给null。
//io.netty.channel.DefaultChannelPipeline#addLast(io.netty.util.concurrent.EventExecutorGroup, io.netty.channel.ChannelHandler...)
@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
ObjectUtil.checkNotNull(handlers, "handlers");
// 循环调用将所有的handler
for (ChannelHandler h: handlers) {
if (h == null) {
break;
}
addLast(executor, null, h);
}

return this;
}

// 真正执行addLast()操作的方法。
//io.netty.channel.DefaultChannelPipeline#addLast(io.netty.util.concurrent.EventExecutorGroup, java.lang.String, io.netty.channel.ChannelHandler)
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
//创建一个新的DefaultChannelHandlerContext,设置eventLoopGroup和handler对象。
newCtx = newContext(group, filterName(name, handler), handler);

// 将newContext添加到pipeline的链表的最后。
addLast0(newCtx);

// If the registered is false it means that the channel was not registered on an eventLoop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
// 如果当前,还没有注册完成,也就是还没有调用PendingHandlerCallback,
// 也就是还没有调用PendingHandlerCallback触发handler的handlerAdd方法。
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}

// 如果线程不是eventLoop线程将其添加到eventLoop的Task队列执行。
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
// 执行到这里的条件是 当前是eventloop线程,并且当前已经完成了注册,直接添加并调用handlerAdd方法即可。
callHandlerAdded0(newCtx);
return this;
}

所以通过上面代码的分析,我们不难得出,PendingHandlerCallback这个handlerAdd方法调用链上的第一个处理器就是我们在ServerBootstrap的init()方法中的ChannelInitializer内部类,弄清楚了这个问题,接下来我们顺着handler().handlerAdded(this);方法继续往下走。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// io.netty.channel.ChannelInitializer#handlerAdded
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
// This should always be true with our current DefaultChannelPipeline implementation.
// The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering
// surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers
// will be added in the expected order.
// 调用initChannel也就是我们在ServerBootstrap中init构造的内部类的initChannel方法。
if (initChannel(ctx)) {

// We are done with init the Channel, removing the initializer now.
removeState(ctx);
}
}
}

//io.netty.channel.ChannelInitializer#initChannel(io.netty.channel.ChannelHandlerContext)
@SuppressWarnings("unchecked")
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.add(ctx)) { // Guard against re-entrance.
try {
// 调用执行initChannel 传入channel, 即p.addLast()内部类的initChannel方法。
initChannel((C) ctx.channel());
} catch (Throwable cause) {
// Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
// We do so to prevent multiple calls to initChannel(...).
exceptionCaught(ctx, cause);
} finally {
// ChannelInitializer 初始化Handler执行完成之后,
// 将当前这个handler从pipeline中移出。
ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) {
pipeline.remove(this);
}
}
return true;
}
return false;
}


p.addLast(new ChannelInitializer<Channel>() {

// 在initChannel方法的逻辑中,我们先将ServerBootstrap的初始化的Handler加入到pipeline中,
// 并且最后将 ServerBootstrapAcceptor 加入到pipeline的最后,并且是以eventLoop的task的形式执行。
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
// 获取到ServerBootstrap的handler,即 LoggingHandler
ChannelHandler handler = config.handler();
if (handler != null) {
//LoggerHandler加入到pipeline的最后,
//此时register为true,会直接触发handlerAdded方法。
pipeline.addLast(handler);
}

// 使用eventloop的方式将ServerBootstrapAcceptor添加到task队列的最后。
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});

ChannelInitializer的初始化方法中,在初始化过程中,我们创建LoggingHandler实例也被加入到pipeline中,我们前面分析的pipeline.addLast()方法,会判断register来决定是立刻执行HandlerAdded()方法,还是包装成PendingHandlerCallback加入到链表中。在这个场景中,register为true,会直接执行HandlerAdded(),即AbstractChannelHandlerContext#callHandlerAdded()方法。

1
2
3
4
5
6
7
8
9
10
//io.netty.channel.AbstractChannelHandlerContext#callHandlerAdded
final void callHandlerAdded() throws Exception {
// We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates
// any pipeline events ctx.handler() will miss them because the state will not allow it.

// 调用添加handler方法。
if (setAddComplete()) {
handler().handlerAdded(this);
}
}

而此时的handler()则是LoggingHandler的实例。LoggingHandler并没有实现handlerAdded()方法,因此实际执行的是io.netty.channel.ChannelHandlerAdapter#handlerAdded()

1
2
3
4
5
6
7
8
/**
* Do nothing by default, sub-classes may override this method.
*/
//io.netty.channel.ChannelHandlerAdapter#handlerAdded
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// NOOP
}

随着最后下面这段逻辑将ChannelInitializer从pipeline中移出去。invokeHandlerAddedIfNeeded()的逻辑也就结束了。当然最后还会通过 eventLoop的task的方式将ServerBootstrapAcceptor加入到pipeline中。

1
2
3
4
5
6
7
8
9
10
11
12
ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) {
pipeline.remove(this);
}

ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});

safeSetSuccess 与 fireChannelRegistered

完成了对方法invokeHandlerAddedIfNeeded()的深入剖析之后,我们继续回到register0()方法中。在完成了pipeline.invokeHandlerAddedIfNeeded();的调用,Netty将调用safeSetSuccess(promise);将异步结果 promise 设置为成功,并且调用后续的fireChannelRegistered()方法,触发pipeline中的handler的channelRegistered()方法。最后还会判断当前是否已经激活,如果激活直接执行channelActive方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
//io.netty.channel.AbstractChannel.AbstractUnsafe#register0
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
// 将channel注册到selector上,并设置类局部变量selectionKey
doRegister();
neverRegistered = false;
registered = true;

// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.

// 下面这个方法将handler加入到pipeline中。这里是NioServerChannel的pipeline
pipeline.invokeHandlerAddedIfNeeded();
// 将异步的promise设置成功,并调用promise中添加的Listener监听器。
safeSetSuccess(promise);
// 调用依次调用pipleline中inBoundhandler的channelRegistered()方法。
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.

// 如果channel已经被打开,并且已经绑定端口,完成下面的操作,否则方法结束。
// 这个方法在我们后面的源码代码分析中会经常见到。
if (isActive()) {
if (firstRegistration) {
// 第一次注册要依次调用触发
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// 如果开启了自动读,直接开始监听accept事件。
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}

首先我们先深入safeSetSuccess(promise);方法, 这个方法里面其实很简单CAS的方式设置futurePromise成功,并notify设置好的Listener监听器。下面是safeSetSuccess(promise);的源码剖析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
//io.netty.channel.AbstractChannel.AbstractUnsafe#safeSetSuccess
protected final void safeSetSuccess(ChannelPromise promise) {
// 这里需要重点注意的方法是promise.trySuccess()
if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
}
}

//io.netty.channel.DefaultChannelPromise#trySuccess
@Override
public boolean trySuccess() {
// 调用重载方法
return trySuccess(null);
}

@Override
public boolean trySuccess(V result) {
return setSuccess0(result);
}

private boolean setSuccess0(V result) {
return setValue0(result == null ? SUCCESS : result);
}

//io.netty.util.concurrent.DefaultPromise#setValue0
private boolean setValue0(Object objResult) {
// CAS操作设置Updater
if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
// checkNotifyWaiters 检查校验,是否有线程wait,如果存在调用 notifyAll,
// 并且返回 listerners != null
if (checkNotifyWaiters()) {
// 如果当前存在Listener,调用notifyListeners,notify他们。
notifyListeners();
}
return true;
}
return false;
}


//io.netty.util.concurrent.DefaultPromise#notifyListeners
private void notifyListeners() {
EventExecutor executor = executor();
// 判断是否在 eventLoop中,如果在eventloop中直接执行。
if (executor.inEventLoop()) {
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
final int stackDepth = threadLocals.futureListenerStackDepth();
// 记录当前调用栈深度,如果操作最大的栈深度直接返回。
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
// 栈深度 +1
threadLocals.setFutureListenerStackDepth(stackDepth + 1);
try {
//调用notifyListeners。
notifyListenersNow();
} finally {
// 方法执行完成后,将调用栈深度设置回原来的值。
threadLocals.setFutureListenerStackDepth(stackDepth);
}
return;
}
}

safeExecute(executor, new Runnable() {
@Override
public void run() {
notifyListenersNow();
}
});
}

//io.netty.util.concurrent.DefaultPromise#notifyListenersNow
private void notifyListenersNow() {
Object listeners;
// 加锁将当前需要当前需要notify的监听器取出来,而不是我们一直准备去处理监听器。
// 所以这一段的逻辑就是使用一个方法的局部变量listeners将类局部变量this.listeners
// 接出来,并将类的this.listeners设置为null。
synchronized (this) {
// Only proceed if there are listeners to notify and we are not already notifying listeners.
if (notifyingListeners || this.listeners == null) {
return;
}
notifyingListeners = true;
listeners = this.listeners;
this.listeners = null;
}
// 循环listener执行。
for (;;) {
if (listeners instanceof DefaultFutureListeners) {
notifyListeners0((DefaultFutureListeners) listeners);
} else {
notifyListener0(this, (GenericFutureListener<?>) listeners);
}
// 执行完成之后,在将notifyingListeners 加锁并设置为false。
synchronized (this) {
if (this.listeners == null) {
// Nothing can throw from within this method, so setting notifyingListeners back to false does not
// need to be in a finally block.
notifyingListeners = false;
return;
}
listeners = this.listeners;
this.listeners = null;
}
}
}

// DefaultFutureListeners listener的包装类,里面包了一个listener的list。
//io.netty.util.concurrent.DefaultPromise#notifyListeners0
private void notifyListeners0(DefaultFutureListeners listeners) {
GenericFutureListener<?>[] a = listeners.listeners();
int size = listeners.size();
for (int i = 0; i < size; i ++) {
// 循环调用notifyListener0即可。
notifyListener0(this, a[i]);
}
}

// notifyListener主方法,也就是调用listener中的operationComplete方法。
//io.netty.util.concurrent.DefaultPromise#notifyListener0
@SuppressWarnings({ "unchecked", "rawtypes" })
private static void notifyListener0(Future future, GenericFutureListener l) {
try {
// 调用operationComplete方法。
l.operationComplete(future);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
}
}
}

在这里完成Listener的回调,其实就是我们前面分析的拉起后续端口bind操作。也就是下面这段代码。如果注册的过程中没有出现异常,doBind0(regFuture, channel, localAddress, promise);即将bind任务加入到eventLoop的task队列中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
regFuture.addListener(new ChannelFutureListener() {
// listener监听器执行逻辑
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
// 将bind任务添加到task任务队列中。
doBind0(regFuture, channel, localAddress, promise);
}
}
});

在核心注册方法register0()完成safeSetSuccess(promise);的调用之后,会执行pipeline.fireChannelRegistered()方法,这个方法的执行的操作和这个方法的名字一样,依次执行Channel的pipeline中的InboundHandler的ChannelRegistered() 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
//io.netty.channel.DefaultChannelPipeline#fireChannelRegistered
// 这是DefaultChannelPipeline的fireChannelRegistered方法。
@Override
public final ChannelPipeline fireChannelRegistered() {
// 使用组合的方式调用HandlerContext的invokeChannelRegistered方法,这里的head
// 是pipeline的handle链的头节点。
AbstractChannelHandlerContext.invokeChannelRegistered(head);
return this;
}

//io.netty.channel.AbstractChannelHandlerContext#invokeChannelRegistered(io.netty.channel.AbstractChannelHandlerContext)
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
// 判断当前线程是否是eventLoop线程
next.invokeChannelRegistered();
} else {
// 如果不再当前线程使用eventLoop执行。
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRegistered();
}
});
}
}

//io.netty.channel.AbstractChannelHandlerContext#invokeChannelRegistered()
private void invokeChannelRegistered() {
// 判断当前handler是否调用过
if (invokeHandler()) {
// 如果没有调用过,执行channelRegistered调用
try {
((ChannelInboundHandler) handler()).channelRegistered(this);
} catch (Throwable t) {
invokeExceptionCaught(t);
}
} else {
// 调用下一个ChannelRegister
fireChannelRegistered();
}
}

//io.netty.channel.AbstractChannelHandlerContext#fireChannelRegistered
@Override
public ChannelHandlerContext fireChannelRegistered() {
//MASK_CHANNEL_REGISTERED这是一个标志性掩码,表示某个方法一定会调用,
// MASK_CHANNEL_REGISTERED 表示REGISTERED方法的handler会被调用。
// 找到合适的handler之后,执行invokeChannelRegistered(ctx)调用。
invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED));
return this;
}

//顺着pipeline的channelHandler的调用链通过mask掩码找到合适的Handler
private AbstractChannelHandlerContext findContextInbound(int mask) {
AbstractChannelHandlerContext ctx = this;
EventExecutor currentExecutor = executor();
do {
ctx = ctx.next;
} while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));
return ctx;
}

通过上面这部分代码,我们就实现了顺着pipeline的channelHandler的调用链,依次执行合适的handler方法即这里的MASK_CHANNEL_REGISTERED所指代的channelRegistered()方法。到此fireChannelRegistered的代码执行逻辑原理我们也就分析完了。我们回到register0方法,简单看下主方法体的最后一段。如果此时的channel已经激活并且是第一次注册,需要调用pipeline.fireChannelActive(),顺着handler调用链依次执行channelActive(ctx)方法,其原理和pipeline.fireChannelRegistered()方法一致。

1
2
3
4
5
6
7
8
9
10
11
12
13
if (isActive()) {
if (firstRegistration) {
// 第一次注册要依次调用触发
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// 如果开启了自动读,直接开始监听accept事件。
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}

到这里register0的分析完成,我们initAndRegister注册和初始化部分的源码也算是剖析完成了。按照顺序我们应该继续分析启动流程的下一个部分bind端口绑定部分,但是我想再中间插入一个小菜,也是我在看源码中感觉设计精妙的点,这个点就是eventloop。之前我对eventloop的理解只是简单处理连接handler逻辑的线程池结构的组件。但是在启动过程中,我又发现Netty又充分利用eventloop的task队列进行Server的拉起。在开始bind绑定部分之前,我们先一起看看eventloop在这承上启下过程中发挥的作用。

EventLoop启动与Task任务队列

EventLoop在前面的我们有详细的剖析,但是没有实际的结合执行代码来分析。这个部分我们将结合启动过程中,eventloop通过task队列将启动过程中各个部分解耦拆分的过程,来深入理解eventloop在整个启动过程中所发挥的作用。同时深入我们对eventLoop这个模块的理解。我们回到代码本身来从头看看eventLoop是什么时候参与到Netty服务的启动的。

eventLoop的启动

我们在整个服务中多次提到了executor.inEventLoop()校验,即判断当前线程是否是eventLoop线程。按照我们代码的执行顺序,第一是出现在调用register0方法时候,由main线程触发。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
//io.netty.channel.AbstractChannel.AbstractUnsafe#register
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
ObjectUtil.checkNotNull(eventLoop, "eventLoop");
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}

AbstractChannel.this.eventLoop = eventLoop;

// 启动过程中第一次出现eventLoop.inEventLoop()校验。
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
// 在启动过程中,由main线程执行,因此走到这个分支。
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}

因为这里是main线程触发,所以这里会通过调用eventLoop.execute(runnable)方法启动已经完成初始化的eventLoop。以下是详细的源码剖析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
//io.netty.util.concurrent.SingleThreadEventExecutor#execute(java.lang.Runnable)
@Override
public void execute(Runnable task) {
ObjectUtil.checkNotNull(task, "task");
execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
}

//io.netty.util.concurrent.SingleThreadEventExecutor#execute(java.lang.Runnable, boolean)
private void execute(Runnable task, boolean immediate) {
boolean inEventLoop = inEventLoop();
// 将任务加入到eventloop的taskQueue中。
addTask(task);
// 判断当前是否eventLoop线程,如果不是启动线程
if (!inEventLoop) {
startThread();
//是否已经是关闭状态。
if (isShutdown()) {
boolean reject = false;
try {
// 将当前任务从当前队列移除
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
// The task queue does not support removal so the best thing we can do is to just move on and
// hope we will be able to pick-up the task before its completely terminated.
// In worst case we will log on termination.
}
// 移除成功,抛eventLoop terminate 的异常警告。
if (reject) {
reject();
}
}
}

if (!addTaskWakesUp && immediate) {
// 判断当前eventloop是否是活跃状态,并且当前任务是否需要立刻执行。
// 如果是,唤醒当前eventloop(丢入一个空任务)
wakeup(inEventLoop);
}
}

// io.netty.util.concurrent.SingleThreadEventExecutor#startThread
private void startThread() {
if (state == ST_NOT_STARTED) {
// 校验当前状态为未启动状态,并通过CAS的方式将状态设置为ST_STARTED。
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
// 真正执行启动线程逻辑
doStartThread();
success = true;
} finally {
// 如果启动失败,将状态通过CAS的方式设置ST_NOT_STARTED
if (!success) {
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
}

//io.netty.util.concurrent.SingleThreadEventExecutor#doStartThread
private void doStartThread() {
assert thread == null;

// 异步执行启动任务。
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}

boolean success = false;
updateLastExecutionTime();
try {
// 在这里启动eventloop线程,这个方法的里面是一个死循环。
SingleThreadEventExecutor.this.run();
// 执行到这里说明是正常的关闭,而非异常退出。
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
// 以下的代码是更新状态为ST_SHUTDOWN,ST_TERMINATED并最后清理资源。
for (;;) {
int oldState = state;
if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
break;
}
}

// Check if confirmShutdown() was called at the end of the loop.
if (success && gracefulShutdownStartTime == 0) {
if (logger.isErrorEnabled()) {
logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
"be called before run() implementation terminates.");
}
}

try {
// Run all remaining tasks and shutdown hooks. At this point the event loop
// is in ST_SHUTTING_DOWN state still accepting tasks which is needed for
// graceful shutdown with quietPeriod.
for (;;) {
if (confirmShutdown()) {
break;
}
}

// Now we want to make sure no more tasks can be added from this point. This is
// achieved by switching the state. Any new tasks beyond this point will be rejected.
for (;;) {
int oldState = state;
if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTDOWN)) {
break;
}
}

// We have the final set of tasks in the queue now, no more can be added, run all remaining.
// No need to loop here, this is the final pass.
confirmShutdown();
} finally {
try {
cleanup();
} finally {
// Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify
// the future. The user may block on the future and once it unblocks the JVM may terminate
// and start unloading classes.
// See https://github.com/netty/netty/issues/6596.
FastThreadLocal.removeAll();

STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
threadLock.countDown();
int numUserTasks = drainTasks();
if (numUserTasks > 0 && logger.isWarnEnabled()) {
logger.warn("An event executor terminated with " +
"non-empty task queue (" + numUserTasks + ')');
}
terminationFuture.setSuccess(null);
}
}
}
}
});
}

通过这部分代码的执行,我们启动了一个初始化完成的eventloop,我们不难发现,eventLoop的启动并不是在addTask之前来调用特定的方法实现。而是整合到addTask的过程中,先将task加入队列中,然后判断当前线程是不是eventLoop线程,如果不是则启动eventLoop,否则跳过启动。最后再由后续的eventLoop的pollTask来执行task。这一点设计很巧妙。

task队列任务的执行

后面的逻辑就是我们前面剖析的NioEventLoop中的run方法剖析了,这里我们再简单的过一下,我们这次着重关注task队列任务的执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
//io.netty.channel.nio.NioEventLoop#run
@Override
protected void run() {
int selectCnt = 0;
for (;;) {
try {
int strategy;
try {
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;

case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO

case SelectStrategy.SELECT:
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) {
strategy = select(curDeadlineNanos);
}
} finally {
// This update is just to help block unnecessary selector wakeups
// so use of lazySet is ok (no race condition)
nextWakeupNanos.lazySet(AWAKE);
}
// fall through
default:
}
} catch (IOException e) {
// If we receive an IOException here its because the Selector is messed up. Let's rebuild
// the selector and retry. https://github.com/netty/netty/issues/8566
rebuildSelector0();
selectCnt = 0;
handleLoopException(e);
continue;
}

selectCnt++;
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
boolean ranTasks;
// 下面这段代码可以看到无论ioRatio的值为多少都会执行runAllTasks();方法
if (ioRatio == 100) {
try {
if (strategy > 0) {
processSelectedKeys();
}
} finally {
// Ensure we always run tasks.
ranTasks = runAllTasks();
}
} else if (strategy > 0) {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
ranTasks = runAllTasks(0); // This will run the minimum number of tasks
}

if (ranTasks || strategy > 0) {
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
selectCnt = 0;
} else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
selectCnt = 0;
}
} catch (CancelledKeyException e) {
// Harmless exception - log anyway
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
} catch (Error e) {
throw e;
} catch (Throwable t) {
handleLoopException(t);
} finally {
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Error e) {
throw e;
} catch (Throwable t) {
handleLoopException(t);
}
}
}
}

这段逻辑我们就不重复分析了,这段逻辑大致可以分为select轮询就绪KeyprocessSelectedKeys处理就绪keyrunAllTasks处理队列中的任务处理空轮询bug这几个部分,其中无论ioRatio的值为多少runAllTasks都会执行。这就保证了我们处于队列中的任务都会被执行。以下是runAllTasks()方法的详细剖析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
//执行队列中所有的task
// io.netty.util.concurrent.SingleThreadEventExecutor#runAllTasks()
protected boolean runAllTasks() {
assert inEventLoop();
boolean fetchedAll;
boolean ranAtLeastOne = false;

do {
// 从定时任务队列中取出所有满足条件的任务,并将它们加入到taskQueue中。
fetchedAll = fetchFromScheduledTaskQueue();
// 执行所有task队列中的任务
if (runAllTasksFrom(taskQueue)) {
ranAtLeastOne = true;
}
} while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.

if (ranAtLeastOne) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
}
//执行taskTail队列中的任务。
afterRunningAllTasks();
return ranAtLeastOne;
}

// 一个简单的“近实时”的延时队列的实现。
//io.netty.util.concurrent.SingleThreadEventExecutor#fetchFromScheduledTaskQueue
private boolean fetchFromScheduledTaskQueue() {
if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
return true;
}
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
for (;;) {
// 循环从cheduledTask取出满足条件的task任务。
Runnable scheduledTask = pollScheduledTask(nanoTime);
if (scheduledTask == null) {
return true;
}
// 将task加入到任务队列taskQueue中。
if (!taskQueue.offer(scheduledTask)) {
// No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
// 如果队列满了加入失败,先将其放回scheduledTaskQueue等task队列消费完成后再次加入
scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);
return false;
}
}
}

//io.netty.util.concurrent.AbstractScheduledEventExecutor#pollScheduledTask(long)
protected final Runnable pollScheduledTask(long nanoTime) {
assert inEventLoop();

ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
if (scheduledTask == null || scheduledTask.deadlineNanos() - nanoTime > 0) {
return null;
}
scheduledTaskQueue.remove();
scheduledTask.setConsumed();
return scheduledTask;
}

//io.netty.channel.SingleThreadEventLoop#afterRunningAllTasks
@Override
protected void afterRunningAllTasks() {
// 执行tailTaskQueue中的所有task。
runAllTasksFrom(tailTasks);
}

run()方法中除了runAllTasks()方法还有一个重载方法runAllTasks(long timeoutNanos)这个方法则是执行时间长度为 timeoutNanos 纳秒的runAllTask方法。实现的方法也很简单,每执行64个task检查下时间是否超时,如果没有超时继续执行,否则退出方法。如果runAllTasks(0)则是执行最小单位的runAllTask方法(即64个task)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
//io.netty.util.concurrent.SingleThreadEventExecutor#runAllTasks(long)
protected boolean runAllTasks(long timeoutNanos) {
// 从schedule任务队列中取出任务加入到taskQueue中
fetchFromScheduledTaskQueue();
// 取出第一个任务
Runnable task = pollTask();
if (task == null) {
afterRunningAllTasks();
return false;
}

// 计算得出这批次执行的deadline
final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
// 执行runnable任务即执行run方法。
safeExecute(task);

runTasks ++;

// 因为nanoTime()方法是一个开销较大的方法,所以这里满64个任务才校验一次。
// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
if ((runTasks & 0x3F) == 0) {
// 执行满64个校验当前是否超过deadline
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
//如果是,退出循环执行后续逻辑。
break;
}
}
// 否则取出下一个任务继续循环执行。
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}

//执行taskTail队列中的任务。
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}

承上启下的task队列

以上就是eventLoop启动与task队列的源码剖析,而结合我们前面的initAndRegister部分的源码分析,我们知道我们将registeraddServerBootstrapAcceptorbind操作放入了task队列中。也就是下面这些代码片段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// register
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}

// addServerBootstrapAcceptor
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});

在initAndRegister的部分,taskQueue中有以下的几个任务,并且这几个任务之间是有明确的先后顺序的。其中register部分是main线程加进去的,其余的都是由eventLoop线程加入到taskQueue中。

那么main线程是在什么时候退出的呢?main线程在执行完bind(PORT)方法之后,就阻塞在下面这段EchoServer中的这段f.channel().closeFuture().sync();代码上。后续的bindactive等操作都将以eventLoop的task的方式执行。

bind(端口绑定)和active操作源码剖析

上面我们分析了整个启动的流程是通过evetLoop的Task的方式执行,而registeraddAcceptorbind等操作都是作为一个个task任务加入到taskQueue中,最后在eventLoop的run方法中调用runAllTasks方法执行这些task任务。在前面的initAndRegister的解析中,我们分析到register方法返回了一个promise,我们在promise添加了一个Listener(监听器),监听器加入的是bind端口绑定的逻辑,并且这段逻辑是通过eventLoop的Task方式执行的。

bind端口绑定操作源码剖析

接下来,我们继续分析bind端口绑定操作的源码。整个流程前面pipeline.channelRegistered()是类似的,只不过channelRegistered()是inboundHandler里面的方法,而bind()是outBoundHandler里的方法。以下是bind部分的源码剖析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
//io.netty.bootstrap.AbstractBootstrap#doBind0
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {

// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
//通过eventLoop的task任务方式异步执行。
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
// 注册成功执行。
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}

//io.netty.channel.AbstractChannel#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise)
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
// 调用pipeline的bind方法,其实绑定端口方法是在channelHandler上的。
return pipeline.bind(localAddress, promise);
}

//io.netty.channel.DefaultChannelPipeline#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise)
@Override
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
// bind方法是从pipeline的handler链的尾开始向头执行的。
// 这里准确的描述应该是outBoundHandler都是从尾向头执行的,bind方法是outBound的方法。
return tail.bind(localAddress, promise);
}

//io.netty.channel.AbstractChannelHandlerContext#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise)
@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
ObjectUtil.checkNotNull(localAddress, "localAddress");
if (isNotValidPromise(promise, false)) {
// cancelled
return promise;
}

// 找到outboundHandler中包含bind方法的handler下一个handler即LoggingHandler
final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
EventExecutor executor = next.executor();
//调用bind方法
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
}
}, promise, null, false);
}
return promise;
}

//io.netty.channel.AbstractChannelHandlerContext#invokeBind
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
//调用绑定方法,此时的handler是LoggingHandler。
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
bind(localAddress, promise);
}
}

//io.netty.handler.logging.LoggingHandler#bind
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
if (logger.isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, "BIND", localAddress));
}
// 继续调用前一个handler的bind方法
ctx.bind(localAddress, promise);
}

// 我们又回到了bind方法,找到前一个handler并调用
//io.netty.channel.AbstractChannelHandlerContext#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise)
@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
ObjectUtil.checkNotNull(localAddress, "localAddress");
if (isNotValidPromise(promise, false)) {
// cancelled
return promise;
}

// 找到outboundHandler中包含bind方法的handler下一个handler即HeadContext头Handler
final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
EventExecutor executor = next.executor();
//调用bind方法
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
}
}, promise, null, false);
}
return promise;
}

//io.netty.channel.DefaultChannelPipeline.HeadContext#bind
@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
// 调用unsafe的bind方法,
unsafe.bind(localAddress, promise);
}

其实,我们的bind操作和channelRegistered操作基本是类似的,只不过bind方法是outBoundHandler中的方法,outBoundHandler的方法不再想inBoundHandler里的方法一样从头到尾节点依次执行handler的方法,outBoundHandler执行顺序反过来,从尾节点开始向前遍历到头节点依次执行对应的handler方法。所以上面这段代码,从调用pipeline.bind(localAddress, promise);方法开始,从后向前依次调用outboundHandler的bind方法。以上的代码逻辑就是这个调用过程,方法调用最后的bind方法就是头节点即HeadContext中的额bind方法,这个方法里面的bind操作是调用unsafe方法,也就是我们的channel与端口的绑定,以及后续讲active操作添加到eventLoop的Task队列中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
//io.netty.channel.AbstractChannel.AbstractUnsafe#bind
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();

// 确保当前promise不可取消并且当前channel还没有open
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}

// See: https://github.com/netty/netty/issues/576
if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
!PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
// Warn a user about the fact that a non-root user can't receive a
// broadcast packet on *nix if the socket is bound on non-wildcard address.
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}

boolean wasActive = isActive();
try {
// 绑定操作
doBind(localAddress);
} catch (Throwable t) {
// 如果捕捉到异常,promise设置失败
safeSetFailure(promise, t);
closeIfClosed();
return;
}

if (!wasActive && isActive()) {
// 如果当前调用active操作,将pipeline.fireChannelActive();加入到eventLoop的task队列中
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}

// promise设置调用成功
safeSetSuccess(promise);
}

//io.netty.channel.socket.nio.NioServerSocketChannel#doBind
@SuppressJava6Requirement(reason = "Usage guarded by java version check")
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
// channel与端口绑定
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}

active部分方法调用源码剖析

active部分并不是需要激活什么,本质上是调用pipelineChannelHandler的channelActive(ctx)方法,channelActivechannelRegistered一样都是inboundHandler里面的方法,因此调用过程和channelRegistered的过程一致。在前面的bind端口绑定部分的解析,我们看到bind方法内通过调用invokeLater方法,将pipeline.fireChannelActive();加入到eventLoop的task队列中。因此我们从pipeline.fireChannelActive();方法开始继续我们的源码剖析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
// io.netty.channel.DefaultChannelPipeline#fireChannelActive
@Override
public final ChannelPipeline fireChannelActive() {
// 对比bind方法,这里我们可以发现inbound方法都是从head开始执行的。
AbstractChannelHandlerContext.invokeChannelActive(head);
return this;
}

// io.netty.channel.AbstractChannelHandlerContext#invokeChannelActive(io.netty.channel.AbstractChannelHandlerContext)
static void invokeChannelActive(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
// 直接执行invokeChannelActive方法。
next.invokeChannelActive();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelActive();
}
});
}
}

// 这个方法和前面的invokeBind和invokeChannelRegistered方法类似。
// io.netty.channel.AbstractChannelHandlerContext#invokeChannelActive()
private void invokeChannelActive() {
if (invokeHandler()) {
try {
// 调用目标handler的channelActive,这里的对象是HeadContext也就是headHandler
((ChannelInboundHandler) handler()).channelActive(this);
} catch (Throwable t) {
invokeExceptionCaught(t);
}
} else {
fireChannelActive();
}
}

// io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive
@Override
public void channelActive(ChannelHandlerContext ctx) {
//触发调用下一个handler的channelActive的方法。
ctx.fireChannelActive();
// 开启监听连接事件
readIfIsAutoRead();
}

// 找寻下一个channelActive的inboundHandler
//io.netty.channel.AbstractChannelHandlerContext#fireChannelActive
@Override
public ChannelHandlerContext fireChannelActive() {
invokeChannelActive(findContextInbound(MASK_CHANNEL_ACTIVE));
return this;
}


// io.netty.channel.AbstractChannelHandlerContext#invokeChannelActive()
private void invokeChannelActive() {
if (invokeHandler()) {
try {
// 此时的Handler对象是LoggingHandler对象
((ChannelInboundHandler) handler()).channelActive(this);
} catch (Throwable t) {
invokeExceptionCaught(t);
}
} else {
fireChannelActive();
}
}

//io.netty.handler.logging.LoggingHandler#channelActive
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
if (logger.isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, "ACTIVE"));
}
ctx.fireChannelActive();
}

结合上面分析的channelActive、bind和channelRegistered方法的调用,大体上都是相同的。我们再次结合复习下pipelineChannelHandler中的调用过程。

上面的图简单的表述了这个调用的代码实现过程,本质上channelActivebindchannelRegistered的调用过程本质上都是一样的, 唯一不同的点就是channelActive和channelRegistered是inboundHandler的方法,所以这两个方法的调用是从头向后依次向后执行即从headContext向tailContext执行。而bind方法是outboundHandler的方法,所以是执行顺序是反过来的,也就是从tailContext向前执行,即下面这张图的逻辑。

在完成了pipelineChannelHandler的channleActive的调用之后,Netty将开始启动过程的最后一步,对ACCEPT时间的监听。即调用readIfIsAutoRead();设置interestOpsOP_ACCEPT

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
// io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive
@Override
public void channelActive(ChannelHandlerContext ctx) {
//触发调用下一个handler的channelActive的方法。
ctx.fireChannelActive();
// 开启监听连接事件
readIfIsAutoRead();
}

//io.netty.channel.DefaultChannelPipeline.HeadContext#readIfIsAutoRead
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
// 开启读
channel.read();
}
}

//io.netty.channel.AbstractChannel#read
@Override
public Channel read() {
// 调用pipeline的read方法
pipeline.read();
return this;
}

//io.netty.channel.DefaultChannelPipeline#read
@Override
public final ChannelPipeline read() {
// read方法是outBoundHandler的方法, 从尾向头执行,因此从tail开始。
tail.read();
return this;
}

//io.netty.channel.DefaultChannelPipeline.HeadContext#read
@Override
public void read(ChannelHandlerContext ctx) {
// headContext的read方法是调用链中最后一个执行方法
unsafe.beginRead();
}

//io.netty.channel.AbstractChannel.AbstractUnsafe#beginRead
@Override
public final void beginRead() {
assertEventLoop();

try {
// 实际执行的read方法。
doBeginRead();
} catch (final Exception e) {
// 如果出现异常,使用eventLoop的task方式执行
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireExceptionCaught(e);
}
});
close(voidPromise());
}
}

// io.netty.channel.nio.AbstractNioChannel#doBeginRead
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}

readPending = true;
// 当前Channel的实例是NioServerSocketChannel,
//所以这里selectionKey.interestOps() 为 SelectionKey.OP_ACCEPT
final int interestOps = selectionKey.interestOps();
// readInterestOp的值为0, 0&n = 0
if ((interestOps & readInterestOp) == 0) {
// 0|n = n, 设置interestOps为SelectionKey.OP_ACCEPT,即设置监听事件为Accept
selectionKey.interestOps(interestOps | readInterestOp);
}
}

// OP_ACCEPT 的值为 16
public static final int OP_ACCEPT = 1 << 4;

完成上面代码的调用,Netty的服务也基本上启动完成即完成了服务的初始化,完成了服务基础初始化,将channelHandler加入到pipeline中,并调用handler的channelRegistered,bind,channelActive和read方法等。通过这些方法的调用,完成了channel的注册,端口的绑定,channelActive的调用和最后对Channel的Accpet事件的监听,整个过程中都依赖EventLoop的Task进行任务的调度执行。在上面这个调用过程中,我发现一个有意思的代码设计即ctx.executeMask的设计逻辑,这也是我们熟悉的二进制标识的应用。以下是这个部分的详细代码剖析。

一个小细节 ctx.executeMask

在看源码过程中,我翻了一些资料和一些博客,在之前的pipelineChannelHandler的版本中,都有标识当前handler是inBoundHandler还是outBoundHandler,但是在我看的这版本源码中并有有关的变量,那它是怎么标识,当前handler是inBound还是outBoundHandler的呢?在一条调用链上,当前handler是否包含了当前的方法呢?答案就是executeMask。但是我debug源码的时候,executeMask的值只是一个普通的常数啊,随着对源码的深入,我发现executMask使用二进制的方式标识重载方法,这又是一个使用二进制位标识状态的应用。我们一起深入源码看看,它是怎么玩的吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//io.netty.channel.AbstractChannelHandlerContext#skipContext
private static boolean skipContext(
AbstractChannelHandlerContext ctx, EventExecutor currentExecutor, int mask, int onlyMask) {
// Ensure we correctly handle MASK_EXCEPTION_CAUGHT which is not included in the MASK_EXCEPTION_CAUGHT

// 因为每个executionMask上标识了当前handler可以执行的方法,通过和mask比较的结果
// 即可知道当前handler是否是合适的可以执行的handler
return (ctx.executionMask & (onlyMask | mask)) == 0 ||
// We can only skip if the EventExecutor is the same as otherwise we need to ensure we offload
// everything to preserve ordering.
//
// See https://github.com/netty/netty/issues/10067
(ctx.executor() == currentExecutor && (ctx.executionMask & mask) == 0);
}

以上是在寻找下个合适的Handler时,使用executeMask进行handler的过滤,二进制的方式进行判断效率很高。下面这段代码则是构建ChannelContextHandler时对executeMask进行赋值的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
// io.netty.channel.AbstractChannelHandlerContext#AbstractChannelHandlerContext
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,
String name, Class<? extends ChannelHandler> handlerClass) {
this.name = ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;
this.executor = executor;
this.executionMask = mask(handlerClass);
// Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
ordered = executor == null || executor instanceof OrderedEventExecutor;
}


/**
* Return the {@code executionMask}.
*/
//io.netty.channel.ChannelHandlerMask#mask
static int mask(Class<? extends ChannelHandler> clazz) {
// Try to obtain the mask from the cache first. If this fails calculate it and put it in the cache for fast
// lookup in the future.
// 加一层缓存和生成executeMask的逻辑
Map<Class<? extends ChannelHandler>, Integer> cache = MASKS.get();
Integer mask = cache.get(clazz);
if (mask == null) {
mask = mask0(clazz);
cache.put(clazz, mask);
}
return mask;
}

//io.netty.channel.ChannelHandlerMask#mask0
private static int mask0(Class<? extends ChannelHandler> handlerType) {
int mask = MASK_EXCEPTION_CAUGHT;
try {
// 使用反射来判断当前类中是否包含某个方法,如果包含则标识对应的标识位。
if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {
mask |= MASK_ALL_INBOUND;

if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_REGISTERED;
}
if (isSkippable(handlerType, "channelUnregistered", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_UNREGISTERED;
}
if (isSkippable(handlerType, "channelActive", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_ACTIVE;
}
if (isSkippable(handlerType, "channelInactive", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_INACTIVE;
}
if (isSkippable(handlerType, "channelRead", ChannelHandlerContext.class, Object.class)) {
mask &= ~MASK_CHANNEL_READ;
}
if (isSkippable(handlerType, "channelReadComplete", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_READ_COMPLETE;
}
if (isSkippable(handlerType, "channelWritabilityChanged", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_WRITABILITY_CHANGED;
}
if (isSkippable(handlerType, "userEventTriggered", ChannelHandlerContext.class, Object.class)) {
mask &= ~MASK_USER_EVENT_TRIGGERED;
}
}

if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) {
mask |= MASK_ALL_OUTBOUND;

if (isSkippable(handlerType, "bind", ChannelHandlerContext.class,
SocketAddress.class, ChannelPromise.class)) {
mask &= ~MASK_BIND;
}
if (isSkippable(handlerType, "connect", ChannelHandlerContext.class, SocketAddress.class,
SocketAddress.class, ChannelPromise.class)) {
mask &= ~MASK_CONNECT;
}
if (isSkippable(handlerType, "disconnect", ChannelHandlerContext.class, ChannelPromise.class)) {
mask &= ~MASK_DISCONNECT;
}
if (isSkippable(handlerType, "close", ChannelHandlerContext.class, ChannelPromise.class)) {
mask &= ~MASK_CLOSE;
}
if (isSkippable(handlerType, "deregister", ChannelHandlerContext.class, ChannelPromise.class)) {
mask &= ~MASK_DEREGISTER;
}
if (isSkippable(handlerType, "read", ChannelHandlerContext.class)) {
mask &= ~MASK_READ;
}
if (isSkippable(handlerType, "write", ChannelHandlerContext.class,
Object.class, ChannelPromise.class)) {
mask &= ~MASK_WRITE;
}
if (isSkippable(handlerType, "flush", ChannelHandlerContext.class)) {
mask &= ~MASK_FLUSH;
}
}

if (isSkippable(handlerType, "exceptionCaught", ChannelHandlerContext.class, Throwable.class)) {
mask &= ~MASK_EXCEPTION_CAUGHT;
}
} catch (Exception e) {
// Should never reach here.
PlatformDependent.throwException(e);
}

return mask;
}

// io.netty.channel.ChannelHandlerMask#isSkippable
private static boolean isSkippable(
final Class<?> handlerType, final String methodName, final Class<?>... paramTypes) throws Exception {
return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {
@Override
public Boolean run() throws Exception {
Method m;
try {
m = handlerType.getMethod(methodName, paramTypes);
} catch (NoSuchMethodException e) {
if (logger.isDebugEnabled()) {
logger.debug(
"Class {} missing method {}, assume we can not skip execution", handlerType, methodName, e);
}
return false;
}
return m.isAnnotationPresent(Skip.class);
}
});
}

从上面的源码代码里面可以看到,netty也是用了比较”笨”的方式去生成了executeMask,生成的主要性能开销集中在启动阶段并且添加了一个简单的缓存,所以整体的性能还是很可观的。使用二进制标识多个状态的示例还是很多的例如对象头中的对象状态,有赞权限系统的权限标识等。这里的executeMask也是一个优秀的实现案例。

总结

这一小节,我们深入剖析了Netty的启动源码,我们从NIO服务的基本设计思路入手,分析了组成服务的一些基本要素。随后我们从ServerBootstrap开始,在创建ServerBootstrap阶段配置一些如eventLoop、handler等核心的参数。在最后的bind方法即绑定端口的方法开始真正的服务启动操作,启动的过程主要分为 initAndRegister初始化、bind端口绑定和active操作。其中,initAndRegister主要包括服务初始化、将handler加入到pipeline中、调用pipelineChannelHander的channelRegistered。bind端口绑定即将channel绑定相应的端口,最后就是active操作,active操作包括调用pipelineChannelHandler的channelActive方法和调用read方法完成OP_ACCEPT方法的监听。这些操作都是通过eventLoop的task方式实现。

在分析源码之前我们提出了下面两个问题,我并没有直接给出答案。看到这里,你是否已经有了答案呢?

NioServerSocketChannel的Handle管道DefaultChannelPipeline是如何添加Handler并触发各种事件的?

在服务启动的过程中,register、bind、active操作是通过EventLoopTask的形式执行的,那他们是如何保证有序的?

本来这一小节准备是准备I/O就绪操作一起梳理掉,但是这一小节洋洋洒洒已经写了这么多🤣,而且源码这种需要结合代码反复的理解才能消化吃透。因此我就准备把I/O就绪处理放到下一小节,下一小节,我们将开始对连接监听读写操作进行深入剖析。

学习资料