异步调用
Dubbo 不仅提供了传统的同步阻塞调用方式,还支持高效的异步调用模式。
异步调用实现方式
服务定义
public interface GreetingService {
String sayHello(String name, int timeToWait);
}
消费者配置
<dubbo:reference id="greetingService" interface="com.example.GreetingService">
<dubbo:method name="sayHello" async="true" />
</dubbo:reference>
消费者调用示例
// 发起异步调用
greetingService.sayHello("world", 1000);
// 立即返回null,实际调用在后台执行
System.out.println("调用立即返回,继续执行其他操作...");
// 从RpcContext获取Future对象
Future<String> future = RpcContext.getContext().getFuture();
// 等待并获取结果
String result = future.get(1500, TimeUnit.MILLISECONDS);
注意事项
- 超时控制:Dubbo默认异步调用超时为1000ms
- Future获取:必须在发起调用的同一线程中获取Future对象
- 异常处理:异步调用需要通过Future检查执行状态和异常
线程池
已有线程池实现
1. 固定大小线程池(fix)
- 这是Dubbo默认采用的线程池实现方式
- 默认创建200个工作线程,没有等待队列
- 线程数量固定,不会动态增减
2. 缓存线程池(cache)
- 线程数量不固定,会根据需求动态创建
- 空闲线程会被回收(默认60秒后)
- 理论上可以无限扩展线程数量
配置示例
<dubbo:protocol name="dubbo" threadpool="fixed" threads="200"/>
自定义线程池方案
public class WzkWatchingThreadPool extends FixedThreadPool implements Runnable {
private static final double ALARM_PERCENT = 0.9;
private final Map<URL, ThreadPoolExecutor> THREADS_POOLS = new ConcurrentHashMap<>();
@Override
public Executor getExecutor(URL url) {
final Executor executor = super.getExecutor(url);
if (executor instanceof ThreadPoolExecutor) {
THREADS_POOLS.put(url, (ThreadPoolExecutor) executor);
}
return executor;
}
@Override
public void run() {
// 监控线程池使用情况
}
}
SPI 声明
文件 META-INF/dubbo/org.apache.dubbo.common.threadpool.ThreadPool
wzkWatching=icu.wzk.pool.WzkWatchingThreadPool
配置使用
dubbo.provider.threadpool=wzkWatching
最佳实践
- 为不同业务设置独立的线程池
- 根据业务特点选择合适的线程池类型
- 建立完善的监控告警体系
- 定期进行压力测试评估线程池容量