欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

DStream转为DF的两种方式(突破map时元组22的限制)

程序员文章站 2022-04-20 19:21:09
在进行Spark Streaming的开发时,我们常常需要将DStream转为DataFrame来进行进一步的处理, 共有两种方式,方式一: 利用map算子和tuple来完成,一般的场景下采用这种方式即可。 但是有的时候我们会遇到列数大于22的情况,这个时候会受到scala的tuple数不能超过22 ......

在进行spark streaming的开发时,我们常常需要将dstream转为dataframe来进行进一步的处理,
共有两种方式,方式一:

val spark = sparksession.builder()
  .appname("test")
  .getorcreate()
import spark.implicits._
dstream.foreachrdd{ rdd =>
  val df = rdd.map(_.split(" "))
    .map(t => (t(1),t(2),t(3)))
    .todf("col1","col2","col3")
  // 业务逻辑
}

利用map算子和tuple来完成,一般的场景下采用这种方式即可。

但是有的时候我们会遇到列数大于22的情况,这个时候会受到scala的tuple数不能超过22的影响。这时可以采用方式二:

val spark = sparksession.builder()
  .appname("test")
  .getorcreate()
dstream.foreachrdd{ rdd =>
  val res:rdd[row] = rdd.map{ row =>
    val buffer = arraybuffer.empty[any]
    val fields: array[string] = row.split("\\|~\\|")
    buffer.append(fields(0))
    buffer.append(fields(1))
    buffer.append(fields(2))
    // 省略
    buffer.append(fields(25))
    row.fromseq(buffer)
  } 
  val schema = structtype(seq(
    structfield("col1", stringtype, false),
    structfield("col2", stringtype, false),
    structfield("col3", stringtype, false),
    // 省略
    structfield("col26", stringtype, false)
  ))
  val df: dataframe = spark.createdataframe(result, schema)
  // 业务逻辑
}