基于litemall构建的数据仓库
- 数据仓库(离线/实时)
- 推荐系统
- 用户画像
- 大数据
- Hadoop
- hive on tez
- sqoop
- spark
- kafka
- azkaban
- 后端
- spring boot
- mysql
- redis
- mongo
- openresty
- Hadoop主机
- Hadoop:3.1.2
- hive:3.1.2
- tez:0.10.1 结合对应的hive版本进行源码编译
- sqoop:1.4.7
- spark $\ge$2.4.7
- Kafka
- zookeeper:3.6.1
- Kafka:2.13-2.6.0
- CDH:6.3.2
处理用户行为数据,采用json数据格式
- openresty采集数据发送到kafka
-
创建topic
kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 2 --partitions 3 --topic litemall-action
-
配置nginx
-
生成工作目录
mkdir -p {litemall/logs,litemall/conf,litemall/luas}
-
配置nginx
worker_processes 1; error_log logs/error.log; events { worker_connections 1024; } user root root; # 定义用户名、用户组,防止权限问题 http { # 设置共享字典 lua_shared_dict kafka_data 10m; resolver 192.168.241.20; #使用能解析kafka域名的dns server { listen 80; location / { default_type text/html; content_by_lua_block { ngx.say("<p>hello, world</p>") } } location /process{ default_type application/json; content_by_lua_file /root/project/litemall/luas/kafka_test.lua; } } }
-
发送到kafka
local cjson=require "cjson" local producer = require "resty.kafka.producer" local broker_list = { { host = "192.168.241.180", port = 9092 }, { host = "192.168.241.181", port = 9092 }, { host = "192.168.241.182", port = 9092 }, } local topic="litemall-action" local partitions=3 local shared_data = ngx.shared.kafka_data local partitionNum=shared_data:get("count") if not partitionNum then shared_data:set("count",1) partitionNum=shared_data:get("count") end local key = ""..(partitionNum%partitions) shared_data:incr("count",1) ngx.req.read_body() local request_body = ngx.req.get_body_data() if request_body then local p = producer:new(broker_list) local offset, err = p:send(topic, key, request_body) end ngx.say(cjson.encode{code = 200, msg = request_body})
-
-
- 页面埋点
- 不同日志的处理:监听生命周期函数
- 启动日志:在
main.js
通过绑定vue
对象的mounted
事件来处理,保证只有一次请求 - 事件日志
- 在
main.js
通过绑定vue
对象的mounted
和beforeDestory
来启动或清除定时器 - 在定时器中发送事件日志,消除频繁请求日志服务器
- 采用数组来记录事件,并在发送之后清空
- 在
- 启动日志:在
- 跨域的处理
-
前端通过代理处理(修改
vue.config.js
配置文件,vue cli 3.0
)devServer: { port:6255, proxy:{ '/process': { target: process.env.VUE_LOG_BASE_API, ws: true, changeOrigin: true //允许跨域 }, '/wx': { target: process.env.VUE_APP_BASE_API, ws: true, changeOrigin: true } } },
-
后端通过nginx处理
location /process{ default_type application/json; # 添加响应头,在opthons请求返回的响应中,从而在下个请求中携带 add_header Access-Control-Allow-Credentials true; add_header Access-Control-Allow-Headers *; add_header Access-Control-Allow-Origin *; content_by_lua_file /root/project/litemall/luas/kafka_test.lua; }
-
- 不同日志的处理:监听生命周期函数
hdfs dfs -mkdir -p {/original_data/litemall/db,/original_data/litemall/log}
-
将事件日志导入hdfs
- 自定义flume的interceptor将数据拆分为start和event类型的数据,并存储到hdfs中不同的文件夹
- 打包之后的jar放入到flume的
plugins.d/interceptor/lib
文件夹下plugins.d
文件夹用于集成第三方的plugin
- flume守护进程问题
-
原因:启动配置参数
-c
设置不正确 -
解决:
FLUME_HOME=/opt/apache-flume-1.9.0-bin # 需要配置到flume安装路径下的conf文件夹 nohup $FLUME_HOME/bin/flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/conf/litemall_kafka_to_hdfs.conf --name a1 -Dflume.root.logger=INFO,console >$FLUME_HOME/logs/litemall 2>&1 &
-
- 处理hive报错:Cannot obtain block length for LocatedBlock
- 在跑Azkaban任务的时候出现以上错误,其原因是:文件处于
openforwrite
的状态,没有关闭 - 解决:
-
查看文件状态
hdfs fsck dir -files
-
查看路径下是否有
openforwrite的文件
hdfs fsck dir -openforwrite
-
释放租约
hdfs debug recoverLease -path filepath [-retries retry-num]
-
- 分析:
- 没有关闭flume重启了hdfs
- 在跑Azkaban任务的时候出现以上错误,其原因是:文件处于
-
将业务数据导入hdfs
- sqoop将mysql中
tinyint
的数据在HDFS 上面显示的true、false
及在hive中需要使用boolean
存储- 原因:jdbc会把tinyint 认为是
java.sql.Types.BIT
- 解决:
jdbc:mysql://localhost/test?tinyInt1isBit=false
- 原因:jdbc会把tinyint 认为是
- sqoop:源数据含有默认换行符导致的数据不正确(出现数据错位)
-
解决:import添加参数
# Drops \n, \r, and \01 from string fields when importing to Hive. --hive-drop-import-delims
-
- 支付和退款数据
- 因该数据都存在于订单表中,所以进行特殊处理
- 支付:除了分区数据以外,同时添加
pay_time
作为过滤条件 - 退款:除了分区数据以外,同时添加
refund_time
作为过滤条件
- 支付:除了分区数据以外,同时添加
- 因该数据都存在于订单表中,所以进行特殊处理
- sqoop将mysql中
-
自定义hive函数来解析events事件日志
-
hive-function
:中定义UDF和UDTF-
将打包后的jar包上传到hdfs
hdfs dfs -put hive-function.jar /user/hive/jars
-
-
注册全局函数
# udf create function litemall.parse_json_object as 'org.tlh.litemall.udf.ParseJsonObject' using jar 'hdfs:///user/hive/jars/hive-function.jar' # udtf create function litemall.extract_event_type as 'org.tlh.litemall.udtf.ExtractEventType' using jar 'hdfs:///user/hive/jars/hive-function.jar'
-
-
对于分区表,新增加列,重新覆盖写已有的分区,新增列为
null
-
原因:由于分区元数据缺少新添加的字段导致的
-
解决:往出错的分区元数据中添加这个列
alter table dwd_fact_cart_info partition(dt = '2020-12-04') add columns(add_time string comment '加购时间');
-
-
加载时间维度数据
LOAD DATA INPATH '/date_info.txt' OVERWRITE INTO TABLE dwd_dim_date_info;
- 将数据导出到mysql数据重复
- 对于以日期为基础的数据
- 如果只有一条,则mysql中日期列设置为主键
- 如果是多条,则hive中需要以日期进行分区
- 对于以日期为基础的数据
- 目前通过Sqoop从Hive的parquet抽数到关系型数据库的时候会报kitesdk找不到文件的错,这是Sqoop已知的问题
- 将需要导出的表存储方式采用默认的
textfile
- 将需要导出的表存储方式采用默认的
-
启动hiveserver
hive --service hiveserver2
-
spark配置通过域名通信
conf.set("dfs.client.use.datanode.hostname", "true")
-
注意事项
- mac环境下读取hive中lzo压缩的数据
-
使用homebrew安装lzo、lzop
brew install lzop lzo
-
编译安装hadoop-lzo
git clone [email protected]:twitter/hadoop-lzo.git mvn clean package install -DskipTests
-
- mac环境下读取hive中lzo压缩的数据
-
创建画像
namespace
create_namespace 'litemall_profile'
-
开启hbase表的复制功能
# 禁用表 disable 'litemall_profile:user_profile' # 开启表复制 alter 'litemall_profile:user_profile',{NAME=>'cf',REPLICATION_SCOPE =>1} # 启用表 enable 'litemall_profile:user_profile'
-
创建
solr
的collection-
生成配置
solrctl instancedir --generate $HOME/hbase_indexer/litemall
-
定义schema
cd $HOME/hbase_indexer/litemall/conf # 修改managed-schema配置,添加如下内容 vim managed-schema <field name="gender" type="string" indexed="true" stored="true"/> <field name="age" type="string" indexed="true" stored="true"/> <field name="consumptionAbility" type="string" indexed="true" stored="true"/> <field name="consumption" type="string" indexed="true" stored="true"/> <field name="coupon" type="string" indexed="true" stored="true"/> <field name="discountRate" type="string" indexed="true" stored="true"/> <field name="singleOrderMax" type="string" indexed="true" stored="true"/> <field name="orderFrequency" type="string" indexed="true" stored="true"/> <field name="pct" type="string" indexed="true" stored="true"/> <field name="refundRate" type="string" indexed="true" stored="true"/> <field name="rfm" type="string" indexed="true" stored="true"/> <field name="psm" type="string" indexed="true" stored="true"/> <field name="brandFavor" type="string" indexed="true" stored="true"/>
-
创建solr的collection并上传配置
# 上传配置 solrctl instancedir --create litemall $HOME/hbase_indexer/litemall # 创建collection solrctl collection --create litemall -s 3 -r 3 -m 9
-
-
创建
Lily HBase Indexer
配置-
定义
morphline 和 hbase
的映射配置cd $HOME/hbase_indexer/litemall vim morphline-hbase-mapper.xml <?xml version="1.0"?> <!-- table: hbase中表名 --> <indexer table="litemall_profile:user_profile" mapper="com.ngdata.hbaseindexer.morphline.MorphlineResultToSolrMapper"> <!-- 设置morphlines文件的路径:绝对路径或相对路径 --> <param name="morphlineFile" value="morphlines.conf"/> <!-- 设置使用的是那个ETL --> <param name="morphlineId" value="userProfile"/> </indexer>
-
创建
Morphline
配置文件(主要用于导入历史数据),定义ETL(hbase-solr的转换关系),同时将userProfile
的配置信息通过CDH控制台页面进行添加(Lily HBase Indexer
->配置
->Morphlines 文件
),注册一个新的ETLcd $HOME/hbase_indexer/litemall vim morphlines.conf # 内容如下 SOLR_LOCATOR : { # Name of solr collection collection : collection # ZooKeeper ensemble zkHost : "$ZK_HOST" } morphlines : [ { id : userProfile importCommands : ["org.kitesdk.**", "com.ngdata.**"] commands : [ { extractHBaseCells { mappings : [ { inputColumn : "cf:gender" outputField : "gender" type : string source : value } { inputColumn : "cf:age" outputField : "age" type : string source : value } { inputColumn : "cf:consumptionAbility" outputField : "consumptionAbility" type : string source : value } { inputColumn : "cf:consumption" outputField : "consumption" type : string source : value } { inputColumn : "cf:coupon" outputField : "coupon" type : string source : value } { inputColumn : "cf:discountRate" outputField : "discountRate" type : string source : value } { inputColumn : "cf:singleOrderMax" outputField : "singleOrderMax" type : string source : value } { inputColumn : "cf:orderFrequency" outputField : "orderFrequency" type : string source : value } { inputColumn : "cf:pct" outputField : "pct" type : string source : value } { inputColumn : "cf:refundRate" outputField : "refundRate" type : string source : value } { inputColumn : "cf:rfm" outputField : "rfm" type : string source : value } { inputColumn : "cf:psm" outputField : "psm" type : string source : value } { inputColumn : "cf:brandFavor" outputField : "brandFavor" type : string source : value } ] } } { logDebug { format : "output record: {}", args : ["@{}"] } } ] } ]
-
注册
Lily HBase Indexer Configuration
和Lily HBase Indexer Service
hbase-indexer add-indexer \ --name litemallIndexer \ --indexer-conf $HOME/hbase_indexer/litemall/morphline-hbase-mapper.xml \ --connection-param solr.zk=cdh-master:2181,cdh-slave-3:2181,cdh-slave-2:2181/solr \ --connection-param solr.collection=litemall \ --zookeeper cdh-master:2181,cdh-slave-3:2181,cdh-slave-2:2181
-
查看Indexer
hbase-indexer list-indexers
-
-
导入历史数据
hadoop --config /etc/hadoop/conf \ jar /opt/cloudera/parcels/CDH/lib/hbase-solr/tools/hbase-indexer-mr-*-job.jar \ --conf /etc/hbase/conf/hbase-site.xml -Dmapreduce.map.java.opts="-Xmx512m" \ -Dmapreduce.reduce.java.opts="-Xmx512m" \ --morphline-file $HOME/hbase_indexer/litemall/morphlines.conf \ --hbase-indexer-file $HOME/hbase_indexer/litemall/morphline-hbase-mapper.xml \ --zk-host cdh-master:2181,cdh-slave-3:2181,cdh-slave-2:2181/solr \ --collection litemall \ --go-live
-
-
备份其他集群的数据
-
创建快照
solrctl collection --create-snapshot litemall_bk -c litemall
-
创建备份目录
hdfs dfs -mkdir -p /backup/litemall_16 # 修改权限 sudo -u hdfs hdfs dfs -chown solr:solr /backup/litemall_16
-
准备导出(目的导出元数据合索引数据)
solrctl collection --prepare-snapshot-export litemall_bk -c litemall -d /backup/litemall_16
-
导出
solrctl collection --export-snapshot litemall_bk -s /backup/litemall_16 -d hdfs://cdh-master:8020/backup/bk
-
复制到本地
hdfs dfs -copyToLocal /backup/bk litemall tar -zcvf litemall.tar.gz litemall/
-
-
恢复到当前集群
-
创建目录上传到出的数据
hdfs dfs -mkdir -p /path/to/restore-staging hdfs dfs -copyFromLocal litemall /path/to/restore-staging
-
数据恢复
solrctl collection --restore litemall -l /path/to/restore-staging/litemall -b litemall_bk -i restore-litemall
-
查看状态
solrctl collection --request-status restore-litemall
-
-
- CDH中使用lzo压缩,本地读取数据问题(报
No LZO codec found, cannot run.
错误)-
原因:在
hadoop-common
包中使用的是SPI来加载解压缩方式,默认配置中并不包含lzo
的配置 -
解决:添加
core-site.xml
文件,并添加lzo解压缩配置<?xml version="1.0" encoding="UTF-8"?> <configuration> <property> <name>io.compression.codecs</name> <value> org.apache.hadoop.io.compress.DefaultCodec, org.apache.hadoop.io.compress.GzipCodec, org.apache.hadoop.io.compress.BZip2Codec, org.apache.hadoop.io.compress.DeflateCodec, org.apache.hadoop.io.compress.SnappyCodec, org.apache.hadoop.io.compress.Lz4Codec, com.hadoop.compression.lzo.LzoCodec, com.hadoop.compression.lzo.LzopCodec </value> </property> </configuration>
-
- 支付转换率:所选时间内,支付买家数除以访客数(支付买家数/访客数),即访客转化为支付买家的比例 $$ 支付转换率={支付买家数 \over 访客数} $$
- 客单价:每一个顾客平均购买商品的金额,也即是平均交易金额 $$ 客单价={销售额 \over 成交顾客数} $$
本项目基于或参考以下项目:
- litemall:又一个小商场系统,Spring Boot后端 Vue管理员前端 微信小程序用户前端 Vue用户移动端