面试分享(spark 实现每天访问的记录数和用户数)
最近收到一家公司面试题感悟:
要求:用户访问日志文件有两列,分别为日期和用户ID :(date,user_id) 使用spark 统计每天访问记录数和用户数。
1、每天访问记录数例子完成
结果
(2017-01-03,4)
(2017-01-02,3)
(2017-01-01,3)
每天访问的用户数(当时没有太理解)估计面试官想问新增用户数
具体实现结果为:
实现思想 是一个倒排的思想
1、找到 每个用户对应 日期 即 rdd1.map(kv=>(kv._2,kv._1))
2、对每个用户groupyby
3、找到 每个用户的最早访问时间明细(细节是每个用户时间后置1) 即rdd3.map(kv => (kv._2.min, 1))
4、用countBykey 对时间出现次数进行统计合并。
思路参考:https://blog.csdn.net/dkl12/article/details/80256688
结果为
(2017-01-03,3)
(2017-01-02,1)
(2017-01-01,3)
具体代码为:
import org.apache.spark.sql.SparkSession
object Test {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("NewUVDemo").master("local").getOrCreate()
val rdd1 = spark.sparkContext.parallelize(
Array(
("2017-01-01", "a"), ("2017-01-01", "b"), ("2017-01-01", "c"),
("2017-01-02", "a"), ("2017-01-02", "b"), ("2017-01-02", "d"),
("2017-01-03", "b"), ("2017-01-03", "e"), ("2017-01-03", "f"),("2017-01-03", "g")))
//倒排
// val rdd2 = rdd1.groupByKey().map(t=>(t._1,t._2.size))
// rdd2.foreach(println)
val rdd2 = rdd1.map(kv => (kv._2, kv._1))
//倒排后的key分组
val rdd3 = rdd2.groupByKey()
//取最小时间
val rdd4 = rdd3.map(kv => (kv._2.min, 1))
rdd4.countByKey().foreach(println)
}
}
注意:失误点当时很久没用groupyBykey 不熟练
reduceByKey和groupByKey区别与用法
(1)reduceByKey(func, numPartitions=None)
reduceByKey用于对每个key对应的多个value进行merge操作,最重要的是它能够在本地先进行merge操作,并且merge操作可以通过函数自定义。一般的化 如 reduceBykey(+)
(2)groupByKey(numPartitions=None)
也就是,groupByKey也是对每个key进行操作,**但只生成一个sequence。**需要特别注意“Note”中的话,它告诉我们:如果需要对sequence进行aggregation操作(注意,groupByKey本身不能自定义操作函数),那么,选择reduceByKey/aggregateByKey更好。这是因为groupByKey不能自定义函数,我们需要先用groupByKey生成RDD,然后才能对此RDD通过map进行自定义函数操作。
两种方法应用的区别为:
val words = Array(“one”, “two”, “two”, “three”, “three”, “three”)
val wordPairsRDD = sc.parallelize(words).map(word => (word, 1))
val wordCountsWithReduce = wordPairsRDD.reduceByKey(_ + _)
val wordCountsWithGroup = wordPairsRDD.groupByKey().map(t => (t._1, t._2.sum))