Netty核心组件源码剖析 — Channel

前言

上一小节我们梳理了NioEventLoop,通过梳理我们知道,NioEventLoop本质上就是封装的执行线程,线程通过NioEventLoopGroup构造创建。NioEventLoop通过select()方法监听selectKey触发对应连接,读/写事件。并在每次的循环中执行一定的任务队列中的任务。而每次事件的触发都是基于Channel实现的。这里的Channel不是NIO中的Channel,而是我们Netty中封装了NIO Channel实现的Channel。这一小节我们将深入理解Channel的一些功能原理和具体的一些实现细节。

介绍

Channel是Netty抽象出来的对网络I/O进行读写的相关接口,与NIO中的Channel接口类似。Channel的主要功能有网络I/O的读写客户端发起连接主动关闭连接获取通信双方网络地址等。Channel接口下有一个重要的抽象类—AbstractChannel,一些公共的基础方法都在这个抽象类中实现,一些特定的功能可以通过各个不同的实现类去实现,最大限度的实现了接口的功能和接口的重用。

AbstractChannel融合了Netty的线程模型事件驱动模型。但由于网络I/O模型及各种协议种类比较多,除了TCP协议,Netty还支持很多其他的连接协议,并且每种协议都有传统阻塞I/O和NIO(非阻塞IO)的版本区别。不同阻塞类型的连接有不同的Channel类型与之对应,因此AbstractChannel并没有直接与网络I/O相关的直接操作。每种阻塞与非阻塞Channel在AbstractChannel上都会继续抽象一层,如AbstractNioChannel,既是Netty重新封装的Epoll SockeChannel实现,也是其他非阻塞I/O Channel的抽象层。

AbstractChannel 源码分析

AbstractChannel 抽象类包括以下几个重要属性:

  • EventLoop:每个Channel对应一条EventLoop线程。

  • DefaultChannelPipeline:一个Handler的容器,也可以将其理解为一个Handler链。Handler主要处理数据的编/解码和业务逻辑。

  • Unsafe:实现具体的连接与读/写数据,如网络的读/写、链路关闭、发起连接等。命名为Unsafe表示不对外提供使用,并非不安全。

下图是AbstractChannel的功能图,从图中可以看出,Unsafe属性的功能非常全面,并且AbstractChannel中有一个Unsafe抽象类—AbstractUnsafe。

AbstractUnSafe的大部分方法都采用了模版设计模式,具体细节由其子类完成。例如bind()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
//io.netty.channel.AbstractChannel.AbstractUnsafe#bind
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();

if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}

// See: https://github.com/netty/netty/issues/576
if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
!PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
// Warn a user about the fact that a non-root user can't receive a
// broadcast packet on *nix if the socket is bound on non-wildcard address.
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}

boolean wasActive = isActive();
try {
// 模版设计模式,调用子类NioServerSocketChannel的doBind()方法。
doBind(localAddress);
} catch (Throwable t) {
//绑定失败回调
safeSetFailure(promise, t);
closeIfClosed();
return;
}
//从非活跃状态到活跃状态触发了active事件
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}

// 绑定成功回调通知
safeSetSuccess(promise);
}

AbstractNioChannel源码剖析

AbstractNioChannel也是一个抽象类,不过他在AbstractChannel的基础上有封装了一层属性和方法,AbstractChannel没有涉及NIO的任何属性和具体方法,包括AbstractUnsafe。AbstractNioChannel有以下三个重要属性:

1
2
3
private final SelectableChannel ch; // 真正用到的NIO Channel
protected final int readInterestOp; // 监听感兴趣的事件
volatile SelectionKey selectionKey; // 注册到Selector后获取的Key

SelectableChannel是java.nio.SocketChannel和java.nio.ServerSocketChannel公共的抽象类,同时它也是Java NIO Channel的接口实现。readInterestOp用于区分当前Channel监听的事件类型。selectionKey它是将Channel注册到Selector上后的返回值。从这些属性不难看出,在AbstractChannel中,已经将Netty的Channel和Java NIO的channel关联起来了。AbstractNioChannel的方法都很简洁,下面是一个很重要的方法doRegister()的源码剖析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//io.netty.channel.nio.AbstractNioChannel#doRegister
//doRegister()方法在AbstractUnsafe的io.netty.channel.AbstractChannel.AbstractUnsafe#register0被调用
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
//通过javaChannel()方法获取具体的Nio Channel,把Channel注册到其EventLoop线程的Selector上,
// 注册返回后的 selectionKey,为其设置channel感兴趣的事件。
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
//调用select方法来清空selectionKey的缓存
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}

doRegister()方法在AbstractUnsafe的register0()方法中被调用。在AbstractNioChannel中有一个非常重要的子类—AbstractNioUnsafe,是AbstractUnsafe类的Nio实现,主要实现了connect()flush0()等方法。它还实现了NioUnsafe接口,实现了其finishConnect()forceFlush()ch()等方法,其中,forceFlush()flush0()最终调用的NioSocketChannel的doWrite()方法,来完成缓存数据写入Socket的工作。connect()finishConnect() 这两个方法只有在Netty客户端中才会用到。下面是对这两个方法的解析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
//io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#connect
@Override
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
// 设置任务为不可取消状态,并确认Channel已经打开
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}

try {
//确保没有正在进行的连接
if (connectPromise != null) {
// Already a connect in process.
throw new ConnectionPendingException();
}
// 获取之前的状态
boolean wasActive = isActive();
// 在远程连接之前,会出现以下三种结果。
// 1、连接成功,返回true
// 2、暂时没有连接上,服务端没有返回ACK应答,连接结果不确定,返回false
// 3、连接失败,直接抛出I/O异常。
// 由于协议和IO模型的不同,连接方式也不一样,因此具体的实现方式由其子类来实现。
if (doConnect(remoteAddress, localAddress)) {
//连接成功后会触发ChannelActive事件
//最终将会NioSocketChannel中的selectionKey
//设置为SelectionKey.OP_READ
//用于监听网络读操作位
fulfillConnectPromise(promise, wasActive);
} else {
connectPromise = promise;
requestedRemoteAddress = remoteAddress;

//获取连接超时时间
// Schedule connect timeout.
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
//根据连接超时时间设置定时任务
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
if (connectPromise != null && !connectPromise.isDone()
&& connectPromise.tryFailure(new ConnectTimeoutException(
"connection timed out: " + remoteAddress))) {
//如果发现连接并没有完成,则关闭连接句柄,释放资源
//设置异常堆并发起取消注册操作
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}

//增加连接结果监听器
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// 如果接收连接完成的通知,则判断是否被取消
// 如果被取消则关闭连接句柄,释放资源,发起取消注册操作
if (future.isCancelled()) {
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
close(voidPromise());
}
}
});
}
} catch (Throwable t) {
promise.tryFailure(annotateConnectException(t, remoteAddress));
//关闭连接句柄,释放资源,发起取消注册操作。
//从多路复用器上移除
closeIfClosed();
}
}

finishConnect()方法解读分析:

finishConnect() 只是在连接完成后调用,用于设置一些状态操作位置,并不是结束连接的字面意思。。不是关闭连接不是关闭连接的意思。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#finishConnect
@Override
public final void finishConnect() {
// Note this method is invoked by the event loop only if the connection attempt was
// neither cancelled nor timed out.
// 只有EventLoop线程才能调用finishConnect()方法
// 此方法在NioEventLoop的processSelectedKey()方法中调用
assert eventLoop().inEventLoop();

try {
boolean wasActive = isActive();
// 判断连接结果(由其子类完成)
// 通过SocketChannel的finishConnect()方法判断连接结果
// 连接成功返回true
// 连接失败抛出异常
// 链路被关闭,链路中断等异常也属于连接失败。
doFinishConnect();
// 负责将SocketChannel修改为监听读操作位
fulfillConnectPromise(connectPromise, wasActive);
} catch (Throwable t) {
// 连接失败,关闭连接句柄,释放资源,发起取消注册操作。
fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
} finally {
// Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
// See https://github.com/netty/netty/issues/1770
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
}
}

AbstractNioByteChannel源码剖析

AbstractNioChannel拥有NIO的Channel,具备NIO的注册、连接等功能。但IO的读写交给了其子类,Netty对IO的读写分配POJO对象与ByteBuf和FileRegion,因此在AbstractNioChannel的基础上继续抽象一层,分为AbstractNioMessageChannel与AbstractNioByteChannel。这一个部分我们详细深入AbstractNioByteChannel。它发送和读取的对象是ByteBuf与FileRegion类型。

首先flushTask为Task任务,主要负责刷新发送缓存链表中的数据,由于write()的数据没有直接写在Socket中,而是写在ChannelOutboundBuffer缓存中,所以当调用flush()方法的时,会把数据写入Socket中并向网络中发送。因此当缓存中的数据未发送完成时,需要将任务添加到EventLoop线程中,等待EventLoop线程的再次发送。

wirte 只是写入了缓存中,只有flush()才是写入了Socket发送。

doWrite()与doWriteInteral()方法在AbstractChannel的flush0()方法中被调用,主要功能是从ChannelOutboundBuffer缓存中获取待发送的数据,进行循环发送,发送的结果分为下面三种:

  1. 发送成功,跳出循环直接返回。
  2. 由于TCP缓存区已满,成功发送的字节数为0,跳出循环,并将写操作OP_WRITE事件添加到选择Key兴趣事件集中。
  3. 默认当写了16此数据还未发送完成时,把选择Key的OP_WRITE事件从兴趣事件集合中移除,并添加一个flushTask任务,先去执行其他任务,当监测到此任务时再发送。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
//io.netty.channel.nio.AbstractNioByteChannel#doWrite
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
//写请求自循环次数,默认为16次
int writeSpinCount = config().getWriteSpinCount();
do {
//获取当前Channel的缓存ChannelOutboundBuffer中的当前待刷新消息
Object msg = in.current();
//所有消息都发送成功了
if (msg == null) {
//清空 Channel 选择Key兴趣事件集中的OP_WRITE写事件
// Wrote all messages.
clearOpWrite();
// Directly return here so incompleteWrite(...) is not called.
//直接返回,没必要再添加写任务
return;
}
//发送数据
writeSpinCount -= doWriteInternal(in, msg);
} while (writeSpinCount > 0);
// 当因缓冲区满了而发送失败时 doWriteInternal 返回 Integer.MAX_VALUE
// 此时 writeSpinCount < 0 为 true
// 当发送16次还未发送完,但每次都写成功时,writeSpinCount为0
incompleteWrite(writeSpinCount < 0);
}

//io.netty.channel.nio.AbstractNioByteChannel#doWriteInternal
private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
//若可读字节数为0,则从缓存区中移除
if (!buf.isReadable()) {
in.remove();
return 0;
}
// 实际发送字节数据
final int localFlushedAmount = doWriteBytes(buf);
if (localFlushedAmount > 0) {
// 更新字节数据的发送进度
in.progress(localFlushedAmount);
if (!buf.isReadable()) {
//若可读字节数为0,则从缓存区中移除
in.remove();
}
return 1;
}
} else if (msg instanceof FileRegion) {
//如果是文件FileRegion消息
FileRegion region = (FileRegion) msg;
if (region.transferred() >= region.count()) {
in.remove();
return 0;
}
//实际写操作
long localFlushedAmount = doWriteFileRegion(region);
if (localFlushedAmount > 0) {
// 更新数据的发送速度
in.progress(localFlushedAmount);
if (region.transferred() >= region.count()) {
//则从缓存中移除
in.remove();
}
return 1;
}
} else {
//不支持发送其他类型数据
// Should not reach here.
throw new Error();
}
return WRITE_STATUS_SNDBUF_FULL;
}

//io.netty.channel.nio.AbstractNioByteChannel#incompleteWrite
protected final void incompleteWrite(boolean setOpWrite) {
// Did not write completely.
if (setOpWrite) {
// 将OP_WRITE写操作事件添加到Channel的选择Key兴趣事件集中
setOpWrite();
} else {
// It is possible that we have set the write OP, woken up by NIO because the socket is writable, and then
// use our write quantum. In this case we no longer want to set the write OP because the socket is still
// writable (as far as we know). We will find out next time we attempt to write if the socket is writable
// and set the write OP if necessary.
//清楚Channel选择Key兴趣事件集中的OP_WRITE写操作事件
clearOpWrite();
// 将写操作任务添加到EventLoop线程上,以便后续继续发送
// Schedule flush again later so other tasks can be picked up in the meantime
eventLoop().execute(flushTask);
}
}

NioByteUnsafe的read()方法的实现思路大致分为以下3步:

  1. 获取Channel的配置对象、内存分配器RecvByteBufAllocator.Handler

  2. 进入for循环。循环体本身的作用:使用内存分配器获取数据容器ByteBuf,调用doReadBytes()方法将数据读取到容器中,如果本次循环没有读到数据或者数据链路已经关闭,则跳出循环。另外,当循环次数达到属性METADATAdefaultMaxMessagePerRead次数(默认为16次)时,也会跳出循环。由于TCP传输会产生粘包问题,因此每次读取都会触发channelRead事件,进而调用业务逻辑处理Handler。

  3. 跳出循环后,表示本次读取已经完成。调用allocHandlerreadComplete()方法,并读取记录,用于下次分配合理内存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
//io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read
@Override
public final void read() {
//获取pipeline通道配置、channel管道
final ChannelConfig config = config();
//socketChannel已经关闭
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
// 获取内容分配器,默认为PooledByteBufAllocator
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
//清空上一次读取的字节数,每次读取时均重新计算
//字节Buf分配器,并计算字节buf分配器 Handler
allocHandle.reset(config);

ByteBuf byteBuf = null;
boolean close = false;
try {
do {
// 分配内存
byteBuf = allocHandle.allocate(allocator);
// 读取通道接收缓冲区的数据
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read. release the buffer.
// 若没有数据可以读,则释放内存
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
//当读到-1时,表示Channel通道已经关闭,没有必要再继续读。
// There is nothing left to read as we received an EOF.
readPending = false;
}
break;
}
// 更新读取消息计数器
allocHandle.incMessagesRead(1);
readPending = false;
//通知通道处理器处理数据,触发Channel通道的fireChannelRead事件
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
// 读取操作完毕
allocHandle.readComplete();
pipeline.fireChannelReadComplete();

if (close) {
// 如果Socket通道关闭,则关闭读操作
closeOnRead(pipeline);
}
} catch (Throwable t) {
// 处理读异常
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
// 若读操作完毕,且没有配置自动读,则选择Key兴趣集中移除读操作事件。
removeReadOp();
}
}
}

AbstractNioMessageChannel 源码剖析

AbstractNioMessageChannel写入和读取的数据类型是Object,而不是字节流**。**在读数据时,AbstractNioMessageChannel数据不存在粘包问题,因此AbstractNioMessageChannnel在read()方法中先循环数据包,再触发channelRead事件。在写数据的时,AbstractNioMessageChannel数据逻辑简单。它把缓存outboundBuffer中的数据包依此写入Channel中。如果Channel中写满了,或循环写、默认写的次数为子类Channel属性METADATA中的defaultMaxMessagesPerRead次数,则在Channel的SelectionKey上设置OP_WRITE事件,随后推出,其后OP_WRITE事件处理逻辑和Byte字节流写逻辑一样。read()和doWrite()方法代码解读如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
//io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read
@Override
public void read() {
assert eventLoop().inEventLoop();
// 获取Channel的配置对象
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
// 获取计算内存分配Handler
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
// 清空上次的记录
allocHandle.reset(config);

boolean closed = false;
Throwable exception = null;
try {
try {
do {
// 调用子类的doReadMessages()方法
// 读取数据包,并放入readBuf链表中
// 当成功读取时返回 1
int localRead = doReadMessages(readBuf);
// 已无数据,跳出循环
if (localRead == 0) {
break;
}
// 链路关闭,跳出循环
if (localRead < 0) {
closed = true;
break;
}
// 记录成功读取的次数
allocHandle.incMessagesRead(localRead);
// 默认不能超过16次
} while (continueReading(allocHandle));
} catch (Throwable t) {
exception = t;
}

int size = readBuf.size();
// 循环处理读取的数据包
for (int i = 0; i < size; i ++) {
readPending = false;
// 触发ChannelRead事件
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
//记录当前读取记录,以便下次分配合理内存
allocHandle.readComplete();
// 触发 readComplete 事件
pipeline.fireChannelReadComplete();

if (exception != null) {
// 处理异常情况下关闭Channel
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}

if (closed) {
inputShutdown = true;
// 处理Channel正常关闭
if (isOpen()) {
close(voidPromise());
}
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
//读操作完毕,且没有配置自动读
if (!readPending && !config.isAutoRead()) {
// 移除读操作事件
removeReadOp();
}
}
}

//io.netty.channel.nio.AbstractNioMessageChannel#doWrite
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
final SelectionKey key = selectionKey();
// 获取Key的兴趣集
final int interestOps = key.interestOps();

int maxMessagesPerWrite = maxMessagesPerWrite();
// 当前校验最大写的次数是否大于0
while (maxMessagesPerWrite > 0)
Object msg = in.current();
if (msg == null) {
//如果当前数据没有数据需要发送,直接跳出循环
break;
}
try {
boolean done = false;
//获取配置中,循环写的最大次数
for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) {
// 调用子方法进行循环写操作,成功返回true
if (doWriteMessage(msg, in)) {
done = true;
break;
}
}
// 若发送成功,则将其从缓存链表中移除,继续发送循环获取下一个数据
if (done) {
maxMessagesPerWrite--;
in.remove();
} else {
break;
}
} catch (Exception e) {
// 判断如果遇到异常是否要继续写
if (continueOnWriteError()) {
maxMessagesPerWrite--;
in.remove(e);
} else {
throw e;
}
}
}
if (in.isEmpty()) {
// Wrote all messages.
// 数据已全部发送发送完成,从兴趣集中移除 OP_WRITE 事件
if ((interestOps & SelectionKey.OP_WRITE) != 0) {
key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
}
} else {
// Did not write all messages.
// 如果数据还没写完,将OP_WRITE加入到兴趣集中
if ((interestOps & SelectionKey.OP_WRITE) == 0) {
key.interestOps(interestOps | SelectionKey.OP_WRITE);
}
}
}

NioSocketChannel 源码剖析

前面分析Channel都是抽象类,NioSocketChannel是AbstractNioByteChannel的子类,也是io.netty.channel.socket.SocketChannel的实现类。Netty服务的每个连接都会生成一个NioSocketChannel对象。NioSocketChannel在AbstractNioByteChannel的基础上封装了NIO中的SocketChannel,实现了IO的读写连接操作,其核心功能如下。

  • SocketChannel在NioSocketChannel构造方法中由SelectorProvider.provider().openSocketChannel()创建,提供javaChannel()方法以获取SocketChannel。

  • 实现doReadByte()方法,从SocketChannel中读取数据。

  • 重写doWrite()方法、实现doWriteBytes()方法,将数据写入Socket中。

  • 实现doConnect()方法和客户端连接。

下图为NioSocketChannel 的核心功能脑图,注明了这些功能会在哪些地方会被调用,从图中可以看出,大部分方法都是被其辅助对象Unsafe调用。

I/O的读写的核心代码解读如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
//io.netty.channel.socket.nio.NioSocketChannel#doReadBytes
@Override
protected int doReadBytes(ByteBuf byteBuf) throws Exception {
// 获取计算内存分配器Handle
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
// 设置尝试读取字节数为buf的可写字节数
allocHandle.attemptedBytesRead(byteBuf.writableBytes());
// 从Channel中读取字节数并写入buf中,返回读取的字节数
return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}

//io.netty.channel.socket.nio.NioSocketChannel#doWriteBytes
@Override
protected int doWriteBytes(ByteBuf buf) throws Exception {
// 获取buf的可读字节数
final int expectedWrittenBytes = buf.readableBytes();
// 把buf写入Socket缓存中,返回写入的字节数
return buf.readBytes(javaChannel(), expectedWrittenBytes);
}

//io.netty.channel.socket.nio.NioSocketChannel#doWrite
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
//获取SocketChannel
SocketChannel ch = javaChannel();
// 获取配置属性 writeSpinCount(循环写入的最大次数)
int writeSpinCount = config().getWriteSpinCount();
do {
// 缓存中数据为空,无数据可写
if (in.isEmpty()) {
// All written so clear OP_WRITE
// 清除写事件并直接返回
clearOpWrite();
// Directly return here so incompleteWrite(...) is not called.
return;
}
// 获取一次最大可写字节数
// Ensure the pending writes are made of ByteBufs only.
int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
/**
* 缓存由多个Entry组成,每次写时都可能写多个entry
* 具体一次性该发送多少数据
* 由ByteBuffer数组的最大长度和一次最大可写字节数决定
*/
ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
int nioBufferCnt = in.nioBufferCount();

// Always use nioBuffers() to workaround data-corruption.
// See https://github.com/netty/netty/issues/2761
switch (nioBufferCnt) {
case 0:
// 非ByteBuffer 数据,交给父类实现
// We have something else beside ByteBuffers to write so fallback to normal writes.
writeSpinCount -= doWrite0(in);
break;
case 1: {
// Only one ByteBuf so use non-gathering write
// Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
// to check if the total size of all the buffers is non-zero.
ByteBuffer buffer = nioBuffers[0];
// buf 可读的字节数
int attemptedBytes = buffer.remaining();
// 将buf发送到Socket缓存中
final int localWrittenBytes = ch.write(buffer);
// 如果发送失败
if (localWrittenBytes <= 0) {
// 将写事件添加到事件兴趣集中
incompleteWrite(true);
return;
}
/**
* 根据成功写入字节数和尝试写入字节数调整下次最大可写字节数
* 当两者相等时,若写入字节数 * 2 大于当前最大可写入字节数
* 则下次最大可写入字节数等于尝试写入字节数 * 2
* 并且当写入最大字节数 > 4096时,下次最大可写入字节数等于尝试写入字节数/2
*/
adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
// 从缓存中移除写入字节数
in.removeBytes(localWrittenBytes);
// 循环写入次数 -1
--writeSpinCount;
break;
}
default: {
// Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
// to check if the total size of all the buffers is non-zero.
// We limit the max amount to int above so cast is safe
long attemptedBytes = in.nioBufferSize();
// 这里和上面的不同点是这里使用多通道写,而上面使用的单个buffer写
final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
if (localWrittenBytes <= 0) {
incompleteWrite(true);
return;
}
// Casting to int is safe because we limit the total amount of data in the nioBuffers to int above.
adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
maxBytesPerGatheringWrite);
in.removeBytes(localWrittenBytes);
--writeSpinCount;
break;
}
}
} while (writeSpinCount > 0);
/**
* 未全发送完;
* 若 writeSpinCount < 0 说明Socket缓冲区已经满了,未发送成功。
* 若 writeSpinCount = 0 说明Netty数据量太大了16次循环未写完
*/
incompleteWrite(writeSpinCount < 0);
}

NioServerSocketChannel 源码剖析

NioServerSocektChannel是AbstractNioMessageChannel的子类,由于NioServerSocektChannel由服务端使用并且只负责监听Socket的接入,不关心IO读写,所以与NioServerChannel相比要简单很多,它封装了NIO中的ServerSocketChannel,并通过newSocket()方法打开ServerSocketChannel。它的多路复用器注册与NioSocketChannel的多路复用注册一样,由父类AbstractNioChannel实现。下面重点关注它是如何监听新加入的连接的(需要由doReadMessages()方法来完成)。具体代码解析如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
// 调用serverSocketChannel.accept()监听新加入的连接
SocketChannel ch = SocketUtils.accept(javaChannel());

try {
if (ch != null) {
//每一个新连接都会构建一个NioSocketChannel
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
// 如果连接出现异常则关闭
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}

鸟瞰Channel

我们梳理部分Channel的一些重要的实现类和一些重要的方法,我们很容易发现,Channel的主要方法都是围绕网络IO的读写,客户端发起连接、关闭连接等功能。AbstractChannel通过模版方法实现做一些基础的实现,子类通过继承的方式完成具体的实现,提高代码的复用率。

这几个实现类的特点:

  • AbstractChannel :基础父类,没有直接参与网络直接有关操作,但是提供了部分模版方法。并且提供了Unsafe内部类。

  • AbstractNioChannel:每种IO类型都会在AbstractChannel上在进行一层封装,针对每种IO类型的特点进行封装,例如AbstractNioChannel中提供了三个重要属性:SelectableChannelNIO真正用到的ChannelreadInterestOp监听感兴趣的事件selectionKey注册到Selector后获取的Key。

  • AbstractNioByteChannel:它是AbstractNioChannel的子类,但是并不是最终的实现类。它具备NIO注册、连接等功能。IO读写会交给子类来处理,在NioByteChannel中发送和读写的对象类型是ByteBuf和FileRegion。

  • NioSocketChannel:AbstractNioByteChannel的子类,也是我们最常使用到的Channel实现类。Netty服务的每次连接都会生成一个NioSocketChannel对象。并在AbstractNioByteChannel的基础上实现了IO的读写连接功能。

  • AbstractNioMessageChannel:同样也是AbstractNioChannel的子类,和AbstractNioByteChannel类似,但是它读写的对象是Object对象。因此它不会存在粘包的问题。

  • NioServerSocketChannel:AbstractNioMessageChannel的子类,是一个仅仅只处理新连接的Channel,在doReadMessages的实现类中,也是关于新连接的处理。创建连接、读写方法都会进行异常处理。

其中有一个特殊的点抽象类中具体的连接与读写都是使用Unsafe实现的。同样的子类中也会对Unsafe接口继承或是实现。Unsafe表示不对外提供功能,并非不安全。

总结

这一小节我们简单梳理了Netty的Channel,我们不难发现,Channel完成了对NIO的Channel的实现与拓展。我们从AbstractChannel开始进行梳理,AbstractChannel 实现了Channel接口,提供了一些类的基础模版实现和一些Channel中的基础字段,例如标识Channel唯一的ChannelId、实现具体操作连接,数据读写的Unsafe、Channel容器Pipeline等。随后向下梳理了AbstractNioChannel的源码分析,AbstractNioChannel也是一个抽象类。这个抽象类是NioChannel的抽象类,这个类继承拓展了AbstractChannel。这个类正对NioChannel进行了拓展和补充。随后我们依据读取数据类型的不同,分析梳理了AbstractNioByteChannel 和AbstractNioMessageChannel。前者使用byteBuf读写的是二进制数据,因此存在粘包问题,后者读写Object数据,不存在粘包问题。这一层我们分析了他们读写方法,他们的读写方法在大体结构上是相似的,但是在读取具体数据的实现细节上又有不同。在往下一层就是我们最常使用到的NioSocketChannel 和NioServerSocketChannel,其中NioSocketChannel中对读写连接操作都有具体的实现,NioServerSocketChannel中则是抛出UnsupportedOperationException(); NioServerSocketChannel则会用来监听Socket接入的Accept连接操作。 因此我们在日常Netty开发中,会使用NioServerSocketChannel通常配置处理连接的bossGroup使用,而NioSocketChannel则会在配置处理读写的workerGroup时使用。

学习资料