本文是大数据系列第 11 篇,介绍在 MapReduce 框架下实现多表 JOIN 的四种经典策略及其 Java 实现。

完整图文版(含截图):CSDN 原文 | 掘金

四种 JOIN 策略概览

策略适用场景核心特点
Reduce-Side Join两表均较大最通用,但 Shuffle 开销大
Map-Side Join一表可装入内存无 Shuffle,效率最高
Semi-Join小表键集合可广播预过滤大表,减少无效传输
Bloom Join超大规模数据集概率过滤,接受极低误判率

Reduce-Side Join

原理:Mapper 为每条记录打上来源标记,以 join key 作为输出键,Shuffle 阶段将相同 key 的记录汇聚到同一个 Reducer,由 Reducer 完成实际的连接操作。

特点

  • 适用于任意大小的数据集
  • 实现相对简单
  • 网络传输量大(所有数据都要经过 Shuffle)
  • Reducer 容易成为瓶颈(reduce端处理压力过大

业务场景示例

以项目数据整合为例,需要连接两张表:

  • 表1:项目编号 + 项目名称
  • 表2:项目编号 + 项目类型 + 项目来源

目标:输出包含所有字段的完整记录,等价于 SQL 的 LEFT JOIN

核心数据类

// ProjectBean.java - 实现 Writable 序列化接口
public class ProjectBean implements Writable {
    private String projectCode;
    private String projectName;
    private String projectType;
    private String projectFrom;
    private String flag;  // 标识数据来源表

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(projectCode);
        out.writeUTF(projectName);
        out.writeUTF(projectType);
        out.writeUTF(projectFrom);
        out.writeUTF(flag);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.projectCode = in.readUTF();
        this.projectName = in.readUTF();
        this.projectType = in.readUTF();
        this.projectFrom = in.readUTF();
        this.flag = in.readUTF();
    }
}

Mapper 实现

public class ReducerJoinMapper
        extends Mapper<LongWritable, Text, Text, ProjectBean> {

    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        // 通过输入分片路径区分数据来源
        String path = context.getInputSplit().toString();
        String line = value.toString();
        String[] fields = line.split(",");

        ProjectBean bean = new ProjectBean();
        if (path.contains("table1")) {
            bean.setProjectCode(fields[0]);
            bean.setProjectName(fields[1]);
            bean.setFlag("1");
        } else {
            bean.setProjectCode(fields[0]);
            bean.setProjectType(fields[1]);
            bean.setProjectFrom(fields[2]);
            bean.setFlag("2");
        }
        context.write(new Text(bean.getProjectCode()), bean);
    }
}

Reducer 实现

public class ReducerJoinReducer
        extends Reducer<Text, ProjectBean, Text, ProjectBean> {

    @Override
    protected void reduce(Text key, Iterable<ProjectBean> values, Context context)
            throws IOException, InterruptedException {
        List<ProjectBean> table1 = new ArrayList<>();
        List<ProjectBean> table2 = new ArrayList<>();

        for (ProjectBean bean : values) {
            if ("1".equals(bean.getFlag())) {
                table1.add(copyBean(bean));
            } else {
                table2.add(copyBean(bean));
            }
        }

        // 笛卡尔积合并两表匹配记录
        for (ProjectBean b1 : table1) {
            for (ProjectBean b2 : table2) {
                ProjectBean result = new ProjectBean();
                result.setProjectCode(key.toString());
                result.setProjectName(b1.getProjectName());
                result.setProjectType(b2.getProjectType());
                result.setProjectFrom(b2.getProjectFrom());
                context.write(key, result);
            }
        }
    }
}

Map-Side Join

原理:将小表通过 DistributedCache 分发到所有节点,每个 Mapper 在 setup() 阶段将小表加载到内存 HashMap,处理大表时直接查表完成连接,完全跳过 Shuffle 阶段。

适用条件:小表能完整装入单台机器内存(通常 < 几百 MB)。

// Driver 中配置分布式缓存
job.addCacheFile(new URI("/hdfs/path/to/small_table.csv"));

// Mapper setup 阶段加载小表
@Override
protected void setup(Context context) throws IOException {
    URI[] cacheFiles = context.getCacheFiles();
    // 读取缓存文件并构建 HashMap
    BufferedReader reader = new BufferedReader(
        new FileReader(new File(cacheFiles[0].getPath())));
    String line;
    while ((line = reader.readLine()) != null) {
        String[] fields = line.split(",");
        cacheMap.put(fields[0], fields[1]);
    }
}

Semi-Join

原理:先将小表的 join key 集合发送给大表的 Mapper,Mapper 根据 key 集合对大表进行预过滤,只输出有匹配可能的记录,减少进入 Shuffle 的数据量。

适合小表 key 集合不大但小表整体较大的场景,是 Reduce-Side Join 的优化变体。

Bloom Join

原理:使用 Bloom Filter 对小表 key 进行概率压缩,将极小的过滤器广播给所有 Mapper。Mapper 用 Bloom Filter 预筛选大表记录,以极低的误判率(false positive)换取显著的数据量削减。

适用于超大规模数据集,当 Semi-Join 的 key 集合本身都很大时的进一步优化手段。

性能对比与选型建议

  • 两表都大 → Reduce-Side Join,考虑二次排序减轻 Reducer 压力
  • 一表小(可放内存) → Map-Side Join,性能最优
  • 小表 key 可广播 → Semi-Join 预过滤 + Reduce-Side Join
  • 超大规模 → Bloom Join,接受极低误判率

在实际生产环境中,Hive 的查询优化器会自动选择 Map Join(对应 Map-Side Join),可通过 set hive.auto.convert.join=true 开启。