TL;DR
- 场景:离线同步 MySQL/HDFS/Hive/OTS/ODPS 等异构数据源,批量迁移与数仓 ETL。
- 结论:DataX 用 Framework + Reader/Writer 插件把”网状对接”降维成”星型链路”,关键靠 Job 切分 + TaskGroup 并发 + speed/errorLimit 控制。
- 产出:可直接落地的安装运行口径、版本依赖矩阵、以及常见报错的定位与修复卡片。
版本矩阵
| 项目 | 说明 |
|---|---|
| DataX 核心定位 | 离线数据同步框架,Reader/Writer 插件抽象异构数据源同步 |
| 运行环境口径 | Linux + JDK 1.8 或以上(推荐 1.8)+ Python 2 或 3 |
| 官方下载与运行入口口径 | 解压后进入 bin,通过 python datax.py job.json 运行 |
| 作业核心配置口径 | job.setting.speed + job.setting.errorLimit 作为常用控制面 |
基本概述
DataX是阿里巴巴集团自主研发并在内部广泛使用的一款高性能离线数据同步工具/平台。作为阿里大数据生态体系中的重要组件,它专门用于解决企业级数据集成过程中的异构数据源同步难题。
在数据源支持方面,DataX具备强大的兼容性,目前已实现对多种主流数据库和大数据存储系统的支持,包括:
- 关系型数据库:MySQL、Oracle、SQL Server、PostgreSQL等
- 大数据生态组件:HDFS、Hive、HBase等
- 阿里云服务:AnalyticDB(ADS)、TableStore(OTS)、MaxCompute(ODPS)
- 分布式数据库:DRDS(阿里云分布式关系型数据库服务)
DataX采用星型拓扑结构重构了数据同步链路:
- 核心层:DataX作为中央调度和传输引擎
- 接入层:各数据源通过插件化方式接入DataX核心
- 传输层:统一的数据格式转换和传输通道
这种架构的优势体现在:
- 扩展性:新增数据源只需开发对应插件,无需修改核心架构
- 维护性:所有同步任务通过统一平台管理
- 可靠性:内置故障检测和重试机制
- 性能优化:支持并发控制和流量控制
典型应用场景包括:
- 数据仓库ETL过程:将业务系统数据定期同步到数据仓库
- 跨云数据迁移:在不同云服务商之间转移数据
- 数据备份:实现重要数据的异地备份
- 数据分析:为机器学习等场景准备训练数据
DataX核心架构
DataX作为阿里巴巴开源的高性能离线数据同步工具,其核心架构采用Framework+plugin模式设计,这种模块化架构使得系统具有高度的灵活性和可扩展性。该框架将数据同步过程中的关键功能抽象为三大核心组件:
1. Reader数据采集模块
- 负责从各类数据源采集数据,支持多种数据格式和协议
- 内置丰富的数据源插件
- 通过分片策略实现并行数据抽取,提高采集效率
- 示例场景:从MySQL数据库中读取千万级订单数据
2. Writer数据写入模块
- 负责将处理后的数据写入目标存储系统
- 支持多种写入模式:全量覆盖、增量追加、条件更新
- 提供数据校验和错误重试机制
- 典型应用:将清洗后的数据写入Hive数据仓库
3. Framework核心框架
- 作为数据传输中枢,实现Reader和Writer的高效对接
- 关键技术特性:内存缓冲管理、流量控制、并发调度、数据转换、脏数据处理
经过多年发展和社区贡献,DataX已经构建了完善的插件生态体系:
- 支持30+种数据源插件
- 覆盖主流数据库系统(MySQL/Oracle/PostgreSQL等)
- 兼容大数据生态系统(HDFS/Hive/HBase等)
- 支持文件系统(FTP/SFTP等)
核心模块
Job(作业)管理
DataX完成单个数据同步的作业称为Job。当DataX接收到一个Job请求时,会启动一个独立的进程来执行整个数据同步流程。Job模块作为作业的中枢管理节点,主要负责以下核心功能:
- 数据清理(包括预处理和后处理)
- 子任务切分(将单一作业分解为多个并行Task)
- 任务调度和状态监控
Task切分机制
Job启动后,会根据源端数据特性采用不同的切分策略:
- 对于RDBMS数据源,通常按照主键范围或分片键切分
- 对于HDFS等文件系统,可按文件块切分
- 每个Task负责处理一部分数据
任务调度(Scheduler)
切分后的Task会通过Scheduler模块进行重组:
- 默认每个TaskGroup包含5个并发Task
- TaskGroup数量 = ceil(总Task数/并发数)
Task执行流程
每个Task启动后遵循标准处理流程:
- Reader线程:从数据源读取数据
- Channel线程:负责数据传输和缓冲
- Writer线程:将数据写入目标端
- 三线程通过内存队列实现高效流水线作业
作业状态管理
Job运行期间会持续监控所有TaskGroup:
- 成功条件:所有TaskGroup均完成且无错误
- 失败处理:任一TaskGroup失败即整体失败
- 状态反馈:通过进程退出码标识(0成功,非0失败)
核心优势
- 可靠的数据质量监控
- 丰富的数据转换功能
- 精准的速度控制
- 强劲的同步性能
- 健壮的容错机制
- 极简的使用体验
官方网站:https://github.com/alibaba/DataX/blob/master/introduction.md
下载项目
cd /opt/software
wget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
环境变量
vim /etc/profile
# datax
export DATAX_HOME=/opt/servers/datax
export PATH=$PATH:$DATAX_HOME/bin
Reader、Writer
Data3.0提供Reader插件和Writer插件,每种插件都有一种和多种切分策略:
"reader": {
"name": "mysqlreader", //从mysql数据库获取数据
"name": "txtfilereader", //从本地获取数据
"name": "hdfsreader", //从hdfs文件、hive表获取数据
"name": "streamreader", //从stream流获取数据(常用于测试)
"name": "httpreader", //从http URL获取数据
}
"writer": {
"name":"hdfswriter", //向hdfs,hive表写入数据
"name":"mysqlwriter", //向mysql写入数据
"name":"streamwriter", //向stream流写入数据(常用于测试)
}
JSON模板
- 整个配置文件就是一个Job描述
- Job下面有两个配置项,content和setting,其中content用来描述该任务的源和目的端的信息,setting用来描述任务本身的信息
- content又分为两部分,reader和writer,分别用来描述源端和目的端的信息
- setting中的speed项表示同时起几个并发执行该任务
Job基本配置
{
"job": {
"content": [{
"reader": {
"name": "",
"parameter": {}
},
"writer": {
"name": "",
"parameter": {}
}
}],
"setting": {
"speed": {},
"errorLimit": {}
}
}
}
Job Setting配置
{
"job": {
"content": [{
"reader": {
"name": "",
"parameter": {}
},
"writer": {
"name": "",
"parameter": {}
}
}],
"setting": {
"speed": {
"channel": 1,
"byte": 104857600
},
"errorLimit": {
"record": 10,
"percentage": 0.05
}
}
}
}
- job.setting.speed 流量控制:Job支持用户对速度的自定义控制,channel的值可以控制同步时的并发数,byte的值可以控制同步时的速度
- job.setting.errorLimit 脏数据控制:job支持用户对于脏数据的自定义监管和告警,包括对脏数据最大记录阈值(record)值或者脏数据占比阈值(percentage),当Job传输过程中出现脏数据大于用户指定的数量、百分比,DataJob报错退出。
应用案例
Stream => Stream
{
"job": {
"content": [{
"reader": {
"name": "streamreader",
"parameter": {
"sliceRecordCount": 10,
"column": [
{"type": "String", "value": "hello DataX"},
{"type": "string", "value": "DataX Stream To Stream"},
{"type": "string", "value": "数据迁移工具"}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "GBK",
"print": true
}
}
}],
"setting": {
"speed": {
"channel": 2
}
}
}
}
执行脚本:
python $DATAX_HOME/bin/datax.py /data/lagoudw/json/stream2stream.json
错误速查
| 症状 | 根因定位 | 修复 |
|---|---|---|
python: command not found / 运行报 Python 版本异常 | 机器未装 Python,或默认 python 指向不兼容版本 | which python / python -V / python3 -V 安装 Python(2 或 3),并确保启动命令使用实际存在的解释器(python3 datax.py ... 或配置软链) |
java: command not found / 启动即失败 | 未安装 JDK 或 PATH 未生效 | java -version / echo $JAVA_HOME 安装 JDK 1.8+,配置环境变量并重新加载 profile |
Permission denied(执行脚本/写目标路径失败) | DataX 目录或目标目录无权限 | ls -l / 查看目标路径属主与权限 调整目录权限/属主;目标路径提前创建;避免用无权限用户写入 |
| 任务跑着跑着整体失败(非 0 退出码) | 任一 TaskGroup 失败导致 Job 失败 | 先看 log/控制台失败栈;定位失败的 reader/writer 先缩小并发(channel),再定位插件参数/连通性;必要时开更细日志 |
| 报错后立刻退出,提示脏数据超限 | job.setting.errorLimit 触发(record/percentage 超阈值) | 看错误输出是否出现 record/percentage 相关信息 调整 errorLimit 阈值;或修正源数据字段/类型映射;把脏数据落库/落文件再回溯 |
| 速度远低于预期 / 目标端压力大 | speed 未配置或不合理(channel/byte),或目标端写入瓶颈 | 对比源端读速、Channel 速率、目标端写入 TPS 先调 channel 控并发,再用 byte 做限速;以目标端可承受为上限逐步压测 |
Connection refused / 超时(MySQL 等) | 网络不通、端口/账号权限、白名单/防火墙 | telnet host port / nc -vz / 数据库登录测试 修通网络与鉴权;确认库表权限与时区/字符集等连接参数 |
| 字符集乱码(控制台/落地文件) | writer/reader encoding 配置与实际不一致 | 对比源库字符集、任务 JSON encoding、落地结果 统一编码;文件类 writer 明确设置 encoding;避免混用 GBK/UTF-8 |