SparkSQL基本操作

首先进入spark-shell

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
$ bin/spark-shell
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://192.168.1.102:4040
Spark context available as 'sc' (master = local[*], app id = local-1569915192693).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.1
/_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_144)
Type in expressions to have them evaluated.
Type :help for more information.

创建DataFrame

  • 从json文件中创建DataFrame
1
2
3
4
5
6
7
8
9
10
11
scala> val df = spark.read.json("./examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df.show
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
  • 对DataFrame创建一个临时表
1
2
3
4
5
6
7
8
9
scala> df.createOrReplaceTempView("people")
scala> spark.sql("select * from people").show
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+

方法一: RDDtoDF创建DataFrame

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
scala> import spark.implicits._
import spark.implicits._

scala> val peopleRDD = sc.textFile("./examples/src/main/resources/people.txt")
peopleRDD: org.apache.spark.rdd.RDD[String] = ./examples/src/main/resources/people.txt MapPartitionsRDD[34] at textFile at <console>:27

scala> val df = peopleRDD.map{x=>val para=x.split(",");(para(0),para(1).trim.toInt)}.toDF("name","age")
df: org.apache.spark.sql.DataFrame = [name: string, age: int]

scala> df.show
+-------+---+
| name|age|
+-------+---+
|Michael| 29|
| Andy| 30|
| Justin| 19|
+-------+---+

方法二 : 样例类转DataFrame(常用方式)

1
2
3
4
5
scala> case class people(name:String,age:Int)
defined class people

scala> peopleRDD.map{x=>val para=x.split(",");people(para(0),para(1).trim.toInt) }.toDF
res21: org.apache.spark.sql.DataFrame = [name: string, age: int]

方法三: 通过schema创建DataFrame

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 导入所需的类型
scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._
// 创建Schema
scala> val structType: StructType = StructType(StructField("name", StringType) :: StructField("age", IntegerType) :: Nil)
structType: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(age,IntegerType,true))
// 导入所需的类型
scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row
// 根据给定的类型创建二元组RDD
scala> val data = peopleRDD.map{ x => val para = x.split(",");Row(para(0),para(1).trim.toInt)}
data: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[41] at map at <console>:33
// 根据数据及给定的schema创建DataFrame
scala> val dataFrame = spark.createDataFrame(data, structType)
dataFrame: org.apache.spark.sql.DataFrame = [name: string, age: int]

创建DataSet

通过样例类创建DataSet

1
2
3
4
5
6
7
8
9
10
11
12
scala> case class Person(name: String, age: Long)
defined class Person

scala> val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]

scala> caseClassDS.show
+----+---+
|name|age|
+----+---+
|Andy| 32|
+----+---+

从RDD创建DataSet

1
2
3
4
5
6
7
8
9
10
11
12
13
14
scala> val peopleRDD = sc.textFile("examples/src/main/resources/people.txt")
peopleRDD: org.apache.spark.rdd.RDD[String] = examples/src/main/resources/people.txt MapPartitionsRDD[1] at textFile at <console>:24

scala> peopleRDD.map(line => {val para = line.split(",");Person(para(0),para(1).trim.toInt)}).toDS
res1: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]

scala> res1.show
+-------+---+
| name|age|
+-------+---+
|Michael| 29|
| Andy| 30|
| Justin| 19|
+-------+---+

DataFrame与DataSet的互相转换

DataFreame转DataSet

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
scala> val df = spark.read.json("examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> case class Person(name: String, age: Long)
defined class Person

scala> df.as[Person]
res3: org.apache.spark.sql.Dataset[Person] = [age: bigint, name: string]

scala> res3.show
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+

DasaSet转DataFrame

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
scala> case class Person(name: String, age: Long)
defined class Person

scala> val ds = Seq(Person("Andy", 32)).toDS()
ds: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]

scala> val df = ds.toDF
df: org.apache.spark.sql.DataFrame = [name: string, age: bigint]

scala> df.show
+----+---+
|name|age|
+----+---+
|Andy| 32|
+----+---+

三者的互相转换

三者的共性

  1. RDDDataFrameDataset全都是spark平台下的分布式弹性数据集,为处理超大型数据提供便利;
  2. 三者都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Actionforeach时,三者才会开始遍历运算;
  3. 三者有许多共同的函数,如filter,排序等;
  4. 在对DataFrameDataset进行操作许多操作都需要导入隐式转换 :import spark.implicits._(在创建好SparkSession对象后尽量直接导入)

编译器中的实现

1
2
3
4
// from people.scala
package tech.mapan.bean

case class people(name: String, age: BigInt)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// from SparkSQLTest.scala
package tech.mapan

import org.apache.spark.sql.SparkSession
import tech.mapan.bean.people

object SparkSQLTest {
/**
* DataFrame、DataSet、rdd之间的转换/ sparkSQL初步应用
* @param args
*/
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("sparkSQL").master("local[*]").getOrCreate()
import spark.implicits._
// 从文件创建DataFrame
val df = spark.read.json("./src/main/resources/people.json")
df.show

// DataFrame转DataSet
val ds = df.as[people]
ds.show

// DataFrame转rdd
val rdd1 = df.rdd
rdd1.collect().foreach(println)

// DataSet转rdd
val rdd2 = ds.rdd
rdd2.collect().foreach(println)

// 创建临时表
ds.createOrReplaceTempView("persons")

// sql查询年龄超过21岁的人。
spark.sql("SELECT * FROM persons WHERE age > 21").show

// sql查询所有人年龄的和
spark.sql("SELECT SUM(age) FROM persons").show
// 关闭连接
spark.stop
}
}