netty-第一个netty应用

简介

本章主要从三方面开始创建netty应用,主要覆盖 1.设置开发环境。2.写一个基于Echo client与server的程序。3.building并且测试你的程序

设置开发环境

基于unix通常需要:

  • jdk7 jre不能compile,请设置JAVA_HOME等相关环境变量,并将$JAVA_HOME/bin添加入环境变量
  • maven 创建管理的工具 设置MAVEN_HOME环境变量,并将$MAVEN_HOME/bin添加入环境变量
  • plain text edit 或者 integrated development environment -> IDE

netty client/server 简介

如图所示:echo server/client view

主要功能是实现 ,当echo client与server监理连接后,它将发送一个或多个信息给server,然后server将echo 每一条信息给client

这幅图展示了多个client与server的交互,client的数量理论上是可以被限制的,仅仅受系统资源的限制(或者jdk本身强制限制)

echo server
  • 要求

至少一个channelHandler 这个主要实现server端的业务处理逻辑

bootstrapping,开始代码配置server,最低限度的配置,绑定端口并且监听请求连接

channelHandlers 与 业务逻辑

在netty中,ChannelHandler主要用于接收以及响应事件的通知。在netty中,所有的数据处理逻辑都包含在在里面。在EchoServer中,server将回复进来的数据,一次我们需要实现接口 ChannleInboundHandler,相关方法定义了入站事务,我们的例子比较简单,因此只需要一些方法,因此最有效的是使用子类 ChannleINbouondHandlerAdapter 这个子类是接口提供个默认实现,主要有以下几个方法与我们有关

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = (ByteBuf) msg;
System.out.println(
"Server received: " + in.toString(CharsetUtil.UTF_8));
ctx.write(in);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx)
throws Exception {
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
.addListener(ChannelFutureListener.CLOSE);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
  • channelRead 有入站数据的时候会被调用
  • channelReadComplete 通知handler 在当前批次最后一条消息被调用
  • exceptionCaught 发生异常的时候被调用 ,每个channle 都有一个chain(管道),因此最好有一个handler实现该方法,否则,一旦有异常发生,会直到链尾才会被打印出来

channlehandlers
支持不同的事件类型格式,应用实现了ChannlHandlers可以实现一个事件整个生命周期的钩子,并且提供自己的业务逻辑。业务代码与网络代码解耦

bootstrapping 服务

bootstrap是配置服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class EchoServer {
private final int port;

public EchoServer(int port) {
this.port = port;
}

public static void main(String[] args)
throws Exception {
if (args.length != 1) {
System.err.println("Usage: " + EchoServer.class.getSimpleName() +
" <port>"
);
return;
}
int port = Integer.parseInt(args[0]);
new EchoServer(port).start();
}

public void start() throws Exception {
final EchoServerHandler serverHandler = new EchoServerHandler(); //创建我们的处理handler
EventLoopGroup group = new NioEventLoopGroup(); //创建一个nio事件循环组
try {
ServerBootstrap b = new ServerBootstrap(); //创建一个bootstrap
b.group(group) //绑定事件循环组
.channel(NioServerSocketChannel.class) //指定处理的channle类型
.localAddress(new InetSocketAddress(port)) //绑定端口
.childHandler(new ChannelInitializer<SocketChannel>() { //初始化一个channle
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(serverHandler); //添加我们的业务逻辑到处理链条中
}
});

ChannelFuture f = b.bind().sync(); //同步绑定
System.out.println(EchoServer.class.getName() +
" started and listening for connections on " + f.channel().localAddress());
f.channel().closeFuture().sync(); //阻塞到channle 同步关闭
} finally {
group.shutdownGracefully().sync(); //优雅的同步关闭
}
}
}

echo client
简介
  1. 连接server
  2. 发送一个或多个消息
  3. 对每个消息,等待接收到同样的消息
  4. 关闭连接

ChannelInboundHandler-> SimpleChannelInboundHandler
需要实现的方法

  • channelActive()—Called after the connection to the server is established,连接建立的时候
  • channelRead0()—Called when a message is received from the server 当接收到消息的时候
  • exceptionCaught()—Called if an exception is raised during processing 发生异常的时候
Bootstrap client
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class EchoClient {
private final String host;
private final int port;

public EchoClient(String host, int port) {
this.host = host;
this.port = port;
}

public void start()
throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class) //与server端不同的
.remoteAddress(new InetSocketAddress(host, port))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(
new EchoClientHandler()); //实现自己的业务逻辑
}
});
ChannelFuture f = b.connect().sync();
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}

public static void main(String[] args)
throws Exception {
if (args.length != 2) {
System.err.println("Usage: " + EchoClient.class.getSimpleName() +
" <host> <port>"
);
return;
}

final String host = args[0];
final int port = Integer.parseInt(args[1]);
new EchoClient(host, port).start();
}
}
echo client handler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class EchoClientHandler
extends SimpleChannelInboundHandler<ByteBuf> {
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!",
CharsetUtil.UTF_8));
}

@Override
public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) {
System.out.println(
"Client received: " + in.toString(CharsetUtil.UTF_8));
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
  • SimpleChannelInboundHandler vs. ChannelInboundHandler

SimpleChannelInboundHandler 调用完channelRead0() 完成后,将会释放ByteBuf的内存,而server端则不能释放,因为发送出去,是在调用readComplete的时候,才会发送。

坚持原创技术分享,您的支持将鼓励我继续创作!