Skip to content
forked from DataSky/HData

一个支持多数据源的ETL数据导入/导出工具

Notifications You must be signed in to change notification settings

How2RideABoar/HData

 
 

Repository files navigation

HData

HData是一个异构的ETL数据导入/导出工具,致力于使用一个工具解决不同数据源(JDBC、Hive、HDFS、HBase、MongoDB、FTP、Http、CSV、Excel、Kafka等)之间数据交换的问题。HData在设计上同时参考了开源的Sqoop、DataX,却与之有不同的实现。HData采用“框架 插件”的结构,具有较好的扩展性,框架相当于数据缓冲区,插件则为访问不同的数据源提供实现。

HData

【HData特性】

1、异构数据源之间高速数据传输;

2、跨平台独立运行;

3、数据传输过程全内存操作,不读写磁盘;

4、插件式扩展。

HData Design

【HData设计】

  • 配置文件:XML格式,配置Reader、Writer的参数(如:并行度、数据库连接地址、账号、密码等);

  • Reader:数据读取模块,负责从数据源读取数据并写入RingBuffer;

  • Splitter:根据配置文件中Reader的并行度构造相应数据的ReaderConfig对象供Reader使用,以实现数据的并行读取;

  • RingBuffer:来自Disruptor的高性能环形数据缓冲区,基于事件监听模式的异步实现,采用无锁方式针对CPU缓存优化,在此用于Reader和Writer的数据交换;

  • Writer:数据写入模块,负责从RingBuffer中读取数据并写入目标数据源。

HData框架通过配置读取解析、RingBuffer 缓冲区、线程池封装等技术,统一处理了数据传输中的基本问题,并提供Reader、Splitter、Writer插件接口,基于此可以方便地开发出各种插件,以满足各种数据源访问的需求。

【编译&运行】

  • 编译

执行 mvn clean package -Pmake-package 命令,执行成功后将会生成压缩包 ./build/hdata.tar.gz ,然后解压即可。

编译时也可以夹带自己的配置, 如:mvn clean package -Pcdh5 -Pmake-package

  • 运行
1、命令行配置方式

./bin/hdata --reader READER_NAME -Rk1=v1 -Rk2=v2 --writer WRITER_NAME -Wk1=v1 -Wk2=v2

READER_NAME、WRITER_NAME分别为读/写插件的名称,例如:jdbc、hive Reader插件的参数配置以-R为前缀,Writer插件的参数配置以-W为前缀。

例子(Mysql -> Hive):

./bin/hdata --reader jdbc -Rurl="jdbc:mysql://127.0.0.1:3306/testdb" -Rdriver="com.mysql.jdbc.Driver" -Rtable="testtable" -Rusername="username" -Rpassword="password" -Rparallelism=3 --writer hive -Wmetastore.uris="thrift://127.0.0.1:9083" -Whdfs.conf.path="/path/to/hdfs-site.xml" -Wdatabase="default" -Wtable="testtable" -Whadoop.user="hadoop" -Wparallelism=2

2、XML配置方式

job.xml

<?xml version="1.0" encoding="UTF-8"?>

<job id="job_example">
    <reader name="jdbc">
		<url>jdbc:mysql://127.0.0.1:3306/testdb</url>
		<driver>com.mysql.jdbc.Driver</driver>
		<table>testtable</table>
		<username>username</username>
		<password>password</password>
		<parallelism>3</parallelism>
	</reader>

	<writer name="hive">
		<metastore.uris>thrift://127.0.0.1:9083</metastore.uris>
		<hdfs.conf.path>/path/to/hdfs-site.xml</hdfs.conf.path>
		<database>default</database>
		<table>testtable</table>
		<hadoop.user>hadoop</hadoop.user>
		<parallelism>2</parallelism>
	</writer>
</job>

运行命令:

./bin/hdata -f /path/to/job.xml

【配置参数】

其中,参数parallelism为读/写并行度,所有插件均有该参数,默认为1。

设置合理的parallelism参数可提高性能。

【Reader配置参数】

  • console

无配置参数,一般仅用于测试

  • jdbc
参数 是否必选 描述
driver JDBC驱动类名,如:com.mysql.jdbc.Driver
url JDBC连接地址,如: jdbc:mysql://localhost:3306/db
username 数据库用户名
password 数据库密码
table 表名(包含数据库名或schema名),如:db.table,也支持分表,例如:table[001-100]
columns 字段名,多个字段用逗号“,”分隔。不填则选取所有字段。
exclude.columns 排除的字段名,多个字段用逗号“,”分隔
where 查询条件,如:day=’20140418’
sql 自定义查询SQL
split.by 并行读取切分的字段
max.size.per.fetch 单次执行SQL获取的最多记录数
null.string 替换当字符串类型的字段值为NULL时的值
null.non.string 替换当非字符串类型的字段值为NULL时的值
field.wrap.replace.string 若字符串字段中存在换行符时需要替换的值
number.format 小数类型字段的输出格式
keyword.escaper 关键字转义字符,默认为`
  • hive
参数 是否必选 描述
metastore.uris Hive Metastore连接地址,如:thrift://localhost:9083, 默认: HiveConf.getVar(ConfVars.METASTOREURIS) 即 hive-site.xml 中的 hive.metastore.uris
database 数据库名,默认:default
table 表名
partitions 分区,例如: visit_date='2016-07-07'
hadoop.user 具有HDFS读权限的用户名
hdfs.conf.path hdfs-site.xml配置文件路径
select.columns 选择读取的字段
convert.null 设置值为NULL时对应的字符串,默认:"NULL"
  • hdfs
参数 是否必选 描述
dir HDFS目录路径,如:hdfs://192.168.1.1:8020/user/dir1
filename 文件名,支持正则表达式
schema 输出的字段定义
fields.separator 字段分隔符,默认:\0001
encoding 文件编码,默认:UTF-8
hadoop.user 具有HDFS读权限的用户名
hdfs.conf.path hdfs-site.xml配置文件路径
  • hbase
参数 是否必选 描述
zookeeper.quorum Zookeeper连接地址,如:192.168.1.16,192.168.1.17
zookeeper.client.port Zookeeper客户端端口,默认:2181
table 表名
start.rowkey Rowkey起始值
end.rowkey Rowkey结束值
columns 读取的列,如::rowkey,cf:start_ip,cf:end_ip
schema 输出的字段定义,如:id,start_ip,end_ip
zookeeper.znode.parent hbase使用的Zookeeper根节点
  • http
参数 是否必选 描述
url URL链接
encoding 编码,默认UTF-8
  • kafka
参数 是否必选 描述
topic 需要消费的topic
group.id consumer组id
zookeeper.connect Zookeeper连接地址,如:198.168.12.34:2181
consumer.stream.count 数据消费流的数量,默认为1
encoding 编码,默认UTF-8
max.fetch.size 最大fetch数,默认:100000
max.wait.second 最大等待时间(单位:秒),默认:300
partition.id 默认:0
start.offset 需要消费的起始offset
fields.separator 字段分隔符,默认\t
schema 输出的字段定义,如:id,start_ip,end_ip

其他配置请参考:Kafka Configuration

  • ftp
参数 是否必选 描述
host FTP连接地址,如:192.168.1.1
port FTP端口,默认:21
username 用户名
password 密码
dir FTP目录,如:/input/dir
filename 文件名,支持正则表达式
recursive 是否递归搜索文件,默认:false
encoding 文件编码,默认:UTF-8
fields.separator 字段分隔符,默认:\t
schema 输出的字段定义
fields.count.filter 符合的字段数,不符合则过滤记录
  • mongodb
参数 是否必选 描述
uri MongoDB连接地址,如:mongodb://localhost/test.ip
query 查询语句,如:{"city":"XXX"}
  • csv
参数 是否必选 描述
path 本地文件路径
start.row 数据起始行数,默认:1
encoding 编码,默认:UTF-8
  • excel
参数 是否必选 描述
path 文件路径
include.column.names 是否包含列名,默认:false

【Writer配置参数】

  • jdbc
参数 是否必选 描述
driver JDBC驱动类名,如:com.mysql.jdbc.Driver
url JDBC连接地址,如: jdbc:mysql://localhost:3306/db
username 数据库用户名
password 数据库密码
table 表名(包含数据库名或schema名),如:db.table
batch.insert.size 批量插入的记录数,默认值:10000
schema 字段名配置,一般用于writer和reader的字段名不一致时
keyword.escaper 关键字转义字符,默认为`
upsert.columns 指定 Upsert 的字段列表,逗号分隔,目前仅支持 Mysql,默认为空(即不启用 upsert)
  • hive
参数 是否必选 描述
metastore.uris Hive Metastore连接地址,如:thrift://localhost:9083, 默认: HiveConf.getVar(ConfVars.METASTOREURIS) 即 hive-site.xml 中的 hive.metastore.uris
database 数据库名,默认:default
table 表名
partitions 分区条件,如:day='20140418'
hadoop.user 具有HDFS写权限的用户名
hdfs.conf.path hdfs-site.xml配置文件路径
  • hdfs
参数 是否必选 描述
path HDFS文件路径,如:hdfs://192.168.1.1:8020/user/1.txt
fields.separator 字段分隔符,默认:\t
line.separator 行分隔符,默认:\n
encoding 文件编码,默认:UTF-8
compress.codec 压缩编码,如:org.apache.hadoop.io.compress.GzipCodec
hadoop.user 具有HDFS写权限的用户名
max.file.size.mb 单个文件最大大小限制(单位:MB)
partition.date.index 日期字段索引值,起始值为0
partition.date.format 日期格式,如:yyyy-MM-dd
hdfs.conf.path hdfs-site.xml配置文件路径
  • hbase
参数 是否必选 描述
zookeeper.quorum Zookeeper连接地址,如:192.168.1.16,192.168.1.17
zookeeper.client.port Zookeeper客户端端口,默认:2181
table 表名
columns 列名,如::rowkey,cf:start_ip
batch.insert.size 批量插入的记录数,默认值:10000
zookeeper.znode.parent hbase使用的Zookeeper根节点
  • kafka
参数 是否必选 描述
topic 需要消费的topic
fields.separator 字段分隔符,默认\t

其他配置请参考:Kafka Configuration

  • ftp
参数 是否必选 描述
host FTP连接地址,如:192.168.1.1
port FTP端口,默认:21
username 用户名
password 密码
path FTP保存目录
encoding 文件编码,默认:UTF-8
fields.separator 字段分隔符,默认:\t
line.separator 行分隔符,默认\n
gzip.compress 是否启用gzip压缩,默认:false
  • mongodb
参数 是否必选 描述
uri MongoDB连接地址,如:mongodb://localhost/test.ip
query 查询语句,如:{"city":"XXX"}
  • csv
参数 是否必选 描述
path 本地文件路径
separator 字段分隔符,默认逗号","
encoding 编码,默认:UTF-8
  • excel
参数 是否必选 描述
path 文件路径
include.column.names 是否包含列名,默认:false
  • console

无配置参数,一般仅用于测试

【Reader的并行度切分依据】

Reader 并行度切分依据
jdbc 数值型主键(或配置指定列)的大小范围。默认会根据查询条件查找max(id)和min(id),然后根据id的区间范围均分,每个线程负责读取其中一份区间的数据
hive 文件数量
hbase region数量
hdfs 文件数量
mongodb _id区间
ftp 文件数量
http http接口中的offset参数
kafka 无。使用High-Level Consumer API
csv 文件数量
excel

赞助

如果您觉得该项目对您有帮助,请扫描下方二维码对我进行鼓励,以便我更好的维护和更新,谢谢支持!

支付宝 微信

About

一个支持多数据源的ETL数据导入/导出工具

Resources

Stars

Watchers

Forks

Packages

No packages published

Languages

  • Java 98.4%
  • Other 1.6%