异步调用

Dubbo 不仅提供了传统的同步阻塞调用方式,还支持高效的异步调用模式。

异步调用实现方式

服务定义

public interface GreetingService {
    String sayHello(String name, int timeToWait);
}

消费者配置

<dubbo:reference id="greetingService" interface="com.example.GreetingService">
    <dubbo:method name="sayHello" async="true" />
</dubbo:reference>

消费者调用示例

// 发起异步调用
greetingService.sayHello("world", 1000);

// 立即返回null,实际调用在后台执行
System.out.println("调用立即返回,继续执行其他操作...");

// 从RpcContext获取Future对象
Future<String> future = RpcContext.getContext().getFuture();

// 等待并获取结果
String result = future.get(1500, TimeUnit.MILLISECONDS);

注意事项

  1. 超时控制:Dubbo默认异步调用超时为1000ms
  2. Future获取:必须在发起调用的同一线程中获取Future对象
  3. 异常处理:异步调用需要通过Future检查执行状态和异常

线程池

已有线程池实现

1. 固定大小线程池(fix)

  • 这是Dubbo默认采用的线程池实现方式
  • 默认创建200个工作线程,没有等待队列
  • 线程数量固定,不会动态增减

2. 缓存线程池(cache)

  • 线程数量不固定,会根据需求动态创建
  • 空闲线程会被回收(默认60秒后)
  • 理论上可以无限扩展线程数量

配置示例

<dubbo:protocol name="dubbo" threadpool="fixed" threads="200"/>

自定义线程池方案

public class WzkWatchingThreadPool extends FixedThreadPool implements Runnable {
    private static final double ALARM_PERCENT = 0.9;
    private final Map<URL, ThreadPoolExecutor> THREADS_POOLS = new ConcurrentHashMap<>();

    @Override
    public Executor getExecutor(URL url) {
        final Executor executor = super.getExecutor(url);
        if (executor instanceof ThreadPoolExecutor) {
            THREADS_POOLS.put(url, (ThreadPoolExecutor) executor);
        }
        return executor;
    }

    @Override
    public void run() {
        // 监控线程池使用情况
    }
}

SPI 声明

文件 META-INF/dubbo/org.apache.dubbo.common.threadpool.ThreadPool

wzkWatching=icu.wzk.pool.WzkWatchingThreadPool

配置使用

dubbo.provider.threadpool=wzkWatching

最佳实践

  • 为不同业务设置独立的线程池
  • 根据业务特点选择合适的线程池类型
  • 建立完善的监控告警体系
  • 定期进行压力测试评估线程池容量