本文是大数据系列第 11 篇,介绍在 MapReduce 框架下实现多表 JOIN 的四种经典策略及其 Java 实现。
四种 JOIN 策略概览
| 策略 | 适用场景 | 核心特点 |
|---|---|---|
| Reduce-Side Join | 两表均较大 | 最通用,但 Shuffle 开销大 |
| Map-Side Join | 一表可装入内存 | 无 Shuffle,效率最高 |
| Semi-Join | 小表键集合可广播 | 预过滤大表,减少无效传输 |
| Bloom Join | 超大规模数据集 | 概率过滤,接受极低误判率 |
Reduce-Side Join
原理:Mapper 为每条记录打上来源标记,以 join key 作为输出键,Shuffle 阶段将相同 key 的记录汇聚到同一个 Reducer,由 Reducer 完成实际的连接操作。
特点:
- 适用于任意大小的数据集
- 实现相对简单
- 网络传输量大(所有数据都要经过 Shuffle)
- Reducer 容易成为瓶颈(
reduce端处理压力过大)
业务场景示例
以项目数据整合为例,需要连接两张表:
- 表1:项目编号 + 项目名称
- 表2:项目编号 + 项目类型 + 项目来源
目标:输出包含所有字段的完整记录,等价于 SQL 的 LEFT JOIN。
核心数据类
// ProjectBean.java - 实现 Writable 序列化接口
public class ProjectBean implements Writable {
private String projectCode;
private String projectName;
private String projectType;
private String projectFrom;
private String flag; // 标识数据来源表
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(projectCode);
out.writeUTF(projectName);
out.writeUTF(projectType);
out.writeUTF(projectFrom);
out.writeUTF(flag);
}
@Override
public void readFields(DataInput in) throws IOException {
this.projectCode = in.readUTF();
this.projectName = in.readUTF();
this.projectType = in.readUTF();
this.projectFrom = in.readUTF();
this.flag = in.readUTF();
}
}
Mapper 实现
public class ReducerJoinMapper
extends Mapper<LongWritable, Text, Text, ProjectBean> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 通过输入分片路径区分数据来源
String path = context.getInputSplit().toString();
String line = value.toString();
String[] fields = line.split(",");
ProjectBean bean = new ProjectBean();
if (path.contains("table1")) {
bean.setProjectCode(fields[0]);
bean.setProjectName(fields[1]);
bean.setFlag("1");
} else {
bean.setProjectCode(fields[0]);
bean.setProjectType(fields[1]);
bean.setProjectFrom(fields[2]);
bean.setFlag("2");
}
context.write(new Text(bean.getProjectCode()), bean);
}
}
Reducer 实现
public class ReducerJoinReducer
extends Reducer<Text, ProjectBean, Text, ProjectBean> {
@Override
protected void reduce(Text key, Iterable<ProjectBean> values, Context context)
throws IOException, InterruptedException {
List<ProjectBean> table1 = new ArrayList<>();
List<ProjectBean> table2 = new ArrayList<>();
for (ProjectBean bean : values) {
if ("1".equals(bean.getFlag())) {
table1.add(copyBean(bean));
} else {
table2.add(copyBean(bean));
}
}
// 笛卡尔积合并两表匹配记录
for (ProjectBean b1 : table1) {
for (ProjectBean b2 : table2) {
ProjectBean result = new ProjectBean();
result.setProjectCode(key.toString());
result.setProjectName(b1.getProjectName());
result.setProjectType(b2.getProjectType());
result.setProjectFrom(b2.getProjectFrom());
context.write(key, result);
}
}
}
}
Map-Side Join
原理:将小表通过 DistributedCache 分发到所有节点,每个 Mapper 在 setup() 阶段将小表加载到内存 HashMap,处理大表时直接查表完成连接,完全跳过 Shuffle 阶段。
适用条件:小表能完整装入单台机器内存(通常 < 几百 MB)。
// Driver 中配置分布式缓存
job.addCacheFile(new URI("/hdfs/path/to/small_table.csv"));
// Mapper setup 阶段加载小表
@Override
protected void setup(Context context) throws IOException {
URI[] cacheFiles = context.getCacheFiles();
// 读取缓存文件并构建 HashMap
BufferedReader reader = new BufferedReader(
new FileReader(new File(cacheFiles[0].getPath())));
String line;
while ((line = reader.readLine()) != null) {
String[] fields = line.split(",");
cacheMap.put(fields[0], fields[1]);
}
}
Semi-Join
原理:先将小表的 join key 集合发送给大表的 Mapper,Mapper 根据 key 集合对大表进行预过滤,只输出有匹配可能的记录,减少进入 Shuffle 的数据量。
适合小表 key 集合不大但小表整体较大的场景,是 Reduce-Side Join 的优化变体。
Bloom Join
原理:使用 Bloom Filter 对小表 key 进行概率压缩,将极小的过滤器广播给所有 Mapper。Mapper 用 Bloom Filter 预筛选大表记录,以极低的误判率(false positive)换取显著的数据量削减。
适用于超大规模数据集,当 Semi-Join 的 key 集合本身都很大时的进一步优化手段。
性能对比与选型建议
- 两表都大 → Reduce-Side Join,考虑二次排序减轻 Reducer 压力
- 一表小(可放内存) → Map-Side Join,性能最优
- 小表 key 可广播 → Semi-Join 预过滤 + Reduce-Side Join
- 超大规模 → Bloom Join,接受极低误判率
在实际生产环境中,Hive 的查询优化器会自动选择 Map Join(对应 Map-Side Join),可通过 set hive.auto.convert.join=true 开启。