Netty流程剖析 — I/O就绪操作源码剖析

前言

前面一篇我们梳理了Netty服务的启动流程,这一小节,我们开始分析I/O就绪操作,I/O就绪操作大致可以划分为三个大的部分,第一个部分是OP_ACCEPT操作,这个部分我们接着上一小节继续分析,当服务启动之后,客户端向服务端发起连接,服务端是怎么如果处理连接,并且设置监听OP_READ。第二个部分也就是OP_READ操作的上半部分,随着客户端通过建立的连接通道发送数据到服务端,服务端又是怎么接受数据并进行层层处理。第三个部分是OP_READ的下半部分,也就是最后服务端通过write方法将响应的消息发送到客户端。完成这几个部分的代码剖析也就基本完成了,也就基本完成了Netty的NIO热点代码分析。在阅读这几个部分的源码,我们还会看到前面我们剖析的几个核心组件。在上一小节,我们加深了对eventLoop和channel的理解,这一小节我们看看channel和bytebuf之间配合进行数据读写。

I/O就绪事件处理之OP_ACCEPT

通过前面的学习,深入剖析了Netty的启动过程,以及Netty采用辅助类ServerBootstrap启动,通过eventloop线程,依次开启initAndRegister开启Selector,并将ServerSocketChannel注册到Selector上,绑定端口监听,最后设置监听OP_ACCEPT事件。

OP_ACCEPT处理就绪概述

在这个部分的开始我们先看OP_ACCEPT事件处理的时序图。

从上面这个时序图我们前面启动逻辑很相似,通过调用pipeline中的方法来实现的。首先是通过processSelectedKey解析出这是一个OP_ACCEPT事件,调用unsafe.read方法,真正调用的是pipeline.channelRead方法,channelRead是inboundHandler的方法。在调用到serverBootstrapAcceptor时候,将作为消息msg传递的NioSocketChannel绑定到childGroup上eventLoop上并完成channelRegister和channelActive方法的调用。NioSocketChannel在read方法中创建NioSocketChannel监听OP_READ操作。

主要操作分为下面三步:

  1. 当eventLoop中的多路复用器Selector轮询到就绪的SelectionKey时,判断Key的readOps类型是否是OP_ACCEPT,如果是,那么Key的attachment就是NioServerSocketChannel,先获取SelectionKey的attachment对象,再触发此对象的辅助类Unsafe的实现类NioMessageUnsafe的read()方法进行处理。

  2. 在NioMessageUnsafe的read()方法中会执行doReadMessages。真正调用的是AbstractNioMessageChannel的子类NioServerSocketChanneldoReadMessage()方法。此方法最终调用ServerSocketChannelaccept()方法,以获取接入的SocketChannel。在获取到SocketChannel后,构建NioSocketChannel,并把构造好的NioSocketChannel作为消息msg传送给Handler(ServerBootstrapAcceptor),触发pipeline的fireChannelRead,进而触发read事件。最后会调用Handler的channelRead()方法。

  3. 在ServerBootstrapAcceptor的channelRead()方法中,把NioSocketChannel注册到Worker线程上,同时绑定channel的handler链。

OP_ACCEPT事件处理源码剖析

上面的部分我们简单的梳理了OP_ACCEPT事件的处理流程,但是这个过程如果不结合源码一起分析的话还是有一种云里雾里的感觉。所以我们结合源码深入分析OP_ACCEPT事件处理的逻辑,从代码层面深入剖析netty如何完成上面的三步操作。

熟悉的pocessSelectedKeys

在nioEventLoop的源码分析中,我们提到如果有当Selector轮训到就绪的SelectionKey时,会从nioEventLoop的run方法中的select(curDeadlineNanos)返回,返回的值是当前就绪key的个数,随后执行调用processSelectedKeys()方法,处理就绪的SelectionKey,我们的源码剖析也就从这里开始。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
//io.netty.channel.nio.NioEventLoop#processSelectedKeys
private void processSelectedKeys() {
//判断优化后的selectedKeys是否为空
if (selectedKeys != null) {
// 优化处理
processSelectedKeysOptimized();
} else {
// 原始处理
processSelectedKeysPlain(selector.selectedKeys());
}
}

//io.netty.channel.nio.NioEventLoop#processSelectedKeysOptimized
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
// 将selectedKeys.keys[i]设置为null,并被JVM快速回收。
selectedKeys.keys[i] = null;

final Object a = k.attachment();

if (a instanceof AbstractNioChannel) {
//根据Key的就绪事件触发对应的事件方法。
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}

if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1);

selectAgain();
i = -1;
}
}
}

//io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel)
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
//如果当前selectionKey无效,直接关闭unsafe。
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
}
// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
if (eventLoop == this) {
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
}
return;
}

try {
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
// 处理连接事件
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);

unsafe.finishConnect();
}

// 处理写事件
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}

// 处理读事件和ACCEPT事件
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}

上面这一段代码的剖析,我们在NioEventLoop中分析过,这里就简单提一下。我们要关注OP_ACCEPT的就绪事件,所以在processSelectedKey(SelectionKey k, AbstractNioChannel ch)方法中的分支判断中会调用unsafe.read()方法。

AbstractNioMessageChannel中read方法的实现

这里有一个问题,是关于此时方法入参AbstractNioChannel ch的具体实现类,abstractNioChannel有两个子抽象类,一个是AbstractNioMessageChannel另一个是AbstractNioByteChannel,AbstractNioMessageChannel是NioServerSocketChannel的父类主要处理连接操作,AbstractNioByteChannel是NioSocketChannel的父类主要处理数据的读写。我们当前关注OP_ACCEPT的就绪事件,所以 方法传递进来的AbstractNioChannelAbstractNioMessageChannel实例,所以unsafe.read()方法的具体方法就是AbstractNioMessageChannel#unsafe的read方法,也就是下面这段代码(这段代码我们在核心组件—Channel中也有详细的分析)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
// io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read
@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
// 当前的实例对象是NioServerSocketChannel,也就是当前eventLoop还是bossGroup的eventLoop
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);

boolean closed = false;
Throwable exception = null;
try {
try {
do {
// 从channel中读出消息,具体实现将accpect的channel包成NioSocketChannel
int localRead = doReadMessages(readBuf);
// 无数据直接跳出循环
if (localRead == 0) {
break;
}
// 链路关闭跳出循环
if (localRead < 0) {
closed = true;
break;
}
// 记录成功读的次数
allocHandle.incMessagesRead(localRead);
// 默认读次数不超过16次
} while (continueReading(allocHandle));
} catch (Throwable t) {
exception = t;
}

int size = readBuf.size();
// 循环处理数据
for (int i = 0; i < size; i ++) {
readPending = false;
// 将创建的nioSockChannel作为消息(msg)调用NioServerSocketChannel#pipeline的fireChannelRead方法。
pipeline.fireChannelRead(readBuf.get(i));
}
// 读完成,清空readBuffer
readBuf.clear();
allocHandle.readComplete();
// 调用pipeline的readComplete方法。
pipeline.fireChannelReadComplete();

if (exception != null) {
// 处理异常关闭
closed = closeOnReadError(exception);
// 调用pipeline的exceptionCaught方法
pipeline.fireExceptionCaught(exception);
}

if (closed) {
// 正常关闭。
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}

//io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
// 从acceptor中获取socketChannel
SocketChannel ch = SocketUtils.accept(javaChannel());

try {
if (ch != null) {
// 将socketChannel包装成NioSocketChannel并放入到buf中
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);

try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}

return 0;
}

以上的部分源码是unsafe.read()方法的具体调用,这里需要注意的第一个点是channel具体实现类是AbstractNioMessageChannel的子类,即NioServerSocketChannel,这和我们OP_READ就绪事件处理的实现类不是同一个类。这段方法的主要逻辑是将accept的channel包装成NioSocketChannel,并且作为参数调用ServerNioSocketChannel的pipelineHander的channelRead方法。也就是ServerChannelhandler即在服务启动时添加LoggingHandlerServerBootstrapAcceptor加入到NioSocketChannelPipeline中。

NioSocketChannel的注册

pipeline.fireChannelRead()是inbound的方法,所以fireChannelRead方法会从沿着handler链从前往后依次调用handler的ChannelRead方法,同时NioSocketChannel将作为入参传递。以下是LoggingHandlerServerBootstrapAcceptor的channelRead方法的源码剖析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
//io.netty.handler.logging.LoggingHandler#channelRead
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (logger.isEnabled(internalLevel)) {
// 将msg即nioSocketChannel对象日志输出
logger.log(internalLevel, format(ctx, "READ", msg));
}
// 调用下一个handler的channelRead方法,即ServerBootstrapAcceptor#channelRead
ctx.fireChannelRead(msg);
}

//io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 这个方法里面就有一些关键的东西了
final Channel child = (Channel) msg;
// 将我们在ServerBootstrap创建阶段childHandler加入到nioSocketChannel的pipeline中。
child.pipeline().addLast(childHandler);

setChannelOptions(child, childOptions, logger);
setAttributes(child, childAttrs);

try {
// 将childChannel注册到childGroup也就是workerEventLoopGroup上
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}

通过ServerBootstrapAcceptor#read方法调用,netty完成当前nioSocketChannel的初始化并将NioSocketChannel注册到workGroup上。这个过程可以理解为NioSocketChannel也就是我们实际执行读写的channel的“initAndRegister”。整个注册register的过程和前面NioServerSocketChannel的过程是类似的,复用的也是同一套register0代码。接下来,我们从NioSocketChannel的角度再来分析下这段代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
// io.netty.channel.AbstractChannel.AbstractUnsafe#register
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
ObjectUtil.checkNotNull(eventLoop, "eventLoop");
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}

// 此时的eventLoop则是workerGroup的eventLoop
AbstractChannel.this.eventLoop = eventLoop;

// 当前调用的线程是bossGroup的线程,因此这里走false分支
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
// 启动workerGroup线程继续完车register0的逻辑
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}

//io.netty.channel.AbstractChannel.AbstractUnsafe#register0
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
// 将channel注册到eventLoop的selector上设置selectionKey
doRegister();
neverRegistered = false;
registered = true;

// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
// 将childHandler加入到nioSocketChannel中
pipeline.invokeHandlerAddedIfNeeded();

// promise设置成功
safeSetSuccess(promise);
// 调用pipelineChannelHandler的channelRegister方法。
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
// 当前chennal已经被激活
if (isActive()) {
if (firstRegistration) {
调用pipelineChannelHandler的channelActive方法
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}

这一段的逻辑大体上可以分为四个部分分别是将channel注册到selector上初始化pipeline(将channelHandler加入到pipeline中)调用pipelineChanelHandler的channelRegister方法调用pipelineChannelHandler的channelActive方法。这段逻辑和服务启动时注册即NioServerSocketChannel调用register的逻辑共用同一段代码。但是在服务启动阶段,channel并没有被激活而在此时的调用中,channel已经被激活所以直接调用channelActive,开启对OP_READ事件的监听。

OP_ACCEPT就绪操作逻辑小结

这个部分是我自己的一些看法和见解,也是我在看源码之后对OP_ACCEPT就绪操作的一些新的认识。在没有深入看OP_ACCEPT部分的源码之前,对于OP_ACCEPT就绪操作不是很理解,Netty的主从Reactor结构的确是可以很大程度上提高处理的速度,但是代码层面到底做了些什么操作是不清楚的。我一直以为在每次进行读写的时候都会初始化childChannel,也就是NioSocketChannel。因为每次debug的时候都会执行serveBootstrap中childHandler中的初始化逻辑。但是现在看来这些理解都是错误的。OP_ACCEPT是BossGroup的eventLoop来操作的,在这个过程中处理的逻辑很简单就是拿到socketChannel包装成NioSocketChannel,然后通过ServerBootstrapAcceptor将channel注册到workerGroup的eventLoop上的selector上监听OP_READ就绪事件。也就是后续的读写操作都会直接和workerGroup的eventLoop进行交互,完成后续操作。现在在看下面这个图是不是更清楚了。

拿一个餐厅服务员招待顾客举例子,一个食客来到一家餐厅就餐,一开始会有一个餐厅经理老王在外面叫号,当老王叫到你的号之后,他会给你安排好座位和餐具等,并把你交给另外一名服务员小二,告诉你后续的点餐上餐投诉反馈等需求你直接和服务员小二沟通就好了。

这个过程其实就是Netty的OP_ACCEPT的操作过程。其中餐厅经理老王就是bossGroup线程组中的eventLoop,当连接来了之后,他会初始化nioSocketChannel,当前socket注册到workerGroup上,并监听OP_READ。后续的读写操作,直接由WorkerGroup的eventLoop线程进行处理,而bossGroup的eventLoop则继续处理下一个链接,这样极大的提高了netty处理连接的处理速度。

一个小细节- interestOps与attachment

在前面的代码分析过程中,我有一个点很迷惑,在channel注册到Selector上时候,设置的ops也就是interestOps是0,不论是netty服务启动时NioServerSocketChannel的register调用,还是NioSocketChannel的register的调用传入的interestOps都是0,也就是下面这段代码。这是不是不对啊。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// io.netty.channel.nio.AbstractNioChannel#doRegister
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}

后来我发现,在调用register方法还有传了一个att也就是this作为入参,也就是我们包装的abstractNioChannel。当有操作就绪的时候,代码判断的不是注册时的interestOps,我们注册的时候本来传入的就都是0,也无法判断。最后在开启读操作中,设置当前selectionKey的interestOps的值和NioServerSocketChannel中的readInterestOp一致。这样也就开启了读。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//io.netty.channel.nio.AbstractNioChannel#doBeginRead
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}

readPending = true;

final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}

我还有一个疑惑的点就是在processKey的过程中,是如何确定对应的channel的呢?也就是这个方法签名中processSelectedKey(SelectionKey k, AbstractNioChannel ch)的ch是怎么传递过来的?通过源码我看到是通过attachment的方式实现,在register阶段,将netty包装的abstractNioChannel作为attachment进行注册,也就是这个入参中的thisregister(eventLoop().unwrappedSelector(), 0, this)这样在下次命中当前selectionKey就可以直接拿到对应的channel了。Selector的结构如下图:

I/O就绪事件处之OP_READ(—)

前面我们梳理了就绪事件OP_ACCEPT操作,了解了bossGroup线程组中NioEventLoop线程处理Socket链路接入的整个过程。在分析完链路接入部分的逻辑,我们来分析I/O事件读写的逻辑。当然读写的逻辑也是可以分开的,从workerGroup的eventLoop处理processSelectedKeys的OP_READ事件开始,到我们自定义逻辑的inBoundHandler的channelRread方法结束作为我们OP_READ的第一部分。从EchoServerHandler中的channelRead方法开始到将数据从server端把数据发送出去作为第二个部分。我们先来探究第一个部分,一起来看看workerGroup线程组NioEventLoop线程是如何读取Socket链路传过来的数据。

OP_READ事件读操作概述

和前面OP_ACCEPT就绪事件的剖析一样,先来看看读数据这部分操作的时序图。

这个时序图和OP_ACCEPT的非常相似。因为都是基于Netty的模型的pipeline的处理器链结构。通过fireChannelXXX的方式去调用下一个Handler,每个handler处理的对应的一些逻辑。其中在整个处理过程中传递方法参数msg是从channel中读到的数据。在读写操作时,我们都会加上一些自定义的编解码器。在Netty中,我们解码需要继承MessageToByteEncoder,然后调用decode方法执行不同的子类方法来实现具体的编解码逻辑。在下面的源码剖析部分,我们也会重点看看这部分的源码。

OP_READ事件读操作源码剖析

这个部分我们开始分析OP_READ事件读部分的源码。从上面的processSelectedKeysh中发现,OP_READOP_ACCEPT都会走到一段代码逻辑,即unsafe.read()方法,但是OP_ACCEPT和OP_READ操作的具体实例是不一样的。OP_READ的unsafe.read()的操作实例AbstractNioByteChannel。所以这个部分的源码剖析也就从这里开始。

unsafe.read()的另一副面孔AbstractNioByteChannel

abstractNioByteChannel之前我们剖析过,他的父类AbstractNioChannel拥有channel的注册、连接等功能,但把数据读写功能交给了其子类,AbstractNioMessageChannel前面我们讨论过,主要处理OP_ACCEPT连接事件处理其处理的msg是NioSocketChannel。我们这里要讨论的AbstractNioByteChannel是处理传输的实际数据,这里要处理的msg则是byteBuf。以下则是源码的深入分析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
//io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read
@Override
public final void read() {
//获取pipeline通道配置、channel管道
final ChannelConfig config = config();
//socketChannel已经关闭
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
// 获取内容分配器,默认为PooledByteBufAllocator
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
//清空上一次读取的字节数,每次读取时均重新计算
//字节Buf分配器,并计算字节buf分配器 Handler
allocHandle.reset(config);

ByteBuf byteBuf = null;
boolean close = false;
try {
do {
// 分配内存
byteBuf = allocHandle.allocate(allocator);
// 读取通道接收缓冲区的数据
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read. release the buffer.
// 若没有数据可以读,则释放内存
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
//当读到-1时,表示Channel通道已经关闭,没有必要再继续读。
// There is nothing left to read as we received an EOF.
readPending = false;
}
break;
}
// 更新读取消息计数器
allocHandle.incMessagesRead(1);
readPending = false;
//通知通道处理器处理数据,触发Channel通道的fireChannelRead事件
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
// 读取操作完毕
allocHandle.readComplete();
pipeline.fireChannelReadComplete();

if (close) {
// 如果Socket通道关闭,则关闭读操作
closeOnRead(pipeline);
}
} catch (Throwable t) {
// 处理读异常
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
// 若读操作完毕,且没有配置自动读,则选择Key兴趣集中移除读操作事件。
removeReadOp();
}
}
}

这段逻辑和AbstractNioMessageChannel中的read方法不同,这个实例中处理的是读写数据。大体的逻辑就分配内存读取数据调用pipelineChannel中handler的channelRead方法。在这几个操作循环结束后调用pipeline中channelReadComplete方法。所以我们接下来深入fireChannelRead方法,看看我们经常用到的编解码操作是如何被处理的。

ByteToMessageDecoder 解码操作

如果要使用编解码操作,我们需要继承ByteToMessageDecoderEchoDecoder,并实现其decode方法。因此我们在EchoServer中childHandler的ChannelInitializer中pipeline的添加我们创建的实现类EchoDecoder。也就是下面这个代码片段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
// 添加编码器
p.addLast(new EchoEncoder());
// 添加解码器
p.addLast(new EchoDecoder());
p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});

我们接着上面AbstractNioByteChannel中的fireChannelRead逻辑继续往下走。依照我们前面配置的逻辑,nioSocketChannel的pipeline中应该包括EchoDecoderLoggingHandlerEchoServerHandler这几个业务伤的Handler,当然handler调用链还包括头节点headContext和尾节点tailContext。按照前面我们分析fireChannelXXX的逻辑来看,channelRead方法是inboundHandler的方法。所以从前往后执行调用链中inboundHandler的channelRead方法(fireChannelXXX方法的调用链执行逻辑,前面已经分析过了,这里就不深入了🥱)。因此以下是EchoDecoder的父类也就是ByteToMessageDecoder的readChannel方法的源码剖析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
//io.netty.handler.codec.ByteToMessageDecoder#channelRead
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
// 解码后的消息列表
CodecOutputList out = CodecOutputList.newInstance();
try {
first = cumulation == null;
// 判断是否是第一次解码
// 如果是只需把data强转赋给字节容器即可。
// 否则把msg写入cumulation中
cumulation = cumulator.cumulate(ctx.alloc(),
first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);
// 从cumulation字节中解码出消息。
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
try {
// 当字节容器不为空且不可读时,需要释放。
// 并置null,直接回收,将下次解码认为是第一次。
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++numReads >= discardAfterReads) {
// 如果读取的字节数大于或等于discardAfterReads
// 则设置读取字节数为0
// 并移除字节容器中一部读取过的字节
// We did enough reads already try to discard some bytes so we not risk to see a OOME.
// See https://github.com/netty/netty/issues/4275
numReads = 0;
discardSomeReadBytes();
}

int size = out.size();
// fireChannelRead 属性在channelReadComplete() 方法中被调用。
firedChannelRead |= out.insertSinceRecycled();
// 执行下一个handler
fireChannelRead(ctx, out, size);
} finally {
//回收解码消息集合。
out.recycle();
}
}
} else {
// 非ByteBuf消息,此解码器不进行解码
ctx.fireChannelRead(msg);
}
}

// io.netty.handler.codec.ByteToMessageDecoder#callDecode
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
// 循环编解码
while (in.isReadable()) {
// 判断是否已经有可用消息
final int outSize = out.size();

if (outSize > 0) {
// 当前存在可用消息,调用fireChannelRead触发下一个handler处理这些消息
fireChannelRead(ctx, out, outSize);
out.clear();

// Check if this handler was removed before continuing with decoding.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See:
// - https://github.com/netty/netty/issues/4635
// 如果当前上下文已经被移除了,则不能继续操作
if (ctx.isRemoved()) {
break;
}
}
// 获取字节容器的可读字节数
int oldInputLength = in.readableBytes();
// 把消息容byteBuf in读取到对象out中,也就是在这里调用我们的子类方法。
decodeRemovalReentryProtection(ctx, in, out);

// Check if this handler was removed before continuing the loop.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See https://github.com/netty/netty/issues/1664
if (ctx.isRemoved()) {
break;
}
if (out.isEmpty()) {
// 如果当前out中没有数据,说明解码失败,无需继续
if (oldInputLength == in.readableBytes()) {
break;
} else {
continue;
}
}

if (oldInputLength == in.readableBytes()) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
}
// 如果只是进行一次解码,直接退出。
if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Exception cause) {
throw new DecoderException(cause);
}
}

// io.netty.handler.codec.ByteToMessageDecoder#decodeRemovalReentryProtection
final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
throws Exception {
decodeState = STATE_CALLING_CHILD_DECODE;
try {
// 由实现子类完成解码。
decode(ctx, in, out);
} finally {
// 判断channel的处理器是否正在移除。
boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
decodeState = STATE_INIT;
if (removePending) {
fireChannelRead(ctx, out, out.size());
out.clear();
handlerRemoved(ctx);
}
}
}

我们通过上面的分析,很容易发现。编解码也是一个channelRead的方法实现,只是Netty提供了一个ByteToMessageDecoder的模版方法实现,把真正的解码过程流程实现类的decode方法。在完成编解码之后,再依次调用后续的处理器的channelRead方法(将拆到的业务数据传递给后续的handler)。这段代码整体逻辑如下:

  1. channelRead() 方法首先会判断msg是否为ByteBuf类型,只有在是的情况下才会进行解码这就是为什么StringDecoder等MessageToMessageCodec解码器放在ByteToMessageDecoder子类之后的原因,这时的msg一般都是堆外直接内存DirectByteBuf,因为采用堆外直接内存在传输时可以少一次复制。然后判断是否为第一次解码,若是,则直接把msg赋值给cumulation(cumulation是读取半包的容器),如不是,则需要把msg写入cumulation中,写入之前要判断是否需要扩容。

  2. 新读取到的数据写入cumulation之后,调用callDecode()方法。在callDecode()方法中会不断的调用子类的decode()方法,直到当前cumulation无法继续解码。其中,无法继续解码分为两种情况。第一种是无可以读的字节,第二种是经历过decode()方法后,可读字节数没有任何变化。

  3. 执行完callDecode()方法后,进入finally代码块进行收尾工作。若cumulation不为空,且不可读的时,需要把cumulation释放掉并赋空值,若连续16次(discardAfterReads的默认值)字节容器cumulation中仍然有未被业务拆包器读取的数据,则需要进行一次压缩:将有效数据段整体移动到容器的首部,同时用一个成员变量fireChannelRead来标识本次读取数据是否拆到了一个业务数据包,并触发fireChannelRead事件,将拆到的业务数据包传递给后续的handler,最后把out放回对象池中。

OP_READ事件读操作源码逻辑小结

优秀的框架总会不断的强化一个其逻辑模型的特性,在netty的源码探索中,几乎所有的业务处理操作都离不开pipeline中handler调用链的处理。同样在OP_READ事件读操作中,也是通过调用pipeline中handler调用链的形式实现的。当前OP_READ的事件被触发时,调用prcessSelectionKeys中的unsafe.read()方法。但是这次和OP_ACCEPT事件不同。这次的具体的实现类是AbstratNioByteChannel,其侧重的是数据的读写。将数据读取到byteBuf之后,将byteBuf作fireChannelRead方法的入参msg调用pipeline的handler调用链。在实际的开发过程中,我们通常会自定义编解码器,在读这个阶段,需要对消息进行解码。通过继承ByteToMessageDecoder并实现具体decode方法来实现。ByteToMessageDecoder也是一个inboundHandler,它也是拓展channelRead方法来进行逻辑的处理。它的主要工作是将byteBuf的数据转换成对应的List<Object>对象(byteBuf -> Object)。因此如果是后续对数据进一步编码的逻辑(Object -> xxx)应该加在ByteToMessageDecoder的后面,可以减少一次堆外直接内存的复制。执行完当前的Handler之后,通过ctx.fireChannelRead()继续调用后续的Handler即LoggingHandlerEchoServerHandler。在读操作完成后,再调用pipeline.fireChannelReadComplete()它也是InboundHandler方法。他们的调用逻辑和前fireChannelActive、fireChannelRegister等inboundHandler中的方法调用逻辑一致,因此就不重复分析了。

I/O就绪事件处之OP_READ(二)

前面的部分我们梳理了Netty读写数据操作中读的部分,接下来,我们来详细梳理下Netty写数据的处理逻辑。在《Netty源码剖析与应用》这本书中,这部分的剖析之前提出了几个有意思的问题,我们可以一起来找找答案。

  • Netty在写操作之前调用了那些Handler,并且这次的调用链和其他outboundHandler方法调用链的调用逻辑有什么区别?

  • 在业务Handler中,若开启了额外的业务线程,那么在Netty内部是如何把业务的结果数据经过IO线程发送出去的呢?

  • 为了提高网络的吞吐量,在调用write时,数据并没有直接被写到Socket中,而是被写到了Netty的缓冲区(channelOutboudBuffer)中,在并发很高的情况下,在对方接收数据比较慢时,Netty的写缓冲区如何防止内存溢出,防止出现大量内存无法释放的情况?

OP_READ事件写操作概述

写操作和前面的读操作register操作的整理逻辑是一样的,都是基于pipeline中handler调用链实现的。同样的这里的大体逻辑也是一样的。也是通过调用outbound的write方法将数据写入到channelOutboundBuffer中,然后调用flush方法将数据发送出去。同样的,这里我们先看调用的时序图。

![](/Users/daiwei/Library/Application Support/typora-user-images/image-20220226150743301.png)

从上面的时序图,不难看出Netty的write操作也是基于pipeline调用实现的,和读操作对应在发送数据之前需要将数据进行编码。因为是outboundHandler的方法,在HeadContext上会将数据写入到缓存中,并在最后调用flush方法将数据通过socket发送回去。

OP_READ事件写操作源码剖析

在write的过程中要先调用编码方法,对写出的数据进行自定义协议的编码操作。在前面我们分析读操作时提到ByteToMessageDecoder解码器的逻辑要放在其他MessageToMessageCodec最前面,因为Netty的msg一般使用的是堆外直接内存DirectByteBuf。同样的MessageToByteEnCoder编码器也需要放在Handler调用链的最前面。write方法是outBound的方法,handler的调用链会从后向前调用,放在handler的最前面则会被最后调用。在下面源码剖析部分也会详细剖析编码的逻辑。在这个部分还有一个细节,就是write方法并不会直接将数据发送出去,而是会将数据暂存到channelOutboundBuffer中,只有当调用flush方法时才会将数据写入到socket中发送出去。那channelOutboundBuffer中的缓存数据维护的实现细节,以及buffer内存背压通知的实现又是怎样的?源码中都会有答案。

write方法不会直接将数据发送出去,会先将数据暂存在channelOutboundBuffer中,只有调用flush方法才会将数据写入socket发送,真的调用了flush就一定会被直接发送出去么?🤔

熟悉的pipeline调用链

在OP_READ操作读的部分剖析中,我们走到了EchoServerHandlerchannelRead方法。我们从这里开始继续我们源码剖析。(p.s.:这一段可能和Netty的源码不太一致,这一段因为内容需要我进行了调整,直接将数据write回客户端,便于我们后续的调试。)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// io.netty.example.echo.EchoServerHandler
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 将数据会客户端
ctx.write(msg);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
// 写操作完调用flush操作发送数据。
ctx.flush();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}

//io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object, boolean, io.netty.channel.ChannelPromise)
private void write(Object msg, boolean flush, ChannelPromise promise) {
// ctx.write()实际调用的方法。
ObjectUtil.checkNotNull(msg, "msg");
try {
if (isNotValidPromise(promise, true)) {
ReferenceCountUtil.release(msg);
// cancelled
return;
}
} catch (RuntimeException e) {
ReferenceCountUtil.release(msg);
throw e;
}

// 找到下一个实现wirte方法的outboudHandler
final AbstractChannelHandlerContext next = findContextOutbound(flush ?
(MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
// 记录当前对象访问位置(咱也不知道这是为啥)
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
// 如果当前要直接flush,调用invokeWriteAndFlush直接写入socket
next.invokeWriteAndFlush(m, promise);
} else {
// 否则写入channelOutBoundBuffer中
next.invokeWrite(m, promise);
}
} else {
// 如果当前线程不是eventLoop线程,创建写任务task,并将task丢入到eventLoop的task队列中执行。
final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
if (!safeExecute(executor, task, promise, m, !flush)) {
// We failed to submit the WriteTask. We need to cancel it so we decrement the pending bytes
// and put it back in the Recycler for re-use later.
//
// See https://github.com/netty/netty/issues/8343.
// 提交任务失败,任务取消。
task.cancel();
}
}
}

这里需要注意的是,我们ctx.write(msg)方法是channelOutboundHandler的中的方法。并且此时pipeline中有以下几个handler,(headContext)EchoEncoderEchoDecoderLoggingHandlerEchoServerHandler(tailContext)。同时(headContext)EchoEncoderLoggingHandlerEchoServerHandler(tailContext)这几个handler是channelOutboundHandler(如下图)。

EchoServerHandler中调用ctx.write()方法,实际执行的是AbstractChannelHandlerContext#write(Object, boolean, ChannelPromise)方法,在这个方法中,通过findContextOutbound方法寻找下一个outBound方法。按照我们前面对fireChannelXXX方法的分析,往后依次执行的handler应该是LoggingHandlerEchoEncoder(headContext)。所以这里的逻辑和前面我们剖析的其他的pipelineChannelHandler方法不一样,它并没有从headContext开始从前往后执行,也不是从tailContext开始从后往前执行,而是从中间的handler开始按照outboundHandler的规则依次向前执行。同样的ctx.flush()也是这个逻辑。

在上面源码中的AbstractChannelHandlerContext#write方法,有提到一个WriteTask。代码中会判断当前执行线程是否是eventLoop线程,如果不是创建一个writeTask并丢到eventLoop队列中执行。看了这一段源码是不是我们在一小节提到的两个问题,你的心中是否已经有了答案呢?

  • Netty在写操作之前调用了那些Handler,并且这次的调用链和其他outboundHandler方法调用链的调用逻辑有什么区别?
  • 在业务Handler中,若开启了额外的业务线程,那么在Netty内部是如何把业务的结果数据经过IO线程发送出去的呢?

我们接着上面pipeline调用链继续往下执行,按照我们前面的分析,下一个outboundHandler是LoggingHandler。这段逻辑很简单,输出一段日志信息即可。

1
2
3
4
5
6
7
8
//io.netty.handler.logging.LoggingHandler#write
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (logger.isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, "WRITE", msg));
}
ctx.write(msg, promise);
}

下一个outboundHandler是EchoEncoderHandler这个handler中主要是编码操作,我们放在下面一个小节深入梳理。在编码的EchoEncoderHandler之后,就是我们的最后一个outboundHandler即HeadContext。下面是HeadContext的write方法的源码剖析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
//io.netty.channel.DefaultChannelPipeline.HeadContext#write
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
unsafe.write(msg, promise);
}

//io.netty.channel.AbstractChannel.AbstractUnsafe#write
@Override
public final void write(Object msg, ChannelPromise promise) {
// 校验是否是eventLoop方法
assertEventLoop();

// 获取当前的outboundBuffer
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
try {
// 如果当前的outboundBuffer为null的话,直接调用释放当前byteBuf
// release message now to prevent resource-leak
ReferenceCountUtil.release(msg);
} finally {
// If the outboundBuffer is null we know the channel was closed and so
// need to fail the future right away. If it is not null the handling of the rest
// will be done in flush0()
// See https://github.com/netty/netty/issues/2362
// 设置promise写失败
safeSetFailure(promise,
newClosedChannelException(initialCloseCause, "write(Object, ChannelPromise)"));
}
return;
}

int size;
try {
// 一个消息转换的方法,但是这个方法直接返回了msg
msg = filterOutboundMessage(msg);
// 计算msg的大小
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
try {
ReferenceCountUtil.release(msg);
} finally {
safeSetFailure(promise, t);
}
return;
}

// 将msg加入到channelOutboundBuffer中。
outboundBuffer.addMessage(msg, size, promise);
}

上面这部分是基于pipeline调用链对write方法进行剖析。当然写操作不仅仅只write方法,数据被写入到channelOutboudBuffer中,调用flush方法才会将数据写入到socket中发送出去。在前面对NioByteUnsafe#read方法的剖析中,完成fireChannelRead()的方法调用后会调用fireChannelReadComplete()方法,按照我们pipeline调用链的inboundHandler的调用逻辑,最终会调用echoServerHandler#readComplete方法,即执行ctx.flush()方法。前面我们提到flush的调用逻辑和write逻辑是一致的,flush是outboundHandler那按照outboundHandler的调用逻辑,会从后向前依次调用outboudHandler的flush方法,也就是LoggingHandlerHeadContext这几个Handler的flush方法,以下是flush操作的源码剖析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
//io.netty.handler.logging.LoggingHandler#flush
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
//loggingHandler的flush方法。
if (logger.isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, "FLUSH"));
}
// 调用下一个包含flush的outboundHandler
ctx.flush();
}

//io.netty.channel.DefaultChannelPipeline.HeadContext#flush
@Override
public void flush(ChannelHandlerContext ctx) {
// headContext中调用unsafe#flush()
unsafe.flush();
}

//io.netty.channel.AbstractChannel.AbstractUnsafe#flush
@Override
public final void flush() {
// 校验当前执行线程是否是eventLoop线程。
assertEventLoop();

ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
}
// 将channelOutboundBuffer中的写入的数据加入到flush的队列中
outboundBuffer.addFlush();
// 将flush队列中的entry数据写入到socket中。
flush0();
}

//io.netty.channel.ChannelOutboundBuffer#addFlush
public void addFlush() {
// There is no need to process all entries if there was already a flush before and no new messages
// where added in the meantime.
// See https://github.com/netty/netty/issues/2577
// 移动链表中entry的操作,表明某个之前的数据需要被处理。
Entry entry = unflushedEntry;
if (entry != null) {
if (flushedEntry == null) {
// there is no flushedEntry yet, so start with the entry
flushedEntry = entry;
}
do {
// 统计要flush的entry的个数。
flushed ++;
// 设置这些即将被flush的entry状态为uncancelable(不可取消状态)。
if (!entry.promise.setUncancellable()) {
// 如果当前entry已经取消了,释放entry的内存,并且减小channelOutboundBuffer大小并触发后续事件。
// Was cancelled so make sure we free up memory and notify about the freed bytes
int pending = entry.cancel();
decrementPendingOutboundBytes(pending, false, true);
}
entry = entry.next;
} while (entry != null);

// All flushed so reset unflushedEntry
unflushedEntry = null;
}
}

//io.netty.channel.AbstractChannel.AbstractUnsafe#flush0
protected void flush0() {
if (inFlush0) {
// Avoid re-entrance
return;
}

final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null || outboundBuffer.isEmpty()) {
return;
}

inFlush0 = true;

// Mark all pending write requests as failure if the channel is inactive.
if (!isActive()) {
// 如果当前通道已经关闭了,异常处理即可。
try {
// Check if we need to generate the exception at all.
if (!outboundBuffer.isEmpty()) {
if (isOpen()) {
outboundBuffer.failFlushed(new NotYetConnectedException(), true);
} else {
// Do not trigger channelWritabilityChanged because the channel is closed already.
outboundBuffer.failFlushed(newClosedChannelException(initialCloseCause, "flush0()"), false);
}
}
} finally {
inFlush0 = false;
}
return;
}

try {
// 将channelOutboundBuffer的数据写入socket中。
doWrite(outboundBuffer);
} catch (Throwable t) {
handleWriteError(t);
} finally {
inFlush0 = false;
}
}

//io.netty.channel.socket.nio.NioSocketChannel#doWrite
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
SocketChannel ch = javaChannel();
int writeSpinCount = config().getWriteSpinCount();
do {
if (in.isEmpty()) {
// All written so clear OP_WRITE
clearOpWrite();
// Directly return here so incompleteWrite(...) is not called.
return;
}

// Ensure the pending writes are made of ByteBufs only.
// 确保挂起的写入数据仅由byteBuf组成。
int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
// 为entry创建byteBuffer数组
ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
int nioBufferCnt = in.nioBufferCount();

// Always use nioBuffers() to workaround data-corruption.
// See https://github.com/netty/netty/issues/2761
switch (nioBufferCnt) {
case 0:
// 没有enrty数据需要处理。
// We have something else beside ByteBuffers to write so fallback to normal writes.
writeSpinCount -= doWrite0(in);
break;
case 1: {
// 只有一个entry数据需要处理。
// Only one ByteBuf so use non-gathering write
// Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
// to check if the total size of all the buffers is non-zero.
// 取出第一个bytebufer
ByteBuffer buffer = nioBuffers[0];
int attemptedBytes = buffer.remaining();
final int localWrittenBytes = ch.write(buffer);
if (localWrittenBytes <= 0) {
// 写入没有完成,后续以eventLoop的task任务形式执行。
incompleteWrite(true);
return;
}
// 调整自适应写,批量写数据时,如果尝试写的都写进去了,接下来会尝试写更多
adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
// 移除已经写入的entry,并更新bytebuf的读索引。
in.removeBytes(localWrittenBytes);
// 减少连续写次数,默认16次。
--writeSpinCount;
break;
}
default: {
// Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
// to check if the total size of all the buffers is non-zero.
// We limit the max amount to int above so cast is safe
// 多个entry数据需要处理。
long attemptedBytes = in.nioBufferSize();
// 将数据写入socket
final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
if (localWrittenBytes <= 0) {
// 写入没有完成,后续以eventLoop的task任务形式执行。
incompleteWrite(true);
return;
}
// Casting to int is safe because we limit the total amount of data in the nioBuffers to int above.
// 调整自适应写,批量写数据时,如果尝试写的都写进去了,接下来会尝试写更多
adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
maxBytesPerGatheringWrite);
// 移除已经写入的entry,并更新bytebuf的读索引。
in.removeBytes(localWrittenBytes);
// 减少连续写次数,默认16次。
--writeSpinCount;
break;
}
}
} while (writeSpinCount > 0);
// 因为连续写入16次,不能保证所有数据都会写完,如果存在后续任务以eventLoop的task任务形式执行。
incompleteWrite(writeSpinCount < 0);
}

//io.netty.channel.nio.AbstractNioByteChannel#incompleteWrite
protected final void incompleteWrite(boolean setOpWrite) {
// Did not write completely.
if (setOpWrite) {
// 当前写socket时返回值小于写入长度甚至返回0,表示有数据积压,此时应该注册write事件,
// 待所有数据写后没有数据可写时,取消write事件并注册read事件
setOpWrite();
} else {
// It is possible that we have set the write OP, woken up by NIO because the socket is writable, and then
// use our write quantum. In this case we no longer want to set the write OP because the socket is still
// writable (as far as we know). We will find out next time we attempt to write if the socket is writable
// and set the write OP if necessary.
// 清除op_write事件
clearOpWrite();

// Schedule flush again later so other tasks can be picked up in the meantime
eventLoop().execute(flushTask);
}
}

以上就是flush逻辑的分析。通过这个部分的分析我们可以容易发现,flush的逻辑也是基于pipeline调用链实现的,并且write方法只会把数据写入到channelOutboundBuffer中,并不会直接写入到socket中,只有后续调用flush方法,才会将数据写入到socket中。在后续的写入socket的操作中,netty并没有简单的写入而是有多个参数来“感知”当前写入的“情况”(socketChannel能写就多写一些,写不了就等等)。当然在这个部分,还有一些有意思的内容没深入,比如entryopWritewriteSpinCount等。这些内容我们会在下面的内容中深入分析。

MessageToByteEncoder编码操作

在前面我们分析基于pipeline调用链调用write方法过程中,write操作调用我们实现的编码EchoEncoder,对数据进行编码。EchoEncoder继承于MessageToByteEcoder,我们这个部分的源码剖析也就从这里开始。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ByteBuf buf = null;
try {
// 校验当前msg是否应该被处理。
if (acceptOutboundMessage(msg)) {
@SuppressWarnings("unchecked")
// 强制类型转换
I cast = (I) msg;
// 分配ByteBuf,preferDirect 是否要使用直接内存
buf = allocateBuffer(ctx, cast, preferDirect);
try {
// 调用子类的编码方法
encode(ctx, cast, buf);
} finally {
// 数据已经写入到bytebuf中,释放msg对象
ReferenceCountUtil.release(cast);
}

//如果当前byteBuf可读
if (buf.isReadable()) {
// 调用下一个outboundHandler的write方法。
ctx.write(buf, promise);
} else {
// 释放bytebuf
buf.release();
//写一个空的bytebuf,继续调用下一个outboundHandler的write方法。
ctx.write(Unpooled.EMPTY_BUFFER, promise);
}
buf = null;
} else {
// 如果当前msg不应该被处理,直接调用下一个outboundHandler的write方法。
ctx.write(msg, promise);
}
} catch (EncoderException e) {
throw e;
} catch (Throwable e) {
throw new EncoderException(e);
} finally {
// 全部执行完成释放bytebuf
if (buf != null) {
buf.release();
}
}
}

//io.netty.example.echo.EchoEncoder
public class EchoEncoder extends MessageToByteEncoder<Object> {
/**
* Encode a message into a {@link ByteBuf}. This method will be called for each written message that can be handled
* by this encoder.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link MessageToByteEncoder} belongs to
* @param msg the message to encode
* @param out the {@link ByteBuf} into which the encoded message will be written
* @throws Exception is thrown if an error occurs
*/
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
// 自定义编码方法很简单,先发送数据长度,再发送数据byte数据/
out.writeInt(msg.toString().length());
out.writeBytes(msg.toString().getBytes(CharsetUtil.UTF_8));
}
}

这个编码操作相较于解码操作则简单了不少,在进行编码操作前只是简单的判断当前msg是否应该处理然后创建用于写入msg的bytebuf随后调用encode方法即我们自定义编码器实现encoder方法对msg进行自定义编码将结果写入到bytebuf中。

ChannelOutboundBuffer结构剖析

前面我们提到了write方法并不会直接把数据写到socketChannel中,而是会写到channelOutboundBuffer中。只有调用flush方法才会将数据从channelOutboundBuffer中写入到socketChannel中。当然在OP_READ读操作剖析这部分开始时,我们还提了一个问题“为了提高网络的吞吐量,在调用write时,数据并没有直接被写到Socket中,而是被写到了Netty的缓冲区(channelOutboudBuffer)中,在并发很高的情况下,在对方接收数据比较慢时,Netty的写缓冲区如何防止内存溢出,防止出现大量内存无法释放的情况?”。在解答这个问题之前,先来了解一下Netty的缓冲区ChannelOutboundBuffer。它也是一个链表结构,链表的每个节点都是一个entry,entry中有消息的内容、next指针等,其中有5个非常重要的属性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry)
//
// The Entry that is the first in the linked-list structure that was flushed
// 链表中被刷新的第一个元素,此元素准备第一个写入Socket
private Entry flushedEntry;
// The Entry which is the first unflushed in the linked-list structure
//链表中的第一个未刷新的元素
// 当调用addMessage()方法后,从原链表tailEntry到Entry(现链表的tailEntry)节点
// 都是未被刷新的数据
private Entry unflushedEntry;
// The Entry which represents the tail of the buffer
// 链表末尾节点。
private Entry tailEntry;
// The number of flushed entries that are not written yet
// 表示已经刷新但还没写到socket中的entry数量
private int flushed;
//每新增一个Entry,其大小要加上Entry实例的大小(96B)和真实数据的大小。
@SuppressWarnings("UnusedDeclaration")
private volatile long totalPendingSize;

在深入ChannelOutboundBuffer的数据结构之前,我们先addMessage()方法,也就是AbstractChannel#write通过这个方法,将要发送的数据写入到ChannelOutboundBuffer中。换言之这个方法是将待发送数据插入到缓存链表的方法,以下是这个方法的源码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* Add given message to this {@link ChannelOutboundBuffer}. The given {@link ChannelPromise} will be notified once
* the message was written.
* 每调用一次addMessage()方法,并把数据加在链表的最后。
*/
//io.netty.channel.ChannelOutboundBuffer#addMessage
public void addMessage(Object msg, int size, ChannelPromise promise) {
//把msg消息数据包装成Entry对象
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
// 若链表为空,则尾节点为当前节点
if (tailEntry == null) {
flushedEntry = null;
} else {
// 当链表不为空时,把新的Entry对象添加到链表尾
Entry tail = tailEntry;
tail.next = entry;
}
// 设置最后当前节点为尾节点。
tailEntry = entry;
// 如果当前unflushedEntry链表为空,
// 则表明调用addFlush()方法将链表中之前的元素都已经全部加入了需要发送的节点,
// 否则链表为空
if (unflushedEntry == null) {
unflushedEntry = entry;
}

// 修改通道缓存总数据的大小,若缓存总数据大小超过了最高水位,
// 则会触发fireChannelWriteabilityChanged事件,进入背压。
// increment pending bytes after adding message to the unflushed arrays.
// See https://github.com/netty/netty/issues/1619
incrementPendingOutboundBytes(entry.pendingSize, false);
}

//io.netty.channel.ChannelOutboundBuffer#incrementPendingOutboundBytes(long, boolean)
private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
if (size == 0) {
return;
}

//当前缓存总数据大小
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
// 如果大于设置的writeBuffer的高水位,设置不可写状态
if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
setUnwritable(invokeLater);
}
}

flushedEntryunflushedEntrytailEntry将链表划分为刷新和待刷新的数据链表。调用addMessage()的链表示意图如下:

在调用addMessage()方法之后,采用CAS方式增加待发送节点的字节数,此时如果待发送的字节数大于通道写buf的最高阈值writeBufferHighWaterMark,则更新通道状态为不可写,同时会触发channelWriteabilityChanged事件防止buf溢出。调用addMessage()方法将数据添加到channelOutboundBuffer中,我们都知道此时数据并没有发送出去,只有调用了flush方法数据才会被发送出去。通过前面的源码剖析我们发现,调用AbstractUnsafe#flush0将数据写入到socket中之前,会先调用outboundBuffer.addFlush()修改buffer中链表数据的状态。具体的修改逻辑代码解读如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
* Add a flush to this {@link ChannelOutboundBuffer}. This means all previous added messages are marked as flushed
* and so you will be able to handle them.
* 将前面所有的消息标识为flushed。
*/
//io.netty.channel.ChannelOutboundBuffer#addFlush
public void addFlush() {
// There is no need to process all entries if there was already a flush before and no new messages
// where added in the meantime.
//
// See https://github.com/netty/netty/issues/2577
//
// 获取当前未flush的第一个entry
Entry entry = unflushedEntry;
if (entry != null) {
if (flushedEntry == null) {
// there is no flushedEntry yet, so start with the entry
// 如果当前flushedEntry指针没有指向任何数据,指向未flush的第一个entry
flushedEntry = entry;
}
// 从unflushedEntry开始循环设置,将这些全部设置不可取消(uncancel)状态,全部加入到flushed链表中。
do {
flushed ++;
if (!entry.promise.setUncancellable()) {
// 设置失败是因为当前entry已经是取消状态的了
// Was cancelled so make sure we free up memory and notify about the freed bytes
int pending = entry.cancel();
// 返回当前entry的size并调用下面方法进行释放对应的大小,
// 如果缓存总数据的大小小于低水位,触发fireChannelWritabilityChanged事件
decrementPendingOutboundBytes(pending, false, true);
}
// 继续下一个entry
entry = entry.next;
} while (entry != null);

// All flushed so reset unflushedEntry
//因为所有的entry都被加入到flushed链表中,所以设置unflushedEntry设置为null。
// 下一次添加数据时,unflushedEntry为最先添加的entry。
unflushedEntry = null;
}
}

在调用完addFlush()方法之后,channelOutboundBuffer的链表结构如下图:

在addFlush()之后,调用doWrite(ChannelOutboundBuffer in)将数据写入到socket中。但是这里需要注意一个细节,ChannelOutboundBuffer并不会直接被写入到socket中。而是会转化为nio的ByteBuffer数组,然后写入到socket中。在doWrite()方法中调用ChannelOutboundBuffer#nioBuffers(int, long)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
//io.netty.channel.ChannelOutboundBuffer#nioBuffers(int, long)
// 在发送数据时需要把ChannelOutboundBuffer中的msg转换成ByteBuffer
public ByteBuffer[] nioBuffers(int maxCount, long maxBytes) {
assert maxCount > 0;
assert maxBytes > 0;
long nioBufferSize = 0;
int nioBufferCount = 0;
final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
// 从线程本地缓存中获取ByteBuffer数组
ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap);
// 从准备第一个写入Socket的元素开始
Entry entry = flushedEntry;
//循环遍历entry,entry必须为准备写入Socket的元素且为非取消状态
while (isFlushedEntry(entry) && entry.msg instanceof ByteBuf) {
if (!entry.cancelled) {
// 获取entry节点中实际发送的数据
ByteBuf buf = (ByteBuf) entry.msg;
final int readerIndex = buf.readerIndex();
// 获取可发送字节数
final int readableBytes = buf.writerIndex() - readerIndex;
// 若可发送字节数 > 0 时,否则跳过
if (readableBytes > 0) {
// 累计发送字节不能大于maxBytes
if (maxBytes - readableBytes < nioBufferSize && nioBufferCount != 0) {
// If the nioBufferSize + readableBytes will overflow maxBytes, and there is at least one entry
// we stop populate the ByteBuffer array. This is done for 2 reasons:
// 1. bsd/osx don't allow to write more bytes then Integer.MAX_VALUE with one writev(...) call
// and so will return 'EINVAL', which will raise an IOException. On Linux it may work depending
// on the architecture and kernel but to be safe we also enforce the limit here.
// 2. There is no sense in putting more data in the array than is likely to be accepted by the
// OS.
//
// See also:
// - https://www.freebsd.org/cgi/man.cgi?query=write&sektion=2
// - https://linux.die.net//man/2/writev
break;
}
// 累计发送字节数
nioBufferSize += readableBytes;
// 获取entry中byteBuffer的个数
int count = entry.count;
if (count == -1) {
//noinspection ConstantValueVariableUse
entry.count = count = buf.nioBufferCount();
}
// 需要多少个byteBuffer
int neededSpace = min(maxCount, nioBufferCount + count);
// nioBuffers长度不够,需要扩容。
if (neededSpace > nioBuffers.length) {
nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount);
NIO_BUFFERS.set(threadLocalMap, nioBuffers);
}
// 如果ByteBuffer的个数为1,则直接获取ByteBuffer并放入nioBuffers数组中。
if (count == 1) {
ByteBuffer nioBuf = entry.buf;
if (nioBuf == null) {
// cache ByteBuffer as it may need to create a new ByteBuffer instance if its a
// derived buffer
entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes);
}
nioBuffers[nioBufferCount++] = nioBuf;
} else {
// 如果有多个循环获取ByteBuffer放入nioBuffers数组中。
// The code exists in an extra method to ensure the method is not too big to inline as this
// branch is not very likely to get hit very frequently.
nioBufferCount = nioBuffers(entry, buf, nioBuffers, nioBufferCount, maxCount);
}
// 不能超过最大个数限制
if (nioBufferCount >= maxCount) {
break;
}
}
}
// 获取下一个节点
entry = entry.next;
}
this.nioBufferCount = nioBufferCount;
this.nioBufferSize = nioBufferSize;

return nioBuffers;
}

通过nioBuffers()方法获取到需要发送的ByteBuffer数组,然后通过SocketChannel写到网络中,并返回写成功了多少个字节,此时ChannelOutboudBuffer需要把这些字节从链表中移除同时需要把刚刚生成的ByteBuffer数组也一起移除,下面继续看removeByte()与remove()方法的解读:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
/**
* Removes the fully written entries and update the reader index of the partially written entry.
* This operation assumes all messages in this buffer is {@link ByteBuf}.
*/
//io.netty.channel.ChannelOutboundBuffer#removeBytes
public void removeBytes(long writtenBytes) {
for (;;) {
// 与nioBuffers()方法一样,从准备写入socket的节点开始获取此节点的buf数据
Object msg = current();
if (!(msg instanceof ByteBuf)) {
assert writtenBytes == 0;
break;
}

final ByteBuf buf = (ByteBuf) msg;
final int readerIndex = buf.readerIndex();
// 获取buf可发送字节数
final int readableBytes = buf.writerIndex() - readerIndex;

//如果当前节点字节数小于或等于已发送的字节数,则直接删除整个节点,更新进度progress(writtenBytes);
// remove()也就是将entry从链表中移除。
if (readableBytes <= writtenBytes) {
if (writtenBytes != 0) {
progress(readableBytes);
writtenBytes -= readableBytes;
}
remove();
} else { // readableBytes > writtenBytes
// 若当前节点还有一部分未发送,则缩小当前节点的可发送字节长度。
if (writtenBytes != 0) {
// 修改readerIndex并更新进度
buf.readerIndex(readerIndex + (int) writtenBytes);
progress(writtenBytes);
}
break;
}
}
// 由于每次在发送时,都需要从线程本地缓存中取出byteBuffer数组,
// 并且每次拿到的数组都应该是干净的(无数据),因此这里清空它。
clearNioBuffers();
}

//io.netty.channel.ChannelOutboundBuffer#clearNioBuffers
private void clearNioBuffers() {
int count = nioBufferCount;
if (count > 0) {
nioBufferCount = 0;
// 填入null对象
Arrays.fill(NIO_BUFFERS.get(), 0, count, null);
}
}

//io.netty.channel.ChannelOutboundBuffer#remove()
public boolean remove() {
// 与nioBuffers()、removeBytes() 方法一样。
Entry e = flushedEntry;
if (e == null) {
clearNioBuffers();
return false;
}
Object msg = e.msg;

ChannelPromise promise = e.promise;
int size = e.pendingSize;
// 从链表中移除此entry,同时将flushedEntry指针指向下一个节点
removeEntry(e);

if (!e.cancelled) {
// entry在uncancel的状态下执行到了这里。
// only release message, notify and decrement if it was not canceled before.
// entry数据已经写入到socket发送出去了,释放内存空间。
ReferenceCountUtil.safeRelease(msg);
// 通知处理成功
safeSuccess(promise);
// 减少当前outbundBuffer中数据对应的大小。
decrementPendingOutboundBytes(size, false, true);
}

// recycle the entry
// 回收entry对象并放回对象池。
e.recycle();

return true;
}

//io.netty.channel.ChannelOutboundBuffer#removeEntry
private void removeEntry(Entry e) {
if (-- flushed == 0) {
// 若最后的节点也被移除了,则所有指针都是null
// processed everything
flushedEntry = null;
if (e == tailEntry) {
tailEntry = null;
unflushedEntry = null;
}
} else {
flushedEntry = e.next;
}
}

所以通过上面源码的分析,在entry数据被写入到socket中,会被移除出链表,移除的方式也非常简单,flushedEntry指针指向后一个entry即可,即如下图所示:

到这里对channelOutboundBuffer的分析梳理也就结束了,channelOutboundBuffer会将msg封装成一个个entry,并组成一个链表结构,通过三个指针来标识出刷新和待刷新的部分,调用addMessage()方法可以将msg加入链表也就是write最后的操作。调用flush方法,将链表中的所有元素全部由unflushed状态转变为flushed,然后调用doWrite方法将flushed的entry写入到socket中,发送完成后移除对应的entry释放空间。

另一个小细节 — Netty背压处理与自适应写

在看源码的过程中发现,我还看到了两个有意思的小细节。在看部分之前,我们先思考前面我们提到的那个问题“为了提高网络的吞吐量,在调用write时,数据并没有直接被写到Socket中,而是被写到了Netty的缓冲区(channelOutboudBuffer)中,在并发很高的情况下,在对方接收数据比较慢时,Netty的写缓冲区如何防止内存溢出,防止出现大量内存无法释放的情况?”buffer空间不是无限的,如果写入socket的发送速率>写入buffer的速率那么内存一定会溢出。换个角度思考问题,如果当达到了某个阈值提醒生产方停止往buffer中写入数据是不是就可以了呢?这就是背压机制。

背压是指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略。

简单来说,背压就是一种流控策略。

我们其实在前面的梳理中有接触过背压,也就是我们TCP的流控策略。通过当前滑动窗口大小来控制发送速率,来处理发送端的发送速率和接收端处理速率不匹配的问题。

那Netty背压是怎么处理的呢?换句话说netty是怎么通上游来降低写入速率的呢?通过方法incrementPendingOutboundBytes(long, boolean)和方法decrementPendingOutboundBytes(long, boolean, boolean)来配合触发对应的fireChannelWritabilityChanged事件来通知上游pipeline中的handler。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
// 这个方法在将msg加入channelOutboundBuffer之后调用
//io.netty.channel.ChannelOutboundBuffer#incrementPendingOutboundBytes(long, boolean)
private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
if (size == 0) {
return;
}

// CAS加上并获取当前的channelOutboundBufffer的大小
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
// 如果当前buffer的size 大于 channel配置的高水位
if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
// 触发channel不可写
setUnwritable(invokeLater);
}
}

//io.netty.channel.ChannelOutboundBuffer#setUnwritable
private void setUnwritable(boolean invokeLater) {
for (;;) {
final int oldValue = unwritable;
final int newValue = oldValue | 1;
// CAS设置不可写状态为1, 如果设置成功对应的channel的abstractChannel#isWritable为false
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
if (oldValue == 0) {
// 如果本次操作将值由0设置成1,调用fireChannelWritabilityChanged方法。
fireChannelWritabilityChanged(invokeLater);
}
break;
}
}
}

// 当数据写入socket之后调用这个方法,释放channelOutboundBuffer的内存
//io.netty.channel.ChannelOutboundBuffer#decrementPendingOutboundBytes(long, boolean, boolean)
private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
if (size == 0) {
return;
}

// CAS减去size并获取当前channelOutboundBuffer的大小
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
// 如果当前设置notifyWritability == true 并且当前channelOutboundBuffer的大小 < channel配置的低水位
if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
// 设置channel可写
setWritable(invokeLater);
}
}

// 设置当前通道可写
//io.netty.channel.ChannelOutboundBuffer#setWritable
private void setWritable(boolean invokeLater) {
for (;;) {
final int oldValue = unwritable;
final int newValue = oldValue & ~1;
// CAS设置当前不可写状态为0,如果设置成功对应的channel的abstractChannel#isWritable为true
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
if (oldValue != 0 && newValue == 0) {
// 如果本次操作将值从1设置成0,调用fireChannelWritabilityChanged方法。
fireChannelWritabilityChanged(invokeLater);
}
break;
}
}
}

// 触发pipeline的channelWriteabilityChanged事件
//io.netty.channel.ChannelOutboundBuffer#fireChannelWritabilityChanged
private void fireChannelWritabilityChanged(boolean invokeLater) {
final ChannelPipeline pipeline = channel.pipeline();
// 是否要稍后调用
if (invokeLater) {
// 如果稍后调用,包装一个task丢入eventLoop的task队列中
Runnable task = fireChannelWritabilityChangedTask;
if (task == null) {
fireChannelWritabilityChangedTask = task = new Runnable() {
@Override
public void run() {
pipeline.fireChannelWritabilityChanged();
}
};
}
channel.eventLoop().execute(task);
} else {
// 否则直接调用触发方法。
pipeline.fireChannelWritabilityChanged();
}
}

在看这段逻辑的时候,我发现我们平时写代码中并没有对背压事件进行特别的处理,如果有需要调整发送速率或是做出响应的动作,重写inboundHandler的channelWritabilityChanged方法即可,同时对应channel的abstractChannel#isWritable为true。通过方法incrementPendingOutboundBytes(long, boolean)decrementPendingOutboundBytes(long, boolean, boolean)来可以写入buffer的速率和写入socket发送速率不匹配的问题。那么如何才能保证我们当前的发送速率达到当前的最佳速率呢?也就是netty是如何自动调整发送数据的速率,让channel尽可能多的发送数据的呢?这里不得不提到Netty的自适应写。我们先看下面这些代码片段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
// 获取最大可写入字节数
int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);


private volatile int maxBytesPerGatheringWrite = Integer.MAX_VALUE;
//io.netty.channel.socket.nio.NioSocketChannel.NioSocketChannelConfig#NioSocketChannelConfig
private NioSocketChannelConfig(NioSocketChannel channel, Socket javaSocket) {
super(channel, javaSocket);
// 计算最大可写字节数
calculateMaxBytesPerGatheringWrite();
}

//io.netty.channel.socket.nio.NioSocketChannel.NioSocketChannelConfig#calculateMaxBytesPerGatheringWrite
private void calculateMaxBytesPerGatheringWrite() {
// Multiply by 2 to give some extra space in case the OS can process write data faster than we can provide.
// 最大可写字节数设置为sendBufferSize * 2,乘以2可以留出一些额外的空间,以防操作系统处理写数据的速度比我们提供的快
int newSendBufferSize = getSendBufferSize() << 1;
if (newSendBufferSize > 0) {
// 设置最大可写字节数。
setMaxBytesPerGatheringWrite(newSendBufferSize);
}
}

//io.netty.channel.socket.nio.NioSocketChannel#doWrite中的代码片段
default: {
// Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
// to check if the total size of all the buffers is non-zero.
// We limit the max amount to int above so cast is safe
long attemptedBytes = in.nioBufferSize();
final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
if (localWrittenBytes <= 0) {
incompleteWrite(true);
return;
}
// Casting to int is safe because we limit the total amount of data in the nioBuffers to int above.
// 根据实际写入的数据量来调整最大可写字节数
adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
maxBytesPerGatheringWrite);
in.removeBytes(localWrittenBytes);
--writeSpinCount;
break;
}

// io.netty.channel.socket.nio.NioSocketChannel#adjustMaxBytesPerGatheringWrite
private void adjustMaxBytesPerGatheringWrite(int attempted, int written, int oldMaxBytesPerGatheringWrite) {
// By default we track the SO_SNDBUF when ever it is explicitly set. However some OSes may dynamically change
// SO_SNDBUF (and other characteristics that determine how much data can be written at once) so we should try
// make a best effort to adjust as OS behavior changes.
// 判断是否所有字节都写入socket中发送
if (attempted == written) {
//如果本次写入所有字节数都写入socket中
if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
// 写入字节数 * 2 大于设置的可以写入的最大字节数,则更新可写入最大字节数为写入字节数 * 2
((NioSocketChannelConfig) config).setMaxBytesPerGatheringWrite(attempted << 1);
}
} else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) {
// 本次写入并不是所有字节都写入socket中
// 那么如果尝试写入的字节数比最大可写入字节的的最小阈值(可以理解为最小的写入最大值上限),并且写入的值小于尝试写入字节数/2
// 将最大可写入字节数 / 2,按照写入时的网络情况,设置的最大可写入字节数仍然大于实际可写入字节数,
// 即使降低最大可写字节数仍然可以充分利用网络资源。
((NioSocketChannelConfig) config).setMaxBytesPerGatheringWrite(attempted >>> 1);
}
}

通过当前socket写入情况,Netty动态调整当前可写入的最大字节数,充分利用网络资源发送尽可能多数据

写到这里,我突然发现这个场景很像我们小时候做过的一道题的背景,一个水池一个管子进水一个管子出水。需要怎么做去保证这个池子的水在不溢出的情况下,将水通过池子高效的转移。

调控进水的管子,不能因为进水速度太快从而导致池子水溢出。同时又要尽可能加快池子排出水的速度,不让水长时间停留在池子里。

对进水管的调控策略就是背压机制,而对于出水管的策略就是自适应写。

这里还有一个细节的点要注意。前面我们提到一个问题“调用flush方法之后,数据一定会被立刻发送出去么?”答案是否定的。从下面的代码片段我们可以发现,doWrite方法中循环写入socket中的逻辑最多执行writeSpinCount次(writeSpinCount的默认值是16)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
//io.netty.channel.socket.nio.NioSocketChannel#doWrite
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
SocketChannel ch = javaChannel();
int writeSpinCount = config().getWriteSpinCount();
do {
... write to socekt logical ...
--writeSpinCount;
... write to socekt logical ...
} while (writeSpinCount > 0);

// 未完成的写操作
incompleteWrite(writeSpinCount < 0);
}

//io.netty.channel.nio.AbstractNioByteChannel#incompleteWrite
protected final void incompleteWrite(boolean setOpWrite) {
// Did not write completely.
if (setOpWrite) {
// 设置OP_WRITE事件。
setOpWrite();
} else {
// It is possible that we have set the write OP, woken up by NIO because the socket is writable, and then
// use our write quantum. In this case we no longer want to set the write OP because the socket is still
// writable (as far as we know). We will find out next time we attempt to write if the socket is writable
// and set the write OP if necessary.
// 清空OP_WRITE事件标识
clearOpWrite();

// Schedule flush again later so other tasks can be picked up in the meantime
// 使用eventLoop的task方式执行后续写操作。
eventLoop().execute(flushTask);
}
}

如果循环16次之后,依旧没有完成所有的写操作,那么此时writeSpinCount == 0即调用incompleteWrite(false);清空OP_WRITE事件标识并将flushTask放入eventLoop的TaskQueue继续执行。这部分数据并不会第一时间写出去,增加服务响应时延。

当看到这里我又想到一个问题,什么场景下才会设置NIO的SelectionKey监听OP_WRITE事件呢?

OP_WRITE代表socket buffer可写,当写入socket时返回值小于写入的字节长度甚至为0的时候,代表socket buffer中有数据积压,这个时候应该取消监听read时间,注册write事件,待所有数据写入完毕,取消write事件监听,重新注册read事件监听。

OP_WRITE事件只要buffer没有写满就可以一直触发可写事件,所以直接注册OP_WRITE而不取消会导致CPU跑满。在没有数据需要写的时候就应该及时取消OP_WRITE事件监听。

OP_READ事件写操作源码逻辑小结

这个部分我们接上OP_READ读操作的逻辑继续分析OP_READ写操作。将数据从server端写出去需要调用write和flush这两个方法,他们都是outboundHandler的方法。从调用过程的逻辑角度来看,他们都是从方法发起调用的handler(echoServerHandler)开始沿着pipeline中的channelHandler从后向前发起调用。在wirte方法调用过程中,我们自定义的MessageToByteEncoder编码器的write方法会被调用,这里的逻辑就是简单的模版方法最终会调用子类实现的encode方法,将Object对象通过自定义编码器转变成byteBuf字节码数据。当write方法调用到最后一个write方法的handler(HeadContext)进行简单的校验后,将msg转变成entry加入到channelOutBoundBuffer中。channelOutboundBuffer中是一个链表的结构,通过unflushEntry,flushedEntry和tailEntry这三个指针标识出未被flush和被flush的entry。write操作会将msg加入到unflushedEntry链表中,调用flush方法之后会把所有unflushEntry中的元素加入到flushedEntry中。当这些数据被写入到socket中之后,再将调用removeEntry方法将这些空间全部释放掉即可。在写数据的细节上,为了防止channelOutboundBuffer溢出,netty设计了简单的背压机制。为了保证socket通道能充分利用带宽资源,netty设计了自适应写操作。这些都是一些很有意思的点。

总结

到这里我们完成了Netty流程源码的剖析,原本是准备一个小节完成启动流程和读写操作的源码剖析的。在自己看源码的过程中真的收获了很多的东西,在之前我是很抗拒看源码的,通过这次的流程源码分析真的很喜欢看源码,点亮了源码阅读的技能树,加深了自己对Netty更加深层次的理解。同时还学到了很多优秀的设计,在今后的业务开发中也多了一个思考的纬度。上一个小节我们详细剖析了Netty服务是怎么启动的,这个小节我们深入剖析,Netty服务是如何处理读写请求的。Netty的抽象真的非常的优秀,随着代码的调用,就像是一支笔一样不断的在描绘着主从Reactor的设计模型,不断加深我对主从Reactor模型的理解。下一个小节我们继续我们对Netty的剖析,看看Netty优秀的内存管理策略~

学习资料