DStream转为DF的两种方式(突破map时元组22的限制)
程序员文章站
2022-04-20 19:21:09
在进行Spark Streaming的开发时,我们常常需要将DStream转为DataFrame来进行进一步的处理, 共有两种方式,方式一: 利用map算子和tuple来完成,一般的场景下采用这种方式即可。 但是有的时候我们会遇到列数大于22的情况,这个时候会受到scala的tuple数不能超过22 ......
在进行spark streaming的开发时,我们常常需要将dstream转为dataframe来进行进一步的处理,
共有两种方式,方式一:
val spark = sparksession.builder() .appname("test") .getorcreate() import spark.implicits._ dstream.foreachrdd{ rdd => val df = rdd.map(_.split(" ")) .map(t => (t(1),t(2),t(3))) .todf("col1","col2","col3") // 业务逻辑 }
利用map算子和tuple来完成,一般的场景下采用这种方式即可。
但是有的时候我们会遇到列数大于22的情况,这个时候会受到scala的tuple数不能超过22的影响。这时可以采用方式二:
val spark = sparksession.builder() .appname("test") .getorcreate() dstream.foreachrdd{ rdd => val res:rdd[row] = rdd.map{ row => val buffer = arraybuffer.empty[any] val fields: array[string] = row.split("\\|~\\|") buffer.append(fields(0)) buffer.append(fields(1)) buffer.append(fields(2)) // 省略 buffer.append(fields(25)) row.fromseq(buffer) } val schema = structtype(seq( structfield("col1", stringtype, false), structfield("col2", stringtype, false), structfield("col3", stringtype, false), // 省略 structfield("col26", stringtype, false) )) val df: dataframe = spark.createdataframe(result, schema) // 业务逻辑 }
上一篇: 咋这么让人不省心啊