Spark 2.0之Spark SQL入门

个人认为Spark SQL应该是Spark BDAS各个组件中最简单的了,在Spark 2.0中,Spark SQL变得更加简单了。本文不再介绍概念性的内容,仅仅通过一个示例来熟悉Spark 2.0中的Spark SQL。本文使用的数据下载自 DATA.GOV.UK,点击即可下载。

创建 SparkSession

这一步是Spark 2.0与之前版本的主要区别之一,2.0时代我们不再需要一步一个脚印地去创建SparkConf, SparkContext, SQLContext/HiveContext等等,只需要一个SparkSession就齐活了。在spark-shell中,Spark会为我们自动创建一个名为spark的SparkSession对象,这里我们与之保持一致:

1
2
3
4
5
6
7
val spark = SparkSession.builder().appName("Spark-2.0-SQL-APP-1").master("local")
.config("spark.some.config.option", "some-value")
.enableHiveSupport()
.getOrCreate()

import spark.implicits._
import spark.sql

这里首先指定了appName以及master运行方式,然后添加了一些配置选项,还启用了Hive支持(后面要把数据持久化到Hive仓库中)。两个import语句对于新手可能有点别扭,这里简单提一句,后面的toDF()方法和“$”操作符,需要第一个import的隐式转换。第二个import更容易理解一些,引入spark.sql之后,后面在需要调用spark.sql()方法来执行SQL语句的时候,直接使用sql()即可。注意这里的spark不是包名,而是我们刚刚创建的名为spark的SparkSession对象。

定义 case class

根据数据格式定义一个case class,用于对数据进行模式匹配。在Scala 2.10版本之前,case class中的字段不能超过22个。不过从目前最新的Scala 2.11版本开始,已经没有这个限制了。

1
case class Energy(Name: String, Postcode: String, Date: String, Unit: String, MeterReading: Int, Output: Int, CapacityKW: Double)

获取 DataFrame

DataFrame是Spark SQL中非常重要的一个抽象,在概念上大致等同于关系数据库中的表。在Scala API中,DataFrame是Dataset[Row]的别名。

1
2
3
4
5
6
val columns = "Name,Postcode,Date,Unit,Meter Reading,Output,Installed Capacity KW"
val df = spark.sparkContext.textFile("input/energy_generation_wc_140114.csv")
.filter { !_.contains(columns) }
.map { _.split(",") }
.map { p => Energy(p(0), p(1), p(2), p(3), p(4).trim().toInt, p(5).trim().toInt, p(6).trim().toDouble) }
.toDF()

这里我们从SparkSession对象中获取SparkContext对象,用来读取CSV文件,然后过滤掉表头,对每行数据进行“,”拆分,然后映射为case class对象,这些操作都是Spark中最普通的RDD操作,最后我们调用toDF()方法把RDD转为DataFrame。

操作 DataFrame

1
df.printSchema()

1
2
val jsonData = df.toJSON
jsonData.take(5).foreach { println }

1
2
df.show(10)
df.select("Name", "Date", "Output").show()

1
2
df.filter($"Output" > 30 and $"Output" < 70).show()
df.filter($"Output" < 30 or $"Output" > 70).show()

1
2
3
df.groupBy("Name").count().show()
df.groupBy("Date").count().sort("Date").show()
df.groupBy("Date").sum("Output").sort("Date").show()

使用 SQL

1
2
3
4
5
df.createOrReplaceTempView("energy")

sql("SELECT * FROM energy LIMIT 10").show()
sql("SELECT COUNT(*) AS CNT FROM energy").show()
sql("SELECT Date, SUM(Output) AS Output_Sum FROM energy GROUP BY Date ORDER BY Date").show()

使用 UDF

1
2
spark.udf.register("doubleOutput", { output: String => output.toInt * 2 })
sql("SELECT Output, doubleOutput(Output) AS Double_Output FROM energy LIMIT 10").show()

持久化表

1
2
3
4
df.write.mode(SaveMode.Overwrite).saveAsTable("hive_energy")

val hdf = spark.table("hive_energy")
hdf.show(10)



完整代码

Scala 部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package com.qinm.tools.spark.sql

import scala.reflect.runtime.universe

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SaveMode

object sparksql1 extends App {

val spark = SparkSession.builder().appName("Spark-2.0-SQL-APP-1").master("local")
.config("spark.some.config.option", "some-value")
.enableHiveSupport()
.getOrCreate()

import spark.implicits._
import spark.sql

case class Energy(Name: String, Postcode: String, Date: String, Unit: String, MeterReading: Int, Output: Int, CapacityKW: Double)

val columns = "Name,Postcode,Date,Unit,Meter Reading,Output,Installed Capacity KW"
val df = spark.sparkContext.textFile("input/energy_generation_wc_140114.csv")
.filter { !_.contains(columns) }
.map { _.split(",") }
.map { p => Energy(p(0), p(1), p(2), p(3), p(4).trim().toInt, p(5).trim().toInt, p(6).trim().toDouble) }
.toDF()

df.show(10)
df.printSchema()

val jsonData = df.toJSON
jsonData.take(5).foreach { println }

df.select("Name", "Date", "Output").show()

df.filter($"Output" > 30 and $"Output" < 70).show()
df.filter($"Output" < 30 or $"Output" > 70).show()

df.groupBy("Name").count().show()
df.groupBy("Date").count().sort("Date").show()
df.groupBy("Date").sum("Output").sort("Date").show()

df.createOrReplaceTempView("energy")

sql("SELECT * FROM energy LIMIT 10").show()
sql("SELECT COUNT(*) AS CNT FROM energy").show()
sql("SELECT Date, SUM(Output) AS Output_Sum FROM energy GROUP BY Date ORDER BY Date").show()

spark.udf.register("doubleOutput", { output: String => output.toInt * 2 })
sql("SELECT Output, doubleOutput(Output) AS Double_Output FROM energy LIMIT 10").show()

df.write.mode(SaveMode.Overwrite).saveAsTable("hive_energy")

val hdf = spark.table("hive_energy")
hdf.show(10)
}

Maven pom 部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
  <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.0.0</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.0.0</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.0.0</version>
</dependency>