欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

用户画像第二章(企业级360°用户画像_数据调研及ETL)

程序员文章站 2022-05-01 14:28:14
...

第二部分:业务数据调研及ETL
整个用户画像(UserProfile)项目中,数据、业务及技术流程图如下所示:
用户画像第二章(企业级360°用户画像_数据调研及ETL)

其中数据源存储在业务系统数据库:MySQL 数据库中,采用SQOOP全量/增量将数据抽取到HDFS(Hive表中),通过转换为HFile文件加载到HBase表。
1)、编写MapReduce程序
2)、编写Spark程序(推荐时用Spark编程)

用户画像第二章(企业级360°用户画像_数据调研及ETL)

1)、为什么将订单相关数据【订单数据和订单商品数据】存储到HBase表中????
特点:数据量比较大
存储HBase:存储海量数据、查询检索
2)、实际项目来说【访问行为日志】数据存储到Hive表中
数据仓库分层:
ODS层、DW层和APP层
3)、特殊:模拟的所有业务数据存储在RDBMs表中,为了简化整个项目开发,重点在于标签开发,将所有数据迁移到HBase表中。

1、电商数据(数据源)
所有的业务数据,都是编写程序模拟产生的,直接保存到MySQL数据库的表中。
1.1、MySQL数据库
在数据库【tags_dat】中包含四张表:

1)、订单商品表:tbl_goods
2)、用户表:tbl_users
3)、行为日志表:tbl_logs
4)、订单数据表:tbl_orders

用户画像第二章(企业级360°用户画像_数据调研及ETL)
1.2、表的结构
电商系统中四张表的结构如下,数据存储在MySQL数据库中(为了方便模拟业务数据,存储MySQL表)。
1.2.1、数据库Database
数据库**tags_dat,构建语句如下:
用户画像第二章(企业级360°用户画像_数据调研及ETL)
1.2.2、订单商品信息表:tbl_goods
电商网站中订单商品
goods**基本信息表,总共97个字段,除去主键ID外96个字段。

CREATE TABLE `tbl_goods` (
  `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `siteId` int(10) unsigned NOT NULL,
  `isTest` tinyint(1) unsigned NOT NULL COMMENT '是否是测试网单',
  `hasRead` tinyint(1) NOT NULL DEFAULT '0' COMMENT '是否已读,测试字段',
  `supportOneDayLimit` tinyint(1) unsigned NOT NULL COMMENT '是否支持24小时限时达',
  `orderId` int(10) unsigned NOT NULL,
  `cOrderSn` varchar(50) NOT NULL COMMENT 'child order sn 子订单编码 C0919293',
  `isBook` tinyint(1) unsigned NOT NULL DEFAULT '0' COMMENT '是否是预订网单',
  `cPaymentStatus` smallint(3) unsigned NOT NULL COMMENT '子订单付款状态',
  `cPayTime` int(10) unsigned NOT NULL COMMENT '子订单付款时间',
  `productType` varchar(50) NOT NULL COMMENT '商品类型',
  `productId` int(10) unsigned NOT NULL COMMENT '抽象商品id(可能是商品规格,也可能是套装,由商品类型决定)',
  `productName` varchar(100) NOT NULL COMMENT '商品名称:可能是商品名称加颜色规格,也可能是套装名称',
  `sku` varchar(60) NOT NULL COMMENT '货号',
  `price` decimal(10,2) unsigned NOT NULL COMMENT '商品单价',
  `number` smallint(5) unsigned NOT NULL COMMENT '数量',
  `lockedNumber` int(10) unsigned NOT NULL COMMENT '曾经锁定的库存数量',
  `unlockedNumber` int(10) unsigned NOT NULL COMMENT '曾经解锁的库存数量',
  `productAmount` decimal(10,2) NOT NULL COMMENT '此字段专为同步外部订单而加,商品总金额=price*number+shippingFee-优惠金额,但优惠金额没在本系统存储',
  `balanceAmount` decimal(10,2) unsigned NOT NULL COMMENT '余额扣减',
  `couponAmount` decimal(10,2) unsigned NOT NULL COMMENT '优惠券抵扣金额',
  `esAmount` decimal(10,2) unsigned NOT NULL COMMENT '节能补贴金额',
  `giftCardNumberId` int(10) unsigned NOT NULL COMMENT '礼品卡号ID,关联GiftCardNumbers表的主键',
  `usedGiftCardAmount` decimal(10,2) unsigned NOT NULL COMMENT '礼品卡抵用的金额',
  `couponLogId` int(10) unsigned NOT NULL COMMENT '使用的优惠券记录ID',
  `activityPrice` decimal(10,2) unsigned NOT NULL COMMENT '活动价,当有活动价时price的值来源于activityPrice',
  `activityId` int(10) unsigned NOT NULL COMMENT '活动ID',
  `cateId` int(11) NOT NULL COMMENT '分类ID',
  `brandId` int(11) NOT NULL COMMENT '品牌ID',
  `netPointId` int(10) NOT NULL COMMENT '网点id',
  `shippingFee` decimal(10,2) NOT NULL COMMENT '配送费用',
  `settlementStatus` tinyint(1) NOT NULL COMMENT '结算状态0 未结算 1已结算 ',
  `receiptOrRejectTime` int(10) unsigned NOT NULL COMMENT '确认收货时间或拒绝收货时间',
  `isWmsSku` tinyint(1) NOT NULL COMMENT '是否淘宝小家电',
  `sCode` varchar(10) NOT NULL COMMENT '库位编码',
  `tsCode` varchar(10) NOT NULL DEFAULT '' COMMENT '转运库房编码',
  `tsShippingTime` int(11) NOT NULL DEFAULT '0' COMMENT '转运时效(小时)',
  `status` smallint(3) NOT NULL COMMENT '状态',
  `productSn` varchar(60) NOT NULL COMMENT '商品条形码',
  `invoiceNumber` varchar(60) NOT NULL COMMENT '运单号',
  `expressName` varchar(255) NOT NULL COMMENT '快递公司',
  `invoiceExpressNumber` varchar(60) NOT NULL COMMENT '发票快递单号',
  `postMan` varchar(20) NOT NULL COMMENT '送货人',
  `postManPhone` varchar(15) NOT NULL COMMENT '送货人电话',
  `isNotice` smallint(5) NOT NULL COMMENT '是否开启预警',
  `noticeType` smallint(5) NOT NULL,
  `noticeRemark` varchar(255) NOT NULL,
  `noticeTime` varchar(8) NOT NULL COMMENT '预警时间',
  `shippingTime` int(10) NOT NULL COMMENT '发货时间',
  `lessOrderSn` varchar(50) NOT NULL COMMENT 'less 订单号',
  `waitGetLesShippingInfo` tinyint(1) unsigned NOT NULL COMMENT '是否等待获取LES物流配送节点信息',
  `getLesShippingCount` int(10) unsigned NOT NULL COMMENT '已获取LES配送节点信息的次数',
  `outping` varchar(20) NOT NULL COMMENT '出库凭证',
  `lessShipTime` int(10) NOT NULL COMMENT 'less出库时间',
  `closeTime` int(10) unsigned NOT NULL COMMENT '网单完成关闭或取消关闭时间',
  `isReceipt` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '是否需要发票',
  `isMakeReceipt` int(1) NOT NULL DEFAULT '1' COMMENT '开票状态',
  `receiptNum` text NOT NULL COMMENT '发票号',
  `receiptAddTime` varchar(30) NOT NULL COMMENT '开票时间',
  `makeReceiptType` tinyint(3) NOT NULL COMMENT '开票类型 0 初始值 1 库房开票  2 共享开票',
  `shippingMode` varchar(60) NOT NULL COMMENT '物流模式,值为B2B2C或B2C',
  `lastTimeForShippingMode` int(10) unsigned NOT NULL COMMENT '最后修改物流模式的时间',
  `lastEditorForShippingMode` varchar(200) NOT NULL COMMENT '最后修改物流模式的管理员',
  `systemRemark` text NOT NULL COMMENT '系统备注,不给用户显示',
  `tongshuaiWorkId` int(11) NOT NULL DEFAULT '-1' COMMENT '统帅定制作品ID',
  `orderPromotionId` int(10) unsigned NOT NULL COMMENT '下单立减活动ID',
  `orderPromotionAmount` decimal(10,2) unsigned NOT NULL COMMENT '下单立减金额',
  `externalSaleSettingId` int(10) unsigned NOT NULL COMMENT '淘宝套装设置ID',
  `recommendationId` int(10) unsigned NOT NULL COMMENT '推荐购买ID',
  `hasSendAlertNum` tinyint(1) unsigned NOT NULL COMMENT '是否已发送了购买数据报警邮件(短信)',
  `isNoLimitStockProduct` tinyint(1) unsigned NOT NULL COMMENT '是否是无限制库存量的商品',
  `hpRegisterDate` int(11) DEFAULT '0' COMMENT 'HP注册时间',
  `hpFailDate` int(11) DEFAULT '0' COMMENT 'HP派工失败时间',
  `hpFinishDate` int(11) DEFAULT '0' COMMENT 'HP派工成功时间',
  `hpReservationDate` int(11) NOT NULL DEFAULT '0' COMMENT 'HP回传预约送货时间',
  `shippingOpporunity` tinyint(4) NOT NULL DEFAULT '0' COMMENT '活动订单发货时机,0定金发货 1尾款发货',
  `isTimeoutFree` tinyint(4) NOT NULL DEFAULT '0' COMMENT '是否超时免单 0未设置 1是 2否',
  `itemShareAmount` decimal(10,2) DEFAULT '0.00' COMMENT '订单优惠价格分摊',
  `lessShipTInTime` int(10) DEFAULT '0' COMMENT 'less转运入库时间',
  `lessShipTOutTime` int(10) DEFAULT '0' COMMENT 'less转运出库时间',
  `cbsSecCode` varchar(10) DEFAULT '' COMMENT 'cbs库位',
  `points` int(11) DEFAULT '0' COMMENT '网单使用积分',
  `modified` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最后更新时间',
  `splitFlag` tinyint(3) unsigned NOT NULL COMMENT '拆单标志,0:未拆单;1:拆单后旧单;2:拆单后新单',
  `splitRelateCOrderSn` varchar(50) NOT NULL COMMENT '拆单关联单号',
  `channelId` tinyint(4) DEFAULT '0' COMMENT '区分EP和商城',
  `activityId2` int(11) NOT NULL DEFAULT '0' COMMENT '运营活动id',
  `pdOrderStatus` int(4) NOT NULL DEFAULT '0' COMMENT '日日单状态',
  `omsOrderSn` varchar(20) NOT NULL DEFAULT '' COMMENT '集团OMS单号',
  `couponCode` varchar(20) NOT NULL DEFAULT '' COMMENT '优惠码编码',
  `couponCodeValue` decimal(10,2) NOT NULL DEFAULT '0.00' COMMENT '优惠码优惠金额',
  `storeId` int(10) unsigned NOT NULL DEFAULT '0',
  `storeType` tinyint(3) unsigned NOT NULL DEFAULT '0' COMMENT '店铺类型',
  `stockType` varchar(10) NOT NULL DEFAULT 'WA',
  `o2oType` tinyint(3) unsigned NOT NULL DEFAULT '1' COMMENT 'o2o网单类型1非O2O网单2线下用户分销商城3商城分销旗舰店4创客',
  `brokerageType` varchar(100) DEFAULT NULL,
  `ogColor` varchar(30) DEFAULT NULL COMMENT '算法预留字段',
  PRIMARY KEY (`id`),
  KEY `orderId` (`orderId`),
  KEY `sCode` (`sCode`),
  KEY `cOrderSn` (`cOrderSn`),
  KEY `netPointId` (`netPointId`),
  KEY `isNotice` (`isNotice`),
  KEY `noticeTime` (`noticeTime`),
  KEY `closeTime` (`closeTime`),
  KEY `cPayTime` (`cPayTime`),
  KEY `productId` (`productId`),
  KEY `activityId` (`activityId`),
  KEY `idx_OrderProducts_cPaymentStatus_status` (`cPaymentStatus`,`status`),
  KEY `idx_OrderProducts_waitGetLesShippingInfo` (`waitGetLesShippingInfo`),
  KEY `idx_OrderProducts_status` (`status`),
  KEY `idx_OrderProducts_sku` (`sku`),
  KEY `ix_OrderProducts_lessShipTime` (`lessShipTime`),
  KEY `modified` (`modified`),
  KEY `ix_OrderProducts_receiptAddTime` (`receiptAddTime`),
  KEY `idx_pdOrderStatus` (`pdOrderStatus`),
  KEY `idx_lessShipTInTime` (`lessShipTInTime`)
) ENGINE=InnoDB AUTO_INCREMENT=3853572 DEFAULT CHARSET=utf8;

1.2.3、会员信息表:tbl_users
电商网站中用户基本信息表,总共38个字段,除去主键ID外共37个字段信息。

CREATE TABLE `tbl_users` (
  `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `siteId` int(10) unsigned NOT NULL,
  `avatarImageFileId` varchar(255) DEFAULT NULL,
  `email` varchar(120) DEFAULT NULL,
  `username` varchar(60) NOT NULL COMMENT '用户名',
  `password` varchar(32) DEFAULT NULL COMMENT '密码',
  `salt` varchar(10) DEFAULT NULL COMMENT '扰码',
  `registerTime` int(10) unsigned NOT NULL COMMENT '注册时间',
  `lastLoginTime` int(10) unsigned DEFAULT NULL COMMENT '最后登录时间',
  `lastLoginIp` varchar(15) DEFAULT NULL COMMENT '最后登录ip',
  `memberRankId` int(10) unsigned DEFAULT NULL COMMENT '特殊会员等级id,0表示非特殊会员等级',
  `bigCustomerId` int(10) unsigned DEFAULT NULL COMMENT '所属的大客户ID',
  `lastAddressId` int(10) unsigned DEFAULT NULL COMMENT '上次使用的收货地址',
  `lastPaymentCode` varchar(20) DEFAULT NULL COMMENT '上次使用的支付方式',
  `gender` tinyint(3) unsigned DEFAULT NULL COMMENT '性别:0保密1男2女',
  `birthday` date DEFAULT NULL COMMENT '生日',
  `qq` varchar(20) DEFAULT NULL,
  `job` varchar(60) DEFAULT NULL COMMENT '职业;1学生、2公务员、3军人、4警察、5教师、6白领',
  `mobile` varchar(15) DEFAULT NULL COMMENT '手机',
  `politicalFace` int(1) unsigned DEFAULT NULL COMMENT '政治面貌:1群众、2党员、3无党派人士',
  `nationality` varchar(20) DEFAULT NULL COMMENT '国籍:1*、2中国香港、3中国澳门、4中国*、5其他',
  `validateCode` varchar(32) DEFAULT NULL COMMENT '找回密码时的验证code',
  `pwdErrCount` tinyint(3) DEFAULT NULL COMMENT '密码输入错误次数',
  `source` varchar(20) DEFAULT NULL COMMENT '会员来源',
  `marriage` varchar(60) DEFAULT NULL COMMENT '婚姻状况:1未婚、2已婚、3离异',
  `money` decimal(15,2) DEFAULT NULL COMMENT '账户余额',
  `moneyPwd` varchar(32) DEFAULT NULL COMMENT '余额支付密码',
  `isEmailVerify` tinyint(1) DEFAULT NULL COMMENT '是否验证email',
  `isSmsVerify` tinyint(1) DEFAULT NULL COMMENT '是否验证短信',
  `smsVerifyCode` varchar(30) DEFAULT NULL COMMENT '邮件验证码',
  `emailVerifyCode` varchar(30) DEFAULT NULL COMMENT '短信验证码',
  `verifySendCoupon` tinyint(1) DEFAULT NULL COMMENT '是否验证发送优惠券',
  `canReceiveEmail` tinyint(1) DEFAULT NULL COMMENT '是否接收邮件',
  `modified` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最后更新时间',
  `channelId` tinyint(4) DEFAULT '0' COMMENT '??EP???',
  `grade_id` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '等级ID',
  `nick_name` varchar(60) NOT NULL DEFAULT '' COMMENT '昵称',
  `is_blackList` tinyint(1) unsigned NOT NULL DEFAULT '0' COMMENT '是否黑名单 : 0:非黑名单  1:黑名单',
  PRIMARY KEY (`id`),
  KEY `siteId` (`siteId`,`email`),
  KEY `memberRankId` (`memberRankId`)
) ENGINE=InnoDB AUTO_INCREMENT=951 DEFAULT CHARSET=utf8;

1.2.4、行为日志表:tbl_logs
电商网站中用户浏览网站访问行为日志数据(浏览数据),总共11个字段,此类数据属于最多。

CREATE TABLE `tbl_logs` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
  `log_id` varchar(50) DEFAULT NULL,
  `remote_ip` varchar(50) DEFAULT NULL,
  `site_global_ticket` varchar(250) DEFAULT NULL,
  `site_global_session` varchar(250) DEFAULT NULL,
  `global_user_id` varchar(50) DEFAULT NULL,
  `cookie_text` mediumtext,
  `user_agent` varchar(250) DEFAULT NULL,
  `ref_url` varchar(250) DEFAULT NULL,
  `loc_url` varchar(250) DEFAULT NULL,
  `log_time` varchar(50) DEFAULT NULL,
  PRIMARY KEY (`id`),
  KEY `log_time` (`log_time`)
) ENGINE=MyISAM AUTO_INCREMENT=1160286 DEFAULT CHARSET=utf8;

1.2.5、订单数据表:tbl_orders
电商网站中用户购买物品下单的订单数据,总共112个字段,记录每个订单详细信息。

CREATE TABLE `tbl_orders` (
  `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `siteId` int(10) unsigned NOT NULL,
  `isTest` tinyint(1) unsigned NOT NULL COMMENT '是否是测试订单',
  `hasSync` tinyint(1) NOT NULL DEFAULT '0' COMMENT '是否已同步(临时添加)',
  `isBackend` tinyint(3) unsigned NOT NULL DEFAULT '0' COMMENT '是否为后台添加的订单',
  `isBook` tinyint(3) unsigned NOT NULL DEFAULT '0' COMMENT '是否为后台添加的订单',
  `isCod` tinyint(1) unsigned NOT NULL COMMENT '是否是货到付款订单',
  `notAutoConfirm` tinyint(1) unsigned NOT NULL COMMENT '是否是非自动确认订单',
  `isPackage` tinyint(1) unsigned NOT NULL DEFAULT '0' COMMENT '是否为套装订单',
  `packageId` int(10) unsigned NOT NULL COMMENT '套装ID',
  `orderSn` varchar(50) NOT NULL COMMENT '订单号',
  `relationOrderSn` varchar(50) NOT NULL COMMENT '关联订单编号',
  `memberId` int(10) unsigned NOT NULL COMMENT '会员id',
  `predictId` int(10) unsigned NOT NULL COMMENT '会员购买预测ID',
  `memberEmail` varchar(120) NOT NULL COMMENT '会员邮件',
  `addTime` int(10) unsigned NOT NULL,
  `syncTime` int(10) unsigned NOT NULL COMMENT '同步到此表中的时间',
  `orderStatus` smallint(3) NOT NULL COMMENT '订单状态',
  `payTime` int(10) unsigned NOT NULL COMMENT '在线付款时间',
  `paymentStatus` smallint(3) unsigned NOT NULL COMMENT '付款状态:0 买家未付款 1 买家已付款 ',
  `receiptConsignee` varchar(20) NOT NULL COMMENT '发票收件人',
  `receiptAddress` varchar(255) NOT NULL COMMENT '发票地址',
  `receiptZipcode` varchar(20) NOT NULL COMMENT '发票邮编',
  `receiptMobile` varchar(30) NOT NULL COMMENT '发票联系电话',
  `productAmount` decimal(10,2) unsigned NOT NULL COMMENT '商品金额,等于订单中所有的商品的单价乘以数量之和',
  `orderAmount` decimal(10,2) unsigned NOT NULL COMMENT '订单总金额,等于商品总金额+运费',
  `paidBalance` decimal(10,2) unsigned NOT NULL COMMENT '余额账户支付总金额',
  `giftCardAmount` decimal(10,2) unsigned NOT NULL COMMENT '礼品卡抵用金额',
  `paidAmount` decimal(10,2) unsigned NOT NULL COMMENT '已支付金额',
  `shippingAmount` decimal(10,2) NOT NULL COMMENT '淘宝运费',
  `totalEsAmount` decimal(10,2) unsigned NOT NULL COMMENT '网单中总的节能补贴金额之和',
  `usedCustomerBalanceAmount` decimal(10,2) unsigned NOT NULL COMMENT '使用的客户的余额支付金额',
  `customerId` int(10) unsigned NOT NULL COMMENT '用余额支付的客户ID',
  `bestShippingTime` varchar(100) NOT NULL COMMENT '最佳配送时间描述',
  `paymentCode` varchar(20) NOT NULL COMMENT '支付方式code',
  `payBankCode` varchar(20) NOT NULL COMMENT '网银代码',
  `paymentName` varchar(60) NOT NULL COMMENT '支付方式名称',
  `consignee` varchar(60) NOT NULL COMMENT '收货人',
  `originRegionName` varchar(255) NOT NULL COMMENT '原淘宝收货地址信息',
  `originAddress` varchar(255) NOT NULL COMMENT '原淘宝收货人详细收货信息',
  `province` int(10) unsigned NOT NULL COMMENT '收货地址中国省份',
  `city` int(10) unsigned NOT NULL COMMENT '收货地址中的城市',
  `region` int(10) unsigned NOT NULL COMMENT '收货地址中城市中的区',
  `street` int(10) unsigned NOT NULL COMMENT '街道ID',
  `markBuilding` int(10) NOT NULL COMMENT '标志建筑物',
  `poiId` varchar(64) NOT NULL DEFAULT '' COMMENT '标建ID',
  `poiName` varchar(100) DEFAULT '' COMMENT '标建名称',
  `regionName` varchar(200) NOT NULL COMMENT '地区名称(如:北京 北京 昌平区 兴寿镇)',
  `address` varchar(255) NOT NULL COMMENT '收货地址中用户输入的地址,一般是区以下的详细地址',
  `zipcode` varchar(20) NOT NULL COMMENT '收货地址中的邮编',
  `mobile` varchar(15) NOT NULL COMMENT '收货人手机号',
  `phone` varchar(20) NOT NULL COMMENT '收货人固定电话号',
  `receiptInfo` text NOT NULL COMMENT '发票信息,序列化数组array(''title'' =>.., ''receiptType'' =>..,''needReceipt'' => ..,''companyName'' =>..,''taxSpotNum'' =>..,''regAddress''=>..,''regPhone''=>..,''bank''=>..,''bankAccount''=>..)',
  `delayShipTime` int(10) unsigned NOT NULL COMMENT '延迟发货日期',
  `remark` text NOT NULL COMMENT '订单备注',
  `bankCode` varchar(255) DEFAULT NULL COMMENT '银行代码,用于银行直链支付',
  `agent` varchar(255) DEFAULT NULL COMMENT '处理人',
  `confirmTime` int(11) DEFAULT NULL COMMENT '确认时间',
  `firstConfirmTime` int(10) unsigned NOT NULL COMMENT '首次确认时间',
  `firstConfirmPerson` varchar(200) NOT NULL COMMENT '第一次确认人',
  `finishTime` int(11) DEFAULT NULL COMMENT '订单完成时间',
  `tradeSn` varchar(255) DEFAULT NULL COMMENT '在线支付交易流水号',
  `signCode` varchar(20) NOT NULL COMMENT '收货确认码',
  `source` varchar(30) NOT NULL COMMENT '订单来源',
  `sourceOrderSn` varchar(60) NOT NULL COMMENT '外部订单号',
  `onedayLimit` tinyint(4) NOT NULL DEFAULT '0' COMMENT '是否支持24小时限时达',
  `logisticsManner` int(1) NOT NULL COMMENT '物流评价',
  `afterSaleManner` int(1) NOT NULL COMMENT '售后评价',
  `personManner` int(1) NOT NULL COMMENT '人员态度',
  `visitRemark` varchar(400) NOT NULL COMMENT '回访备注',
  `visitTime` int(11) NOT NULL COMMENT '回访时间',
  `visitPerson` varchar(20) NOT NULL COMMENT '回访人',
  `sellPeople` varchar(20) NOT NULL COMMENT '销售代表',
  `sellPeopleManner` int(1) NOT NULL COMMENT '销售代表服务态度',
  `orderType` tinyint(2) NOT NULL COMMENT '订单类型 默认0 团购预付款 团购正式单 2',
  `hasReadTaobaoOrderComment` tinyint(1) unsigned NOT NULL COMMENT '是否已读取过淘宝订单评论',
  `memberInvoiceId` int(10) unsigned NOT NULL COMMENT '订单发票ID,MemberInvoices表的主键',
  `taobaoGroupId` int(10) unsigned NOT NULL COMMENT '淘宝万人团活动ID',
  `tradeType` varchar(100) NOT NULL COMMENT '交易类型,值参考淘宝',
  `stepTradeStatus` varchar(100) NOT NULL COMMENT '分阶段付款的订单状态,值参考淘宝',
  `stepPaidFee` decimal(10,2) NOT NULL COMMENT '分阶段付款的已付金额',
  `depositAmount` decimal(10,2) unsigned NOT NULL COMMENT '定金应付金额',
  `balanceAmount` decimal(10,2) unsigned NOT NULL COMMENT '尾款应付金额',
  `autoCancelDays` int(10) unsigned NOT NULL COMMENT '未付款过期的天数',
  `isNoLimitStockOrder` tinyint(1) unsigned NOT NULL COMMENT '是否是无库存限制订单',
  `ccbOrderReceivedLogId` int(10) unsigned NOT NULL COMMENT '建行订单接收日志ID',
  `ip` varchar(50) NOT NULL COMMENT '订单来源IP,针对商城前台订单',
  `isGiftCardOrder` tinyint(1) unsigned NOT NULL COMMENT '是否为礼品卡订单',
  `giftCardDownloadPassword` varchar(200) NOT NULL COMMENT '礼品卡下载密码',
  `giftCardFindMobile` varchar(20) NOT NULL COMMENT '礼品卡密码找回手机号',
  `autoConfirmNum` int(10) unsigned NOT NULL COMMENT '已自动确认的次数',
  `codConfirmPerson` varchar(100) NOT NULL COMMENT '货到付款确认人',
  `codConfirmTime` int(11) NOT NULL COMMENT '货到付款确认时间',
  `codConfirmRemark` varchar(255) NOT NULL COMMENT '货到付款确认备注',
  `codConfirmState` tinyint(1) unsigned NOT NULL COMMENT '货到侍确认状态0无需未确认,1待确认,2确认通过可以发货,3确认无效,订单可以取消',
  `paymentNoticeUrl` text NOT NULL COMMENT '付款结果通知URL',
  `addressLon` decimal(9,6) NOT NULL COMMENT '地址经度',
  `addressLat` decimal(9,6) NOT NULL COMMENT '地址纬度',
  `smConfirmStatus` tinyint(4) NOT NULL COMMENT '标建确认状态。1 = 初始状态;2 = 已发HP,等待确认;3 = 待人工处理;4 = 待自动处理;5 = 已确认',
  `smConfirmTime` int(10) NOT NULL COMMENT '请求发送HP时间,格式为时间戳',
  `smManualTime` int(10) DEFAULT '0' COMMENT '转人工确认时间',
  `smManualRemark` varchar(200) DEFAULT '' COMMENT '转人工确认备注',
  `isTogether` tinyint(3) unsigned NOT NULL COMMENT '货票通行',
  `isNotConfirm` tinyint(1) NOT NULL DEFAULT '0' COMMENT '是否是无需确认的订单',
  `tailPayTime` int(11) NOT NULL DEFAULT '0' COMMENT '尾款付款时间',
  `points` int(11) DEFAULT '0' COMMENT '网单使用积分',
  `modified` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最后更新时间',
  `channelId` tinyint(4) DEFAULT '0' COMMENT '区分EP和商城',
  `isProduceDaily` int(2) NOT NULL DEFAULT '0' COMMENT '是否日日单(1:是,0:否)',
  `couponCode` varchar(20) NOT NULL DEFAULT '' COMMENT '优惠码编码',
  `couponCodeValue` decimal(10,2) NOT NULL DEFAULT '0.00' COMMENT '优惠码优惠金额',
  `ckCode` varchar(200) NOT NULL DEFAULT '',
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_Orders_ordersn` (`orderSn`),
  KEY `memberId` (`memberId`),
  KEY `agent` (`agent`),
  KEY `addTime` (`addTime`),
  KEY `payTime` (`payTime`),
  KEY `orderStatus` (`orderStatus`),
  KEY `sourceOrderSn` (`sourceOrderSn`),
  KEY `smConfirmStatus` (`smConfirmStatus`),
  KEY `idx_orders_source_orderStatus_hasReadTaobaoOrderComment` (`source`,`orderStatus`,`hasReadTaobaoOrderComment`),
  KEY `idx_order_mobile` (`mobile`),
  KEY `idx_orders_codConfirmState` (`codConfirmState`),
  KEY `modified` (`modified`),
  KEY `tailPayTime` (`tailPayTime`),
  KEY `ix_Orders_syncTime` (`syncTime`),
  KEY `ix_Orders_relationOrderSn` (`relationOrderSn`),
  KEY `ix_Orders_consignee` (`consignee`),
  KEY `idx_firstConfirmTime` (`firstConfirmTime`),
  KEY `ix_Orders_paymentStatus` (`paymentStatus`)
) ENGINE=InnoDB AUTO_INCREMENT=120128 DEFAULT CHARSET=utf8;

数据导入
设置mysql 导入数据允许的最大包大小:

set global max_allowed_packet=1024102432;

导入数据

source /opt/tags_dat.sql;

此表中目前的数据量为:125463条

mysql> SELECT COUNT(1) FROM tags_dat.tbl_goods ;
±---------+
| COUNT(1) |
±---------+
| 125463 |
±---------+

此表中目前的数据量为:950条

mysql> SELECT COUNT(1) FROM tags_dat.tbl_users ;
±---------+
| COUNT(1) |
±---------+
| 950 |
±---------+

此表中目前的数据量为:376983条

mysql> SELECT COUNT(1) FROM tags_dat.tbl_logs;
±---------+
| COUNT(1) |
±---------+
| 376983 |
±---------+

此表中目前的数据量为:120125条

mysql> SELECT COUNT(1) FROM tags_dat.tbl_orders;
±---------+
| COUNT(1) |
±---------+
| 120125 |
±---------+

1.3、Hive 数据仓库
将MySQL数据库中表的数据导入到Hive表中,以便加载到HBase表中。
用户画像第二章(企业级360°用户画像_数据调研及ETL)
启动HiveMetastore服务和HiveServer2服务,使用beeline命令行连接,相关命令如下:

[aaa@qq.com ~]# /export/servers/hive/bin/beeline
Beeline version 1.1.0-cdh5.14.0 by Apache Hive
beeline> !connect jdbc:hive2://bd001:10000
scan complete in 2ms
Connecting to jdbc:hive2://bd001:10000
Enter username for jdbc:hive2://bd001:10000: root
Enter password for jdbc:hive2://bd001:10000: ****
Connected to: Apache Hive (version 1.1.0-cdh5.14.0)
Driver: Hive JDBC (version 1.1.0-cdh5.14.0)
Transaction isolation: TRANSACTION_REPEATABLE_READ
0: jdbc:hive2://bd001:10000>

1.3.1、创建表
创建Hive中数据库Database:

CREATE DATABASE tags_dat;

根据MySQL数据库表在Hive数据仓库中构建相应的表:
行为日志表:tbl_logs

/export/servers/sqoop/bin/sqoop create-hive-table
–connect jdbc:mysql://bd001:3306/tags_dat
–table tbl_logs
–username root
–password 123456
–hive-table tags_dat2.tbl_logs
–fields-terminated-by ‘\t’
–lines-terminated-by ‘\n’

商品表:tbl_goods

/export/servers/sqoop/bin/sqoop create-hive-table
–connect jdbc:mysql://bd001:3306/tags_dat
–table tbl_goods
–username root
–password 123456
–hive-table tags_dat2.tbl_goods
–fields-terminated-by ‘\t’
–lines-terminated-by ‘\n’

订单数据表:tbl_orders

/export/servers/sqoop/bin/sqoop create-hive-table
–connect jdbc:mysql://bd001:3306/tags_dat
–table tbl_orders
–username root
–password 123456
–hive-table tags_dat2.tbl_orders
–fields-terminated-by ‘\t’
–lines-terminated-by ‘\n’

用户信息表:tbl_users

/export/servers/sqoop/bin/sqoop create-hive-table
–connect jdbc:mysql://bd001:3306/tags_dat
–table tbl_orders
–username root
–password 123456
–hive-table tags_dat2.tbl_orders
–fields-terminated-by ‘\t’
–lines-terminated-by ‘\n’

用户信息表:tbl_users

/export/servers/sqoop/bin/sqoop create-hive-table
–connect jdbc:mysql://bd001:3306/tags_dat
–table tbl_users
–username root
–password 123456
–hive-table tags_dat2.tbl_users
–fields-terminated-by ‘\t’
–lines-terminated-by ‘\n’

1.3.2、导入数据至Hive表
使用Sqoop将MySQL数据库表中的数据导入到Hive表中(本质就是存储在HDFS上),具体命令如下。
行为日志表:tbl_logs

/export/servers/sqoop/bin/sqoop import
–connect jdbc:mysql://bd001:3306/tags_dat
–username root
–password 123456
–table tbl_logs
–direct
–hive-overwrite
–delete-target-dir
–fields-terminated-by ‘\t’
–lines-terminated-by ‘\n’
–hive-table tags_dat2.tbl_logs
–hive-import
–num-mappers 20

商品表:tbl_goods

/export/servers/sqoop/bin/sqoop import
–connect jdbc:mysql://bd001:3306/tags_dat
–username root
–password 123456
–table tbl_goods
–direct
–hive-overwrite
–delete-target-dir
–fields-terminated-by ‘\t’
–lines-terminated-by ‘\n’
–hive-table tags_dat2.tbl_goods
–hive-import
–num-mappers 5

订单数据表:tbl_orders

/export/servers/sqoop/bin/sqoop import
–connect jdbc:mysql://bd001:3306/tags_dat
–username root
–password 123456
–table tbl_orders
–direct
–hive-overwrite
–delete-target-dir
–fields-terminated-by ‘\t’
–lines-terminated-by ‘\n’
–hive-table tags_dat2.tbl_orders
–hive-import
–num-mappers 10

用户信息表:tbl_users

/export/servers/sqoop/bin/sqoop import
–connect jdbc:mysql://bd001:3306/tags_dat
–username root
–password 123456
–table tbl_users
–direct
–hive-overwrite
–delete-target-dir
–fields-terminated-by ‘\t’
–lines-terminated-by ‘\n’
–hive-table tags_dat2.tbl_users
–hive-import
–num-mappers 1

2、数据导入(Import)
将MySQL表中业务数据导入大数据平台中如HBase表,方案如下所示:
用户画像第二章(企业级360°用户画像_数据调研及ETL)
2.1、HBase 表设计
电商网站中各类数据(用户信息数据、用户访问日志数据及用户订单数据)存储到HBase表中,便于检索和分析构建电商用户画像,有如下几张表:

hbase(main):008:0> list
TABLE
tbl_logs
tbl_orders
tbl_users
tbl_goods
=> [“tbl_logs”, “tbl_orders”, “tbl_users”, “tbl_goods”]

用户基本信息:tbl_users

-- 1、如果用户表存在先删除
hbase(main):013:0> disable 'tbl_users'
hbase(main):014:0> drop 'tbl_users'
-- 或者清空表
hbase(main):015:0> truncate 'tbl_users'
-- 2、创建用户表
hbase(main):016:0> create 'tbl_users2','detail'

hbase(main):019:0> desc "tbl_users"
Table tbl_users is ENABLED
tbl_users
COLUMN FAMILIES DESCRIPTION
{NAME => 'detail', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}

hbase(main):020:0> count "tbl_users"
950 row(s) in 0.1910 seconds

HBase表中有1个Region:
用户画像第二章(企业级360°用户画像_数据调研及ETL)
行为日志数据:tbl_logs,共376983条数据

# 1、如果行为表存在先删除
hbase(main):008:0> disable 'tbl_logs'
hbase(main):009:0> drop 'tbl_logs'
# 2、创建行为表
hbase(main):010:0> create 'tbl_logs2', 'detail', SPLITS => ['49394']
# 3、查看表中一条数据
hbase(main):011:0> scan 'tbl_logs', { LIMIT => 1 }
hbase(main):012:0> desc  "tbl_logs"
Table tbl_logs is ENABLED
tbl_logs
COLUMN FAMILIES DESCRIPTION
{NAME => 'detail', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}

hbase(main):004:0> count "tbl_logs"
376983 row(s) in 63.4920 seconds

用户画像第二章(企业级360°用户画像_数据调研及ETL)
用户订单数据:tbl_orders,共120125条

-- 1、如果订单表存在先删除
hbase(main):012:0> disable 'tbl_orders'
hbase(main):013:0> drop 'tbl_orders'
-- 2、创建订单表
hbase(main):014:0> create 'tbl_orders2','detail'

hbase(main):015:0> desc "tbl_orders"
Table tbl_orders is ENABLED
tbl_orders
COLUMN FAMILIES DESCRIPTION
{NAME => 'detail', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}


hbase(main):016:0> count "tbl_orders"
120125 row(s) in 38.9170 seconds

用户画像第二章(企业级360°用户画像_数据调研及ETL)
订单商品数据:tbl_goods

-- 1、如果用户表存在先删除
hbase(main):013:0> disable 'tbl_goods'
hbase(main):014:0> drop 'tbl_goods'
-- 或者清空表
hbase(main):015:0> truncate 'tbl_goods2'
-- 2、创建用户表
hbase(main):016:0> create 'tbl_goods2','detail'

hbase(main):019:0> desc "tbl_goods"
Table tbl_users is ENABLED
tbl_users
COLUMN FAMILIES DESCRIPTION
{NAME => 'detail', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}

hbase(main):016:0> count "tbl_goods"
120125 row(s) in 38.9170 seconds

2.2、Sqoop直接导入
可以使用SQOOP将MySQL表的数据导入到HBase表中,指定表的名称、列簇及RowKey,范例如下所示:

/export/servers/sqoop/bin/sqoop import
-D sqoop.hbase.add.row.key=true
–connect jdbc:mysql://bd001:3306/tags_dat
–username root
–password 123456
–table tbl_users
–hbase-create-table
–hbase-table tbl_users2
–column-family detail
–hbase-row-key id
–num-mappers 2

参数含义解释:

1、-D sqoop.hbase.add.row.key=true
是否将rowkey相关字段写入列族中,默认为false,默认情况下你将在列族中看不到任何row key中的字段。注意,该参数必须放在import之后。
2、–hbase-create-table 如果hbase中该表不存在则创建
3、–hbase-table 对应的hbase表名
4、–hbase-row-key hbase表中的rowkey,注意格式
5、–column-family hbase表的列族

知识拓展:如何使用SQOOP进行增量导入数据至HBase表,范例命令如下:

/export/servers/sqoop/bin/sqoop import
-D sqoop.hbase.add.row.key=true
–connect jdbc:mysql://bd001:3306/tags_dat
–username root
–password 123456
–table tbl_logs

–hbase-create-table
–hbase-table tag_logs
–column-family detail
–hbase-row-key id
–num-mappers 20
–incremental lastmodified
–check-column log_time
–last-value ‘2019-08-13 00:00:00’ \

相关增量导入参数说明:

1、–incremental lastmodified 增量导入支持两种模式 append 递增的列;lastmodified时间戳。
2、–check-column 增量导入时参考的列
3、–last-value 最小值,这个例子中表示导入2019-08-13 00:00:00到今天的值
用户画像第二章(企业级360°用户画像_数据调研及ETL)

2.3、HBase ImportTSV
ImportTSV功能描述:

将tsv(也可以是csv,每行数据中各个字段使用分隔符分割)格式文本数据,加载到HBase表中。
1)、采用Put方式加载导入
2)、采用BulkLoad方式批量加载导入

使用如下命令,查看HBase官方自带工具类使用说明:

HADOOP_HOME=/export/servers/hadoop
HBASE_HOME=/export/servers/hbase
HADOOP_CLASSPATH=${HBASE_HOME}/bin/hbase mapredcp????{HBASE_HOME}/conf ${HADOOP_HOME}/bin/yarn jar ${HBASE_HOME}/lib/hbase-server-1.2.0-cdh5.14.0.jar

执行上述命令提示如下信息:

An example program must be given as the first argument.
Valid program names are:
CellCounter: Count cells in HBase table.
WALPlayer: Replay WAL files.
completebulkload: Complete a bulk data load.
copytable: Export a table from local cluster to peer cluster.
export: Write table data to HDFS.
exportsnapshot: Export the specific snapshot to a given FileSystem.
import: Import data written by Export.
importtsv: Import data in TSV format.
rowcounter: Count rows in HBase table.
verifyrep: Compare the data from tables in two different clusters.

必须给出一个示例程序作为第一个参数。
有效的程序名是:
CellCounter:对HBase表中的单元格进行计数。
WALPlayer:重放WAL文件。
completebulkload:完成批量数据加载。
copytable:将表从本地群集导出到对等群集。
export:将表数据写入HDFS。
exportsnapshot:将特定快照导出到给定的文件系统。
import:导入通过导出写入的数据。
importtsv:导入TSV格式的数据。
rowcounter:计算HBase表中的行数。
verifyrep:比较两个不同集群中表中的数据。

其中importtsv就是将文本文件(比如CSV、TSV等格式)数据导入HBase表工具类,使用说明如下:

Usage: importtsv -Dimporttsv.columns=a,b,c
The column names of the TSV data must be specified using the -Dimporttsv.columns
option. This option takes the form of comma-separated column names, where each
column name is either a simple column family, or a columnfamily:qualifier. The special column name HBASE_ROW_KEY is used to designate that this column should be used as the row key for each imported record.
To instead generate HFiles of data to prepare for a bulk data load, pass the option:
-Dimporttsv.bulk.output=/path/for/output
‘-Dimporttsv.separator=|’ - eg separate on pipes instead of tabs
For performance consider the following options:
-Dmapreduce.map.speculative=false
-Dmapreduce.reduce.speculative=false

分别演示采用直接Put方式和HFile文件方式将数据导入HBase表,命令如下:
其一、直接导入Put方式

HADOOP_HOME=/export/servers/hadoop
HBASE_HOME=/export/servers/hbase
HADOOP_CLASSPATH=${HBASE_HOME}/bin/hbase mapredcp????{HBASE_HOME}/conf ${HADOOP_HOME}/bin/yarn jar ${HBASE_HOME}/lib/hbase-server-1.2.0-cdh5.14.0.jar
importtsv
-Dimporttsv.columns=HBASE_ROW_KEY,detail:log_id,detail:remote_ip,detail:site_global_ticket,detail:site_global_session,detail:global_user_id,detail:cookie_text,detail:user_agent,detail:ref_url,detail:loc_url,detail:log_time
tbl_logs2
/user/hive/warehouse/tags_dat2.db/tbl_logs

上述命令本质上运行一个MapReduce应用程序,将文本文件中每行数据转换封装到Put对象,然后插入到HBase表中。

回顾一下:
采用Put方式向HBase表中插入数据流程:
Put
-> WAL 预写日志
-> MemStore(内存) ,当达到一定大写Spill到磁盘上:StoreFile(HFile)
思考:
对海量数据插入,能否将数据直接保存为HFile文件,然后加载到HBase表中

其二、转换为HFile文件,再加载至表

# 生成HFILES文件
HADOOP_HOME=/export/servers/hadoop
HBASE_HOME=/export/servers/hbase
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`:${HBASE_HOME}/conf ${HADOOP_HOME}/bin/yarn jar ${HBASE_HOME}/lib/hbase-server-1.2.0-cdh5.14.0.jar \
importtsv \
-Dimporttsv.bulk.output=hdfs://bd001:8020/datas/output_hfile/tbl_tag_logs \
-Dimporttsv.columns=HBASE_ROW_KEY,detail:log_id,detail:remote_ip,detail:site_global_ticket,detail:site_global_session,detail:global_user_id,detail:cookie_text,detail:user_agent,detail:ref_url,detail:loc_url,detail:log_time \
tbl_logs2 \
/user/hive/warehouse/tags_dat2.db/tbl_logs

# 将HFILE文件加载到表中
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`:${HBASE_HOME}/conf ${HADOOP_HOME}/bin/yarn jar \
${HBASE_HOME}/lib/hbase-server-1.2.0-cdh5.14.0.jar \
completebulkload \
hdfs://bd001:8020/datas/output_hfile/tbl_tag_logs \
tbl_logs2

缺点:

1)、ROWKEY不能是组合主键
只能是某一个字段
2)、当表中列很多时,书写-Dimporttsv.columns值时很麻烦,容易出错

2.4、HBase Bulkload
在大量数据需要写入HBase时,通常有put方式和bulkLoad两种方式。
1、put方式为单条插入,在put数据时会先将数据的更新操作信息和数据信息写入WAL,在写入到WAL后,数据就会被放到MemStore中,当MemStore满后数据就会被flush到磁盘(即形成HFile文件),在这种写操作过程会涉及到flush、split、compaction等操作,容易造成节点不稳定,数据导入慢,耗费资源等问题,在海量数据的导入过程极大的消耗了系统性能,避免这些问题最好的方法就是使用BulkLoad的方式来加载数据到HBase中.

val put = new Put(rowKeyByts)
put.addColumn(cf, column, value)
put.addColumn(cf, column, value)
put.addColumn(cf, column, value)
put.addColumn(cf, column, value)
table.put(put)

用户画像第二章(企业级360°用户画像_数据调研及ETL)
2、BulkLoader利用HBase数据按照HFile格式存储在HDFS的原理,使用MapReduce直接批量生成HFile格式文件后,RegionServers再将HFile文件移动到相应的Region目录下
用户画像第二章(企业级360°用户画像_数据调研及ETL)

1)、Extract,异构数据源数据导入到 HDFS 之上。
2)、Transform,通过用户代码,可以是 MR 或者 Spark 任务将数据转化为 HFile。
3)、Load,HFile 通过 loadIncrementalHFiles 调用将 HFile 放置到 Region 对应的 HDFS 目录上,该过程可能涉及到文件切分。
1、不会触发WAL预写日志,当表还没有数据时进行数据导入不会产生Flush和Split。
2、减少接口调用的消耗,是一种快速写入的优化方式。
Spark读写HBase之使用Spark自带的API以及使用Bulk Load将大量数据导入HBase:
https://www.jianshu.com/p/b6c5a5ba30af

Bulkload过程主要包括三部分:

1、从数据源(通常是文本文件或其他的数据库)提取数据并上传到HDFS。
抽取数据到HDFS。和Hbase并没有关系,所以大家可以选用自己擅长的方式进行。
2、利用MapReduce作业处理事先准备的数据 。
这一步需要一个MapReduce作业,并且大多数情况下还需要我们自己编写Map函数,而Reduce函数不需要我们考虑,由HBase提供。
该作业需要使用rowkey(行键)作为输出Key;
KeyValue、Put或者Delete作为输出Value。
MapReduce作业需要使用HFileOutputFormat2来生成HBase数据文件。
为了有效的导入数据,需要配置HFileOutputFormat2使得每一个输出文件都在一个合适的区域中。为了达到这个目的,MapReduce作业会使用Hadoop的TotalOrderPartitioner类根据表的key值将输出分割开来。
HFileOutputFormat2的方法configureIncrementalLoad()会自动的完成上面的工作。
3、告诉RegionServers数据的位置并导入数据。
这一步是最简单的,通常需要使用LoadIncrementalHFiles(更为人所熟知是completebulkload工具),将文件在HDFS上的位置传递给它,它就会利用RegionServer将数据导入到相应的区域。

2.5、编写MapReduce导入
将MySQL表的数据先导入到HDFS文件中(比如TSV格式),编写MapReduce将文本文件数据转换为HFile文件,加载到HBase表中。
第一步、Hive中创建表

/export/servers/sqoop/bin/sqoop create-hive-table
–connect jdbc:mysql://bd001:3306/tags_dat
–table tbl_logs
–username root
–password 123456
–hive-table tags_dat2.tbl_logs
–fields-terminated-by ‘\t’
–lines-terminated-by ‘\n’

第二步、导入MySQL表数据到Hive表

/export/servers/sqoop/bin/sqoop import
–connect jdbc:mysql://bd001:3306/tags_dat
–username root
–password 123456
–table tbl_logs
–direct
–hive-overwrite
–delete-target-dir
–fields-terminated-by ‘\t’
–lines-terminated-by ‘\n’
–hive-table tags_dat2.tbl_logs
–hive-import
–num-mappers 20

第三步、编写MapReduce导入数据至HBase表
其一、创建HBase 表,设置预分区

create ‘tbl_logs’, ‘detail’, SPLITS => [‘49394’]

其二、工具类Constants,定义常量值

package cn.itcast.tags.etl.mr;

import org.apache.hadoop.hbase.util.Bytes;

import java.util.ArrayList;
import java.util.List;

/**
 * 定义常量
 */
interface Constants {
	// hive表数据目录
	String INPUT_PATH = "hdfs://bd001:8020/user/hive/warehouse/tags_dat.db/tbl_logs";
	// 生成的hfile目录
	String HFILE_PATH = "hdfs://bd001:8020/datas/output_hfile/tbl_logs";
	// 表名
	String TABLE_NAME = "tbl_logs";
	// 列簇名称
	byte[] COLUMN_FAMILY = Bytes.toBytes("detail");

	// 表字段
	List<byte[]> list = new ArrayList<byte[]>() {
		private static final long serialVersionUID = -6125158551837044300L;

		{ //
			add(Bytes.toBytes("id"));
			add(Bytes.toBytes("log_id"));
			add(Bytes.toBytes("remote_ip"));
			add(Bytes.toBytes("site_global_ticket"));
			add(Bytes.toBytes("site_global_session"));
			add(Bytes.toBytes("global_user_id"));
			add(Bytes.toBytes("cookie_text"));
			add(Bytes.toBytes("user_agent"));
			add(Bytes.toBytes("ref_url"));
			add(Bytes.toBytes("loc_url"));
			add(Bytes.toBytes("log_time"));
		} //
	};

}

其三、MapReduce程序(本地运行)
使用Java语言,编写MapReduce程序,读取Hive表中数据文件,使用HFileOutputFormat2输出格式,保存数据至HFile文件,再加载到HBase表中。

package cn.itcast.tags.etl.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

/**
 * 将Hive表数据转换为HFile文件并移动HFile到HBase
 */
public class LoadLogsToHBaseMapReduce
		extends Configured implements Tool {

	// 连接HBase Connection对象
	private static Connection connection = null ;

	/**
	 * 定义Mapper类,读取CSV格式数据,转换为Put对象,存储HBase表
	 */
	static class LoadLogsToHBase extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {

		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			// 按照分隔符分割数据,分隔符为 逗号
			String[] split = value.toString().split("\\t");
			if (split.length == Constants.list.size()) {
				// 构建Put对象,将每行数据转换为Put
				Put put = new Put(Bytes.toBytes(split[0]));
				for (int i = 1; i < Constants.list.size(); i++) {
					put.addColumn(//
							Constants.COLUMN_FAMILY, //
							Constants.list.get(i), //
							Bytes.toBytes(split[i]) //
					);
				}
				// 将数据输出
				context.write(new ImmutableBytesWritable(put.getRow()), put);
			}
		}

	}

	@Override
	public int run(String[] args) throws Exception {
		// a. 获取配置信息对象
		Configuration configuration = super.getConf() ;

		// b. 构建Job对象Job
		Job job = Job.getInstance(configuration);
		job.setJobName(this.getClass().getSimpleName());
		job.setJarByClass(LoadLogsToHBaseMapReduce.class);

		// c. 设置Job
		FileInputFormat.addInputPath(job, new Path(Constants.INPUT_PATH));
		job.setMapperClass(LoadLogsToHBase.class);
		// TODO: 设置输出格式为HFileOutputFormat2
		job.setMapOutputKeyClass(ImmutableBytesWritable.class);
		job.setMapOutputValueClass(Put.class);
		job.setOutputFormatClass(HFileOutputFormat2.class);

		// TODO: 判断输出目录是否存在,如果存在就删除
		FileSystem hdfs = FileSystem.get(configuration) ;
		Path outputPath = new Path(Constants.HFILE_PATH) ;
		if(hdfs.exists(outputPath)){
			hdfs.delete(outputPath, true) ;
		}
		// d. 设置输出路径
		FileOutputFormat.setOutputPath(job, outputPath);

		// TODO:获取HBase Table,对HFileOutputFormat2进行设置
		Table table = connection.getTable(TableName.valueOf(Constants.TABLE_NAME));
		HFileOutputFormat2.configureIncrementalLoad( //
				job, //
				table, //
				connection.getRegionLocator(TableName.valueOf(Constants.TABLE_NAME)) //
		);

		// 提交运行Job,返回是否执行成功
		boolean isSuccess = job.waitForCompletion(true);
		return isSuccess ? 0 : 1;
	}


	public static void main(String[] args) throws Exception {
		// 获取Configuration对象,读取配置信息
		Configuration configuration = HBaseConfiguration.create();
		// 获取HBase 连接Connection对象
		connection = ConnectionFactory.createConnection(configuration);
		
		// 运行MapReduce将数据文件转换为HFile文件
		int status = ToolRunner.run(configuration, new LoadLogsToHBaseMapReduce(), args);
		System.out.println("HFile文件生成完毕!~~~");

		// TODO:运行成功时,加载HFile文件数据到HBase表中
		if (0 == status) {
			// 获取HBase Table句柄
			Admin admin = connection.getAdmin();
			Table table = connection.getTable(TableName.valueOf(Constants.TABLE_NAME));
			// 加载数据到表中
			LoadIncrementalHFiles load = new LoadIncrementalHFiles(configuration);
			load.doBulkLoad(
					new Path(Constants.HFILE_PATH), //
					admin, //
					table, //
					connection.getRegionLocator(TableName.valueOf(Constants.TABLE_NAME)) //
			);
			System.out.println("HFile文件移动完毕!~~~");
		}
	}

}

2.6、编写Spark 程序导入
企业中大规模数据存储于HBase背景:

项目中有需求,要频繁地、快速地向一个表中初始化数据。因此如何加载数据,如何提高速度是需要解决的问题。
一般来说,作为数据存储系统会分为检索和存储两部分。检索是对外暴露数据查询接口。存储一是要实现数据按固定规则存储到存储介质中(如磁盘、内存等),另一方面还需要向外暴露批量装载的工具。如DB2的 db2load 工具,在关闭掉日志的前提下,写入速度能有显著提高。

HBase数据库提供批量导入数据至表功能,相关知识点如下:

1、Hbase 中LoadIncrementalHFiles 支持向Hbase 写入HFile 文件
2、写入的HFile 文件要求是排序的(rowKey,列簇,列)
3、关键是绕过Hbase regionServer,直接写入Hbase文件
4、Spark RDD的repartitionAndSortWithinPartitions 方法可以高效地实现分区并排序
5、JAVA util.TreeMap 是红黑树的实现,能很好的实现排序的要求

编写应用开发流程如下:

1、对待写入的数据按Key值构造util.TreeMap 树结构。目的是按Key值构造匹配Hbase 的排序结构
2、转换成RDD,使用repartitionAndSortWithinPartitions算子 对Key值分区并排序
3、调用RDD的saveAsNewAPIHadoopFile 算子,生成HFile文件
4、调用Hbase: LoadIncrementalHFiles 将HFile文件Load 到Hbase 表中

1)、批量加载数据类:HBaseBulkLoader,代码如下:

package cn.itcast.tags.etl.hfile

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Table}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue, TableName}
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.immutable.TreeMap

/**
 * 将数据存储文本文件转换为HFile文件,加载到HBase表中
 */
object HBaseBulkLoader {
	
	def main(args: Array[String]): Unit = {
		// 应用执行时传递5个参数:数据类型、HBase表名称、表列簇、输入路径及输出路径
		/*
		args = Array("1", "tbl_tag_logs", "detail", "/user/hive/warehouse/tags_dat.db/tbl_logs", "/datas/output_hfile/tbl_tag_logs")
		args = Array("2", "tbl_tag_goods", "detail", "/user/hive/warehouse/tags_dat.db/tbl_goods", "/datas/output_hfile/tbl_tag_goods")
		args = Array("3", "tbl_tag_users", "detail", "/user/hive/warehouse/tags_dat.db/tbl_users", "/datas/output_hfile/tbl_tag_users")
		args = Array("4", "tbl_tag_orders", "detail", "/user/hive/warehouse/tags_dat.db/tbl_orders", "/datas/output_hfile/tbl_tag_orders")
		*/
		if(args.length != 5){
			println("Usage: required params: <DataType> <HBaseTable> <Family> <InputDir> <OutputDir>")
			System.exit(-1)
		}
		// 将传递赋值给变量, 其中数据类型:1Log、2Good、3User、4Order
		val Array(dataType, tableName, family, inputDir, outputDir) = args
		
		// 依据参数获取处理数据schema
		val fieldNames = dataType.toInt match {
			case 1 => TableFieldNames.LOG_FIELD_NAMES
			case 2 => TableFieldNames.GOODS_FIELD_NAMES
			case 3 => TableFieldNames.USER_FIELD_NAMES
			case 4 => TableFieldNames.ORDER_FIELD_NAMES
		}
		
		// 1. 构建SparkContext实例对象
		val sc: SparkContext = {
			// a. 创建SparkConf,设置应用配置信息
			val sparkConf = new SparkConf()
				.setMaster("local[2]")
				.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
				.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
			// b. 传递SparkContext创建对象
			SparkContext.getOrCreate(sparkConf)
		}
		
		// 2. 读取文本文件数据,转换格式
		val keyValuesRDD: RDD[(ImmutableBytesWritable, KeyValue)] = sc
			.textFile(inputDir)
			// 过滤数据
			.filter(line => null != line)
			// 提取数据字段,构建二元组(RowKey, KeyValue)
			/*
				Key: rowkey + cf + column + version(timestamp)
				Value: ColumnValue
			 */
			.flatMap{line => getLineToData(line, family, fieldNames)}
			// TODO: 对数据做字典排序
			.sortByKey()
		
		// TODO:构建Job,设置相关配置信息,主要为输出格式
		// a. 读取配置信息
		val conf: Configuration = HBaseConfiguration.create()
		// b. 如果输出目录存在,删除
		val dfs = FileSystem.get(conf)
		val outputPath: Path = new Path(outputDir)
		if(dfs.exists(outputPath)){
			dfs.delete(outputPath, true)
		}
		dfs.close()
		
		// TODO:c. 配置HFileOutputFormat2输出
		val conn = ConnectionFactory.createConnection(conf)
		val htableName = TableName.valueOf(tableName)
		val table: Table = conn.getTable(htableName)
		HFileOutputFormat2.configureIncrementalLoad(
			Job.getInstance(conf), //
			table, //
			conn.getRegionLocator(htableName)//
		)
		
		// TODO: 3. 保存数据为HFile文件
		keyValuesRDD.saveAsNewAPIHadoopFile(
			outputDir, //
			classOf[ImmutableBytesWritable], //
			classOf[KeyValue], //
			classOf[HFileOutputFormat2], //
			conf //
		)
		
		// TODO:4. 将输出HFile加载到HBase表中
		val load = new LoadIncrementalHFiles(conf)
		load.doBulkLoad(outputPath, conn.getAdmin, table, conn.getRegionLocator(htableName))
		
		// 应用结束,关闭资源
		sc.stop()
	}
	
	/**
	 * 依据不同表的数据文件,提取对应数据,封装到KeyValue对象中
	 */
	def getLineToData(line: String, family: String,
	                  fieldNames: TreeMap[String, Int]): List[(ImmutableBytesWritable, KeyValue)] = {
		val length = fieldNames.size
		// 分割字符串
		val fieldValues: Array[String] = line.split("\\t", -1)
		if(null == fieldValues || fieldValues.length != length) return Nil
		
		// 获取id,构建RowKey
		val id: String = fieldValues(0)
		val rowKey = Bytes.toBytes(id)
		val ibw: ImmutableBytesWritable = new ImmutableBytesWritable(rowKey)
		
		// 列簇
		val columnFamily: Array[Byte] = Bytes.toBytes(family)
		
		// 构建KeyValue对象
		fieldNames.toList.map{ case (fieldName, fieldIndex) =>
			// KeyValue实例对象
			val keyValue = new KeyValue(
				rowKey, //
				columnFamily, //
				Bytes.toBytes(fieldName), //
				Bytes.toBytes(fieldValues(fieldIndex)) //
			)
			// 返回
			(ibw, keyValue)
		}
	}
	
}

2)、将HBase数据库中不同表的字段信息封装object对象中,代码如下:

package cn.itcast.tags.etl.hfile

import scala.collection.immutable.TreeMap

/**
  * HBase 中各个表的字段名称,存储在TreeMap中
  */
object TableFieldNames{

	// TODO: 使用TreeMap为qualifier做字典序排序

	// a. 行为日志数据表的字段
	val LOG_FIELD_NAMES: TreeMap[String, Int] = TreeMap(
		("id", 0),
		("log_id", 1),
		("remote_ip", 2),
		("site_global_ticket", 3),
		("site_global_session", 4),
		("global_user_id", 5),
		("cookie_text", 6),
		("user_agent", 7),
		("ref_url", 8),
		("loc_url", 9),
		("log_time", 10)
	)

	// b. 商品信息数据表的字段
	val GOODS_FIELD_NAMES: TreeMap[String, Int] = TreeMap(
		("id", 0),
		("siteid", 1),
		("istest", 2),
		("hasread", 3),
		("supportonedaylimit", 4),
		("orderid", 5),
		("cordersn", 6),
		("isbook", 7),
		("cpaymentstatus", 8),
		("cpaytime", 9),
		("producttype", 10),
		("productid", 11),
		("productname", 12),
		("sku", 13),
		("price", 14),
		("number", 15),
		("lockednumber", 16),
		("unlockednumber", 17),
		("productamount", 18),
		("balanceamount", 19),
		("couponamount", 20),
		("esamount", 21),
		("giftcardnumberid", 22),
		("usedgiftcardamount", 23),
		("couponlogid", 24),
		("activityprice", 25),
		("activityid", 26),
		("cateid", 27),
		("brandid", 28),
		("netpointid", 29),
		("shippingfee", 30),
		("settlementstatus", 31),
		("receiptorrejecttime", 32),
		("iswmssku", 33),
		("scode", 34),
		("tscode", 35),
		("tsshippingtime", 36),
		("status", 37),
		("productsn", 38),
		("invoicenumber", 39),
		("expressname", 40),
		("invoiceexpressnumber", 41),
		("postman", 42),
		("postmanphone", 43),
		("isnotice", 44),
		("noticetype", 45),
		("noticeremark", 46),
		("noticetime", 47),
		("shippingtime", 48),
		("lessordersn", 49),
		("waitgetlesshippinginfo", 50),
		("getlesshippingcount", 51),
		("outping", 52),
		("lessshiptime", 53),
		("closetime", 54),
		("isreceipt", 55),
		("ismakereceipt", 56),
		("receiptnum", 57),
		("receiptaddtime", 58),
		("makereceipttype", 59),
		("shippingmode", 60),
		("lasttimeforshippingmode", 61),
		("lasteditorforshippingmode", 62),
		("systemremark", 63),
		("tongshuaiworkid", 64),
		("orderpromotionid", 65),
		("orderpromotionamount", 66),
		("externalsalesettingid", 67),
		("recommendationid", 68),
		("hassendalertnum", 69),
		("isnolimitstockproduct", 70),
		("hpregisterdate", 71),
		("hpfaildate", 72),
		("hpfinishdate", 73),
		("hpreservationdate", 74),
		("shippingopporunity", 75),
		("istimeoutfree", 76),
		("itemshareamount", 77),
		("lessshiptintime", 78),
		("lessshiptouttime", 79),
		("cbsseccode", 80),
		("points", 81),
		("modified", 82),
		("splitflag", 83),
		("splitrelatecordersn", 84),
		("channelid", 85),
		("activityid2", 86),
		("pdorderstatus", 87),
		("omsordersn", 88),
		("couponcode", 89),
		("couponcodevalue", 90),
		("storeid", 91),
		("storetype", 92),
		("stocktype", 93),
		("o2otype", 94),
		("brokeragetype", 95),
		("ogcolor", 96)
	)


	// c. 用户信息数据表的字段
	val USER_FIELD_NAMES: TreeMap[String, Int] = TreeMap(
		("id", 0),
		("siteid", 1),
		("avatarimagefileid", 2),
		("email", 3),
		("username", 4),
		("password", 5),
		("salt", 6),
		("registertime", 7),
		("lastlogintime", 8),
		("lastloginip", 9),
		("memberrankid", 10),
		("bigcustomerid", 11),
		("lastaddressid", 12),
		("lastpaymentcode", 13),
		("gender", 14),
		("birthday", 15),
		("qq", 16),
		("job", 17),
		("mobile", 18),
		("politicalface", 19),
		("nationality", 20),
		("validatecode", 21),
		("pwderrcount", 22),
		("source", 23),
		("marriage", 24),
		("money", 25),
		("moneypwd", 26),
		("isemailverify", 27),
		("issmsverify", 28),
		("smsverifycode", 29),
		("emailverifycode", 30),
		("verifysendcoupon", 31),
		("canreceiveemail", 32),
		("modified", 33),
		("channelid", 34),
		("grade_id", 35),
		("nick_name", 36),
		("is_blacklist", 37)
	)

	// d. 订单数据表的字段
	val ORDER_FIELD_NAMES: TreeMap[String, Int] = TreeMap(
		("id", 0),
		("siteid", 1),
		("istest", 2),
		("hassync", 3),
		("isbackend", 4),
		("isbook", 5),
		("iscod", 6),
		("notautoconfirm", 7),
		("ispackage", 8),
		("packageid", 9),
		("ordersn", 10),
		("relationordersn", 11),
		("memberid", 12),
		("predictid", 13),
		("memberemail", 14),
		("addtime", 15),
		("synctime", 16),
		("orderstatus", 17),
		("paytime", 18),
		("paymentstatus", 19),
		("receiptconsignee", 20),
		("receiptaddress", 21),
		("receiptzipcode", 22),
		("receiptmobile", 23),
		("productamount", 24),
		("orderamount", 25),
		("paidbalance", 26),
		("giftcardamount", 27),
		("paidamount", 28),
		("shippingamount", 29),
		("totalesamount", 30),
		("usedcustomerbalanceamount", 31),
		("customerid", 32),
		("bestshippingtime", 33),
		("paymentcode", 34),
		("paybankcode", 35),
		("paymentname", 36),
		("consignee", 37),
		("originregionname", 38),
		("originaddress", 39),
		("province", 40),
		("city", 41),
		("region", 42),
		("street", 43),
		("markbuilding", 44),
		("poiid", 45),
		("poiname", 46),
		("regionname", 47),
		("address", 48),
		("zipcode", 49),
		("mobile", 50),
		("phone", 51),
		("receiptinfo", 52),
		("delayshiptime", 53),
		("remark", 54),
		("bankcode", 55),
		("agent", 56),
		("confirmtime", 57),
		("firstconfirmtime", 58),
		("firstconfirmperson", 59),
		("finishtime", 60),
		("tradesn", 61),
		("signcode", 62),
		("source", 63),
		("sourceordersn", 64),
		("onedaylimit", 65),
		("logisticsmanner", 66),
		("aftersalemanner", 67),
		("personmanner", 68),
		("visitremark", 69),
		("visittime", 70),
		("visitperson", 71),
		("sellpeople", 72),
		("sellpeoplemanner", 73),
		("ordertype", 74),
		("hasreadtaobaoordercomment", 75),
		("memberinvoiceid", 76),
		("taobaogroupid", 77),
		("tradetype", 78),
		("steptradestatus", 79),
		("steppaidfee", 80),
		("depositamount", 81),
		("balanceamount", 82),
		("autocanceldays", 83),
		("isnolimitstockorder", 84),
		("ccborderreceivedlogid", 85),
		("ip", 86),
		("isgiftcardorder", 87),
		("giftcarddownloadpassword", 88),
		("giftcardfindmobile", 89),
		("autoconfirmnum", 90),
		("codconfirmperson", 91),
		("codconfirmtime", 92),
		("codconfirmremark", 93),
		("codconfirmstate", 94),
		("paymentnoticeurl", 95),
		("addresslon", 96),
		("addresslat", 97),
		("smconfirmstatus", 98),
		("smconfirmtime", 99),
		("smmanualtime", 100),
		("smmanualremark", 101),
		("istogether", 102),
		("isnotconfirm", 103),
		("tailpaytime", 104),
		("points", 105),
		("modified", 106),
		("channelid", 107),
		("isproducedaily", 108),
		("couponcode", 109),
		("couponcodevalue", 110),
		("ckcode", 111)
	)

}

运行此应用程序时,需传递相关参数:

// 应用执行时传递5个参数:数据类型、HBase表名称、表列簇、输入路径及输出路径
/*
args = Array("1", "tbl_tag_logs", "detail", "/user/hive/warehouse/tags_dat.db/tbl_logs", "/datas/output_hfile/tbl_tag_logs")
args = Array("2", "tbl_tag_goods", "detail", "/user/hive/warehouse/tags_dat.db/tbl_goods", "/datas/output_hfile/tbl_tag_goods")
args = Array("3", "tbl_tag_users", "detail", "/user/hive/warehouse/tags_dat.db/tbl_users", "/datas/output_hfile/tbl_tag_users")
args = Array("4", "tbl_tag_orders", "detail", "/user/hive/warehouse/tags_dat.db/tbl_orders", "/datas/output_hfile/tbl_tag_orders")
*/

// IDEA中测试执行,传递应用程序参数
// a. 行为日志
1 tbl_tag_logs detail /user/hive/warehouse/tags_dat.db/tbl_logs /datas/output_hfile/tbl_logs

// b. 商品信息
2 tbl_tag_goods detail /user/hive/warehouse/tags_dat.db/tbl_goods /datas/output_hfile/tbl_goods

// c. 用户信息
3 tbl_tag_users detail /user/hive/warehouse/tags_dat.db/tbl_users /datas/output_hfile/tbl_users

// d. 订单数据
4 tbl_tag_orders detail /user/hive/warehouse/tags_dat.db/tbl_orders /datas/output_hfile/tbl_orders

Spark Bulkload常见错误解析:

1)、调用 saveAsNewAPIHadoopFile 方法抛出 “Added a key not lexically larger than previous” 的异常是因为排序问题导致
2)、ImmutableBytesWritable 无法序列化的异常。通过如下 java 代码设置即可(读者可以用 scala 实现) sparkConf.set(“spark.serializer”,“org.apache.spark.serializer.KryoSerializer”);sparkConf.registerKryoClasses(newClass[]{org.apache.hadoop.hbase.io.ImmutableBytesWritable.class});
3)、比较器无法序列化的异常。让比较器实现 Serializable 接口即可。
4)、driver 中初始化的对象 于在 RDD 的 action 或者 transformation 中无法获取的异常,需要做 broadcast。
5)、可以通过动态设置 executor 个数来优化整体任务执行效率。

3、用户画像数据
基于电商数据,构建各个用户画像信息,开发项目时,设计的数据库和表(简易版本)如下:

profile_tags:
tbl_basic_tag:标签表
tbl_model:模型表

数据库:profile_tags

mysql> show create database profile_tags ;
±---------±------------------------------------------------------------------+
| Database | Create Database |
±---------±------------------------------------------------------------------+
| profile_tags | CREATE DATABASE profile_tags /*!40100 DEFAULT CHARACTER SET utf8 */ |
±---------±------------------------------------------------------------------+

标签表:tbl_basic_tag

CREATE TABLE tbl_basic_tag (
id bigint(20) NOT NULL AUTO_INCREMENT,
name varchar(50) DEFAULT NULL COMMENT ‘标签名称’,
industry varchar(30) DEFAULT NULL COMMENT ‘行业、子行业、业务类型、标签、属性’,
rule varchar(300) DEFAULT NULL COMMENT ‘标签规则’,
business varchar(100) DEFAULT NULL COMMENT ‘业务描述’,
level int(11) DEFAULT NULL COMMENT ‘标签等级’,
pid bigint(20) DEFAULT NULL COMMENT ‘父标签ID’,
ctime datetime DEFAULT NULL COMMENT ‘创建时间’,
utime datetime DEFAULT NULL COMMENT ‘修改时间’,
state int(11) DEFAULT NULL COMMENT ‘状态:1申请中、2开发中、3开发完成、4已上线、5已下线、6已禁用’,
remark varchar(100) DEFAULT NULL COMMENT ‘备注’,
PRIMARY KEY (id)
) ENGINE=InnoDB AUTO_INCREMENT=384 DEFAULT CHARSET=utf8 COMMENT=‘基础标签表’;

模型表:tbl_model

CREATE TABLE tbl_model (
id bigint(20) NOT NULL AUTO_INCREMENT,
tag_id bigint(20) DEFAULT NULL,
model_name varchar(200) DEFAULT NULL,
model_main varchar(200) DEFAULT NULL,
model_path varchar(200) DEFAULT NULL,
sche_time varchar(200) DEFAULT NULL,
ctime datetime DEFAULT NULL,
utime datetime DEFAULT NULL,
state int(11) DEFAULT NULL,
args varchar(100) DEFAULT NULL,
PRIMARY KEY (id)
) ENGINE=InnoDB AUTO_INCREMENT=35 DEFAULT CHARSET=utf8;

构建用户表:"tbl_profile",存储在HBase表中

hbase(main):025:0> desc “tbl_profile”
Table tbl_profile is ENABLED
tbl_profile, {TABLE_ATTRIBUTES => {coprocessor$1 => ‘hdfs://bd001:8020/apps/hbase/coprocessor/hbase-coprocessor-ext.jar|cn.itcast.tag.model.tools.hbase.HBaseSolrIndexCoprocessor|1001’}
COLUMN FAMILIES DESCRIPTION
{NAME => ‘item’, BLOOMFILTER => ‘ROW’, VERSIONS => ‘1’, IN_MEMORY => ‘false’, KEEP_DELETED_CELLS => ‘FALSE’, DATA_BLOCK_ENCODING => ‘NONE’, TTL => ‘FOREVER’, COMPRESSION => ‘NONE’, MIN_VERSIONS => ‘0’,BLOCKCACHE => ‘true’, BLOCKSIZE => ‘65536’, REPLICATION_SCOPE => ‘0’}
{NAME => ‘user’, BLOOMFILTER => ‘ROW’, VERSIONS => ‘1’, IN_MEMORY => ‘false’, KEEP_DELETED_CELLS => ‘FALSE’, DATA_BLOCK_ENCODING => ‘NONE’, TTL => ‘FOREVER’, COMPRESSION => ‘NONE’, MIN_VERSIONS => ‘0’,BLOCKCACHE => ‘true’, BLOCKSIZE => ‘65536’, REPLICATION_SCOPE => ‘0’}

样例数据:

hbase(main):023:0> scan “tbl_profile”, {LIMIT => 4 }
ROW COLUMN+CELL
1 column=user:tagIds, timestamp=1566812409742, value=1,2,3
1 column=user:userId, timestamp=1566812409742, value=1
2 column=user:tagIds, timestamp=1568098888426, value=235,241
2 column=user:userId, timestamp=1568098888426, value=2
10 column=user:tagIds, timestamp=1568098888426, value=234,243
10 column=user:userId, timestamp=1568098888426, value=10
100 column=user:tagIds, timestamp=1568098888428, value=241
100 column=user:userId, timestamp=1568098888428, value=100

相关标签: 用户画像