This is article 11 in the Big Data series. Introduces four classic strategies for implementing multi-table JOIN in MapReduce framework and their Java implementations.
Complete illustrated version: CSDN Original | Juejin
Four JOIN Strategies Overview
| Strategy | Applicable Scenario | Core Features |
|---|---|---|
| Reduce-Side Join | Both tables large | Most generic, high Shuffle overhead |
| Map-Side Join | One table fits in memory | No Shuffle, highest efficiency |
| Semi-Join | Small table key set can be broadcasted | Pre-filter large table, reduce invalid transfer |
| Bloom Join | Ultra-large scale dataset | Probability filtering, accepts very low false positive rate |
Reduce-Side Join
Principle: Mapper tags each record with source identifier, uses join key as output key. Shuffle phase aggregates records with same key to the same Reducer, which completes the actual join operation.
Features:
- Suitable for datasets of any size
- Relatively simple to implement
- Large network traffic (all data goes through Shuffle)
- Reducer easily becomes bottleneck (
excessive reduce-side processing pressure)
Business Scenario Example
Taking project data integration as example, need to join two tables:
- Table 1: Project code + Project name
- Table 2: Project code + Project type + Project source
Goal: Output complete records with all fields, equivalent to SQL LEFT JOIN.
Core Data Class
// ProjectBean.java - Implement Writable serialization interface
public class ProjectBean implements Writable {
private String projectCode;
private String projectName;
private String projectType;
private String projectFrom;
private String flag; // Identify data source table
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(projectCode);
out.writeUTF(projectName);
out.writeUTF(projectType);
out.writeUTF(projectFrom);
out.writeUTF(flag);
}
@Override
public void readFields(DataInput in) throws IOException {
this.projectCode = in.readUTF();
this.projectName = in.readUTF();
this.projectType = in.readUTF();
this.projectFrom = in.readUTF();
this.flag = in.readUTF();
}
}
Mapper Implementation
public class ReducerJoinMapper
extends Mapper<LongWritable, Text, Text, ProjectBean> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// Distinguish data source by input split path
String path = context.getInputSplit().toString();
String line = value.toString();
String[] fields = line.split(",");
ProjectBean bean = new ProjectBean();
if (path.contains("table1")) {
bean.setProjectCode(fields[0]);
bean.setProjectName(fields[1]);
bean.setFlag("1");
} else {
bean.setProjectCode(fields[0]);
bean.setProjectType(fields[1]);
bean.setProjectFrom(fields[2]);
bean.setFlag("2");
}
context.write(new Text(bean.getProjectCode()), bean);
}
}
Reducer Implementation
public class ReducerJoinReducer
extends Reducer<Text, ProjectBean, Text, ProjectBean> {
@Override
protected void reduce(Text key, Iterable<ProjectBean> values, Context context)
throws IOException, InterruptedException {
List<ProjectBean> table1 = new ArrayList<>();
List<ProjectBean> table2 = new ArrayList<>();
for (ProjectBean bean : values) {
if ("1".equals(bean.getFlag())) {
table1.add(copyBean(bean));
} else {
table2.add(copyBean(bean));
}
}
// Cartesian product merge matching records from two tables
for (ProjectBean b1 : table1) {
for (ProjectBean b2 : table2) {
ProjectBean result = new ProjectBean();
result.setProjectCode(key.toString());
result.setProjectName(b1.getProjectName());
result.setProjectType(b2.getProjectType());
result.setProjectFrom(b2.getProjectFrom());
context.write(key, result);
}
}
}
}
Map-Side Join
Principle: Distribute small table to all nodes via DistributedCache. Each Mapper loads small table into memory HashMap in setup() phase. When processing large table, directly lookup in table to complete join, completely skipping Shuffle phase.
Applicable Conditions: Small table can fit entirely in single machine memory (usually < a few hundred MB).
// Configure distributed cache in Driver
job.addCacheFile(new URI("/hdfs/path/to/small_table.csv"));
// Load small table in Mapper setup phase
@Override
protected void setup(Context context) throws IOException {
URI[] cacheFiles = context.getCacheFiles();
// Read cache file and build HashMap
BufferedReader reader = new BufferedReader(
new FileReader(new File(cacheFiles[0].getPath())));
String line;
while ((line = reader.readLine()) != null) {
String[] fields = line.split(",");
cacheMap.put(fields[0], fields[1]);
}
}
Semi-Join
Principle: First send small table’s join key set to large table’s Mapper. Mapper pre-filters large table based on key set, only outputs records that may have matches, reducing data volume entering Shuffle.
Suitable when small table key set is not large but small table overall is large. It’s an optimized variant of Reduce-Side Join.
Bloom Join
Principle: Use Bloom Filter to probabilistically compress small table keys. Broadcast the tiny filter to all Mappers. Mappers use Bloom Filter to pre-filter large table records, achieving significant data reduction at the cost of very low false positive rate.
Applicable for ultra-large scale datasets, further optimization when Semi-Join’s key set itself is too large.
Performance Comparison and Selection Guide
- Both tables large → Reduce-Side Join, consider secondary sort to reduce Reducer pressure
- One table small (fits in memory) → Map-Side Join, best performance
- Small table key can be broadcasted → Semi-Join pre-filter + Reduce-Side Join
- Ultra-large scale → Bloom Join, accept very low false positive rate
In actual production environment, Hive’s query optimizer will automatically select Map Join (corresponds to Map-Side Join), which can be enabled via set hive.auto.convert.join=true.