This is article 10 in the Big Data series. Implement a complete MapReduce WordCount program in Java, deeply understand MapReduce programming model and Hadoop serialization.
Complete illustrated version (with full code): CSDN Original | Juejin
Why Hadoop Doesn’t Use Native Java Serialization
Java native serialization (Serializable) produces byte streams containing lots of class information, large size and slow transmission. Hadoop’s Writable serialization mechanism:
- More compact, higher transmission efficiency
- Optimized specifically for RPC and MapReduce data transmission
- Supports direct read/write to byte streams
Core Writable Types
| Java Type | Hadoop Writable |
|---|---|
String | Text |
int | IntWritable |
long | LongWritable |
float | FloatWritable |
boolean | BooleanWritable |
Maven Dependencies
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.9.0</version>
</dependency>
</dependencies>
1. WordCountMapper
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* Generic parameters: <InputKeyType, InputValueType, OutputKeyType, OutputValueType>
* Input: Line offset (LongWritable), Line text (Text)
* Output: Word (Text), Count (IntWritable)
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private Text word = new Text();
private IntWritable one = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// Split line by whitespace
String[] words = value.toString().split("\\s+");
for (String w : words) {
word.set(w);
context.write(word, one); // Output (word, 1)
}
}
}
2. WordCountReducer
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* Input: Word (Text), List of counts (Iterable<IntWritable>)
* Output: Word (Text), Total count (IntWritable)
*/
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable total = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
total.set(sum);
context.write(key, total);
}
}
3. WordCountDriver
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCountDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "WordCount");
// Set Jar main class
job.setJarByClass(WordCountDriver.class);
// Set Mapper and Reducer
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// Set output types
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// Set input/output paths
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// Submit job
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Package and Run
# Package as jar (Maven)
mvn clean package -DskipTests
# Submit to Hadoop cluster
hadoop jar wordcount.jar WordCountDriver /test/input /wcoutput3
# Run in local mode (without starting cluster)
hadoop jar wordcount.jar WordCountDriver file:///local/input file:///local/output
MapReduce Execution Flow
Input File → [InputFormat Split] → Map (Parallel) → Shuffle (Sort/Group) → Reduce → Output File
Each Block corresponds to one Map Task, multiple Map Tasks execute in parallel, Shuffle phase aggregates data with same key, Reduce phase aggregates to produce final result.
Complete Maven project (including pom.xml and log4j configuration) see CSDN Original.
Next article: Big Data 11 - MapReduce JOIN Operation