有了之前的基础之后,我们从netty官网的示例(略做修改),来开始netty之旅。我们实现一个支持hello world版的netty程序。
首先我们创建一个主类:侦听 http端口,启动服务
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.SelfSignedCertificate;
/**
* An HTTP server that sends back the content of the received HTTP request
* in a pretty plaintext form.
*/
public final class HttpHelloWorldServer {
static final boolean SSL = System.getProperty("ssl") != null;
static final int PORT = Integer.parseInt(System.getProperty("port", SSL ? "8443" : "8080"));
public static void main(String[] args) throws Exception {
// Configure SSL.
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
} else {
sslCtx = null;
}
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup(10);//源码内部默认取的是 2*n, N=CPU数量 , 此处注意生产应该以压测结果为准。
try {
ServerBootstrap b = new ServerBootstrap();
b.option(ChannelOption.SO_BACKLOG, 1024);//标识当服务器请求处理线程全满时,用于临时存放已完成三次握手的请求的队列的最大长度
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new HttpHelloWorldServerInitializer(sslCtx));
System.out.println("服务启动成功,访问地址:" + (SSL ? "https" : "http") + "://127.0.0.1:" + PORT + '/');
Channel ch = b.bind(PORT).sync().channel();
ch.closeFuture().sync();
/* 单个netty是可以侦听多个端口的,一个端口一条线程,如果需要侦听多个端口,如下所示:
List<Integer> ports = Arrays.asList(8080, 8081);
Collection<Channel> channels = new ArrayList<>(ports.size());
for (int port : ports) {
Channel serverChannel = b.bind(port).sync().channel();
channels.add(serverChannel);
}
for (Channel ch : channels) {
ch.closeFuture().sync();
} */
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
HttpHelloWorldServerInitializer类中我们添加work线程的handler
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
public class HttpHelloWorldServerInitializer extends ChannelInitializer<SocketChannel> {
private final SslContext sslCtx;
//private static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);
//p.addLast(group, "handler", new HttpHelloWorldServerHandler2());//业务线程独立的线程池
public HttpHelloWorldServerInitializer(SslContext sslCtx) {
this.sslCtx = sslCtx;
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
/*注意此处的超时时间不是客户端TCP连接的超时时间,而是服务器处理的时间,如果超时,那么就会触发handler里面的exceptionCaught */
p.addLast(new ReadTimeoutHandler( 10));//服务器端设置超时时间,单位:秒
p.addLast(new WriteTimeoutHandler(10));//服务器端设置超时时间,单位:秒
p.addLast(new HttpServerCodec());//对http通信数据进行编解码
p.addLast(new HttpHelloWorldServerHandler()); //业务handler
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
HttpHelloWorldServerHandler ,work 线程池里面的handler不能放耗时比较大的业务逻辑,否则会导致netty工作现成阻塞,所以我们再启动一个单例的异步业务线程池,处理业务逻辑。
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.handler.timeout.WriteTimeoutException;
public class HttpHelloWorldServerHandler extends ChannelInboundHandlerAdapter {
private static final byte[] CONTENT = {'H', 'e', 'l', 'l', 'o', ' ', 'W', 'o', 'r', 'l', 'd'};
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
System.out.println("IO线程处理完毕:" + Thread.currentThread().getThreadGroup()+":"+Thread.currentThread().getName());
ctx.flush();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{
BusinessThreadUtil.doBusiness(ctx, msg, CONTENT);//handle中,可以使用异步的线程池,处理业务。防止handler卡住,导致netty并发性能不佳
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
if(cause instanceof ReadTimeoutException||cause instanceof WriteTimeoutException) {
System.out.println("超时了:" + cause.toString());
}
ctx.close();//直接关闭channel
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
业务逻辑:BusinessThreadUtil
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpUtil;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
public class BusinessThreadUtil {
private static final ExecutorService executor = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(100000));//CPU核数4-10倍
public static void doBusiness(ChannelHandlerContext ctx, Object msg, byte[] content) {
//异步线程池处理
executor.submit( () -> {
if (msg instanceof HttpRequest) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Thread.currentThread().setName("buessness-thread");
System.out.println(Thread.currentThread().getId());
HttpRequest req = (HttpRequest) msg;
boolean keepAlive = HttpUtil.isKeepAlive(req);
FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(content));
response.headers().set("Content-Type", "text/plain");
response.headers().setInt("Content-Length", response.content().readableBytes());
if (!keepAlive) {
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
} else {
response.headers().set("Connection", "keep-alive");
ctx.writeAndFlush(response);
}
}
});
}
}
HttpHelloWorldServerInitializer类中我们添加work线程的handler
HttpHelloWorldServerHandler ,work 线程池里面的handler不能放耗时比较大的业务逻辑,否则会导致netty工作现成阻塞,所以我们再启动一个单例的异步业务线程池,处理业务逻辑。
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.handler.timeout.WriteTimeoutException;
public class HttpHelloWorldServerHandler extends ChannelInboundHandlerAdapter {
private static final byte[] CONTENT = {'H', 'e', 'l', 'l', 'o', ' ', 'W', 'o', 'r', 'l', 'd'};
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
System.out.println("IO线程处理完毕:" + Thread.currentThread().getThreadGroup()+":"+Thread.currentThread().getName());
ctx.flush();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{
BusinessThreadUtil.doBusiness(ctx, msg, CONTENT);//handle中,可以使用异步的线程池,处理业务。防止handler卡住,导致netty并发性能不佳
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
if(cause instanceof ReadTimeoutException||cause instanceof WriteTimeoutException) {
System.out.println("超时了:" + cause.toString());
}
ctx.close();//直接关闭channel
}
}
业务逻辑:BusinessThreadUtil
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpUtil;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
public class BusinessThreadUtil {
private static final ExecutorService executor = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(100000));//CPU核数4-10倍
public static void doBusiness(ChannelHandlerContext ctx, Object msg, byte[] content) {
//异步线程池处理
executor.submit( () -> {
if (msg instanceof HttpRequest) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Thread.currentThread().setName("buessness-thread");
System.out.println(Thread.currentThread().getId());
HttpRequest req = (HttpRequest) msg;
boolean keepAlive = HttpUtil.isKeepAlive(req);
FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(content));
response.headers().set("Content-Type", "text/plain");
response.headers().setInt("Content-Length", response.content().readableBytes());
if (!keepAlive) {
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
} else {
response.headers().set("Connection", "keep-alive");
ctx.writeAndFlush(response);
}
}
});
}
}
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END