Netty 是一个基于 Java NIO 的异步、事件驱动的网络应用框架,旨在简化 TCP/UDP 等协议服务器和客户端的开发。它封装了底层的 NIO 细节,提供了易用的 API,广泛应用于高性能网络通信场景,如 Dubbo、RocketMQ、Elasticsearch 等中间件。
核心组件 Channel 表示一个网络连接的抽象,如 SocketChannel。它是数据读写的通道,支持异步非阻塞操作。
EventLoop & EventLoopGroup EventLoop 是处理 I/O 操作的核心,绑定到一个线程,负责处理 Channel 的所有事件。EventLoopGroup 是 EventLoop 的集合,用于管理多个 EventLoop,实现多线程处理。developer.aliyun.com+2processon.com+2learn.lianglianglee.com+2 developer.aliyun.com
ChannelHandler & ChannelPipeline ChannelHandler 是处理入站和出站数据的处理器,ChannelPipeline 是 ChannelHandler 的链式结构,定义了数据处理的流程。
Bootstrap & ServerBootstrap 用于初始化客户端和服务器的辅助类,配置 Channel、EventLoopGroup、Handler 等。
ByteBuf Netty 自定义的字节缓冲区,替代了 JDK 的 ByteBuffer,提供更高效的内存管理和读写操作。1 2 3 4 5 6 7 8 9 10 11 12 13 import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;ByteBuf buffer = Unpooled.buffer(256 ); byte [] data = new byte []{1 , 2 , 3 };ByteBuf wrappedBuffer = Unpooled.wrappedBuffer(data); ByteBuf copiedBuffer = Unpooled.copiedBuffer(data);
Echo服务器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;public class EchoServer { private final int port; public EchoServer (int port) { this .port = port; } public void start () throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel (SocketChannel ch) { ch.pipeline().addLast(new EchoServerHandler()); } }); ChannelFuture f = b.bind(port).sync(); System.out.println("Echo server started on port " + port); f.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main (String[] args) throws Exception { new EchoServer(8080 ).start(); } } class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead (ChannelHandlerContext ctx, Object msg) { ctx.write(msg); } @Override public void channelReadComplete (ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
Http服务器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.http.*;public class HttpHelloWorldServer { private final int port; public HttpHelloWorldServer (int port) { this .port = port; } public void start () throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<Channel>() { @Override protected void initChannel (Channel ch) { ch.pipeline().addLast(new HttpServerCodec()); ch.pipeline().addLast(new HttpObjectAggregator(65536 )); ch.pipeline().addLast(new SimpleChannelInboundHandler<FullHttpRequest>() { @Override protected void channelRead0 (ChannelHandlerContext ctx, FullHttpRequest req) { FullHttpResponse response = new DefaultFullHttpResponse( HttpVersion.HTTP_1_1, HttpResponseStatus.OK, ctx.alloc().buffer().writeBytes("Hello, World!" .getBytes()) ); response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain" ); response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes()); ctx.writeAndFlush(response); } }); } }); ChannelFuture f = b.bind(port).sync(); System.out.println("HTTP server started on port " + port); f.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main (String[] args) throws Exception { new HttpHelloWorldServer(8080 ).start(); } }
WebSocket服务器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.http.*;import io.netty.handler.codec.http.websocketx.*;import io.netty.handler.stream.ChunkedWriteHandler;public class WebSocketServer { private final int port; public WebSocketServer (int port) { this .port = port; } public void start () throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<Channel>() { @Override protected void initChannel (Channel ch) { ch.pipeline().addLast(new HttpServerCodec()); ch.pipeline().addLast(new HttpObjectAggregator(65536 )); ch.pipeline().addLast(new ChunkedWriteHandler()); ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws" )); ch.pipeline().addLast(new SimpleChannelInboundHandler<TextWebSocketFrame>() { @Override protected void channelRead0 (ChannelHandlerContext ctx, TextWebSocketFrame msg) { System.out.println("Received: " + msg.text()); ctx.channel().writeAndFlush(new TextWebSocketFrame("Echo: " + msg.text())); } }); } }); ChannelFuture f = b.bind(port).sync(); System.out.println("WebSocket server started on port " + port); f.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main (String[] args) throws Exception { new WebSocketServer(8080 ).start(); } }