欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

推荐

程序员文章站 2022-07-13 08:06:58
...

import os

配置spark driver和pyspark运行时,所使用的python解释器路径

由于miniconda3中默认存在一个python3.7的版本,jupyter默认也使用的是这个版本,故:设置pyspark的解释器为miniconda3的解释器

PYSPARK_PYTHON = “/root/miniconda3/bin/python3”
JAVA_HOME=’/root/bigdata/jdk1.8.0_181’

当存在多个版本时,不指定很可能会导致出错

os.environ[“PYSPARK_PYTHON”] = PYSPARK_PYTHON
os.environ[“PYSPARK_DRIVER_PYTHON”] = PYSPARK_PYTHON
os.environ[‘JAVA_HOME’]=JAVA_HOME

spark配置信息

from pyspark import SparkConf
from pyspark.sql import SparkSession

SPARK_APP_NAME = “preprocessingBehaviorLog”
SPARK_URL = “spark://192.168.199.126:7077”

conf = SparkConf() # 创建spark config对象
config = (
(“spark.app.name”, SPARK_APP_NAME), # 设置启动的spark的app名称,没有提供,将随机产生一个名称
(“spark.executor.memory”, “2g”), # 设置该app启动时占用的内存用量,默认1g
(“spark.master”, SPARK_URL), # spark master的地址
(“spark.executor.cores”, “2”), # 设置spark executor使用的CPU核心数
# 以下三项配置,可以控制执行器数量

(“spark.dynamicAllocation.enabled”, True),

(“spark.dynamicAllocation.initialExecutors”, 1), # 1个执行器

(“spark.shuffle.service.enabled”, True)

(‘spark.sql.pivotMaxValues’, ‘99999’), # 当需要pivot DF,且值很多时,需要修改,默认是10000

)

查看更详细配置及说明:https://spark.apache.org/docs/latest/configuration.html

conf.setAll(config)

利用config对象,创建spark session

spark = SparkSession.builder.config(conf=conf).getOrCreate()

从hdfs中加载csv文件为DataFrame

从hdfs加载CSV文件为DataFrame

df = spark.read.csv(“file:///root/jupyter_code/behavior_log_less.csv”, header=True)
df.show() # 查看dataframe,默认显示前20条

大致查看一下数据类型

df.printSchema() # 打印当前dataframe的结构

从hdfs加载数据为dataframe,并设置结构
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType

构建结构对象

schema = StructType([
StructField(“userId”, IntegerType()),
StructField(“timestamp”, LongType()),
StructField(“btag”, StringType()),
StructField(“cateId”, IntegerType()),
StructField(“brandId”, IntegerType())
])

从hdfs加载数据为dataframe,并设置结构

behavior_log_df = spark.read.csv(“file:///root/jupyter_code/behavior_log_less.csv”, header=True, schema=schema)
behavior_log_df.show()
behavior_log_df.count()

分析数据集字段的类型和格式
查看是否有空值
查看每列数据的类型
查看每列数据的类别情况
print(“查看userId的数据情况:”, behavior_log_df.groupBy(“userId”).count().count())

约113w用户

#注意:behavior_log_df.groupBy(“userId”).count() 返回的是一个dataframe,这里的count计算的是每一个分组的个数,但当前还没有进行计算

当调用df.count()时才开始进行计算,这里的count计算的是dataframe的条目数,也就是共有多少个分组

print(“查看btag的数据情况:”, behavior_log_df.groupBy(“btag”).count().collect()) # collect会把计算结果全部加载到内存,谨慎使用

只有四种类型数据:pv、fav、cart、buy

这里由于类型只有四个,所以直接使用collect,把数据全部加载出来

pivot透视操作,把某列里的字段值转换成行并进行聚合运算(pyspark.sql.GroupedData.pivot)
如果透视的字段中的不同属性值超过10000个,则需要设置spark.sql.pivotMaxValues,否则计算过程中会出现错误。文档介绍。

统计每个用户对各类商品的pv、fav、cart、buy数量

cate_count_df = behavior_log_df.groupBy(behavior_log_df.userId, behavior_log_df.cateId).pivot(“btag”,[“pv”,“fav”,“cart”,“buy”]).count()
cate_count_df.printSchema() # 此时还没有开始计算

统计每个用户对各个品牌的pv、fav、cart、buy数量并保存结果

统计每个用户对各个品牌的pv、fav、cart、buy数量

brand_count_df = behavior_log_df.groupBy(behavior_log_df.userId, behavior_log_df.brandId).pivot(“btag”,[“pv”,“fav”,“cart”,“buy”]).count()

brand_count_df.show() # 同上

113w * 46w

由于运算时间比较长,所以这里先将结果存储起来,供后续其他操作使用

写入数据时才开始计算

cate_count_df.write.csv(“file:///root/jupyter_code1/middle_result/preprocessing_dataset/cate_count.csv”, header=True)
brand_count_df.write.csv("file:///root/jupyter_code1/middle_result/preprocessing_dataset/brand_cou

7.2 根据用户对类目偏好打分训练ALS模型
根据您统计的次数 + 打分规则 ==> 偏好打分数据集 ==> ALS模型

spark ml的模型训练是基于内存的,如果数据过大,内存空间小,迭代次数过多的话,可能会造成内存溢出,报错

设置Checkpoint的话,会把所有数据落盘,这样如果异常退出,下次重启后,可以接着上次的训练节点继续运行

但该方法其实指标不治本,因为无法防止内存溢出,所以还是会报错

如果数据量大,应考虑的是增加内存、或限制迭代次数和训练数据量级等

spark.sparkContext.setCheckpointDir(“hdfs://node-teach1:8020/checkPoint/”)
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, FloatType

构建结构对象

schema = StructType([
StructField(“userId”, IntegerType()),
StructField(“cateId”, IntegerType()),
StructField(“pv”, IntegerType()),
StructField(“fav”, IntegerType()),
StructField(“cart”, IntegerType()),
StructField(“buy”, IntegerType())
])

从hdfs加载CSV文件

cate_count_df = spark.read.csv(“file:///root/jupyter_code1/middle_result/preprocessing_dataset/cate_count.csv”, header=True, schema=schema)
cate_count_df.printSchema()
cate_count_df.first() # 第一行数据

处理每一行数据:r表示row对象
def process_row®:
# 处理每一行数据:r表示row对象

# 偏好评分规则:
#     m: 用户对应的行为次数
#     该偏好权重比例,次数上限仅供参考,具体数值应根据产品业务场景权衡
#     pv: if m<=20: score=0.2*m; else score=4
#     fav: if m<=20: score=0.4*m; else score=8
#     cart: if m<=20: score=0.6*m; else score=12
#     buy: if m<=20: score=1*m; else score=20

# 注意这里要全部设为浮点数,spark运算时对类型比较敏感,要保持数据类型都一致
pv_count = r.pv if r.pv else 0.0
fav_count = r.fav if r.fav else 0.0
cart_count = r.cart if r.cart else 0.0
buy_count = r.buy if r.buy else 0.0

pv_score = 0.2*pv_count if pv_count<=20 else 4.0
fav_score = 0.4*fav_count if fav_count<=20 else 8.0
cart_score = 0.6*cart_count if cart_count<=20 else 12.0
buy_score = 1.0*buy_count if buy_count<=20 else 20.0

rating = pv_score + fav_score + cart_score + buy_score
# 返回用户ID、分类ID、用户对分类的偏好打分
return r.userId, r.cateId, rating

返回一个PythonRDD类型

返回一个PythonRDD类型,此时还没开始计算

cate_count_df.rdd.map(process_row).toDF([“userId”, “cateId”, “rating”])

用户对商品类别的打分数据

用户对商品类别的打分数据

map返回的结果是rdd类型,需要调用toDF方法转换为Dataframe

cate_rating_df = cate_count_df.rdd.map(process_row).toDF([“userId”, “cateId”, “rating”])

注意:toDF不是每个rdd都有的方法,仅局限于此处的rdd

可通过该方法获得 user-cate-matrix

但由于cateId字段过多,这里运算量比很大,机器内存要求很高才能执行,否则无法完成任务

请谨慎使用

但好在我们训练ALS模型时,不需要转换为user-cate-matrix,所以这里可以不用运行

cate_rating_df.groupBy(“userId”).povit(“cateId”).min(“rating”)

用户对类别的偏好打分数据

cate_rating_df

使用pyspark中的ALS矩阵分解方法实现CF评分预测

文档地址:https://spark.apache.org/docs/2.2.2/api/python/pyspark.ml.html?highlight=vectors#module-pyspark.ml.recommendation

from pyspark.ml.recommendation import ALS # ml:dataframe, mllib:rdd

利用打分数据,训练ALS模型

checkpointInterval:每迭代n次,就会做一次cache,为了处理内存不足造成的问题。

als = ALS(userCol=‘userId’, itemCol=‘cateId’, ratingCol=‘rating’, checkpointInterval=2)

此处训练时间较长

model = als.fit(cate_rating_df)

模型训练好后,调用方法进行使用,具体API查看

model.recommendForAllUsers(N) 给所有用户推荐TOP-N个物品

ret = model.recommendForAllUsers(3)

由于是给所有用户进行推荐,此处运算时间也较长

ret.show()

推荐结果存放在recommendations列中,

ret.select(“recommendations”).show()

model.recommendForUserSubset 给部分用户推荐TOP-N个物品

注意:recommendForUserSubset API,2.2.2版本中无法使用

dataset = spark.createDataFrame([[1],[2],[3]])
dataset = dataset.withColumnRenamed("_1", “userId”)
ret = model.recommendForUserSubset(dataset, 3)

只给部分用推荐,运算时间短

ret.show()
ret.collect() # 注意: collect会将所有数据加载到内存,慎用

transform中提供userId和cateId可以对打分进行预测,利用打分结果排序后

将模型进行存储

model.save(“file:///root/jupyter_code1/tmp_models/userCateRatingALSModel.obj”)

测试存储的模型

from pyspark.ml.recommendation import ALSModel

从hdfs加载之前存储的模型

als_model = ALSModel.load(“file:///root/jupyter_code1/tmp_models/userCateRatingALSModel.obj”)

model.recommendForAllUsers(N) 给用户推荐TOP-N个物品

result = als_model.recommendForAllUsers(3)
result.show()