This is article 10 in the Big Data series. Implement a complete MapReduce WordCount program in Java, deeply understand MapReduce programming model and Hadoop serialization.

Complete illustrated version (with full code): CSDN Original | Juejin

Why Hadoop Doesn’t Use Native Java Serialization

Java native serialization (Serializable) produces byte streams containing lots of class information, large size and slow transmission. Hadoop’s Writable serialization mechanism:

  • More compact, higher transmission efficiency
  • Optimized specifically for RPC and MapReduce data transmission
  • Supports direct read/write to byte streams

Core Writable Types

Java TypeHadoop Writable
StringText
intIntWritable
longLongWritable
floatFloatWritable
booleanBooleanWritable

Maven Dependencies

<dependencies>
  <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>2.9.0</version>
  </dependency>
  <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-mapreduce-client-core</artifactId>
    <version>2.9.0</version>
  </dependency>
</dependencies>

1. WordCountMapper

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

/**
 * Generic parameters: <InputKeyType, InputValueType, OutputKeyType, OutputValueType>
 * Input: Line offset (LongWritable), Line text (Text)
 * Output: Word (Text), Count (IntWritable)
 */
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    private Text word = new Text();
    private IntWritable one = new IntWritable(1);

    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        // Split line by whitespace
        String[] words = value.toString().split("\\s+");
        for (String w : words) {
            word.set(w);
            context.write(word, one);  // Output (word, 1)
        }
    }
}

2. WordCountReducer

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;

/**
 * Input: Word (Text), List of counts (Iterable<IntWritable>)
 * Output: Word (Text), Total count (IntWritable)
 */
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    private IntWritable total = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        total.set(sum);
        context.write(key, total);
    }
}

3. WordCountDriver

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountDriver {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "WordCount");

        // Set Jar main class
        job.setJarByClass(WordCountDriver.class);

        // Set Mapper and Reducer
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        // Set output types
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // Set input/output paths
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // Submit job
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

Package and Run

# Package as jar (Maven)
mvn clean package -DskipTests

# Submit to Hadoop cluster
hadoop jar wordcount.jar WordCountDriver /test/input /wcoutput3

# Run in local mode (without starting cluster)
hadoop jar wordcount.jar WordCountDriver file:///local/input file:///local/output

MapReduce Execution Flow

Input File → [InputFormat Split] → Map (Parallel) → Shuffle (Sort/Group) → Reduce → Output File

Each Block corresponds to one Map Task, multiple Map Tasks execute in parallel, Shuffle phase aggregates data with same key, Reduce phase aggregates to produce final result.

Complete Maven project (including pom.xml and log4j configuration) see CSDN Original.

Next article: Big Data 11 - MapReduce JOIN Operation