写在前面: 博主是一名软件工程系大数据应用开发专业大二的学生,昵称来源于《爱丽丝梦游仙境》中的Alice和自己的昵称。作为一名互联网小白,
写博客一方面是为了记录自己的学习历程,一方面是希望能够帮助到很多和自己一样处于起步阶段的萌新
。由于水平有限,博客中难免会有一些错误,有纰漏之处恳请各位大佬不吝赐教!个人小站:http://alices.ibilibili.xyz/ , 博客主页:https://alice.blog.csdn.net/
尽管当前水平可能不及各位大佬,但我还是希望自己能够做得更好,因为一天的生活就是一生的缩影
。我希望在最美的年华,做最好的自己
!
本篇博客,博主为大家带来的是大数据实战【千亿级数仓】阶段二的内容。
通过之前的预告,先来回顾一下我们需要掌握的技能。
-
学习、掌握kettle的使用、使用kettle将项目需求所需的数据在MySQL同步到Hive。
-
使用sqoop,将剩余的数据在MySQL同步到Hive。
关于Kettle的具体使用情况,体贴的博主就不在这里赘述太多了,毕竟之前关于Kettle的使用说明的博客可花了不少心思。
关于Kettle的详情,感兴趣的朋友可以进入Kettle专栏
接下来讲的是,如何使用Kettle将项目所需要的数据从MySQL同步到Hive中。
首先我们将快速在MySQL中创建好原始表的sql文件复制到DataGrip的新建文件夹下
然后选中右键执行
执行完毕,我们集群的MySQL下就会创建一个新的数据库itcast_shop,数据库下又会有诸多已经创建好的数据表
这些表正是在阶段一中提到的那八十多个表
然而,我们本次项目中真正用到的就只有这里面中的10个
现在表全在MySQL中了,我们要做的就是使用Kettle将这10个表同步到Hive中。然后将剩下的表用Sqoop导入到Hive。
这里肯定就有朋友要问了,为什么不全部都用Sqoop同步,还要分两种方式来同步数据,不是自找麻烦么?
确实没错,但这里使用Kettle是为了让我们对kettle的使用更熟练,毕竟Kettle的功能有多强大,相信看过博主前面的介绍kettle博文的朋友都知道。
因为使用Kettle导入10个表的数据到Hive,因此我们需要先在Hive中将这些数据表先创建出来。
执行下面的建表语句
-- 创建ods层订单表
drop table if exists `itcast_ods`.`itcast_orders`;
create EXTERNAL table `itcast_ods`.`itcast_orders`(
orderId bigint,
orderNo string,
shopId bigint,
userId bigint,
orderStatus bigint,
goodsMoney double,
deliverType bigint,
deliverMoney double,
totalMoney double,
realTotalMoney double,
payType bigint,
isPay bigint,
areaId bigint,
userAddressId bigint,
areaIdPath string,
userName string,
userAddress string,
userPhone string,
orderScore bigint,
isInvoice bigint,
invoiceClient string,
orderRemarks string,
orderSrc bigint,
needPay double,
payRand bigint,
orderType bigint,
isRefund bigint,
isAppraise bigint,
cancelReason bigint,
rejectReason bigint,
rejectOtherReason string,
isClosed bigint,
goodsSearchKeys string,
orderunique string,
receiveTime string,
deliveryTime string,
tradeNo string,
dataFlag bigint,
createTime string,
settlementId bigint,
commissionFee double,
scoreMoney double,
useScore bigint,
orderCode string,
extraJson string,
orderCodeTargetId bigint,
noticeDeliver bigint,
invoiceJson string,
lockCashMoney double,
payTime string,
isBatch bigint,
totalPayFee bigint
)
partitioned by (dt string)
STORED AS PARQUET;
-- 创建ods层订单明细表
drop table if exists `itcast_ods`.`itcast_order_goods`;
create EXTERNAL table `itcast_ods`.`itcast_order_goods`(
ogId bigint,
orderId bigint,
goodsId bigint,
goodsNum bigint,
goodsPrice double,
payPrice double,
goodsSpecId bigint,
goodsSpecNames string,
goodsName string,
goodsImg string,
extraJson string,
goodsType bigint,
commissionRate double,
goodsCode string,
promotionJson string,
createtime string
)
partitioned by (dt string)
STORED AS PARQUET;
-- 创建ods层店铺表
drop table if exists `itcast_ods`.`itcast_shops`;
create EXTERNAL table `itcast_ods`.`itcast_shops`(
shopId bigint,
shopSn string,
userId bigint,
areaIdPath string,
areaId bigint,
isSelf bigint,
shopName string,
shopkeeper string,
telephone string,
shopCompany string,
shopImg string,
shopTel string,
shopQQ string,
shopWangWang string,
shopAddress string,
bankId bigint,
bankNo string,
bankUserName string,
isInvoice bigint,
invoiceRemarks string,
serviceStartTime bigint,
serviceEndTime bigint,
freight bigint,
shopAtive bigint,
shopStatus bigint,
statusDesc string,
dataFlag bigint,
createTime string,
shopMoney double,
lockMoney double,
noSettledOrderNum bigint,
noSettledOrderFee double,
paymentMoney double,
bankAreaId bigint,
bankAreaIdPath string,
applyStatus bigint,
applyDesc string,
applyTime string,
applyStep bigint,
shopNotice string,
rechargeMoney double,
longitude double,
latitude double,
mapLevel bigint,
BDcode string,
modifyTime string
)
partitioned by (dt string)
STORED AS PARQUET;
-- 创建ods层商品表
drop table if exists `itcast_ods`.`itcast_goods`;
create EXTERNAL table `itcast_ods`.`itcast_goods`(
goodsId bigint,
goodsSn string,
productNo string,
goodsName string,
goodsImg string,
shopId bigint,
goodsType bigint,
marketPrice double,
shopPrice double,
warnStock bigint,
goodsStock bigint,
goodsUnit string,
goodsTips string,
isSale bigint,
isBest bigint,
isHot bigint,
isNew bigint,
isRecom bigint,
goodsCatIdPath string,
goodsCatId bigint,
shopCatId1 bigint,
shopCatId2 bigint,
brandId bigint,
goodsDesc string,
goodsStatus bigint,
saleNum bigint,
saleTime string,
visitNum bigint,
appraiseNum bigint,
isSpec bigint,
gallery string,
goodsSeoKeywords string,
illegalRemarks string,
dataFlag bigint,
createTime string,
isFreeShipping bigint,
goodsSerachKeywords string,
modifyTime string
)
partitioned by (dt string)
STORED AS PARQUET;
-- 创建ods层组织机构表
drop table `itcast_ods`.`itcast_org`;
create EXTERNAL table `itcast_ods`.`itcast_org`(
orgId bigint,
parentId bigint,
orgName string,
orgLevel bigint,
managerCode string,
isdelete bigint,
createTime string,
updateTime string,
isShow bigint,
orgType bigint
)
partitioned by (dt string)
STORED AS PARQUET;
-- 创建ods层商品分类表
drop table if exists `itcast_ods`.`itcast_goods_cats`;
create EXTERNAL table `itcast_ods`.`itcast_goods_cats`(
catId bigint,
parentId bigint,
catName string,
isShow bigint,
isFloor bigint,
catSort bigint,
dataFlag bigint,
createTime string,
commissionRate double,
catImg string,
subTitle string,
simpleName string,
seoTitle string,
seoKeywords string,
seoDes string,
catListTheme string,
detailTheme string,
mobileCatListTheme string,
mobileDetailTheme string,
wechatCatListTheme string,
wechatDetailTheme string,
cat_level bigint
)
partitioned by (dt string)
STORED AS PARQUET;
-- 创建ods层用户表
drop table if exists `itcast_ods`.`itcast_users`;
create EXTERNAL table `itcast_ods`.`itcast_users`(
userId bigint,
loginName string,
loginSecret bigint,
loginPwd string,
userType bigint,
userSex bigint,
userName string,
trueName string,
brithday string,
userPhoto string,
userQQ string,
userPhone string,
userEmail string,
userScore bigint,
userTotalScore bigint,
lastIP string,
lastTime string,
userFrom bigint,
userMoney double,
lockMoney double,
userStatus bigint,
dataFlag bigint,
createTime string,
payPwd string,
rechargeMoney double,
isInform bigint
)
partitioned by (dt string)
STORED AS PARQUET;
-- 创建ods层退货表
drop table if exists `itcast_ods`.`itcast_order_refunds`;
create EXTERNAL table `itcast_ods`.`itcast_order_refunds`(
id bigint,
orderId bigint,
goodsId bigint,
refundTo bigint,
refundReson bigint,
refundOtherReson string,
backMoney double,
refundTradeNo string,
refundRemark string,
refundTime string,
shopRejectReason string,
refundStatus bigint,
createTime string
)
partitioned by (dt string)
STORED AS PARQUET;
-- 创建ods层地址表
drop table if exists `itcast_ods`.`itcast_user_address`;
create EXTERNAL table `itcast_ods`.`itcast_user_address`(
addressId bigint,
userId bigint,
userName string,
otherName string,
userPhone string,
areaIdPath string,
areaId bigint,
userAddress string,
isDefault bigint,
dataFlag bigint,
createTime string
)
partitioned by (dt string)
STORED AS PARQUET;
-- 创建ods层支付方式表
drop table if exists `itcast_ods`.`itcast_payments`;
create EXTERNAL table `itcast_ods`.`itcast_payments`(
id bigint,
payCode string,
payName string,
payDesc string,
payOrder bigint,
payConfig string,
enabled bigint,
isOnline bigint,
payFor string
)
partitioned by (dt string)
STORED AS PARQUET;
执行完毕,此时数据库中就创建好了10个空的表
接下来我们就需要通过Kettle读取MySQL中的数据,输出到各个hive表存储在HDFS的路径下的parquent文件中即可。
相信看到这里的朋友,对于Kettle已经相当熟练了,所以我这里就不再像第一次教学Kettle那样细讲了。
我们根据需求,需要使用到表输入组件,字段选择(根据业务添加),parquent输出组件。
我们将所需要的组件连接起来,因为需要同时同步10个表的数据,所以我们也构造了10个"线路"
组件连接好了之后,让我们来看看如何单独设置每个的内容
首先双击空白处,我们需要设置一个kettle中的参数,方便我们调用,用来做数据分区使用
然后就可以进行设置表的输入了,需要注意的地方有如下四个
如果不放心,还可以选择预览数据
字段选择中,如果没有其他的特殊情况,我们这里默认就获取字段
然后我们就可以设置parquent文件的输出了
需要注意位置要设置成HDFS,然后在预览中选择需要导入Hive表在HDFS上的元数据的路径。
另外建议勾选上,覆盖已存在文件,这样我们就反复运行程序而无需担心每次都要换个输出路径了~
默认也都是获取所有的字段,然后我们就可以设置压缩格式Snappy
,就可以点击确定了。
上面演示的是一个表从MySQL读取到输出Parquent的过程,因为这里我们涉及到了十个表,所以需要操作十次…
待到10个表的流程都完成,直接运行然后在命令行上修复分区数据也是一样的
但是都操作到这里了,我们还是换一种优雅
的方式
首先我们新建一个作业
在作业
界面,我们获取到这些组件,并连接起来
Start 我们无需操作,后面挂的小锁代表着无需任何条件即可执行
关于转换组件的设置,是一个重点
这里的路径需要设置成我们前面已经创建的转换文件在本地的路径
接着就在SQL组件中,连接上hive,并编写需要执行的SQL脚本
待到设置完毕,我们就可以运行这个Job了
正常情况下,我们可以在执行完毕之后,查询之前创建的Hive数据表,可以发现10张表都已经有了数据
Kettle如何实现MySQL同步到Hive已经说完了。下面我们来整点“刺激”的!
关于全量导入mysql表数据到Hive,有以下两种方法:
首先,进入到Sqoop的安装目录下,cd /export/servers/sqoop-1.4.6.bin__hadoop-2.0.4-alpha
方式一:先复制表结构到hive中再导入数据
将关系型数据的表结构复制到hive中
bin/sqoop create-hive-table \
--connect jdbc:mysql://节点IP:3306/mysql数据库 \
--table mysql数据表名 \
--username mysql账户 \
--password mysql密码 \
--hive-table 数据库.需要输出的表名
从关系数据库导入文件到hive中
bin/sqoop import \
--connect jdbc:mysql://节点IP:3306/mysql数据库\
--username mysql账户 \
--password mysql密码 \
--table mysql数据表 \
--hive-table 数据库.需要输出的表名 \
--hive-import \
--m 1
方式二:直接复制表结构数据到hive中
bin/sqoop import \
--connect jdbc:mysql://节点IP:3306/mysql数据库\
--username mysql账户 \
--password mysql密码 \
--table mysql数据表 \
--hive-import \
--m 1 \
--hive-database hive数据库;
如果用方式二想把数据导出到分区表,可以用下面这种方式
sqoop import \
--connect jdbc:mysql://IP节点:3306/mysql数据库 \
--username mysql账户 \
--password mysql密码 \
--where "查询条件" \
--target-dir /user/hive/warehouse/xxx输出路径/ \
--table mysql数据表--m 1
注意:
用这两种方法都可以实现从MySQL同步数据到Hive,区别就在于方式二输出的hive表名与mysql输入的表名是一样的,方式一可以自己定义hive输出表的名字
小结
大数据实战【千亿级数仓】阶段二需要大家熟练Kettle的基本使用,项目所需数据的从MySQL到Hive同步以及使用Sqoop同步其他数据。
如果以上过程中出现了任何的纰漏错误,烦请大佬们指正
受益的朋友或对大数据技术感兴趣的伙伴记得点赞关注支持一波