什么是 Netty?
Netty 是一个面向 JVM 的异步事件驱动网络开发框架,目标是让高并发、高吞吐的网络程序开发像写普通业务代码一样简单。它在内部采用 Reactor/Proactor + 线程池组合,将 I/O 多路复用、缓冲区管理、协议编解码等复杂细节彻底封装。
Netty 核心结构
- Channel:套接字抽象,支持 NIO、Epoll、KQueue 等多种传输
- EventLoop/Group:事件循环线程,单线程串行化 + 任务队列,天然避免锁竞争
- ChannelPipeline:责任链,入站/出站双向流
- ByteBuf:内存模型,直接内存池、零拷贝、引用计数
基于 Netty 实现自定义 RPC
基本概念
RPC(Remote Procedure Call)又称为远程过程调用,是分布式系统中服务间通信的重要方式。主要分为两种类型:
- 基于HTTP的RESTful形式 - 如 Spring Cloud 的 Feign
- 基于TCP协议的RPC远程调用 - 如 Dubbo、gRPC
实现目标
模仿 Dubbo 框架实现简单的 RPC 调用:
- 消费者和生产者约定统一接口和协议
- 消费者远程调用生产者的方法
- 生产者返回字符串响应
公共模块依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.16.Final</version>
</dependency>
接口定义
package icu.wzk.nettyrpc.service;
public interface WzkUserService {
String sayHello(String str);
}
生产者(Provider)实现
package icu.wzk.nettyrpc.service.impl;
import icu.wzk.nettyrpc.service.WzkUserService;
import icu.wzk.nettyrpc.hanlder.WzkUserServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class WzkUserServiceImpl implements WzkUserService {
@Override
public String sayHello(String str) {
System.out.println("调用函数: " + str);
return "调用成功: " + str;
}
public static void startServer(String hostName, int port) throws Exception {
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap
.group(eventLoopGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline
.addLast(new StringDecoder())
.addLast(new StringEncoder())
.addLast(new WzkUserServerHandler());
}
});
bootstrap.bind(hostName, port);
}
}
生产者 Handler
package icu.wzk.nettyrpc.hanlder;
import icu.wzk.nettyrpc.service.impl.WzkUserServiceImpl;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class WzkUserServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg.toString().startsWith("WzkUserService")) {
String result = new WzkUserServiceImpl()
.sayHello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));
ctx.writeAndFlush(result);
}
}
}
启动服务端
package icu.wzk.nettyrpc;
import icu.wzk.nettyrpc.service.impl.WzkUserServiceImpl;
public class WzkNettyRPCServer {
public static void main(String[] args) throws Exception {
WzkUserServiceImpl.startServer("localhost", 9999);
}
}
消费者(Consumer)实现
使用 JDK 动态代理实现透明调用:
package icu.wzk.nettyrpc;
import icu.wzk.nettyrpc.hanlder.WzkUserClientHandler;
import icu.wzk.nettyrpc.service.WzkUserService;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.lang.reflect.Proxy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class WzkNettyRPCClient {
private static ExecutorService executor =
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private static WzkUserClientHandler client;
public Object createProxy(final Class<?> serviceClass, final String providerName) {
return Proxy.newProxyInstance(
Thread.currentThread().getContextClassLoader(),
new Class<?>[]{serviceClass}, (proxy, method, args) -> {
if (null == client) {
initClient();
}
client.setParam(providerName + args[0]);
return executor.submit(client).get();
}
);
}
private static void initClient() throws Exception {
client = new WzkUserClientHandler();
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap
.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline
.addLast(new StringDecoder())
.addLast(new StringEncoder())
.addLast(client);
}
});
bootstrap.connect("localhost", 9999).sync();
}
public static void main(String[] args) throws Exception {
String providerName = "WzkUserService#sayHello#";
WzkNettyRPCClient wzkNettyRPCClient = new WzkNettyRPCClient();
WzkUserService wzkUserService =
(WzkUserService) wzkNettyRPCClient.createProxy(WzkUserService.class, providerName);
while (true) {
Thread.sleep(1000);
System.out.println(wzkUserService.sayHello("hello???"));
}
}
}
消费者 Handler
package icu.wzk.nettyrpc.hanlder;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.concurrent.Callable;
public class WzkUserClientHandler extends ChannelInboundHandlerAdapter implements Callable {
private ChannelHandlerContext context;
private String result;
private String param;
public void setParam(String param) {
this.param = param;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
context = ctx;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
synchronized (this) {
result = msg.toString();
notify();
}
}
@Override
public Object call() throws Exception {
synchronized (this) {
context.writeAndFlush(param);
wait();
return result;
}
}
}
技术实现要点
- 序列化方案:建议使用 Hessian2 或 Protobuf 提高性能
- 连接管理:需要实现连接池机制
- 超时处理:设置合理的调用超时时间
- 负载均衡:多服务提供者时需实现负载均衡策略
- 容错机制:失败重试、熔断降级等