李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
【转载】scala spark创建DataFrame的多种方式
Leefs
2021-12-16 PM
1766℃
0条
[TOC] ### 一、通过RDD[Row]和StructType创建 ```scala import org.apache.spark.rdd.RDD import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.sql.{DataFrame, Row, SparkSession} /** * 通过RDD[Row]和StructType创建DataFrame **/ object DataFrameDemo01 { def main(args: Array[String]): Unit = { // 通过SparkSession创建spark入口 val spark: SparkSession = SparkSession.builder() .appName(this.getClass.getSimpleName) .master("local[*]") .getOrCreate() println("1. 通过RDD[Row]和StructType创建") //1. 通过RDD[Row]和StructType创建 val sparkRdd1: RDD[String] = spark.sparkContext.parallelize(List("X,22,M", "y,21,W", "N,22,M")) // 将RDD与Row联合 val rowRdd: RDD[Row] = sparkRdd1.map(t => { val per: Array[String] = t.split(",") Row(per(0), per(1).toInt, per(2)) }) // 创建StructType实例,设置字段名和类型 val schema: StructType = StructType( List( StructField("name", StringType), StructField("age", IntegerType), StructField("sex", StringType) ) ) // 创建dataFrame val dataFrame: DataFrame = spark.createDataFrame(rowRdd, schema) // 展示数据 dataFrame.show() // 释放资源 spark.stop() } } ``` **输出结果** ``` 1. 通过RDD[Row]和StructType创建 +----+---+---+ |name|age|sex| +----+---+---+ | X| 22| M| | y| 21| W| | N| 22| M| +----+---+---+ ``` ### 二、直接通过Rdd.toDF()创建DataFrame ```scala import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SparkSession} /** * 通过RDD.toDF直接创建DataFrame **/ object DataFrameDemo02 { def main(args: Array[String]): Unit = { // 通过SparkSession创建spark入口 val spark: SparkSession = SparkSession.builder() .appName(this.getClass.getSimpleName) .master("local[*]") .getOrCreate() println("2. 直接通过Rdd.toDF()创建DataFrame") //创建RDD val sparkRdd2: RDD[String] = spark.sparkContext.parallelize(List("X,22,M", "y,21,W", "N,22,M")) //将RDD与Row联合 val toDFRdd: RDD[(String, Int, String)] = sparkRdd2.map(t => { val per: Array[String] = t.split(",") (per(0), per(1).toInt, per(2)) }) import org.apache.spark.sql.functions._ import spark.implicits._ //写法1 //val frame: DataFrame = toDFRdd.toDF("name", "age", "sex") //写法2 val array = Array("name","age","sex") val frame: DataFrame = toDFRdd.toDF(array:_*) //展示数据 frame.agg(sum("age") as "avg_age").show() // 释放资源 spark.stop() } } ``` **运行结果** ``` 2. 直接通过Rdd.toDF()创建DataFrame +-------+ |avg_age| +-------+ | 65| +-------+ ``` ### 三、通过RDD和ScalaBean创建DataFrame ```scala import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SparkSession} /** * @author 通过RDD和scalabean创建DataFrame **/ object DataFrameDemo03 { def main(args: Array[String]): Unit = { // 通过SparkSession创建spark入口 val spark: SparkSession = SparkSession.builder() .appName(this.getClass.getSimpleName) .master("local[*]") .getOrCreate() println("3. 通过RDD和ScalaBean创建DataFrame") //创建RDD val sparkRdd3: RDD[String] = spark.sparkContext.parallelize(List("X,22,M", "y,21,W", "N,22,M")) //关联ScalaBean的RDD val beanRdd: RDD[Per] = sparkRdd3.map(t => { val per: Array[String] = t.split(",") Per(per(0), per(1).toInt, per(2)) }) // 必须导入隐式转换才能使用.toDF import spark.implicits._ //创建DataFrame val df: DataFrame = beanRdd.toDF() // 创建视图 df.createTempView("t_per") // 查询数据 val res: DataFrame = spark.sql("SELECT name,age FROM t_per ORDER BY age") // 展示数据 res.show() // 释放资源 spark.stop() } case class Per(name:String,age:Int,sex:String) } ``` **运行结果** ``` 3. 通过RDD和ScalaBean创建DataFrame +----+---+ |name|age| +----+---+ | y| 21| | X| 22| | N| 22| +----+---+ ``` ### 四、通过RDD联合JavaBean创建DataFrame ```scala import com.llc.spark.hbase.bean.Person import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SparkSession} /** * 通过RDD和javabean创建DataFrame **/ object DataFrameDemo04 { def main(args: Array[String]): Unit = { // 通过SparkSession创建spark入口 val spark: SparkSession = SparkSession.builder() .appName(this.getClass.getSimpleName) .master("local[*]") .getOrCreate() println("4. 通过RDD联合JavaBean创建DataFrame") //创建RDD val sparkRdd4: RDD[String] = spark.sparkContext.parallelize(List("X,22,M", "y,21,W", "N,22,M")) //关联JavaBean的RDD val javaBeanRdd: RDD[Person] = sparkRdd4.map(t => { val per: Array[String] = t.split(",") new Person(per(0), per(1).toInt, per(2)) }) //创建DataFrame val frame: DataFrame = spark.createDataFrame(javaBeanRdd,classOf[Person]) // 展示数据 frame.show() //释放资源 spark.stop() } } ``` **Java bean class** ```java public class Person { private String name; private int age; private String sex; public Person(String name, int age, String sex) { this.name = name; this.age = age; this.sex = sex; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } public String getSex() { return sex; } public void setSex(String sex) { this.sex = sex; } } ``` **运行结果** ``` 4. 通过RDD联合JavaBean创建DataFrame +---+----+---+ |age|name|sex| +---+----+---+ | 22| X| M| | 21| y| W| | 22| N| M| +---+----+---+ ``` ### 五、手写的通过RDD和javabean创建DataFrame ```scala import com.llc.spark.hbase.bean.NumberWord import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row, SparkSession} /** * 通过RDD和javabean创建DataFrame **/ object DataFrameDemo05 { def main(args: Array[String]): Unit = { // 通过SparkSession创建spark入口 val spark: SparkSession = SparkSession.builder() .appName(this.getClass.getSimpleName) .master("local[*]") .getOrCreate() println("5.手写的通过RDD和javabean创建DataFrame") val rdd11: RDD[Row] = spark.sparkContext.parallelize( Seq( Row(8, "bat"), Row(64, "mouse"), Row(-27, "horse") ) ) rdd11.foreach(println) //创建 val newRDD: RDD[NumberWord] = rdd11.map(x => { val rdto = convertNumberWordDTO(x.getInt(0) + 100, x.getString(1)) rdto }) val ds: DataFrame = spark.createDataFrame(newRDD,classOf[NumberWord]) ds.show(100,false) println(ds.schema) //StructType(StructField(number,IntegerType,false), StructField(word,StringType,true)) //调用 someDF.schema.printTreeString()得到someDF的schema: ds.schema.printTreeString() //root // |-- number: integer (nullable = false) // |-- word: string (nullable = true) // 释放资源 spark.stop() } def convertNumberWordDTO(number: Int,word:String) ={ val rdto = new NumberWord() rdto.setNumber(number) rdto.setWord(word) rdto } } ``` **Java bean class** ```java public class NumberWord { private int number; private String word; public NumberWord() { } public NumberWord(int number, String word) { this.number = number; this.word = word; } public int getNumber() { return number; } public void setNumber(int number) { this.number = number; } public String getWord() { return word; } public void setWord(String word) { this.word = word; } } ``` **运行结果** ``` 5.手写的通过RDD和javabean创建DataFrame [64,mouse] [-27,horse] [8,bat] +------+-----+ |number|word | +------+-----+ |108 |bat | |164 |mouse| |73 |horse| +------+-----+ StructType(StructField(number,IntegerType,false), StructField(word,StringType,true)) root |-- number: integer (nullable = false) |-- word: string (nullable = true) ``` *附原文链接地址:* *https://blog.csdn.net/helloxiaozhe/article/details/113354054*
标签:
Spark
,
Spark SQL
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://www.lilinchao.com/archives/1748.html
上一篇
19.Hive自定义UDF函数
下一篇
20.Hive自定义UDTF函数
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
NLP
6
标签云
Golang
Jenkins
数学
Spring
Zookeeper
JavaScript
MyBatis
Hadoop
数据结构
递归
Filter
Elastisearch
前端
Nacos
LeetCode刷题
Scala
HDFS
Spark Core
JavaWEB项目搭建
MySQL
序列化和反序列化
栈
Tomcat
设计模式
Spark SQL
MyBatis-Plus
SQL练习题
SpringBoot
Kafka
字符串
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭