netty ChannelHandler and ChannelPipeline

channelHandler 家族

Channel的生命周期

Channel接口定义了一个简单但是有力的状态模型,与相关的ChannelInboundHandler API,下面四种状态:

  • ChannelUnregistered 当channel创建的时候,但是还没有注册到eventLoop
  • ChannelRegistered Channel注册到EventLoop
  • ChannelActive 当Channel是活跃的时候(连接到远程),当前状态就是可以接受以及发送数据
  • ChannelInactive 当Channel不在连接到远程

正常的Channel的生命周期,当这些响应的状态发生后,会产生响应的事件(event),而这些事件将会在ChannlePipeline中从头至尾经过各个ChannelHandler

1
ChannelRegistered->ChannelActive->ChannelInactive->ChannelUnregistered

ChannleHandler的生命周期

ChannelHandler的生命周期发生在add、remove、发生异常的时候

  • handlerAdded 当ChannelHandler添加到ChannelPipeline
  • handlerRemoved 当ChannelHandler从 ChannelPipeline 移除
  • exceptionCaught 在处理的过程中发生异常

ChannelInboundHandler—处理所有的入站数据以及所有类型的状态改变

ChannelOutboundHandler—处理所有出站的数据以及对所有的操作进行拦截

ChannelInboundHandler接口

netty-6

当使用ChannelInboundHandler覆盖channelRead()方法时,记得要主动释放池化的 ByteBuf实例,使用ReferenceCountUtil.release()这个方法来释放

1
2
3
4
5
6
7
8
9
@Sharable
public class DiscardHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {

ReferenceCountUtil.release(msg);
}
}
Discards received message

当然你也可以使用SimpleChannelInboundHandler,这个不需要你主动释放,它会自动释放

1
2
3
4
5
6
7
8
9
10
@Sharable
public class SimpleDiscardHandler
Extends SimpleChannel- InboundHandler
extends SimpleChannelInboundHandler<Object> {
@Override
public void channelRead0(ChannelHandlerContext ctx,
Object msg) {
// No need to do anything special
}
}

ChannelOutboundHandler接口

出站操作以及数据处理,通常入参包括Channel, ChannelPipeline, and ChannelHandlerContext
netty-6

netty-6

CHANNELPROMISE VS. CHANNELFUTURE
大多数的出站Channel方法使用ChannelPromise作为参数,当操作完成时候被通知,ChannelPromise是ChannelFuture的子类,他定义了一系列方法,像setSuccess() 或者setFailure(),这样可以使得ChannelFuture不可变

ChannelHandler adapters

netty-6

由图可知,adapters实现了ChannelHandler了以及进站、出站的默认实现

资源管理

netty默认提供了资源管理类class Resource- LeakDetector,使用这个类可以诊断程序中是否有内存泄漏(主要是ByteBuf)

如果有泄漏检测到,可以看到如下日志

1
2
3
4
5
LEAK: ByteBuf.release() was not called before it's garbage-collected. Enable
advanced leak reporting to find out where the leak occurred. To enable
advanced leak reporting, specify the JVM option
'-Dio.netty.leakDetectionLevel=ADVANCED' or call
ResourceLeakDetector.setLevel().

泄漏等级
netty-6

ChannelPipeline 接口

每一个新的channel都有一个ChannelPipeline

一个event会出现在入站ChannelInboundHandler或者出站ChannelOutboundHandler中,随后它将向下一个hnadler转发,通过ChannelHandlerContext实现

channelPipeline总是从头到尾的,如果时间类型不符合某个handler,那么会自动跳过该handler

修改ChannelPipeline

这是相关的方法
netty-6

netty-6

1
2
3
4
5
6
7
8
9
ChannelPipeline pipeline = ..;
FirstHandler firstHandler = new FirstHandler();
pipeline.addLast("handler1", firstHandler);
pipeline.addFirst("handler2", new SecondHandler());
pipeline.addLast("handler3", new ThirdHandler());
...
pipeline.remove("handler3");
pipeline.remove(firstHandler);
pipeline.replace("handler2", "handler4", new FourthHandler());

ChannelPipeline执行是不能阻塞的,如果因为以前的旧代码兼容,我们可以使用自定义的EventExecutorGroup来添加到ChannelPipeline链上

发出时间 firing events

当ChannelPipeline 发出的有关ChannelInboundHandlers事件的相关API
netty-6

当ChannelPipeline 发出的有关ChannelOutboundHandlers事件的相关API
netty-6

ChannelPipeline本章总结

  • 一个ChannelPipeline 持有ChannelHandlers 与Channel协调
  • 一个ChannelPipeline 可以动态删减ChannelHandlers
  • ChannelPipeline拥有富足的API,用于入站、出站的所有事件响应

ChannelHandlerContext 接口

ChannelHandlerContext是管理ChannelHandler与它的ChannelPipeline

ChannelHandlerContext有很多的方法、主要用于操作Channel、ChannelPipeline,但是不同的是,他的方法将用于整个pipe链,同样的方法被ChannelHandlerContext调用,它将开始在当前的ChannelHandler,并将调动pipe链中下一个ChannelHandler处理该事件

使用ChannelHandlerContext

ChannelHandlerContext API
netty-6
netty-6

ChannelHandlerContext、ChannelPipeline、Channel的关系
netty-6

只在感兴趣的事件后执行,在channelHandler中获取ChannelHandlerContext,然后执行读写,即可
netty-6

ChannelHandler与ChannelHandlerContext的高级使用

使用@Sharable注解我们的handler,可以使得handler在多个Pipeline中使用

ChannelHandler要保证线程安全,因此不该带有状态

ChannelHandlerContext是线程安全的

1
2
3
4
5
6
7
8
9
10
//保存ChannelHandlerContext的引用
public class WriteHandler extends ChannelHandlerAdapter {
private ChannelHandlerContext ctx;
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
public void send(String msg) {
ctx.writeAndFlush(msg);
} }

异常处理

处理入站异常

您要覆盖ChannelInboundHandler以下方法

1
2
3
4
5
6
7
8
9
10
11
12
13
public void exceptionCaught(
ChannelHandlerContext ctx, Throwable cause) throws Exception



//例如
public class InboundExceptionHandler extends ChannelInboundHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) {
cause.printStackTrace();
ctx.close();
} }

通常我们实现的ChannelInboundHandler总是处于链条的最后,因此我们总是要确保处理异常!,如果没有处理,netty会打印相关日志

处理出站异常

通常出站操作会返回ChannelFuture、或者ChannelPromise

以下为两种处理方法

1
2
ChannelPromise setSuccess();
ChannelPromise setFailure(Throwable cause);
1
2
3
4
5
6
7
8
9
ChannelFuture future = channel.write(someMessage);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) {
if (!f.isSuccess()) {
f.cause().printStackTrace();
f.channel().close();
} }
});
1
2
3
4
5
6
7
8
9
10
11
12
13
public class OutboundExceptionHandler extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise) {
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) {
if (!f.isSuccess()) {
f.cause().printStackTrace();
f.channel().close();
} }
}); }
}
坚持原创技术分享,您的支持将鼓励我继续创作!