目录
- 一. DataX简介
- 1.1 DataX概述
- 1.2 DataX支持的数据源
- 二. DataX架构原理
- 2.1 DataX设计理念
- 2.2 DataX框架设计
- 2.3 DataX运行流程
- 2.4 DataX调度决策思路
- 2.5 DataX 与 Sqoop 对比
- 三. DataX使用
- 3.1 DataX使用概述
- 3.1.1 DataX任务提交命令
- 3.1.2 DataX配置文件格式
- 3.2 同步MySQL数据到HDFS案例
- 3.2.1 MySQLReader之TableMode
- 3.2.2 MySQLReader之QuerySQLMode
- 3.2.3 DataX传参
- 3.3 同步HDFS数据到MySQL案例
- 四. DataX优化
- 4.1 速度控制
- 4.2 内存调整
一. DataX简介
1.1 DataX概述
DataX
是阿里巴巴开源的一个异构数据源离线同步工具
,致力于实现包括关系型数据库
(MySQL、Oracle 等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间
稳定高效的数据同步
功能。
源码地址:阿里云DataX源码
1.2 DataX支持的数据源
二. DataX架构原理
2.1 DataX设计理念
为了解决异构数据源同步
问题,DataX
将复杂的网状的同步链路
变成了星型数据链路
,DataX 作为中间传输载体
负责连接各种数据源
。当需要接入一个新的数据源的时候,只需要将此数据源对接
到DataX
,便能跟已有的数据源做到无缝数据同步。
2.2 DataX框架设计
DataX
本身作为离线数据同步框架
,采用 Framework + plugin架构
构建。将数据源读取和写入
抽象成为 Reader/Writer 插件
,纳入到整个同步框架中。
其中:
Reader:数据采集
模块,负责采集数据源的数据,将数据发送给Framework
。
Writer:数据写入
模块,负责不断向Framework取数据
,并将数据写入到目的端
。
Framework:用于连接reader和writer
,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换
等核心技术问题。
2.3 DataX运行流程
注:
Job:单个数据同步
的作业,称为一个Job
,一个Job
启动一个进程
。
Task:根据不同数据源的切分策略,一个Job会切分为多个Task
,Task
是DataX作业的最小单元
,每个Task
负责一部分数据的同步
工作。
TaskGroup:Scheduler调度模块
会对Task
进行分组
,每个Task组称为一个Task Group
。每个Task Group负责以一定的并发度运行
其所分得的Task,单个Task Group的并发度为5
。
Reader–>Channel–>Writer:每个Task启动后,都会固定启动Reader–>Channel–>Writer的线程来完成同步工作。
2.4 DataX调度决策思路
举例来说,用户提交了一个 DataX 作业,并且配置了总的并发度为 20,目的是对一个有 100 张分表的 mysql 数据源进行同步。DataX 的调度决策思路是:
1)DataX Job 根据分库分表切分策略,将同步工作分成 100 个 Task。
2)根据配置的总的并发度 20,以及每个 Task Group 的并发度 5,DataX 计算共需要分配 4 个TaskGroup。
3)4 个 TaskGroup 平分 100 个 Task,每一个 TaskGroup 负责运行 25 个 Task。
注:
默认DataX会把Mysql中的1个表切分成1个Task。
2.5 DataX 与 Sqoop 对比
三. DataX使用
3.1 DataX使用概述
3.1.1 DataX任务提交命令
用户只需根据自己同步数据的数据源
和目的地
选择相应的Reader
和 Writer
,并将 Reader 和 Writer 的信息配置在一个 json 文件
中,然后执行如下命令提交数据同步任务即可:
3.1.2 DataX配置文件格式
使用如下命令查看 DataX 配置文件模板
:
配置文件模板如下,json 最外层
是一个 job
,job
包含setting
和 content
两部分,其中setting
用于对整个 job 进行配置
,content 用户配置数据源和目的地
。
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [],
"connection": [
{
"jdbcUrl": [],
"table": []
}
],
"password": "",
"username": "",
"where": ""
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [],
"compress": "",
"defaultFS": "",
"fieldDelimiter": "",
"fileName": "",
"fileType": "",
"path": "",
"writeMode": ""
}
}
}
],
"setting": {
"speed": {
"channel": ""
}
}
}
}
Reader 和 Writer 的具体参数可参考官方文档,地址如下:DataX配置
3.2 同步MySQL数据到HDFS案例
案例要求:同步 gmall 数据库中 base_province 表数据到 HDFS 的/base_province 目录
需求分析:要实现该功能,需选用 MySQLReader
和 HDFSWriter
,MySQLReader
具有两种模式
分别是 TableMode
和 QuerySQLMode
,前者
使用 table,column,where 等属性
声明需要同步的数据;后者
使用一条 SQL 查询语句
声明需要同步的数据。
下面分别使用两种模式进行演示。
3.2.1 MySQLReader之TableMode
1)编写配置文件
(1)创建配置文件 base_province.json
(2)内容如下:
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
"id",
"name",
"region_id",
"area_code",
"iso_code",
"iso_3166_2"
],
"where": "id>=3",
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://hadoop102:3306/gmall"
],
"table": [
"base_province"
]
}
],
"password": "000000",
"splitPk": "",
"username": "root"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{
"name": "id",
"type": "bigint"
},
{
"name": "name",
"type": "string"
},
{
"name": "region_id",
"type": "string"
},
{
"name": "area_code",
"type": "string"
},
{
"name": "iso_code",
"type": "string"
},
{
"name": "iso_3166_2",
"type": "string"
}
],
"compress": "gzip",
"defaultFS": "hdfs://hadoop102:8020",
"fieldDelimiter": "\t",
"fileName": "base_province",
"fileType": "text",
"path": "/base_province",
"writeMode": "append"
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
2)配置文件说明
(1)Reader 参数说明
(2) Writer 参数说明
注意事项:
HFDS Writer
并未提供nullFormat
参数:也就是用户并不能自定义 null 值
写到 HFDS 文件
中的存储格式。默认情况下,HFDS Writer
会将null 值存储
为空字符串('')
,而Hive
默认的null 值存储格式
为\N
。所以后期将 DataX 同步的文件导入 Hive 表就会出现问题。
解决该问题的方案有两个:
一是修改 DataX HDFS Writer 的源码,增加自定义 null 值存储格式的逻辑。
二是在 Hive 中建表时指定 null 值存储格式为空字符串(‘’),如:
DROP TABLE IF EXISTS base_province;
CREATE EXTERNAL TABLE base_province
(
`id` STRING COMMENT '编号',
`name` STRING COMMENT '省份名称',
`region_id` STRING COMMENT '地区 ID',
`area_code` STRING COMMENT '地区编码',
`iso_code` STRING COMMENT '旧版 ISO-3166-2 编码,供可视化使用',
`iso_3166_2` STRING COMMENT '新版 IOS-3166-2 编码,供可视化使用'
) COMMENT '省份表'
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
NULL DEFINED AS ''
LOCATION '/base_province/';
(3)Setting 参数说明
3)提交任务
(1)在 HDFS 创建/base_province 目录
使用DataX
向 HDFS
同步数据时,需确保目标路径已存在
:
登录HDFS-namenode节点
查看目录生成
情况:
(2)执行如下命令
4)查看结果
(1)DataX 打印日志
查看数据库原数据
,发现原数据
有34条
,而DataX打印日志
最终读出
只有32条
:
为什么出现上述情况呢?
原因是,我们的base_province.json配置文件
中有where限定条件
,该条件只筛选表中id>=3的数据
记录,所以最终读出
的才是32条!
(2)查看 HDFS 文件
注:此处需要用管道,借助zcat命令查看数据,否则会出现乱码!
3.2.2 MySQLReader之QuerySQLMode
1)编写配置文件
(1)新增配置文件 base_province_sql.json
(2)配置文件内容如下:
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://hadoop102:3306/gmall"
],
"querySql": [
"select id,name,region_id,area_code,iso_code,iso_3166_2 from base_province where id>=3"
]
}
],
"password": "000000",
"username": "root"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{
"name": "id",
"type": "bigint"
},
{
"name": "name",
"type": "string"
},
{
"name": "region_id",
"type": "string"
},
{
"name": "area_code",
"type": "string"
},
{
"name": "iso_code",
"type": "string"
},
{
"name": "iso_3166_2",
"type": "string"
}
],
"compress": "gzip",
"defaultFS": "hdfs://hadoop102:8020",
"fieldDelimiter": "\t",
"fileName": "base_province",
"fileType": "text",
"path": "/base_province",
"writeMode": "append"
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
2)配置文件说明
(1)Reader 参数说明
3)提交任务
(1)清空历史数据
(2)执行如下命令
4)查看结果
(1)DataX 打印日志
(2)查看 HDFS 文件
可以看到
MySQLReader
的两种模式
最终都达到了同样的效果
,那么这两种mode有什么区别
?
答:TableMode只能查询一张表
,而QuerySQLMode
可以通过join等操作关联查询多张表
,并且可以完成更复杂的查询工作(通过修改json文件中reader模块中的querySql语句).
3.2.3 DataX传参
通常情况下,离线数据同步
任务需要每日定时重复
执行,故 HDFS
上的目标路径
通常会包含一层日期
,以对每日同步的数据加以区分,也就是说每日同步数据的目标路径不是固定不变
的,因此 DataX
配置文件中 HDFS Writer
的path
参数的值应该是动态
的。为实现这一效果,就需要使用 DataX 传参
的功能。
DataX 传参的用法如下,在 JSON 配置文件
中使用${param}
引用参数
,在提交任务
时使用-p"-Dparam=value"
传入参数值,具体示例如下。
1)编写配置文件
(1)修改配置文件 base_province.json
(2)配置文件内容如下
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
"id",
"name",
"region_id",
"area_code",
"iso_code",
"iso_3166_2"
],
"where": "id>=3",
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://hadoop102:3306/gmall"
],
"table": [
"base_province"
]
}
],
"password": "000000",
"splitPk": "",
"username": "root"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{
"name": "id",
"type": "bigint"
},
{
"name": "name",
"type": "string"
},
{
"name": "region_id",
"type": "string"
},
{
"name": "area_code",
"type": "string"
},
{
"name": "iso_code",
"type": "string"
},
{
"name": "iso_3166_2",
"type": "string"
}
],
"compress": "gzip",
"defaultFS": "hdfs://hadoop102:8020",
"fieldDelimiter": "\t",
"fileName": "base_province",
"fileType": "text",
"path": "/base_province/${dt}",
"writeMode": "append"
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
2)提交任务
(1)创建目标路径
(2)执行如下命令
注:此处通过-p"-Ddt=2023-06-19"命令来实现传入到指定目录下。
3)查看结果
3.3 同步HDFS数据到MySQL案例
案例要求:同步 HDFS 上的/base_province 目录下的数据到 MySQL gmall 数据库下的test_province 表。
需求分析:要实现该功能,需选用 HDFSReader
和 MySQLWriter
。
1)编写配置文件
(1)创建配置文件 test_province.json
(2)配置文件内容如下
{
"job": {
"content": [
{
"reader": {
"name": "hdfsreader",
"parameter": {
"defaultFS": "hdfs://hadoop102:8020",
"path": "/base_province",
"column": [
"*"
],
"fileType": "text",
"compress": "gzip",
"encoding": "UTF-8",
"nullFormat": "\\N",
"fieldDelimiter": "\t"
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"username": "root",
"password": "000000",
"connection": [
{
"table": [
"test_province"
],
"jdbcUrl": "jdbc:mysql://hadoop102:3306/gmall?useUnicode=true&characterEncoding=utf-8"
}
],
"column": [
"id",
"name",
"region_id",
"area_code",
"iso_code",
"iso_3166_2"
],
"writeMode": "replace"
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
2)配置文件说明
(1)Reader 参数说明
(2)Writer 参数说明
注:此处
WriteMode
有三种模式
,分别如下:insert into(insert)
: 当要写入的Mysql表无主键
时,为正常写入
;当有主键
时,一旦出现重复数据
,会出现数据重复异常
。replace into(replace)
: 要求写入的Mysql表必须有主键
,且当主键
存在重复
时,会delete
对应整行数据
,然后再insert
。ON DUPLICATE KEY UPDATE(update)
: 要求写入的Mysql表必须有主键
,该模式是在原有数据记录
的基础上作修改
,而不做删除
操作。
3)提交任务
(1)在 MySQL 中创建 gmall.test_province 表
DROP TABLE IF EXISTS `test_province`;
CREATE TABLE `test_province` (
`id` bigint(20) NOT NULL,
`name` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT
NULL,
`region_id` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT
NULL,
`area_code` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT
NULL,
`iso_code` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT
NULL,
`iso_3166_2` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL
DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT =
Dynamic;
(2)执行如下命令
4)查看结果
(1)DataX 打印日志
(2)查看 MySQL 目标表数据
为什么DataX日志中读出记录总数为64,而实际写入Mysql中的数据只有32条呢?
由于DataX
从HDFS
中读取
的时候,会从根目录
依次读取其中文件
与子目录
,而2023-06-19子目录下
又保存有一份
与根目录下gz文件同样的文件
,所以DataX
实际读取的数据为2*32=64条
:而又因为
Writer写入的策略
选用为replace
,该策略会对重复的数据作覆盖
(仅保留其中一份数据),所以写入Mysql
中的数据就只有32条
:
四. DataX优化
4.1 速度控制
DataX3.0 提供了包括通道(并发)、记录流、字节流
三种流控模式
,可以随意控制你的作业速度,让你的作业在数据库可以承受的范围内达到最佳的同步速度。
关键优化参数如下:
注意事项:
1.若配置
了总 record 限速
,则必须配置单个 channel
的record 限速
(避免类似“数据倾斜”的现象出现)
2.若配置
了总 byte 限速
,则必须配置单个 channe
的byte 限速
(避免类似“数据倾斜”的现象出现)
3.若配置
了总 record 限速
和总 byte 限速
,channel 并发数参数
就会失效
。因为配置了总record 限速和总 byte 限速之后,实际 channel 并发数是通过计算
得到的:
计算公式为:
min(总 byte 限速/单个 channel 的 byte 限速,总 record 限速/单个 channel 的 record 限速)
4.2 内存调整
当提升 DataX Job 内 Channel 并发数
时,内存的占用会显著增加,因为 DataX 作为数据交换通道在内存中会缓存较多的数据。例如 Channel 中会有一个 Buffer,作为临时的数据交换的缓冲区,而在部分 Reader 和 Writer 的中,也会存在一些 Buffer,为了防止 OOM 等错误
,需调大JVM的堆内存
。
建议将内存设置为 4G 或者 8G,这个也可以根据实际情况来调整。文章来源:https://uudwc.com/A/wo388
调整 JVM xms xmx 参数的两种方式:一种是直接更改 datax.py 脚本
;另一种是在启动
的时候,加上对应的参数
,如下:文章来源地址https://uudwc.com/A/wo388
python datax/bin/datax.py --jvm="-Xms8G -Xmx8G" /path/to/your/job.json