This is article 11 in the Big Data series. Introduces four classic strategies for implementing multi-table JOIN in MapReduce framework and their Java implementations.

Complete illustrated version: CSDN Original | Juejin

Four JOIN Strategies Overview

StrategyApplicable ScenarioCore Features
Reduce-Side JoinBoth tables largeMost generic, high Shuffle overhead
Map-Side JoinOne table fits in memoryNo Shuffle, highest efficiency
Semi-JoinSmall table key set can be broadcastedPre-filter large table, reduce invalid transfer
Bloom JoinUltra-large scale datasetProbability filtering, accepts very low false positive rate

Reduce-Side Join

Principle: Mapper tags each record with source identifier, uses join key as output key. Shuffle phase aggregates records with same key to the same Reducer, which completes the actual join operation.

Features:

  • Suitable for datasets of any size
  • Relatively simple to implement
  • Large network traffic (all data goes through Shuffle)
  • Reducer easily becomes bottleneck (excessive reduce-side processing pressure)

Business Scenario Example

Taking project data integration as example, need to join two tables:

  • Table 1: Project code + Project name
  • Table 2: Project code + Project type + Project source

Goal: Output complete records with all fields, equivalent to SQL LEFT JOIN.

Core Data Class

// ProjectBean.java - Implement Writable serialization interface
public class ProjectBean implements Writable {
    private String projectCode;
    private String projectName;
    private String projectType;
    private String projectFrom;
    private String flag;  // Identify data source table

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(projectCode);
        out.writeUTF(projectName);
        out.writeUTF(projectType);
        out.writeUTF(projectFrom);
        out.writeUTF(flag);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.projectCode = in.readUTF();
        this.projectName = in.readUTF();
        this.projectType = in.readUTF();
        this.projectFrom = in.readUTF();
        this.flag = in.readUTF();
    }
}

Mapper Implementation

public class ReducerJoinMapper
        extends Mapper<LongWritable, Text, Text, ProjectBean> {

    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        // Distinguish data source by input split path
        String path = context.getInputSplit().toString();
        String line = value.toString();
        String[] fields = line.split(",");

        ProjectBean bean = new ProjectBean();
        if (path.contains("table1")) {
            bean.setProjectCode(fields[0]);
            bean.setProjectName(fields[1]);
            bean.setFlag("1");
        } else {
            bean.setProjectCode(fields[0]);
            bean.setProjectType(fields[1]);
            bean.setProjectFrom(fields[2]);
            bean.setFlag("2");
        }
        context.write(new Text(bean.getProjectCode()), bean);
    }
}

Reducer Implementation

public class ReducerJoinReducer
        extends Reducer<Text, ProjectBean, Text, ProjectBean> {

    @Override
    protected void reduce(Text key, Iterable<ProjectBean> values, Context context)
            throws IOException, InterruptedException {
        List<ProjectBean> table1 = new ArrayList<>();
        List<ProjectBean> table2 = new ArrayList<>();

        for (ProjectBean bean : values) {
            if ("1".equals(bean.getFlag())) {
                table1.add(copyBean(bean));
            } else {
                table2.add(copyBean(bean));
            }
        }

        // Cartesian product merge matching records from two tables
        for (ProjectBean b1 : table1) {
            for (ProjectBean b2 : table2) {
                ProjectBean result = new ProjectBean();
                result.setProjectCode(key.toString());
                result.setProjectName(b1.getProjectName());
                result.setProjectType(b2.getProjectType());
                result.setProjectFrom(b2.getProjectFrom());
                context.write(key, result);
            }
        }
    }
}

Map-Side Join

Principle: Distribute small table to all nodes via DistributedCache. Each Mapper loads small table into memory HashMap in setup() phase. When processing large table, directly lookup in table to complete join, completely skipping Shuffle phase.

Applicable Conditions: Small table can fit entirely in single machine memory (usually < a few hundred MB).

// Configure distributed cache in Driver
job.addCacheFile(new URI("/hdfs/path/to/small_table.csv"));

// Load small table in Mapper setup phase
@Override
protected void setup(Context context) throws IOException {
    URI[] cacheFiles = context.getCacheFiles();
    // Read cache file and build HashMap
    BufferedReader reader = new BufferedReader(
        new FileReader(new File(cacheFiles[0].getPath())));
    String line;
    while ((line = reader.readLine()) != null) {
        String[] fields = line.split(",");
        cacheMap.put(fields[0], fields[1]);
    }
}

Semi-Join

Principle: First send small table’s join key set to large table’s Mapper. Mapper pre-filters large table based on key set, only outputs records that may have matches, reducing data volume entering Shuffle.

Suitable when small table key set is not large but small table overall is large. It’s an optimized variant of Reduce-Side Join.

Bloom Join

Principle: Use Bloom Filter to probabilistically compress small table keys. Broadcast the tiny filter to all Mappers. Mappers use Bloom Filter to pre-filter large table records, achieving significant data reduction at the cost of very low false positive rate.

Applicable for ultra-large scale datasets, further optimization when Semi-Join’s key set itself is too large.

Performance Comparison and Selection Guide

  • Both tables large → Reduce-Side Join, consider secondary sort to reduce Reducer pressure
  • One table small (fits in memory) → Map-Side Join, best performance
  • Small table key can be broadcasted → Semi-Join pre-filter + Reduce-Side Join
  • Ultra-large scale → Bloom Join, accept very low false positive rate

In actual production environment, Hive’s query optimizer will automatically select Map Join (corresponds to Map-Side Join), which can be enabled via set hive.auto.convert.join=true.