Big Data 192 - DataX 3.0 Architecture & Practice
TL;DR
- Scenario: Offline sync MySQL/HDFS/Hive/OTS/ODPS and other heterogeneous data sources, batch migration and data warehouse ETL.
- Conclusion: DataX uses Framework + Reader/Writer plugins to reduce mesh docking to star topology, key is Job splitting + TaskGroup concurrency + speed/errorLimit control.
- Output: Deployable installation and operation instructions, version dependency matrix, and common error diagnosis and fix cards.
Version Matrix
| Item | Description |
|---|---|
| DataX Core Positioning | Offline data sync framework, Reader/Writer plugin abstracts heterogeneous data source sync |
| Running Environment | Linux + JDK 1.8 or above (recommend 1.8) + Python 2 or 3 |
| Official Download & Run | Extract and enter bin, run via python datax.py job.json |
| Job Core Config | job.setting.speed + job.setting.errorLimit as common control plane |
Overview
DataX is a high-performance offline data sync tool/platform independently developed by Alibaba Group and widely used internally. As an important component in Alibaba’s big data ecosystem, it specifically solves heterogeneous data source sync challenges in enterprise data integration.
Regarding data source support, DataX has strong compatibility and currently supports:
- Relational databases: MySQL, Oracle, SQL Server, PostgreSQL
- Big data ecosystem components: HDFS, Hive, HBase
- Alibaba Cloud services: AnalyticDB (ADS), TableStore (OTS), MaxCompute (ODPS)
- Distributed database: DRDS (Alibaba Cloud Distributed Relational Database Service)
DataX reconstructs data sync chain using star topology:
- Core Layer: DataX as central scheduling and transmission engine
- Access Layer: Each data source accesses DataX core via plugin mode
- Transmission Layer: Unified data format conversion and transmission channel
This architecture’s advantages:
- Scalability: New data sources only need to develop corresponding plugins, no need to modify core architecture
- Maintainability: All sync tasks managed through unified platform
- Reliability: Built-in fault detection and retry mechanism
- Performance Optimization: Supports concurrency control and flow control
Typical application scenarios:
- Data warehouse ETL: Regularly sync business system data to data warehouse
- Cross-cloud data migration: Transfer data between different cloud providers
- Data backup: Implement offsite backup of important data
- Data analysis: Prepare training data for scenarios like machine learning
DataX Core Architecture
As Alibaba’s open-sourced high-performance offline data sync tool, DataX’s core architecture uses Framework+plugin mode design. This modular architecture gives the system high flexibility and extensibility. The framework abstracts key functions in data sync process into three core components:
1. Reader Data Collection Module
- Responsible for collecting data from various data sources, supports multiple data formats and protocols
- Built-in rich data source plugins
- Implements parallel data extraction through sharding strategy, improving collection efficiency
- Example scenario: Reading tens of millions of order data from MySQL database
2. Writer Data Write Module
- Responsible for writing processed data to target storage system
- Supports multiple write modes: full overwrite, incremental append, conditional update
- Provides data validation and error retry mechanism
- Typical application: Writing cleaned data to Hive data warehouse
3. Framework Core
- Acts as data transmission hub, achieving efficient对接 between Reader and Writer
- Key technical features: memory buffer management, flow control, concurrency scheduling, data conversion, dirty data handling
After years of development and community contributions, DataX has built a complete plugin ecosystem:
- Supports 30+ data source plugins
- Covers mainstream database systems (MySQL/Oracle/PostgreSQL, etc.)
- Compatible with big data ecosystem (HDFS/Hive/HBase, etc.)
- Supports file systems (FTP/SFTP, etc.)
Core Modules
Job Management
DataX calls a single data sync job a Job. When DataX receives a Job request, it starts an independent process to execute the entire data sync flow. As the central management node of the job, Job module is mainly responsible for:
- Data cleanup (including preprocessing and post-processing)
- Sub-task splitting (decompose single job into multiple parallel Tasks)
- Task scheduling and status monitoring
Task Splitting Mechanism
After Job starts, uses different splitting strategies based on source data characteristics:
- For RDBMS data sources, typically split by primary key range or sharding key
- For HDFS and other file systems, can split by file blocks
- Each Task is responsible for processing a portion of data
Task Scheduling (Scheduler)
After splitting, Tasks are reorganized through Scheduler module:
- Default each TaskGroup contains 5 concurrent Tasks
- TaskGroup count = ceil(total Tasks / concurrency)
Task Execution Flow
Each Task follows standard processing flow after starting:
- Reader thread: Read data from source
- Channel thread: Responsible for data transmission and buffering
- Writer thread: Write data to target
- Three threads achieve efficient pipeline operation through memory queue
Job Status Management
Job continuously monitors all TaskGroups during execution:
- Success condition: All TaskGroups complete with no errors
- Failure handling: Any TaskGroup failure means overall failure
- Status feedback: Exit code indicates (0 success, non-zero failure)
Core Advantages
- Reliable data quality monitoring
- Rich data transformation functions
- Precise speed control
- Powerful sync performance
- Robust fault tolerance mechanism
- Minimalist usage experience
Official website: https://github.com/alibaba/DataX/blob/master/introduction.md
Download Project
cd /opt/software
wget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
Environment Variables
vim /etc/profile
# datax
export DATAX_HOME=/opt/servers/datax
export PATH=$PATH:$DATAX_HOME/bin
Reader, Writer
DataX 3.0 provides Reader and Writer plugins, each plugin has one or more splitting strategies:
"reader": {
"name": "mysqlreader", // Get data from mysql database
"name": "txtfilereader", // Get data from local files
"name": "hdfsreader", // Get data from hdfs files, hive tables
"name": "streamreader", // Get data from stream (often used for testing)
"name": "httpreader", // Get data from http URL
}
"writer": {
"name":"hdfswriter", // Write data to hdfs, hive tables
"name":"mysqlwriter", // Write data to mysql
"name":"streamwriter", // Write data to stream (often used for testing)
}
JSON Template
- The entire config file is a Job description
- Job has two config items, content and setting. content describes source and destination info, setting describes job itself info
- content is divided into reader and writer, describing source and destination info respectively
- speed item in setting indicates how many concurrent tasks to run
Job Basic Config
{
"job": {
"content": [{
"reader": {
"name": "",
"parameter": {}
},
"writer": {
"name": "",
"parameter": {}
}
}],
"setting": {
"speed": {},
"errorLimit": {}
}
}
}
Job Setting Config
{
"job": {
"content": [{
"reader": {
"name": "",
"parameter": {}
},
"writer": {
"name": "",
"parameter": {}
}
}],
"setting": {
"speed": {
"channel": 1,
"byte": 104857600
},
"errorLimit": {
"record": 10,
"percentage": 0.05
}
}
}
}
- job.setting.speed Flow control: Job supports user-defined speed control, channel value controls concurrency, byte value controls speed
- job.setting.errorLimit Dirty data control: Job supports user-defined dirty data monitoring and alerting, including max dirty record threshold (record) or dirty data percentage threshold (percentage). When dirty data exceeds user-specified quantity/percentage during Job transmission, DataX exits with error.
Application Cases
Stream => Stream
{
"job": {
"content": [{
"reader": {
"name": "streamreader",
"parameter": {
"sliceRecordCount": 10,
"column": [
{"type": "String", "value": "hello DataX"},
{"type": "string", "value": "DataX Stream To Stream"},
{"type": "string", "value": "数据迁移工具"}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "GBK",
"print": true
}
}
}],
"setting": {
"speed": {
"channel": 2
}
}
}
}
Execute script:
python $DATAX_HOME/bin/datax.py /data/lagoudw/json/stream2stream.json
Error Quick Reference
| Symptom | Diagnosis | Fix |
|---|---|---|
python: command not found / Python version error | Python not installed on machine, or default python points to incompatible version | which python / python -V / python3 -V install Python (2 or 3), ensure startup command uses actual interpreter (python3 datax.py ... or configure symlink) |
java: command not found / Fails immediately on start | JDK not installed or PATH not effective | java -version / echo $JAVA_HOME install JDK 1.8+, configure env vars and reload profile |
Permission denied (script execution/target path write failure) | No permission on DataX directory or target directory | ls -l / check target path owner and permissions adjust directory permissions/owner; create target path in advance; avoid writing with no-permission user |
| Task runs then overall fails (non-zero exit) | Any TaskGroup failure causes Job failure | Check log/console failure stack; locate failed reader/writer first reduce concurrency (channel), then locate plugin params/connectivity; enable finer logging if needed |
| Exits immediately after error, indicates dirty data exceeded limit | job.setting.errorLimit triggered (record/percentage exceeded threshold) | Check if error output shows record/percentage info adjust errorLimit threshold; or fix source data field/type mapping; write dirty data to db/file for backtracking |
| Speed far below expected / target pressure high | speed not configured or unreasonable (channel/byte), or target write bottleneck | Compare source read speed, Channel rate, target write TPS first adjust channel to control concurrency, then use byte for rate limit; gradually test with target bearable as upper limit |
Connection refused / timeout (MySQL etc.) | Network unreachable, port/account permission, whitelist/firewall | telnet host port / nc -vz / database login test fix network and auth; confirm db/table permissions and timezone/charset connection params |
| Character encoding garbled (console/output file) | writer/reader encoding config inconsistent with actual | Compare source db charset, task JSON encoding, output result unify encoding; for file writer explicitly set encoding; avoid mixing GBK/UTF-8 |