SparkSQL使用说明与DataFrame创建

Spark SQL使用说明与DataFrame创建

启动spark-shell

由于spark-shell演示具有显而易见的有点,因此本文主要基于spark-shell对Spark SQL的使用进行介绍

我使用的是最新版本的Spark 2.3.0来进行演示

首先我们启动spark-shell,等待进入Scala REPL:

创建DataFrame

Spark SQL的所有操作都是基于其内部定义的一个叫做DataFrame(Spark2.0后它变成了DataSet[Row]的类型别名)的结构的,因此,我们首先需要创建DataFrame。

创建DataFrame的方式有很多种,比如json\csv\parquet等,在Spark 2.0以后,可以通过sparkSession.read得到DataFrameReader来读取各种支持类型的文件,从而得到对应的DataFrame。

不过我们既然使用了spark-shell,就更随意一点了。如果你现在就是为了处理一份数据,那么你可以直接用sparkSession.read里的方法来加载这份数据文件。如果你只是为了学习,或者研究某种数据结构的处理方式,那么你可以使用case class的方式来构造你的实验DataFrame:

case class DemoClass(arg0: Int, arg: String)

首先我们定义一个case class作为我们DataFrame的模版

之后我们造一些我们的测试数据:

scala> val test = (1 to 100).map(i => DemoClass(i, s"String_$i"))
test: scala.collection.immutable.IndexedSeq[DemoClass] = Vector(DemoClass(1,String_1), DemoClass(2,String_2), DemoClass(3,String_3), DemoClass(4,String_4), DemoClass(5,String_5), DemoClass(6,String_6), DemoClass(7,String_7), DemoClass(8,String_8), DemoClass(9,String_9), DemoClass(10,String_10), DemoClass(11,String_11), DemoClass(12,String_12), DemoClass(13,String_13), DemoClass(14,String_14), DemoClass(15,String_15), DemoClass(16,String_16), DemoClass(17,String_17), DemoClass(18,String_18), DemoClass(19,String_19), DemoClass(20,String_20), DemoClass(21,String_21), DemoClass(22,String_22), DemoClass(23,String_23), DemoClass(24,String_24), DemoClass(25,String_25), DemoClass(26,String_26), DemoClass(27,String_27), DemoClass(28,String_28), DemoClass(29,String_29), DemoClass(30,String_30), D...

这样,我们就获得了一个长度为100的DemoClass列表

之后我们运行:

scala> val df = spark.createDataFrame(test)
df: org.apache.spark.sql.DataFrame = [arg0: int, arg: string]

这样我们就轻松获得了一个我们自定义类型的DataFrame:

我们分别使用printSchema和show来获取关于这个DataFrame的内部信息。

通过printSchema我们可以将DataFrame的数据结构打印出来,我们可以获得的是列名和其对应的列类型以及是否可以为null。从pringSchema的打印形式上我们可以看出,它是按树的形式来打印的,所以DataFrame的数据结构可以有很多层级的。

通过show默认打印了前20行数据,我们可以看出这个DataFrame有两列,并且其前20行数据。

从上图我们看到,使用Seq[case class]来创建DataFrame,我们得到的列名和列类型就是case class的各个成员变量的名字和类型。这样,我们就可以造出多种多样的数据结构来进行测试了。

/**********************************************我是分割线**********************************************/

下面我们来举个稍微复杂一点的例子:

scala> case class A(a: Int, b: Double, c: String)
defined class A

scala> case class B(a: A, b: List[Double], c: Map[Int, String])
defined class B

scala> case class C(a: A, b: B, c: Array[String], d: Map[A, B])
defined class C

scala> def a_gen(i: Int) = A(i, i * 0.1, s"a_$i")
a_gen: (i: Int)A

scala> def b_gen(i: Int) = B(a_gen(i), (0 to i).map(_ / 0.1).toList, (0 to i).map(j => (j, s"value_$j")).toMap)
b_gen: (i: Int)B

scala> def c_gen(i: Int) = C(a_gen(i), b_gen(i), (0 to i).map(j => s"str_$j").toArray, (0 to i).map(j => (a_gen(j), b_gen(j))).toMap)
c_gen: (i: Int)C

scala> (1 to 100).map(c_gen)
res4: scala.collection.immutable.IndexedSeq[C] = Vector(C(A(1,0.1,a_1),B(A(1,0.1,a_1),List(0.0, 10.0),Map(0 -> value_0, 1 -> value_1)),[Ljava.lang.String;@17483bb2,Map(A(0,0.0,a_0) -> B(A(0,0.0,a_0),List(0.0),Map(0 -> value_0)), A(1,0.1,a_1) -> B(A(1,0.1,a_1),List(0.0, 10.0),Map(0 -> value_0, 1 -> value_1)))), C(A(2,0.2,a_2),B(A(2,0.2,a_2),List(0.0, 10.0, 20.0),Map(0 -> value_0, 1 -> value_1, 2 -> value_2)),[Ljava.lang.String;@7f415d8e,Map(A(0,0.0,a_0) -> B(A(0,0.0,a_0),List(0.0),Map(0 -> value_0)), A(1,0.1,a_1) -> B(A(1,0.1,a_1),List(0.0, 10.0),Map(0 -> value_0, 1 -> value_1)), A(2,0.2,a_2) -> B(A(2,0.2,a_2),List(0.0, 10.0, 20.0),Map(0 -> value_0, 1 -> value_1, 2 -> value_2)))), C(A(3,0.30000000000000004,a_3),B(A(3,0.30000000000000004,a_3),List(0.0, 10.0, 20.0, 30.0),Map(0 -> value_0, ...

我们从schema来进行分析

我们使用C类列表来创建DataFrame,C类共有四个成员变量a\b\c\d,因此root下有a\b\c\d四列

a为类A,我们从schema中可以看出a字段下面有三个子字段,分别对应类A的三个成员变量。剩下的b\c\d列都是同理。

我们可以看出

1、List和Array转换成了array子树结构

2、Map转换成了key\value子树结构

3、类转换成了struct子树结构

(0)
上一篇 2022年3月21日
下一篇 2022年3月21日

相关推荐