欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

求助KMeans算法关于转换矩阵Vectors问题

程序员文章站 2024-03-24 14:17:22
...

遇到的问题,KMeans算法中,通过map算子,两种处理方法,结果不一样,第一种莫名的增加了很多列?

1.数据内容如下 test1.csv 

求助KMeans算法关于转换矩阵Vectors问题

2.完整代码如下

val rawData = sc.textFile("E:\\test1.csv")
    println("----11122221-----")
    rawData.foreach(println )
    val labelsAndData = rawData.map{ line =>

      val label = line.split(',').toString
      println("lable:...."+label)
      val vector = Vectors.dense(label.map(_.toDouble).toArray)
      println("vector11111:......"+vector)
      (label,vector)
      /**
        * 或者这样写
        */
      val label2 = line.split(',')
      val aa2 = label2.map(_.toDouble)
      val vector2 = Vectors.dense(label2.map(_.toDouble))
      println("vector22222:...."+vector2)
      (label2,vector2)
    }


    labelsAndData.foreach(println )
    val data = labelsAndData.values
    println("---------******---------")
    println("data:"+data)
    data.foreach(println )

    val dataAsArray = data.map(_.toArray)
    println("dataAsArray:"+dataAsArray)
    dataAsArray.foreach(println )
    val sums = dataAsArray.reduce(
      (a,b) => a.zip(b).map( t => t._1 + t._2)
    )
    for(ele <- sums) println(ele)
    println("sums 数量:"+sums.length)

输出结果如下:

17/06/01 13:29:48 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:60540 (size: 9.8 KB, free: 1132.5 MB)
17/06/01 13:29:48 INFO SparkContext: Created broadcast 0 from textFile at zip.scala:72
----11122221-----
17/06/01 13:29:48 INFO FileInputFormat: Total input paths to process : 1
17/06/01 13:29:48 INFO SparkContext: Starting job: foreach at zip.scala:74
17/06/01 13:29:48 INFO DAGScheduler: Got job 0 (foreach at zip.scala:74) with 1 output partitions
17/06/01 13:29:48 INFO DAGScheduler: Final stage: ResultStage 0 (foreach at zip.scala:74)
17/06/01 13:29:48 INFO DAGScheduler: Parents of final stage: List()
17/06/01 13:29:48 INFO DAGScheduler: Missing parents: List()
17/06/01 13:29:48 INFO DAGScheduler: Submitting ResultStage 0 (E:\test1.csv MapPartitionsRDD[1] at textFile at zip.scala:72), which has no missing parents
17/06/01 13:29:48 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 3.0 KB, free 120.5 KB)
17/06/01 13:29:48 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 1799.0 B, free 122.2 KB)
17/06/01 13:29:48 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:60540 (size: 1799.0 B, free: 1132.5 MB)
17/06/01 13:29:48 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006
17/06/01 13:29:48 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (E:\test1.csv MapPartitionsRDD[1] at textFile at zip.scala:72)
17/06/01 13:29:48 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
17/06/01 13:29:48 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,PROCESS_LOCAL, 2120 bytes)
17/06/01 13:29:48 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
17/06/01 13:29:48 INFO HadoopRDD: Input split: file:/E:/test1.csv:0+21
17/06/01 13:29:48 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
17/06/01 13:29:48 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
17/06/01 13:29:48 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
17/06/01 13:29:48 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
17/06/01 13:29:48 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
1,2,3
4,5,6
7,8,9
17/06/01 13:29:48 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2044 bytes result sent to driver
17/06/01 13:29:48 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 60 ms on localhost (1/1)
17/06/01 13:29:48 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
17/06/01 13:29:48 INFO DAGScheduler: ResultStage 0 (foreach at zip.scala:74) finished in 0.070 s
17/06/01 13:29:48 INFO DAGScheduler: Job 0 finished: foreach at zip.scala:74, took 0.136096 s
17/06/01 13:29:48 INFO SparkContext: Starting job: foreach at zip.scala:94
17/06/01 13:29:48 INFO DAGScheduler: Got job 1 (foreach at zip.scala:94) with 1 output partitions
17/06/01 13:29:48 INFO DAGScheduler: Final stage: ResultStage 1 (foreach at zip.scala:94)
17/06/01 13:29:48 INFO DAGScheduler: Parents of final stage: List()
17/06/01 13:29:48 INFO DAGScheduler: Missing parents: List()
17/06/01 13:29:48 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[2] at map at zip.scala:75), which has no missing parents
17/06/01 13:29:48 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 3.1 KB, free 125.3 KB)
17/06/01 13:29:48 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 1865.0 B, free 127.1 KB)
17/06/01 13:29:48 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:60540 (size: 1865.0 B, free: 1132.5 MB)
17/06/01 13:29:48 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1006
17/06/01 13:29:48 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[2] at map at zip.scala:75)
17/06/01 13:29:48 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
17/06/01 13:29:48 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, partition 0,PROCESS_LOCAL, 2120 bytes)
lable:....[Ljava.lang.String;@3d38491e
17/06/01 13:29:48 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
17/06/01 13:29:48 INFO HadoopRDD: Input split: file:/E:/test1.csv:0+21
vector11111:......[91.0,76.0,106.0,97.0,118.0,97.0,46.0,108.0,97.0,110.0,103.0,46.0,83.0,116.0,114.0,105.0,110.0,103.0,59.0,64.0,51.0,100.0,51.0,56.0,52.0,57.0,49.0,101.0]
vector22222:....[1.0,2.0,3.0]
([Ljava.lang.String;@751013ad,[1.0,2.0,3.0])
lable:....[Ljava.lang.String;@7607112c
vector11111:......[91.0,76.0,106.0,97.0,118.0,97.0,46.0,108.0,97.0,110.0,103.0,46.0,83.0,116.0,114.0,105.0,110.0,103.0,59.0,64.0,55.0,54.0,48.0,55.0,49.0,49.0,50.0,99.0]
vector22222:....[4.0,5.0,6.0]
([Ljava.lang.String;@5884de0f,[4.0,5.0,6.0])
lable:....[Ljava.lang.String;@385daf7b
vector11111:......[91.0,76.0,106.0,97.0,118.0,97.0,46.0,108.0,97.0,110.0,103.0,46.0,83.0,116.0,114.0,105.0,110.0,103.0,59.0,64.0,51.0,56.0,53.0,100.0,97.0,102.0,55.0,98.0]
vector22222:....[7.0,8.0,9.0]
([Ljava.lang.String;@4da54ea6,[7.0,8.0,9.0])
17/06/01 13:29:48 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 2044 bytes result sent to driver
17/06/01 13:29:48 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 12 ms on localhost (1/1)
17/06/01 13:29:48 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
17/06/01 13:29:48 INFO DAGScheduler: ResultStage 1 (foreach at zip.scala:94) finished in 0.013 s
17/06/01 13:29:48 INFO DAGScheduler: Job 1 finished: foreach at zip.scala:94, took 0.033489 s
---------******---------
data:MapPartitionsRDD[3] at values at zip.scala:95
17/06/01 13:29:48 INFO SparkContext: Starting job: foreach at zip.scala:98
17/06/01 13:29:48 INFO DAGScheduler: Got job 2 (foreach at zip.scala:98) with 1 output partitions
17/06/01 13:29:48 INFO DAGScheduler: Final stage: ResultStage 2 (foreach at zip.scala:98)
17/06/01 13:29:48 INFO DAGScheduler: Parents of final stage: List()
17/06/01 13:29:48 INFO DAGScheduler: Missing parents: List()
17/06/01 13:29:48 INFO DAGScheduler: Submitting ResultStage 2 (MapPartitionsRDD[3] at values at zip.scala:95), which has no missing parents
17/06/01 13:29:48 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 3.3 KB, free 130.4 KB)
lable:....[Ljava.lang.String;@4f707786
vector11111:......[91.0,76.0,106.0,97.0,118.0,97.0,46.0,108.0,97.0,110.0,103.0,46.0,83.0,116.0,114.0,105.0,110.0,103.0,59.0,64.0,52.0,102.0,55.0,48.0,55.0,55.0,56.0,54.0]
vector22222:....[1.0,2.0,3.0]
[1.0,2.0,3.0]
lable:....[Ljava.lang.String;@57978dea
vector11111:......[91.0,76.0,106.0,97.0,118.0,97.0,46.0,108.0,97.0,110.0,103.0,46.0,83.0,116.0,114.0,105.0,110.0,103.0,59.0,64.0,53.0,55.0,57.0,55.0,56.0,100.0,101.0,97.0]
vector22222:....[4.0,5.0,6.0]
[4.0,5.0,6.0]
lable:....[Ljava.lang.String;@20028443
vector11111:......[91.0,76.0,106.0,97.0,118.0,97.0,46.0,108.0,97.0,110.0,103.0,46.0,83.0,116.0,114.0,105.0,110.0,103.0,59.0,64.0,50.0,48.0,48.0,50.0,56.0,52.0,52.0,51.0]
vector22222:....[7.0,8.0,9.0]
[7.0,8.0,9.0]
17/06/01 13:29:48 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 1951.0 B, free 132.3 KB)
17/06/01 13:29:48 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on localhost:60540 (size: 1951.0 B, free: 1132.5 MB)
17/06/01 13:29:48 INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:1006
17/06/01 13:29:48 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 2 (MapPartitionsRDD[3] at values at zip.scala:95)
17/06/01 13:29:48 INFO TaskSchedulerImpl: Adding task set 2.0 with 1 tasks
17/06/01 13:29:48 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 2, localhost, partition 0,PROCESS_LOCAL, 2120 bytes)
17/06/01 13:29:48 INFO Executor: Running task 0.0 in stage 2.0 (TID 2)
17/06/01 13:29:48 INFO HadoopRDD: Input split: file:/E:/test1.csv:0+21
17/06/01 13:29:48 INFO Executor: Finished task 0.0 in stage 2.0 (TID 2). 2044 bytes result sent to driver
dataAsArray:MapPartitionsRDD[4] at map at zip.scala:100
17/06/01 13:29:48 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 2) in 21 ms on localhost (1/1)
17/06/01 13:29:48 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 
17/06/01 13:29:48 INFO DAGScheduler: ResultStage 2 (foreach at zip.scala:98) finished in 0.022 s
17/06/01 13:29:48 INFO DAGScheduler: Job 2 finished: foreach at zip.scala:98, took 0.016712 s
17/06/01 13:29:48 INFO SparkContext: Starting job: foreach at zip.scala:102
17/06/01 13:29:48 INFO DAGScheduler: Got job 3 (foreach at zip.scala:102) with 1 output partitions
17/06/01 13:29:48 INFO DAGScheduler: Final stage: ResultStage 3 (foreach at zip.scala:102)
17/06/01 13:29:48 INFO DAGScheduler: Parents of final stage: List()
17/06/01 13:29:48 INFO DAGScheduler: Missing parents: List()
17/06/01 13:29:48 INFO DAGScheduler: Submitting ResultStage 3 (MapPartitionsRDD[4] at map at zip.scala:100), which has no missing parents
17/06/01 13:29:48 INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 3.4 KB, free 135.7 KB)
17/06/01 13:29:48 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 1988.0 B, free 137.7 KB)
17/06/01 13:29:48 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on localhost:60540 (size: 1988.0 B, free: 1132.5 MB)
17/06/01 13:29:48 INFO SparkContext: Created broadcast 4 from broadcast at DAGScheduler.scala:1006
17/06/01 13:29:48 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 3 (MapPartitionsRDD[4] at map at zip.scala:100)
17/06/01 13:29:48 INFO TaskSchedulerImpl: Adding task set 3.0 with 1 tasks
17/06/01 13:29:48 INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID 3, localhost, partition 0,PROCESS_LOCAL, 2120 bytes)
17/06/01 13:29:48 INFO Executor: Running task 0.0 in stage 3.0 (TID 3)
lable:....[Ljava.lang.String;@4f304a06
vector11111:......[91.0,76.0,106.0,97.0,118.0,97.0,46.0,108.0,97.0,110.0,103.0,46.0,83.0,116.0,114.0,105.0,110.0,103.0,59.0,64.0,52.0,102.0,51.0,48.0,52.0,97.0,48.0,54.0]
vector22222:....[1.0,2.0,3.0]
[aaa@qq.com
lable:....[Ljava.lang.String;@d158d30
vector11111:......[91.0,76.0,106.0,97.0,118.0,97.0,46.0,108.0,97.0,110.0,103.0,46.0,83.0,116.0,114.0,105.0,110.0,103.0,59.0,64.0,100.0,49.0,53.0,56.0,100.0,51.0,48.0]
vector22222:....[4.0,5.0,6.0]
[aaa@qq.com
lable:....[Ljava.lang.String;@199c4dc7
vector11111:......[91.0,76.0,106.0,97.0,118.0,97.0,46.0,108.0,97.0,110.0,103.0,46.0,83.0,116.0,114.0,105.0,110.0,103.0,59.0,64.0,49.0,57.0,57.0,99.0,52.0,100.0,99.0,55.0]
vector22222:....[7.0,8.0,9.0]
[aaa@qq.com
17/06/01 13:29:49 INFO HadoopRDD: Input split: file:/E:/test1.csv:0+21
17/06/01 13:29:49 INFO Executor: Finished task 0.0 in stage 3.0 (TID 3). 2044 bytes result sent to driver
17/06/01 13:29:49 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID 3) in 20 ms on localhost (1/1)
17/06/01 13:29:49 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool 
17/06/01 13:29:49 INFO DAGScheduler: ResultStage 3 (foreach at zip.scala:102) finished in 0.020 s
17/06/01 13:29:49 INFO DAGScheduler: Job 3 finished: foreach at zip.scala:102, took 0.031459 s
17/06/01 13:29:49 INFO BlockManagerInfo: Removed broadcast_3_piece0 on localhost:60540 in memory (size: 1951.0 B, free: 1132.5 MB)
17/06/01 13:29:49 INFO SparkContext: Starting job: reduce at zip.scala:103
17/06/01 13:29:49 INFO DAGScheduler: Got job 4 (reduce at zip.scala:103) with 1 output partitions
17/06/01 13:29:49 INFO DAGScheduler: Final stage: ResultStage 4 (reduce at zip.scala:103)
17/06/01 13:29:49 INFO DAGScheduler: Parents of final stage: List()
17/06/01 13:29:49 INFO DAGScheduler: Missing parents: List()
17/06/01 13:29:49 INFO DAGScheduler: Submitting ResultStage 4 (MapPartitionsRDD[4] at map at zip.scala:100), which has no missing parents
17/06/01 13:29:49 INFO MemoryStore: Block broadcast_5 stored as values in memory (estimated size 3.4 KB, free 135.9 KB)
17/06/01 13:29:49 INFO MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 1982.0 B, free 137.8 KB)
lable:....[Ljava.lang.String;@4f867a2b
vector11111:......[91.0,76.0,106.0,97.0,118.0,97.0,46.0,108.0,97.0,110.0,103.0,46.0,83.0,116.0,114.0,105.0,110.0,103.0,59.0,64.0,52.0,102.0,56.0,54.0,55.0,97.0,50.0,98.0]
vector22222:....[1.0,2.0,3.0]
lable:....[Ljava.lang.String;@4d6c48b7
vector11111:......[91.0,76.0,106.0,97.0,118.0,97.0,46.0,108.0,97.0,110.0,103.0,46.0,83.0,116.0,114.0,105.0,110.0,103.0,59.0,64.0,52.0,100.0,54.0,99.0,52.0,56.0,98.0,55.0]
vector22222:....[4.0,5.0,6.0]
lable:....[Ljava.lang.String;@5ad75e7
17/06/01 13:29:49 INFO BlockManagerInfo: Added broadcast_5_piece0 in memory on localhost:60540 (size: 1982.0 B, free: 1132.5 MB)
17/06/01 13:29:49 INFO SparkContext: Created broadcast 5 from broadcast at DAGScheduler.scala:1006
17/06/01 13:29:49 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 4 (MapPartitionsRDD[4] at map at zip.scala:100)
17/06/01 13:29:49 INFO TaskSchedulerImpl: Adding task set 4.0 with 1 tasks
17/06/01 13:29:49 INFO TaskSetManager: Starting task 0.0 in stage 4.0 (TID 4, localhost, partition 0,PROCESS_LOCAL, 2120 bytes)
17/06/01 13:29:49 INFO Executor: Running task 0.0 in stage 4.0 (TID 4)
17/06/01 13:29:49 INFO HadoopRDD: Input split: file:/E:/test1.csv:0+21
17/06/01 13:29:49 INFO Executor: Finished task 0.0 in stage 4.0 (TID 4). 2130 bytes result sent to driver
vector11111:......[91.0,76.0,106.0,97.0,118.0,97.0,46.0,108.0,97.0,110.0,103.0,46.0,83.0,116.0,114.0,105.0,110.0,103.0,59.0,64.0,53.0,97.0,100.0,55.0,53.0,101.0,55.0]
vector22222:....[7.0,8.0,9.0]
12.0
15.0
18.0
sums 数量:3
17/06/01 13:29:49 INFO TaskSetManager: Finished task 0.0 in stage 4.0 (TID 4) in 10 ms on localhost (1/1)

求助KMeans算法关于转换矩阵Vectors问题

注意到数据内容为:

求助KMeans算法关于转换矩阵Vectors问题

vector11111:......[91.0,76.0,106.0,97.0,118.0,97.0,46.0,108.0,97.0,110.0,103.0,46.0,83.0,116.0,114.0,105.0,110.0,103.0,59.0,64.0,51.0,100.0,51.0,56.0,52.0,57.0,49.0,101.0]
vector22222:....[1.0,2.0,3.0]
([Ljava.lang.String;@751013ad,[1.0,2.0,3.0])
lable:....[Ljava.lang.String;@7607112c
vector11111:......[91.0,76.0,106.0,97.0,118.0,97.0,46.0,108.0,97.0,110.0,103.0,46.0,83.0,116.0,114.0,105.0,110.0,103.0,59.0,64.0,55.0,54.0,48.0,55.0,49.0,49.0,50.0,99.0]
vector22222:....[4.0,5.0,6.0]
([Ljava.lang.String;@5884de0f,[4.0,5.0,6.0])
lable:....[Ljava.lang.String;@385daf7b
vector11111:......[91.0,76.0,106.0,97.0,118.0,97.0,46.0,108.0,97.0,110.0,103.0,46.0,83.0,116.0,114.0,105.0,110.0,103.0,59.0,64.0,51.0,56.0,53.0,100.0,97.0,102.0,55.0,98.0]
vector22222:....[7.0,8.0,9.0]
([Ljava.lang.String;@4da54ea6,[7.0,8.0,9.0])
看到 第一次处理的结果多了很多列,第二种处理方式转换成的矩阵是正确的。

分析:

第一种处理方式:

val labelsAndData = rawData.map{ line =>

      val label = line.split(',').toString
      println("lable:...."+label)
      val vector = Vectors.dense(label.map(_.toDouble).toArray)
      println("vector11111:......"+vector)

因为Vectors要求的是 Array[Double],如下图所示,如果在 

val vector = Vectors.dense(label.map(_.toDouble))

没有转换成Array报错,第一种方法就是先通过split函数分隔,然后转换成 String类型,然后转成Vectors的时候再转换成Array,但是结果出错了。

求助KMeans算法关于转换矩阵Vectors问题

第二种方法,开始直接通过split函数切分数据集,然后直接在转换成Vecotrs 的时候也不需要转换成Array。

查看各函数类型:

求助KMeans算法关于转换矩阵Vectors问题

求助KMeans算法关于转换矩阵Vectors问题

求助KMeans算法关于转换矩阵Vectors问题

求助KMeans算法关于转换矩阵Vectors问题

类型就是一样的,为什么结果不一样?第二种结果是正确的,第一种为什么多了很多列?