什么是 Netty?

Netty 是一个面向 JVM 的异步事件驱动网络开发框架,目标是让高并发、高吞吐的网络程序开发像写普通业务代码一样简单。它在内部采用 Reactor/Proactor + 线程池组合,将 I/O 多路复用、缓冲区管理、协议编解码等复杂细节彻底封装。

Netty 核心结构

  • Channel:套接字抽象,支持 NIO、Epoll、KQueue 等多种传输
  • EventLoop/Group:事件循环线程,单线程串行化 + 任务队列,天然避免锁竞争
  • ChannelPipeline:责任链,入站/出站双向流
  • ByteBuf:内存模型,直接内存池、零拷贝、引用计数

基于 Netty 实现自定义 RPC

基本概念

RPC(Remote Procedure Call)又称为远程过程调用,是分布式系统中服务间通信的重要方式。主要分为两种类型:

  1. 基于HTTP的RESTful形式 - 如 Spring Cloud 的 Feign
  2. 基于TCP协议的RPC远程调用 - 如 Dubbo、gRPC

实现目标

模仿 Dubbo 框架实现简单的 RPC 调用:

  1. 消费者和生产者约定统一接口和协议
  2. 消费者远程调用生产者的方法
  3. 生产者返回字符串响应

公共模块依赖

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.16.Final</version>
</dependency>

接口定义

package icu.wzk.nettyrpc.service;

public interface WzkUserService {
    String sayHello(String str);
}

生产者(Provider)实现

package icu.wzk.nettyrpc.service.impl;

import icu.wzk.nettyrpc.service.WzkUserService;
import icu.wzk.nettyrpc.hanlder.WzkUserServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class WzkUserServiceImpl implements WzkUserService {

    @Override
    public String sayHello(String str) {
        System.out.println("调用函数: " + str);
        return "调用成功: " + str;
    }

    public static void startServer(String hostName, int port) throws Exception {
        NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap
                .group(eventLoopGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        pipeline
                                .addLast(new StringDecoder())
                                .addLast(new StringEncoder())
                                .addLast(new WzkUserServerHandler());
                    }
                });
        bootstrap.bind(hostName, port);
    }
}

生产者 Handler

package icu.wzk.nettyrpc.hanlder;

import icu.wzk.nettyrpc.service.impl.WzkUserServiceImpl;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class WzkUserServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg.toString().startsWith("WzkUserService")) {
            String result = new WzkUserServiceImpl()
                    .sayHello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));
            ctx.writeAndFlush(result);
        }
    }
}

启动服务端

package icu.wzk.nettyrpc;

import icu.wzk.nettyrpc.service.impl.WzkUserServiceImpl;

public class WzkNettyRPCServer {
    public static void main(String[] args) throws Exception {
        WzkUserServiceImpl.startServer("localhost", 9999);
    }
}

消费者(Consumer)实现

使用 JDK 动态代理实现透明调用:

package icu.wzk.nettyrpc;

import icu.wzk.nettyrpc.hanlder.WzkUserClientHandler;
import icu.wzk.nettyrpc.service.WzkUserService;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.lang.reflect.Proxy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class WzkNettyRPCClient {

    private static ExecutorService executor =
            Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    private static WzkUserClientHandler client;

    public Object createProxy(final Class<?> serviceClass, final String providerName) {
        return Proxy.newProxyInstance(
                Thread.currentThread().getContextClassLoader(),
                new Class<?>[]{serviceClass}, (proxy, method, args) -> {
                    if (null == client) {
                        initClient();
                    }
                    client.setParam(providerName + args[0]);
                    return executor.submit(client).get();
                }
        );
    }

    private static void initClient() throws Exception {
        client = new WzkUserClientHandler();
        EventLoopGroup group = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap
                .group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        pipeline
                                .addLast(new StringDecoder())
                                .addLast(new StringEncoder())
                                .addLast(client);
                    }
                });
        bootstrap.connect("localhost", 9999).sync();
    }

    public static void main(String[] args) throws Exception {
        String providerName = "WzkUserService#sayHello#";
        WzkNettyRPCClient wzkNettyRPCClient = new WzkNettyRPCClient();
        WzkUserService wzkUserService =
                (WzkUserService) wzkNettyRPCClient.createProxy(WzkUserService.class, providerName);
        while (true) {
            Thread.sleep(1000);
            System.out.println(wzkUserService.sayHello("hello???"));
        }
    }
}

消费者 Handler

package icu.wzk.nettyrpc.hanlder;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.concurrent.Callable;

public class WzkUserClientHandler extends ChannelInboundHandlerAdapter implements Callable {

    private ChannelHandlerContext context;
    private String result;
    private String param;

    public void setParam(String param) {
        this.param = param;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        context = ctx;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        synchronized (this) {
            result = msg.toString();
            notify();
        }
    }

    @Override
    public Object call() throws Exception {
        synchronized (this) {
            context.writeAndFlush(param);
            wait();
            return result;
        }
    }
}

技术实现要点

  1. 序列化方案:建议使用 Hessian2 或 Protobuf 提高性能
  2. 连接管理:需要实现连接池机制
  3. 超时处理:设置合理的调用超时时间
  4. 负载均衡:多服务提供者时需实现负载均衡策略
  5. 容错机制:失败重试、熔断降级等