//io.netty.channel.AbstractChannel.AbstractUnsafe#bind @Override publicfinalvoidbind(final SocketAddress localAddress, final ChannelPromise promise){ assertEventLoop();
if (!promise.setUncancellable() || !ensureOpen(promise)) { return; }
// See: https://github.com/netty/netty/issues/576 if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) && localAddress instanceof InetSocketAddress && !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() && !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) { // Warn a user about the fact that a non-root user can't receive a // broadcast packet on *nix if the socket is bound on non-wildcard address. logger.warn( "A non-root user can't receive a broadcast packet if the socket " + "is not bound to a wildcard address; binding to a non-wildcard " + "address (" + localAddress + ") anyway as requested."); }
//io.netty.channel.nio.AbstractNioChannel#doRegister //doRegister()方法在AbstractUnsafe的io.netty.channel.AbstractChannel.AbstractUnsafe#register0被调用 @Override protectedvoiddoRegister()throws Exception { boolean selected = false; for (;;) { try { //通过javaChannel()方法获取具体的Nio Channel,把Channel注册到其EventLoop线程的Selector上, // 注册返回后的 selectionKey,为其设置channel感兴趣的事件。 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. //调用select方法来清空selectionKey的缓存 eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } }
// io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#finishConnect @Override publicfinalvoidfinishConnect(){ // Note this method is invoked by the event loop only if the connection attempt was // neither cancelled nor timed out. // 只有EventLoop线程才能调用finishConnect()方法 // 此方法在NioEventLoop的processSelectedKey()方法中调用 asserteventLoop().inEventLoop();
try { boolean wasActive = isActive(); // 判断连接结果(由其子类完成) // 通过SocketChannel的finishConnect()方法判断连接结果 // 连接成功返回true // 连接失败抛出异常 // 链路被关闭,链路中断等异常也属于连接失败。 doFinishConnect(); // 负责将SocketChannel修改为监听读操作位 fulfillConnectPromise(connectPromise, wasActive); } catch (Throwable t) { // 连接失败,关闭连接句柄,释放资源,发起取消注册操作。 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress)); } finally { // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used // See https://github.com/netty/netty/issues/1770 if (connectTimeoutFuture != null) { connectTimeoutFuture.cancel(false); } connectPromise = null; } }
//io.netty.channel.nio.AbstractNioByteChannel#doWrite @Override protectedvoiddoWrite(ChannelOutboundBuffer in)throws Exception { //写请求自循环次数,默认为16次 int writeSpinCount = config().getWriteSpinCount(); do { //获取当前Channel的缓存ChannelOutboundBuffer中的当前待刷新消息 Object msg = in.current(); //所有消息都发送成功了 if (msg == null) { //清空 Channel 选择Key兴趣事件集中的OP_WRITE写事件 // Wrote all messages. clearOpWrite(); // Directly return here so incompleteWrite(...) is not called. //直接返回,没必要再添加写任务 return; } //发送数据 writeSpinCount -= doWriteInternal(in, msg); } while (writeSpinCount > 0); // 当因缓冲区满了而发送失败时 doWriteInternal 返回 Integer.MAX_VALUE // 此时 writeSpinCount < 0 为 true // 当发送16次还未发送完,但每次都写成功时,writeSpinCount为0 incompleteWrite(writeSpinCount < 0); }
//io.netty.channel.nio.AbstractNioByteChannel#doWriteInternal privateintdoWriteInternal(ChannelOutboundBuffer in, Object msg)throws Exception { if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; //若可读字节数为0,则从缓存区中移除 if (!buf.isReadable()) { in.remove(); return0; } // 实际发送字节数据 finalint localFlushedAmount = doWriteBytes(buf); if (localFlushedAmount > 0) { // 更新字节数据的发送进度 in.progress(localFlushedAmount); if (!buf.isReadable()) { //若可读字节数为0,则从缓存区中移除 in.remove(); } return1; } } elseif (msg instanceof FileRegion) { //如果是文件FileRegion消息 FileRegion region = (FileRegion) msg; if (region.transferred() >= region.count()) { in.remove(); return0; } //实际写操作 long localFlushedAmount = doWriteFileRegion(region); if (localFlushedAmount > 0) { // 更新数据的发送速度 in.progress(localFlushedAmount); if (region.transferred() >= region.count()) { //则从缓存中移除 in.remove(); } return1; } } else { //不支持发送其他类型数据 // Should not reach here. thrownew Error(); } return WRITE_STATUS_SNDBUF_FULL; }
//io.netty.channel.nio.AbstractNioByteChannel#incompleteWrite protectedfinalvoidincompleteWrite(boolean setOpWrite){ // Did not write completely. if (setOpWrite) { // 将OP_WRITE写操作事件添加到Channel的选择Key兴趣事件集中 setOpWrite(); } else { // It is possible that we have set the write OP, woken up by NIO because the socket is writable, and then // use our write quantum. In this case we no longer want to set the write OP because the socket is still // writable (as far as we know). We will find out next time we attempt to write if the socket is writable // and set the write OP if necessary. //清楚Channel选择Key兴趣事件集中的OP_WRITE写操作事件 clearOpWrite(); // 将写操作任务添加到EventLoop线程上,以便后续继续发送 // Schedule flush again later so other tasks can be picked up in the meantime eventLoop().execute(flushTask); } }
//io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read @Override publicfinalvoidread(){ //获取pipeline通道配置、channel管道 final ChannelConfig config = config(); //socketChannel已经关闭 if (shouldBreakReadReady(config)) { clearReadPending(); return; } final ChannelPipeline pipeline = pipeline(); // 获取内容分配器,默认为PooledByteBufAllocator final ByteBufAllocator allocator = config.getAllocator(); final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); //清空上一次读取的字节数,每次读取时均重新计算 //字节Buf分配器,并计算字节buf分配器 Handler allocHandle.reset(config);
ByteBuf byteBuf = null; boolean close = false; try { do { // 分配内存 byteBuf = allocHandle.allocate(allocator); // 读取通道接收缓冲区的数据 allocHandle.lastBytesRead(doReadBytes(byteBuf)); if (allocHandle.lastBytesRead() <= 0) { // nothing was read. release the buffer. // 若没有数据可以读,则释放内存 byteBuf.release(); byteBuf = null; close = allocHandle.lastBytesRead() < 0; if (close) { //当读到-1时,表示Channel通道已经关闭,没有必要再继续读。 // There is nothing left to read as we received an EOF. readPending = false; } break; } // 更新读取消息计数器 allocHandle.incMessagesRead(1); readPending = false; //通知通道处理器处理数据,触发Channel通道的fireChannelRead事件 pipeline.fireChannelRead(byteBuf); byteBuf = null; } while (allocHandle.continueReading()); // 读取操作完毕 allocHandle.readComplete(); pipeline.fireChannelReadComplete();
if (close) { // 如果Socket通道关闭,则关闭读操作 closeOnRead(pipeline); } } catch (Throwable t) { // 处理读异常 handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { // Check if there is a readPending which was not processed yet. // This could be for two reasons: // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com/netty/netty/issues/2254 if (!readPending && !config.isAutoRead()) { // 若读操作完毕,且没有配置自动读,则选择Key兴趣集中移除读操作事件。 removeReadOp(); } } }
if (closed) { inputShutdown = true; // 处理Channel正常关闭 if (isOpen()) { close(voidPromise()); } } } finally { // Check if there is a readPending which was not processed yet. // This could be for two reasons: // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com/netty/netty/issues/2254 //读操作完毕,且没有配置自动读 if (!readPending && !config.isAutoRead()) { // 移除读操作事件 removeReadOp(); } } }
//io.netty.channel.socket.nio.NioSocketChannel#doWrite @Override protectedvoiddoWrite(ChannelOutboundBuffer in)throws Exception { //获取SocketChannel SocketChannel ch = javaChannel(); // 获取配置属性 writeSpinCount(循环写入的最大次数) int writeSpinCount = config().getWriteSpinCount(); do { // 缓存中数据为空,无数据可写 if (in.isEmpty()) { // All written so clear OP_WRITE // 清除写事件并直接返回 clearOpWrite(); // Directly return here so incompleteWrite(...) is not called. return; } // 获取一次最大可写字节数 // Ensure the pending writes are made of ByteBufs only. int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite(); /** * 缓存由多个Entry组成,每次写时都可能写多个entry * 具体一次性该发送多少数据 * 由ByteBuffer数组的最大长度和一次最大可写字节数决定 */ ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite); int nioBufferCnt = in.nioBufferCount();
// Always use nioBuffers() to workaround data-corruption. // See https://github.com/netty/netty/issues/2761 switch (nioBufferCnt) { case0: // 非ByteBuffer 数据,交给父类实现 // We have something else beside ByteBuffers to write so fallback to normal writes. writeSpinCount -= doWrite0(in); break; case1: { // Only one ByteBuf so use non-gathering write // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need // to check if the total size of all the buffers is non-zero. ByteBuffer buffer = nioBuffers[0]; // buf 可读的字节数 int attemptedBytes = buffer.remaining(); // 将buf发送到Socket缓存中 finalint localWrittenBytes = ch.write(buffer); // 如果发送失败 if (localWrittenBytes <= 0) { // 将写事件添加到事件兴趣集中 incompleteWrite(true); return; } /** * 根据成功写入字节数和尝试写入字节数调整下次最大可写字节数 * 当两者相等时,若写入字节数 * 2 大于当前最大可写入字节数 * 则下次最大可写入字节数等于尝试写入字节数 * 2 * 并且当写入最大字节数 > 4096时,下次最大可写入字节数等于尝试写入字节数/2 */ adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite); // 从缓存中移除写入字节数 in.removeBytes(localWrittenBytes); // 循环写入次数 -1 --writeSpinCount; break; } default: { // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need // to check if the total size of all the buffers is non-zero. // We limit the max amount to int above so cast is safe long attemptedBytes = in.nioBufferSize(); // 这里和上面的不同点是这里使用多通道写,而上面使用的单个buffer写 finallong localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt); if (localWrittenBytes <= 0) { incompleteWrite(true); return; } // Casting to int is safe because we limit the total amount of data in the nioBuffers to int above. adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes, maxBytesPerGatheringWrite); in.removeBytes(localWrittenBytes); --writeSpinCount; break; } } } while (writeSpinCount > 0); /** * 未全发送完; * 若 writeSpinCount < 0 说明Socket缓冲区已经满了,未发送成功。 * 若 writeSpinCount = 0 说明Netty数据量太大了16次循环未写完 */ incompleteWrite(writeSpinCount < 0); }