Netty实战 — 基础Demo与API网关实战

前言

上一小节我们总结了Reactor模型,Netty基于主从Reactor模型设计了自己的模型结构,使得Netty在不同的场景下都有非常亮眼的性能和可用性表现。进入到这一小节,我又想到了那句话”Talk is cheap, show me code”。虽然上一小节我们简单写了一个“快速开始”,但是这对于要达到快速使用Netty进行网络开发的目的还远远不够。在一小节中我们会通过一系列的Demo示例熟悉Netty的开发,从代码中感受他们的特性。最后我们会通过一个简单的聊天加深我们对Netty各个核心组件的理解。最后我们动手设计实践一个API网关,认识网关的功能和结构,同时也进入提升我们使用Netty进行网络开发的熟练度,同时为后续深入源码打下基础。这一小节代码比较多,不要害怕耐心看完并付诸实践,一定会收获满满。

Demo示例

在这一部分中,我们使用Netty编写几个我们常见的服务类型,其中包括HttpServer和WebSocket服务。我们还会一起编写Netty的idel检测的Demo,它常用于进行服务间心跳包的开发,还会简单实践我们上一小节说到的inboundHandleroutboundHandler,感受他们在代码中的特性。为后续的使用Netty进行API网关的开发实战基础。

Http服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/**
* Created by Daiwei on 2021/9/25
*/
public class NettyHttpServer {

public static void main(String[] args) {
int port = 8001;
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 4096)
.option(ChannelOption.SO_RCVBUF, 32 * 1024)
.childOption(ChannelOption.SO_KEEPALIVE, true)
// .handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(1024 * 1024))
.addLast(new HttpHandler());
}
});
ChannelFuture future = bootstrap.bind(port).sync();
System.out.println("server started and listening port "+ port);
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

/**
* Http的ChannelHandler
*/
class HttpHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
FullHttpRequest request = (FullHttpRequest) msg;
// System.out.println(request.uri());
sndHttpResp(ctx.channel(), "{\"status\": 1, \"message\":\"hello netty\"}", request);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.channel().close();
}

/**
* 返回客户端响应
* @param channel 当前连接channel
* @param msg 响应消息
* @param request http请求
*/
private void sndHttpResp(Channel channel, String msg, FullHttpRequest request) {
DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(msg.getBytes(StandardCharsets.UTF_8)));
response.headers().set(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON);
response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
if (!HttpUtil.isKeepAlive(request)) {
channel.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
} else {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
channel.writeAndFlush(response);
}
}
}

这是一个用Netty实现的HttpServer的实例。在这个例子里,我们可以看到代码分成了两个部分,上面部分是Netty的Server,它的大体的结构和之前的结构都是一致的。在Netty服务的部分,我们要着重关注ChildHandler。在上一小节我们知道ChildHandler是最后执行业务逻辑的地方,对于我们HttpServer来说,它和其他的TCPServer并没有什么本质的区别,应用层以下都是一样的。只有在编解码的时候要注意Http协议的格式,所以这里我们看到Netty通过HttpServerCodecHttpObjectAggregator 的方式对HttpServer的适配。所以从这里我们不难可以拓展思考,我们如果需要自定义协议的话是否可以从这里下手呢?下面的部分是HttpHandler的实现,也是我们处理业务逻辑的地方,这里我们简单的给前端重新返回一个HttpResponse就可以了。

在这个demo中在每次启动完成服务后的第一次调用都会出现一个内存泄漏的异常

io.netty.util.ResourceLeakDetector reportTracedLeak

严重: LEAK: ByteBuf.release() was not called before it’s garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.

服务一切正常,并且这里的编解码其也都是使用Netty提供的,所以这里为什么会出现这样问题,恐怕只有深入源码才能探其究竟吧🙈

对比我们使用Socket编写的简单的Http服务,Netty的性能有非常明显的提升。以下是使用wrk压测的结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
-- socket 
daiwei@daiweideMacBook-Pro ~ % wrk -t10 -c10 -d5s --latency http://127.0.0.1:8081
Running 5s test @ http://127.0.0.1:8081
10 threads and 10 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 5.05ms 23.84ms 225.27ms 95.38%
Req/Sec 2.43k 719.68 6.96k 87.96%
Latency Distribution
50% 171.00us
75% 194.00us
90% 266.00us
99% 147.61ms
119295 requests in 5.06s, 10.69MB read
Socket errors: connect 0, read 118887, write 404, timeout 0
Requests/sec: 23560.89
Transfer/sec: 2.11MB

-- Netty
daiwei@daiweideMacBook-Pro ~ % wrk -t10 -c10 -d5s --latency http://127.0.0.1:8001
Running 5s test @ http://127.0.0.1:8001
10 threads and 10 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 1.49ms 10.13ms 124.27ms 97.88%
Req/Sec 11.07k 4.14k 16.54k 73.95%
Latency Distribution
50% 71.00us
75% 95.00us
90% 228.00us
99% 63.43ms
550359 requests in 5.10s, 69.81MB read
Requests/sec: 107857.46
Transfer/sec: 13.68MB

WebSocket服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
* Created by Daiwei on 2021/9/25
*/
public class NettyWebsocketServer {

public static void main(String[] args) {
int port = 8002;
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(1024 * 1024))
.addLast(new WebSocketServerProtocolHandler("/websocket"))
.addLast(new MyWebSocketHandler());
}
});
ChannelFuture channelFuture = bootstrap.bind(port).sync();
System.out.println("netty websocket server started and listening at port " + port);
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

/**
* 业务处理的ChannelHandler
*/
class MyWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
System.out.println("receive message ["+ msg.text() +"] from web ");
ctx.channel().writeAndFlush(new TextWebSocketFrame("[" + DateFormatUtil.nowStr() + "]: 服务端收到消息[" + msg.text() + "]"));

}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.channel().close();
}
}

上面这段代码是Netty实现WebSocket的服务端部分,整体结构上和我们前面构建的HttpServer是一致的,不同的地方在于ChildChannel中的handler是不一样的。相较于HttpServer,WebSocket 多了个处理器WebSocketServerProtocolHandler,这个handler从名字就能看出来是一个用于WebSocket协议的handler,构造这个handler需要传入一个字符串,就是我们Socket服务所监听的地址,最后加上我们自己的业务处理器。我们实现的业务处理handler和前面的HttpHandler有一点点不同,这次我们继承的是SimpleChannelInboundHandler,这个类也是channelInboundHandlerAdapter的子类,这个子类特殊的地方在于提供了一个泛型,即为read方法中的message的类型。如果在read方法中进行类型转换也是一样的。当然Websocket光有服务端是远远不够的,我们还需要页面的配合使用。以下是页面的HTML代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<script>
var socket;
if (window.WebSocket) {
socket = new WebSocket("ws://localhost:8002/websocket");
socket.onmessage = function (ev) {
var rt = document.getElementById('respText');
rt.value = rt.value + "\n" + ev.data;
}

socket.onopen = function (ev) {
var rt = document.getElementById('respText');
rt.value = "连接开启了";
}

socket.onclose = function (ev) {
var rt = document.getElementById('respText');
rt.value = "连接关闭了";
}
} else {
alter("当浏览器不支持 webSocket");
}

function send(msg) {
if (!window.socket) {
return;
}
if (socket.readyState == WebSocket.OPEN) {
socket.send(msg);
}
}
</script>
<body>
<form>
<textarea name="message" style="height: 300px; width: 300px;"></textarea>
<input type="button" value="发生消息" onclick="send(this.form.message.value)"/>
<textarea id="respText" style="height: 300px; width: 300px;"></textarea>
<input type="button" value="清空内容" onclick="document.getElementById('respText').value = ''"/>
</form>
</body>
</html>

启动后端服务并使用浏览器打开上面的HTML代码,我们可以得到如下的页面,当我们看到连接开启即表明当前websocket连接已经建立。

在左侧的文本框中输入文字并点击发送数据,我们的Netty服务端即可收到消息,并且添加一些内容将消息返回给前端,同时在后端输出结构。

1
2
3
console 输出:
netty websocket server started and listening at port 8002
receive message [不积跬步,无以至千里。] from web

Idel(空闲)检测

在网络连接中处于idel(空闲)状态是非常常见的,但是我们经常需要依据idel状态进行一些简单的开发,例如分布式服务中的心跳探活。服务端和客户端在一定时间没有进行任何的读写请求,就可以认为两个服务之间的连接是空闲的。使用Netty进行idel(空闲)检测是非常方便的。看下面的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
/**
* Created by Daiwei on 2021/9/21
*/
public class NettyServer {

public static void main(String[] args) {
int port = 8801;
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 512)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new IdleStateHandler(2, 5, 8, TimeUnit.SECONDS))
.addLast(new HelloHandler());
}
});
ChannelFuture future = bootstrap.bind(port).sync();
System.out.println("服务启动完成, 监听端口 http://127.0.0.1:" + port);
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}

/**
* helloHandler 处理器
*/
class HelloHandler extends ChannelInboundHandlerAdapter {

/**
* 从channel中读数据。执行一些业务操作,或者加入一些hook触发后续业务。
* 同时可以通过channel发送数据
* @param ctx 上下文
* @param msg 从channel中读到的数据
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("message["+ byteBuf.toString(StandardCharsets.UTF_8) +"] received!and " +
"message is from [" + ctx.channel().remoteAddress().toString() + "]");
ByteBuf toClientMsg = Unpooled.copiedBuffer( "hello!".getBytes(StandardCharsets.UTF_8));
ctx.channel().writeAndFlush(toClientMsg);
}

/**
* 事件触发方法
* @param ctx 上下文
* @param evt 事件
* @throws Exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
IdleState state = event.state();
System.out.println(state.toString());
if (IdleState.ALL_IDLE.equals(state)) {
System.out.println("all idle and close channel");
ctx.channel().close();
}
}
}

/**
* 在处理过程中捕捉到了异常执行这个方法,输出异常,关闭资源
* @param ctx 上下文
* @param cause 异常信息
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
cause.printStackTrace();
super.exceptionCaught(ctx, cause);
}
}

这个例子是Netty入门那小节快速开始的例子改过来的,这里修改了两个地方。第一个地方是在childHandler 中添加了new IdleStateHandler(2, 5, 8, TimeUnit.SECONDS),这是连接空闲检测的Handler,其中入参2表示当读空闲超过2秒触发一次读空闲的idel事件,5表示写空闲超过5秒触发一次写空闲的idel事件,8表示读和写同时空闲超过8秒触发一次读写空闲事件。事件触发方法写位于IdleStateHandler后面的HelloHandler的中,即上面这段代码的userEventTriggered方法中。在这段代码中,如果是IdelStatEvent(连接空闲事件)输出他们的事件类型,并最后判断如果是读写空闲则关闭空闲连接。

1
2
3
4
5
6
7
8
9
10
console 输出~

服务启动完成, 监听端口 http://127.0.0.1:8801
message[hello server!] received!and message is from [/127.0.0.1:49529]
READER_IDLE
READER_IDLE
WRITER_IDLE
READER_IDLE
ALL_IDLE
all idle and close channel

自定义编解码

我们前面介绍了ChannelPipeline提供了一个容器,在这个容器内包含很多的ChannelHandler,这些ChannelHandler提供了一个API调用链。ChannelPipeline管理这沿着链入站和出站的事件流动。下面是Netty ChannelPipline的Doc文档说明,这个图很形象的描述了ChannelPipline的作用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
                                             I/O Request
via Channel or
ChannelHandlerContext
|
+---------------------------------------------------+---------------+
| ChannelPipeline | |
| \|/ |
| +---------------------+ +-----------+----------+ |
| | Inbound Handler N | | Outbound Handler 1 | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
| | \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler N-1 | | Outbound Handler 2 | |
| +----------+----------+ +-----------+----------+ |
| /|\ . |
| . . |
| ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
| [ method call] [method call] |
| . . |
| . \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler 2 | | Outbound Handler M-1 | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
| | \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler 1 | | Outbound Handler M | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
+---------------+-----------------------------------+---------------+
| \|/
+---------------+-----------------------------------+---------------+
| | | |
| [ Socket.read() ] [ Socket.write() ] |
| |
| Netty Internal I/O Threads (Transport Implementation) |
+-------------------------------------------------------------------+

入站事件自下而上的入站程序即inBoundHandler处理,出站事件则有出站程序outBoundHandler处理,在Netty中每个Channel都有且仅有一个ChannelPipeline与之对应。在实际开发中,我们的inBound和outBound就像是一个“大的切面包裹着”实际的业务处理过程。我们可以通过增加handler的方式增加功能,就像前面一个Demo我们把IdleStateHandler加入到ChannelPipeline中添加一个idel检测功能一样。当然这个包裹在实际业务前后的主要还是编解码操作,就像我们前面编写Http的服务端和WebSocket的服务端一样。他们都在ChannelPipleline中添加了HttpServerCodecWebSocketServerProtocolHandler来支持Http服务或WebSocket服务。同理,我们也可以编写我们自己的编解码器实现自定义协议的效果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
/**
* 编码器
* Created by Daiwei on 2021/4/12
*/
@Slf4j
public class NettyEncoder extends MessageToByteEncoder<Object> {

private final Class<?> clazz;

private final RpcSerializer serializer;

public NettyEncoder(Class<?> clazz, RpcSerializer serializer) {
this.clazz = clazz;
this.serializer = serializer;
}

// TODO: 2021/4/12 这里可以开发一个可拓展的协议
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
if (clazz.isInstance(msg)) {
try {
byte[] bytes = this.serializer.serialize(msg);
out.writeInt(bytes.length);
out.writeBytes(bytes);
} catch (Exception e) {
log.error("server catch exception!", e);
e.printStackTrace();
}
}
}
}


/**
* 解码器
* Created by Daiwei on 2021/4/12
*/
@Slf4j
public class NettyDecoder extends ByteToMessageDecoder {

private final Class<?> clazz;

private final RpcSerializer serializer;

public NettyDecoder(Class<?> clazz, RpcSerializer serializer) {
this.clazz = clazz;
this.serializer = serializer;
}

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.readableBytes() < 4) {
return;
}
in.markReaderIndex();
int len = in.readInt();
if (len < 0) {
ctx.close();
}
if (in.readableBytes() < len) {
in.resetReaderIndex();
return;
}
byte[] bytes = new byte[len];
in.readBytes(bytes);
try {
Object msg = serializer.deserialize(bytes, clazz);
out.add(msg);
} catch (Exception e) {
log.error("server catch exception!", e);
e.printStackTrace();
}
}
}

这是我之前写的两个编解码器,通过构造方法传入序列化实现实例,实现RCP的编解码逻辑。只要在两个服务之间加入我们自定义的编解码器,就可以实现我们自定义在数据传输过程中的序列化协议,提高数据传输的安全性和可靠性,这也就是我们自定义的协议。

我们自定义协议还是要解决一个问题就是TCP的粘包、半包和半粘包的问题。

什么是TC粘包问题

TCP沾包就是指发送方发送的若干包数据到达接收方时沾成了一包,从接收缓冲区来看,后一包数据的头会紧紧接着前一包数据的尾,出现沾包的原因是多方面的,可能来自发送方,也可能来自接收方。

造成TCP粘包的原因

  • 发送方原因,TCP默认使用Nagle算法,而Nagle算法主要做两件事:

    • 只有上一个分组得到确认,才会发送下一组。
    • 收集多个小分组,在一个确认到来时一起发送。

因此Nagle算法造成发送方可能会出现粘包问题。

  • 接收方原因,TCP接收的数据包时,并不会马上交到应用层进行处理,或者说是应用层不会立即处理。实际上,TCP将接收到的数据包保存在接收缓存里,然后应用程序主动从缓存读取收到的分组。这样,如果TCP接收数据到到缓存的速度,多个包就会被缓存,应用程序就有可能读取到多个首尾相接粘到一起的包。

如何处理粘包问题

  • 发送方,对于发送方造成的粘包问题,可以通过关闭Nagle算法来解决,使用TCP_NODELAY选项来关闭算法。
  • 接收方,接收方没有办法来处理粘包现象,只能交给应用层来处理。

应用层读取数据的时候,按照数据的长度一批批读取数据从而解决粘包问题。同时在发送数据的时候,也需要将数据的长度加入到传输的数据中,方便读取的时候切割数据包。Netty通过添加编解码器的方式来很好的解决这个问题。

⚠️添加编解码器的时候一定要注意要把它们加到处理逻辑前面,才能实现“业务处理器前后被包裹”的效果。

这里还有一个点简单的啰嗦一下,就是InBoundHandler和outBoundHandler,这些Handler都是ChildHandler,也都是ChannelPipeline里面的Handler。ChannelPipeline中提供了双向的Handler链头尾引用分别是headtail然后入站事件从head开始从前向后遍历执行inBoundHandler,而出站事件从tail开始从后向前遍历执行outBoundhandler。

因为两种类型的Handler是在一个调用链上,所以在遍历过程中需要判断是否是当前需要的执行的handler,Netty通过一下非常巧妙的方式实现。我一开始以为是通过一个字段标明,结果并不是,是通过每个handler实例中的executionMask的数值进行复杂的位运算来实现区分,真的很巧妙。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 一些常量用于计算
static final int MASK_ONLY_INBOUND = MASK_CHANNEL_REGISTERED |
MASK_CHANNEL_UNREGISTERED | MASK_CHANNEL_ACTIVE | MASK_CHANNEL_INACTIVE | MASK_CHANNEL_READ |
MASK_CHANNEL_READ_COMPLETE | MASK_USER_EVENT_TRIGGERED | MASK_CHANNEL_WRITABILITY_CHANGED;
private static final int MASK_ALL_INBOUND = MASK_EXCEPTION_CAUGHT | MASK_ONLY_INBOUND;
static final int MASK_ONLY_OUTBOUND = MASK_BIND | MASK_CONNECT | MASK_DISCONNECT |
MASK_CLOSE | MASK_DEREGISTER | MASK_READ | MASK_WRITE | MASK_FLUSH;
private static final int MASK_ALL_OUTBOUND = MASK_EXCEPTION_CAUGHT | MASK_ONLY_OUTBOUND;

// 循环找到当前需要执行的InboundHandler
private AbstractChannelHandlerContext findContextInbound(int mask) {
AbstractChannelHandlerContext ctx = this;
EventExecutor currentExecutor = executor();
do {
ctx = ctx.next;
} while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));
return ctx;
}

// 是否要跳过
private static boolean skipContext(
AbstractChannelHandlerContext ctx, EventExecutor currentExecutor, int mask, int onlyMask) {
// Ensure we correctly handle MASK_EXCEPTION_CAUGHT which is not included in the MASK_EXCEPTION_CAUGHT
return (ctx.executionMask & (onlyMask | mask)) == 0 ||
// We can only skip if the EventExecutor is the same as otherwise we need to ensure we offload
// everything to preserve ordering.
//
// See https://github.com/netty/netty/issues/10067
(ctx.executor() == currentExecutor && (ctx.executionMask & mask) == 0);
}

API网关实战

在前面的梳理的内容中我们介绍过网关,它是一个三层的转发设备,往往是一个路由器、三层交换机或是防火墙。它是一个网段的入口和出口,连接这多个网段,如果我们想要访问不在当前网段的服务,数据包就要通过网关的路由转发。那我们今天要介绍的这个网关和那个网关又有一些不一样。我们这个网关是和服务集群有关,在以前的单机时代,我们只需要在服务前面加一个Nginx做一个反向代理,将请求均匀的分配到几个相同的实例上即可。但是在为服务架构中,不同的服务之间服务调用已经变得非常复杂,如果直接把服务接口暴露给客户端势必会有各种各样的问题。因此我们提供一个特定的微服务,所有的请求通过这个服务再次分派转发给目标的微服务接口。而这样特定的微服务就是我们的API网关。API网关是微服务架构中的基础组件,位于接入层之下和业务服务层之上。

网关与微服务集群

如果我们将上面的介绍的网关应用到实际的服务的拓扑图中,就可以得类似于下面的架构图。我们可以为每一个服务实例提供一个网关(Gateway),也可以为一组服务配置一个,甚至可以为整个服务集群配置一个接入的网关(Gateway),这样整体系统的复杂度就变得简单起来,集中重复的简单功能也可以放在网关中实现。

上面的图展示了一个多层网关的架构,其中一个总的网关接入所有的流量,并分发给不同的子系统,还有二级网关用于做各个子系统的接入。通过网关,我们可以把分布式架构组成一个星型架构,由网关对服务的请求进行路由和分发。

主要功能

作为一个系统模块的入口和出口,如果要实现对接口服务的管理应该实现一下的一些功能。

  • 请求路由,网关作为网络的出入口一定要具有请求路由的功能。对于调用端来说,并不知道真正调用的服务地址,一类服务的地址统一交给网关处理。

  • 服务注册功能,为了能够代理后面的服务,并把请求路由到正确的位置上,网关应该具备服务注册功能,也就是后端的实例可以将服务注册到网关上。一般来说,注册服务也就是注册提供服务的API接口,网关再通过策略从这些注册的接口中选择一个进行路由调用。

  • 负载均衡,一个网关可以注册多个服务实例,那么按照什么样的路由策略进行路由请求就是一个需要解决的问题。简单一点的可以直接是轮询或随机。复杂一点的可以设计权重随机等负载策略。

  • 弹力设计,网关还可以把弹力设计中的那些异步、重试、幂等、流控、熔断、监控等都实现进去。这样,可以像Service Mesh那样,让应用服务只关心自己的业务逻辑而不用考虑控制逻辑(控制面)。

  • 安全方面,SSL加密及证书管理、Session验证、授权、数据校验,以及黑白名单等。错误处理越高处理位置越靠前越好,所以,网关可以做到一个全站的接入组件来对后端的服务进行保护。当然,网关还可以做更多更有趣的事,比如:灰度发布、API聚合、API编排。

上面的这些点,简单归纳下可以总结为以下的四个功能点:

  1. 请求接入,作为所有API接口的服务请求的接入点。

  2. 聚合业务,作为所有后端服务的聚合点。

  3. 中介策略,实现安全、验证、路由、过滤、流控等策略。

  4. 统一管理,对所有API服务和策略进行统一管理。

设计重点

网关是我们服务的入口和出口,所有的流量都会经过网关,因此高性能和高可用性很自然而然的成为我们的设计指标。网关的设计应该像许多服务一样,应该可以随着硬件水平的提升,提升整体的服务能力,也就是具有强大的水平拓展能力。我们时常还需要添加一些自己的业务逻辑在网关上,例如常见的一些登录、鉴权、限流等操作。

  • 高性能,技术选型设计上,网关不应该也不能成为整个系统的系统瓶颈。对于高性能,最好是使用高性能的编程语言来实现。如C,C++、GO或Java。网关对后端的请求,以及对前端的请求的服务一定要使用异步阻塞的I/O来确保后端延迟不会导致应用程序中出现性能问题。

  • 高可用,所有的流量和调用都会经过网关,所以网关必须成为一个高可用的组件,它的稳定之间关系到所有服务的稳定,如果一片服务的网关出现故障,将会造成这一片服务的不可用。

如果要做到高可用可以从以下几个方向拓展。

  • 集群化,服务集群,降低单个服务负载,同时提升服务容错性。

  • 服务化,网关还需要做到在不间断的情况下修改配置,调整自己配置后,可以快速重启。或是说通过某些方式可以动态修改配置信息。

  • 持续化,当某个服务下线后,可以保证流量不会进入到当前的正在下线的服务上。而服务上线后,服务可以快速地分摊整体的负载。

  • 高扩展,因为网关需要承接所有的业务流量和请求,,所以一定会或多或少有一些业务逻辑。而这些业务逻辑是多变和不确定的,因此我们的服务还需要可拓展,并且能进行二次开发。

网关对比

网关依据不同的功能和关注点也有不同的分类,分为流量网关和业务网关。流量网关顾名思义就是控制流量进入集群的网关,对于一个服务承载的流量是非常大的,但并不是每一个请求都是有效的请求。这个时候我们就需要将无效的请求挡在外面,降低整体服务的压力。而业务网关,顾名思义和业务贴的比较近,是微服务架构中的核心基础设施,主要做一些基础功能的扩展,比如请求转发,协议适配,熔断限流等。他们之间的关系如下图:

功能对比

业务网关的功能更多和应用业务相关来提供更好的服务,而流量关注的更多的是流量负载压力保证服务稳定安全。流量网关通常包括下面一些功能:

  • 全局性流控。

  • 日志统计。

  • 防止SQL注入。

  • 防止Web攻击。

  • 屏蔽工具扫描。

  • 黑白名单。

  • 证书/加解密处理。

流量通常只有一些全局性的API管理策略,从上面的主要功能也不难发现这一点。流量网关总体功能上类似防火墙。Kong就是典型的流量网关。可以看到下面也都是一些流量控制策略,包括一些日志记录,安全、监控等功能。

业务网关通常包含下面的一些功能:

  • 服务级别流量控制。

  • 服务降级与熔断。

  • 路由与负载均衡、灰度策略。

  • 服务过滤、聚合与发现。

  • 权限验证与参数校验。

  • 多级缓存策略。

业务网关相较于流量网关会更贴近业务,从上面的常包括的功能也能看出来,业务网关更多是和服务相关,有很多应用层需要考虑的事情,就可以依托于业务网关,例如线程模型、协议适配、熔断限流和服务编排等。下面这个就是一个网业务网关的结构。

主流网关

目前常见的开源网关大致按照语言可以分为以下几类:

  • Nginx+lua:OpenResty、Kong、Orange、Abtesting gateway等

  • Java:Zuul/Zuul2、Spring Cloud Gateway、Kaazing KWG、gravitee、shenyu(soul)等。

  • Go:Janus、fagongzi、Grpc-gateway

  • Dotnet:Ocelot

  • NodeJS:Express Gateway、Micro Gateway

目前按照使用数量、成熟度等指标来划分。目前主流的有4个:

  • OpenResty

  • Kong

  • Zuul/Zuul2

  • Spring Cloud Gateway

这里我们就不深入梳理了,感兴趣的同学可以找对应项目的社区和文档继续深入学习。

编码实战

前面我们介绍了关网关的一些知识,这一部分我们将使用Netty实现一个简单的API网关。这个网关的功能包括基础的过滤,路由等功能。整体的功能结构如下图。

首先我们创建一个基于Netty的HTTP Server,这个部分在前面的Demo中已经写过很多个了,这里就不过多赘述了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/**
* netty 网关服务
* Created by Daiwei on 2021/1/25
*/
public class NettyGateWayServer {

private static final int DEFAULT_PORT = 8888;

private final int port;

private final List<String> proxyServers;

public NettyGateWayServer(Integer port, List<String> proxyServers) {
this.port = port != null ? port : DEFAULT_PORT;
if (proxyServers == null || proxyServers.size() == 0) {
throw new RuntimeException("gateway need at least one server instance!");
}
this.proxyServers = proxyServers;
}

public void run() {
EventLoopGroup bossGroup = new NioEventLoopGroup(2);
EventLoopGroup workGroup = new NioEventLoopGroup(4);
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.option(ChannelOption.SO_BACKLOG, 128)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_RCVBUF, 32 * 1024)
.option(EpollChannelOption.SO_REUSEPORT, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.SO_KEEPALIVE, true);
serverBootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new MyHttpInitializer(this.proxyServers));

ChannelFuture future = serverBootstrap.bind(port).sync();
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}

@Override
public String toString() {
return "NettyGateWayServer{" +
"port=" + port +
", proxyServers=" + proxyServers +
'}';
}
}

在前面的Demo中,我们是ChildHandler的实现直接作为内部类的方式直接放在NettyServer中,但是这么我们单独放在一个类文件中,这两种写法没有本质的区别。在这里的实现如下所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* childHandler 实现
* Created by Daiwei on 2021/1/25
*/
public class MyHttpInitializer extends ChannelInitializer<SocketChannel> {

private final HttpRouterHandler httpRouterHandler;

public MyHttpInitializer(List<String> proxyServers) {
this.httpRouterHandler = new HttpRouterHandler(proxyServers);
}

@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
HttpFilterChain chain = HttpFilterChain.createDefault().addFilter(new PathCheckFilter())
.addFilter(new HeaderAdderFilter());
socketChannel.pipeline().addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(1024 * 1024))
.addLast(new HttpFilterHandler(chain)).addLast(httpRouterHandler);
}
}

在这个实现中,提供了一个构造器需要传入一个List<String>即实例的地址列表。然后使用这个地址List创建出一个RouterHandler。通过这个RouterHandler实现不同的请求路由策略。这是一个HttpServer的实现,最后添加了我们两个自定义的Handler,一个是过滤器Handler,另外一个是路由Handler。在下面的initChannel我们还创建了HttpFilterChain即一个过滤链,在这个过滤链中加入我们自定义的过滤器实例。其中过滤链和过滤器实现如下,都非常简单。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* Created by Daiwei on 2021/1/29
*/
public class HttpFilterChain {

private final List<HttpRequestFilter> filters;

private HttpFilterChain() {
this.filters = new ArrayList<>();
}

public static HttpFilterChain createDefault() {
return new HttpFilterChain();
}

public HttpFilterChain addFilter(HttpRequestFilter filter) {
this.filters.add(filter);
return this;
}

public List<HttpRequestFilter> getFilters() {
return this.filters;
}

}
/**
* Created by Daiwei on 2021/1/29
*/
public interface HttpRequestFilter {

boolean filter(FullHttpRequest request, ChannelHandlerContext ctx);
}
/**
* Created by Daiwei on 2021/1/29
*/
public class PathCheckFilter implements HttpRequestFilter{

@Override
public boolean filter(FullHttpRequest request, ChannelHandlerContext ctx) {
String uri = request.uri();
return uri.contains("hello");
}
}

因为fitler方法会传入FullHttpRequest,我们往往可以在这里完成一些鉴权或是往头中加入一些业务需要的数据,例如下面这个实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* Created by Daiwei on 2021/1/29
*/
public class HeaderAdderFilter implements HttpRequestFilter {

@Override
public boolean filter(FullHttpRequest request, ChannelHandlerContext ctx) {
request.headers().set("name", "daiwei");
request.headers().set("age", 25);
request.headers().set("addr", "shanghai");
return true;
}
}

ChannelPipline中我们加入了两个自定义的Handler,其中第一个过滤器handler,第二个是路由handler。这两个handler实现了对请求的过滤和转发。下面的代码是这两个handler的实现代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/**
* 过滤器处理器
* Created by Daiwei on 2021/1/25
*/
public class HttpFilterHandler extends ChannelInboundHandlerAdapter {

private final HttpFilterChain filterChain;

public HttpFilterHandler(HttpFilterChain filterChain) {
this.filterChain = filterChain;
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
FullHttpRequest fullHttpRequest = (FullHttpRequest) msg;
try {
if (doFilter(fullHttpRequest, ctx)) {
ctx.fireChannelRead(msg);
} else {
DefaultFullHttpResponse unAuthResp = HttpNettyHelper.genUnAuthResp();
if (!HttpUtil.isKeepAlive(fullHttpRequest)) {
ctx.writeAndFlush(unAuthResp).addListener(ChannelFutureListener.CLOSE);
} else {
unAuthResp.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderNames.KEEP_ALIVE);
ctx.writeAndFlush(unAuthResp);
}
}
} finally {
ReferenceCountUtil.release(msg);
}
}


@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}

/**
* 执行过滤链
* @param request
* @return
*/
private boolean doFilter(FullHttpRequest request, ChannelHandlerContext ctx) {
for (HttpRequestFilter filter : this.filterChain.getFilters()) {
if (!filter.filter(request, ctx)) {
return false;
}
}
return true;
}

}

这一段的逻辑也非常简单,channelRead方法读取请求,然后调用过滤链doFilter,如果过滤不通过,直接返回一个校验不通过的响应,如果过滤通过直接拉起调用下一个channelPipline的下一个inboundHandler,即负责路由的handler。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
/**
* 路由处理器
* Created by Daiwei on 2021/1/25
*/
@ChannelHandler.Sharable
public class HttpRouterHandler extends ChannelInboundHandlerAdapter {

private final HttpRouter router;

private final CloseableHttpAsyncClient httpClient;

private final ThreadPoolExecutor executorService;

private static final int SUCCESS_CODE = 200;

private static final int NUM_OF_CORE = Runtime.getRuntime().availableProcessors();

private static final long KEEP_ALIVE = 8L;

private static final int QUEUE_SIZE = 1024;

public HttpRouterHandler(List<String> proxyServers) {


IOReactorConfig ioConfig = IOReactorConfig.custom()
.setConnectTimeout(1000)
.setSoTimeout(1000)
.setIoThreadCount(NUM_OF_CORE)
.setRcvBufSize(32 * 1024)
.build();

httpClient = HttpAsyncClients.custom().setMaxConnTotal(40)
.setMaxConnPerRoute(8).setDefaultIOReactorConfig(ioConfig)
.setKeepAliveStrategy(((httpResponse, httpContext) -> 6000)).build();

httpClient.start();

this.executorService = new ThreadPoolExecutor(NUM_OF_CORE, NUM_OF_CORE, KEEP_ALIVE, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(QUEUE_SIZE), new ThreadPoolExecutor.CallerRunsPolicy());
this.router = new RoundRobinRouter(proxyServers);
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String host = this.router.route();
FullHttpRequest request = (FullHttpRequest) msg;
String url = host + request.uri();
this.executorService.submit(() -> executeAndWrite(request, ctx, url));
}


@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}

/**
* 调用并返回
* @param url
*/
private void executeAndWrite(FullHttpRequest request, ChannelHandlerContext ctx, String url) {
final HttpGet httpGet = new HttpGet(url);
httpGet.setHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_KEEP_ALIVE);
this.httpClient.execute(httpGet, new FutureCallback<HttpResponse>() {
@Override
public void completed(HttpResponse httpResponse) {
try {
writeResp(httpResponse, request, ctx);
} catch (Exception e) {
e.printStackTrace();
}
}

@Override
public void failed(Exception e) {
httpGet.abort();
FullHttpResponse resp = HttpNettyHelper.genFailedResp();
ctx.writeAndFlush(resp);
e.printStackTrace();
}

@Override
public void cancelled() {
httpGet.abort();
}
});
}

private void writeResp(HttpResponse response, FullHttpRequest request, ChannelHandlerContext ctx) {
FullHttpResponse resp = null;
try {
if (response.getStatusLine().getStatusCode() == SUCCESS_CODE) {
byte[] bytes = EntityUtils.toByteArray(response.getEntity());
resp = HttpNettyHelper.genBaseResp(bytes);
} else {
resp = HttpNettyHelper.genFailedResp();
}
} catch (IOException e) {
e.printStackTrace();
resp = HttpNettyHelper.genFailedResp();
} finally {
if (request != null) {
if (!HttpUtil.isKeepAlive(request)) {
ctx.write(resp).addListener(ChannelFutureListener.CLOSE);
} else {
ctx.write(resp);
}
}
ctx.flush();
}
}

}

这个类构造方法里创建一个httpClient用来进行转发操作,并且创建了一个路由器。在读取请求的方法channelRead中,使用路由器路由出下一次的转发的地址。并且进行请求的转发操作。当都到回复请求后,再将响应返回给调用方。整个流程下来,一次请求的过滤和转发就完成了。注意这里的路由规则是RoundRobinRouter即轮询的路由规则。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* Created by Daiwei on 2021/1/29
*/
public class RoundRobinRouter extends HttpRouter {

private final AtomicInteger cnt = new AtomicInteger();

public RoundRobinRouter(List<String> proxyServers) {
super(proxyServers);
}

@Override
public String route() {
return this.proxyServers.get(cnt.getAndIncrement() % this.proxyServers.size());
}
}

我还顺带实现了随机的路由规则。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* Created by Daiwei on 2021/1/29
*/
public class RandomRouter extends HttpRouter {

private final Random rdn;

public RandomRouter(List<String> proxyServers) {
super(proxyServers);
this.rdn = new Random();
}

@Override
public String route() {
return this.proxyServers.get(this.rdn.nextInt(proxyServers.size()));
}
}

在我们实现的这个路由handler中有一个细节,之前我们提到当一个连接连接上后,Netty都会为这个请求创建一个ChannelPipline。那么这就意味着,每当一个连接连上都会初始化pipline中的handler。对于HttpFilterHandler来说创建开销并不大,但是对于HttpRouterHandler就完全不一样了,在路由handler中,不仅仅需要创建一个HttpClient还需要赋值一个路由(Router),如果每一个请求都要创建这么多东西,那性能的开销就非常的不乐观了。同时我们我们轮询的策略是通过一个AtomicInteger不断的去自增实现的,那么如果每次请求进来都创建一个新的AtomicInteger计数器,这轮询的策略就是有问题的。因此如果我们要现实轮询且避免每次连接创建一个HttpClient,我们可以将它们通过构造方法传入到RouterHandler中,也可以让这个RouterHandler在不同的连接中进行共享。共享的方法很简单,只要在Handler上添加注解@ChannelHandler.Sharable既可以让当前Handler实现在不同的ChannelPipline中共享。当然我建议使用传入的方式来实现,这种方式可以避免不同连接之间资源的共享,减少资源的竞争。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* Created by Daiwei on 2021/1/29
*/
public class NettyServerApplication {

public static void main(String[] args) {
int port = 8888;
List<String> serverList = Arrays.asList("http://127.0.0.1:8801", "http://127.0.0.1:8802", "http://127.0.0.1:8803");
NettyGateWayServer server = new NettyGateWayServer(port, serverList);
System.out.println("my gateway is listening at http:127.0.0.1:"+ port + " for " + server.toString());
try {
server.run();
} catch (Exception e) {
e.printStackTrace();
}
}
}

最后这是一个整个项目的main函数,这里的ServerList是写死的,当然我们可以接入注册中心,来实现可用服务列表的动态拉取。以上就是整个简单的API网关的设计与编码。

总结

这一小节我们接着上一小节的Demo,继续深入编写了一些Demo实例,熟悉掌握使用Netty进行网络编程,从代码层面感受Netty的特性。Demo示例中,我们梳理使用Netty编写Http服务、WebSocket服务,都是我们常用用到的Server类型。随后我们通过Idel检测和自定义编解码器的例子,熟悉我们实际开发过程中的一些使用实例。编写完成这些代码示例后,接下来我们开始我们的网关实战,我们先是介绍了网关,已经网关在微服务中扮演的角色。随后我们按照网关的不同类型,将网关分为流量网关和业务网关,以及他们的设计重点和功能侧重点。最后我们动手实战编写一个简单的API网关,分析实现了一个API网关的一些基础功能。在接下来的梳理中,我们将深入Netty的核心组件的源码进行分析,一定非常有意思,加油😏

学习资料