概述
spark sql是spark中的一个模块,负责结构化数据的处理。它跟spark rdd api不一样,spark sql提供的接口会提供更多关于数据和执行计算的信息。在内部,spark sql使用这些额外的信息去执行额外的优化。可以通过sql 和 dataset api与spark sql进行交互。当使用同一个执行引擎得到的计算结果,它是不会依赖于使用的api/语言。这意味着开发人员能更容易的在不同api进行切换。
sql
spark sql可以执行sql的查询,也可以从现有的hive中读取数据。当从另一种编程语言中运行sql时,结果将作为一个dataset/dataframe返回。可以通过命令行或者jdbc/odbc与sql进行交互。
其实spark sql的功能不仅仅只是sql的功能,它比sql拥有更多的功能。
datasets and dataframes
一个dataset是一个分部的数据集。dataset是spark 1.6中新增的一个新接口它有利于rdds和spark sql中的优化引擎。一个dataset可以jvm的对象中构建还可以被transformations函数(map,flatmap,filter,etc)操作。dataset的api可以用于scala和java。不适用于python和r语言。
一个dataframe是一个被加入列名的dataset。从概念上理解可以等同于关系型里的一张表。dataframe的构建数据源有很多,例如:结构化文件,hive中的表,外部数据库,或者已存在的rdds。dataframe api 可以用于scala,java,python 和 r语言。scala api中,dataframe进进是dataset[row]的别名。java api中为dataset
创建一个dataframes
package com.hihi.learn.sparksql
import com.hihi.learn.sparksql.datasetsdemo.person
import org.apache.spark.sql.types.{stringtype, structfield, structtype}
import org.apache.spark.sql.{dataframe, row, sparksession}
import scala.collection.mutable.arraybuffer
object dataframedemo {
case class person(name: string, age: long)
def main(args: array[string]): unit = {
// 创建sparksession,此为spark sql的入口
val spark = sparksession
.builder()
.appname("spark sql basic example")
.master("local[2]")
.getorcreate()
import spark.implicits._
//creating dataframes
val df = spark.read.format("json").load("e:\\spark-branch-2.2\\examples\\src\\main\\resources\\people.json")
//df.show()
/*
+----+-------+
| age| name|
+----+-------+
|null|michael|
| 30| andy|
| 19| justin|
+----+-------+
*/
// 使用算子操作dataframe
//untypeddatasetoperations(df)
// 使用sql编程方式
runningsqlqueriesprogrammatically(spark, df)
// interoperating with rdds
val peopledf = spark.sparkcontext.textfile("e:\\spark-branch-2.2\\examples\\src\\main\\resources\\people.txt")
.mappartitions(its => {
var arrperson = arraybuffer[person]()
for (it <- its) {
val arr = it.split(",")
arrperson += person(arr(0), arr(1).trim.tolong)
}
arrperson.toiterator
})
.todf().show
// programmatically specifying the schema
val personds4 = spark.sparkcontext.textfile("e:\\spark-branch-2.2\\examples\\src\\main\\resources\\people.txt")
val schemastring = "name age"
val fields = schemastring.split(" ").map(structfield(_, stringtype, nullable = true))
val schema = structtype(fields)
val rowrdd = personds4
.map(_.split(","))
.map(attributes => row(attributes(0), attributes(1).trim))
val peopledf2 = spark.createdataframe(rowrdd, schema)
peopledf2.show()
spark.stop()
}
def untypeddatasetoperations(df:dataframe): unit = {
// 打印schema
df.printschema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// 查询 name
df.select("name").show
// +-------+
// | name|
// +-------+
// |michael|
// | andy|
// | justin|
// +-------+
// 查询age > 21的数据
df.filter("age > 21").show
// +---+----+
// |age|name|
// +---+----+
// | 30|andy|
// +---+----+
// 使用分组
df.groupby("age").count.show()
// +----+-----+
// | age|count|
// +----+-----+
// | 19| 1|
// |null| 1|
// | 30| 1|
// +----+-----+
}
def runningsqlqueriesprogrammatically(spark: sparksession, df: dataframe) : unit = {
// 注册一张临时表
df.createorreplacetempview("people")
spark.sql("select name, age from people").show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|michael|
// | 30| andy|
// | 19| justin|
// +----+-------+
spark.sql("select name, age from people where name = 'andy'").show
// +----+---+
// |name|age|
// +----+---+
// |andy| 30|
// +----+---+
}
}
创建一个datasets
package com.hihi.learn.sparksql
import org.apache.spark.sql.{row, sparksession}
import org.apache.spark.sql.types.{stringtype, structfield, structtype}
import scala.collection.mutable.arraybuffer
object datasetsdemo {
case class person(name: string, age: long)
def main(args: array[string]): unit = {
val spark = sparksession
.builder()
.appname("datasetsdemo")
.master("local")
.getorcreate()
// creating datasets
import spark.implicits._
val personds = seq(person("hihi", 22), person("tom", 11)).tods
personds.show
val personds2 = spark.read.format("json").load("e:\\spark-branch-2.2\\examples\\src\\main\\resources\\people.json").as[person]
personds2.show
spark.stop()
}
}