What is Netty?
Netty is an asynchronous event-driven network development framework for JVM, with the goal of making high-concurrency, high-throughput network program development as simple as writing ordinary business code. Internally, it uses a combination of Reactor/Proactor + thread pool, thoroughly encapsulating complex details like I/O multiplexing, buffer management, and protocol encoding/decoding.
Netty Core Structure
- Channel: Socket abstraction, supporting multiple transports like NIO, Epoll, KQueue, etc.
- EventLoop/Group: Event loop thread, single-threaded serialization + task queue, naturally avoiding lock contention
- ChannelPipeline: Responsibility chain, inbound/outbound bidirectional flow
- ByteBuf: Memory model, direct memory pool, zero-copy, reference counting
Implementing Custom RPC Based on Netty
Basic Concepts
RPC (Remote Procedure Call), also known as remote procedure call, is an important method of communication between services in distributed systems. Mainly divided into two types:
- HTTP-based RESTful style - Such as Feign in Spring Cloud
- TCP-based RPC remote call - Such as Dubbo, gRPC
Implementation Goals
Implement a simple RPC call mimicking the Dubbo framework:
- Consumer and producer agree on a unified interface and protocol
- Consumer remotely calls producer’s method
- Producer returns string response
Common Module Dependency
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.16.Final</version>
</dependency>
Interface Definition
package icu.wzk.nettyrpc.service;
public interface WzkUserService {
String sayHello(String str);
}
Producer (Provider) Implementation
package icu.wzk.nettyrpc.service.impl;
import icu.wzk.nettyrpc.service.WzkUserService;
import icu.wzk.nettyrpc.hanlder.WzkUserServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class WzkUserServiceImpl implements WzkUserService {
@Override
public String sayHello(String str) {
System.out.println("Calling function: " + str);
return "Call successful: " + str;
}
public static void startServer(String hostName, int port) throws Exception {
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap
.group(eventLoopGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline
.addLast(new StringDecoder())
.addLast(new StringEncoder())
.addLast(new WzkUserServerHandler());
}
});
bootstrap.bind(hostName, port);
}
}
Producer Handler
package icu.wzk.nettyrpc.hanlder;
import icu.wzk.nettyrpc.service.impl.WzkUserServiceImpl;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class WzkUserServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg.toString().startsWith("WzkUserService")) {
String result = new WzkUserServiceImpl()
.sayHello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));
ctx.writeAndFlush(result);
}
}
}
Start Server
package icu.wzk.nettyrpc;
import icu.wzk.nettyrpc.service.impl.WzkUserServiceImpl;
public class WzkNettyRPCServer {
public static void main(String[] args) throws Exception {
WzkUserServiceImpl.startServer("localhost", 9999);
}
}
Consumer (Consumer) Implementation
Using JDK dynamic proxy for transparent calls:
package icu.wzk.nettyrpc;
import icu.wzk.nettyrpc.hanlder.WzkUserClientHandler;
import icu.wzk.nettyrpc.service.WzkUserService;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.lang.reflect.Proxy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class WzkNettyRPCClient {
private static ExecutorService executor =
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private static WzkUserClientHandler client;
public Object createProxy(final Class<?> serviceClass, final String providerName) {
return Proxy.newProxyInstance(
Thread.currentThread().getContextClassLoader(),
new Class<?>[]{serviceClass}, (proxy, method, args) -> {
if (null == client) {
initClient();
}
client.setParam(providerName + args[0]);
return executor.submit(client).get();
}
);
}
private static void initClient() throws Exception {
client = new WzkUserClientHandler();
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap
.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline
.addLast(new StringDecoder())
.addLast(new StringEncoder())
.addLast(client);
}
});
bootstrap.connect("localhost", 9999).sync();
}
public static void main(String[] args) throws Exception {
String providerName = "WzkUserService#sayHello#";
WzkNettyRPCClient wzkNettyRPCClient = new WzkNettyRPCClient();
WzkUserService wzkUserService =
(WzkUserService) wzkNettyRPCClient.createProxy(WzkUserService.class, providerName);
while (true) {
Thread.sleep(1000);
System.out.println(wzkUserService.sayHello("hello???"));
}
}
}
Consumer Handler
package icu.wzk.nettyrpc.hanlder;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.concurrent.Callable;
public class WzkUserClientHandler extends ChannelInboundHandlerAdapter implements Callable {
private ChannelHandlerContext context;
private String result;
private String param;
public void setParam(String param) {
this.param = param;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
context = ctx;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
synchronized (this) {
result = msg.toString();
notify();
}
}
@Override
public Object call() throws Exception {
synchronized (this) {
context.writeAndFlush(param);
wait();
return result;
}
}
}
Key Implementation Points
- Serialization Solution: It is recommended to use Hessian2 or Protobuf for better performance
- Connection Management: Need to implement connection pool mechanism
- Timeout Handling: Set reasonable call timeout
- Load Balancing: Need to implement load balancing strategy when multiple service providers exist
- Fault Tolerance: Retry on failure, circuit breaker degradation, etc.