0%
Theme NexT works best with JavaScript enabled
首先进入spark-shell
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 $ bin/spark-shell Using Spark 's default log4j profile: org/apache/spark/log4j-defaults.propertiesSetting default log level to "WARN" .To adjust logging level use sc.setLogLevel(newLevel). For SparkR , use setLogLevel(newLevel).Spark context Web UI available at http:Spark context available as 'sc' (master = local[*], app id = local-1569915192693 ).Spark session available as 'spark'.Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.1 .1 /_/ Using Scala version 2.11 .8 (Java HotSpot (TM ) 64 -Bit Server VM , Java 1.8 .0 _144)Type in expressions to have them evaluated.Type :help for more information.
创建DataFrame
1 2 3 4 5 6 7 8 9 10 11 scala> val df = spark.read.json("./examples/src/main/resources/people.json" ) df: org.apache.spark.sql.DataFrame = [age: bigint, name: string] scala> df.show +----+-------+ | age| name| +----+-------+ |null |Michael | | 30 | Andy | | 19 | Justin | +----+-------+
1 2 3 4 5 6 7 8 9 scala> df.createOrReplaceTempView("people" ) scala> spark.sql("select * from people" ).show +----+-------+ | age| name| +----+-------+ |null |Michael | | 30 | Andy | | 19 | Justin | +----+-------+
方法一: RDDtoDF创建DataFrame 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 scala> import spark.implicits._ import spark.implicits._scala> val peopleRDD = sc.textFile("./examples/src/main/resources/people.txt" ) peopleRDD: org.apache.spark.rdd.RDD [String ] = ./examples/src/main/resources/people.txt MapPartitionsRDD [34 ] at textFile at <console>:27 scala> val df = peopleRDD.map{x=>val para=x.split("," );(para(0 ),para(1 ).trim.toInt)}.toDF("name" ,"age" ) df: org.apache.spark.sql.DataFrame = [name: string, age: int] scala> df.show +-------+---+ | name|age| +-------+---+ |Michael | 29 | | Andy | 30 | | Justin | 19 | +-------+---+
方法二 : 样例类转DataFrame(常用方式) 1 2 3 4 5 scala> case class people (name:String ,age:Int ) defined class people scala> peopleRDD.map{x=>val para=x.split("," );people(para(0 ),para(1 ).trim.toInt) }.toDF res21: org.apache.spark.sql.DataFrame = [name: string, age: int]
方法三: 通过schema创建DataFrame 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 scala> import org.apache.spark.sql.types._ import org.apache.spark.sql.types._scala> val structType: StructType = StructType (StructField ("name" , StringType ) :: StructField ("age" , IntegerType ) :: Nil ) structType: org.apache.spark.sql.types.StructType = StructType (StructField (name,StringType ,true ), StructField (age,IntegerType ,true )) scala> import org.apache.spark.sql.Row import org.apache.spark.sql.Row scala> val data = peopleRDD.map{ x => val para = x.split("," );Row (para(0 ),para(1 ).trim.toInt)} data: org.apache.spark.rdd.RDD [org.apache.spark.sql.Row ] = MapPartitionsRDD [41 ] at map at <console>:33 scala> val dataFrame = spark.createDataFrame(data, structType) dataFrame: org.apache.spark.sql.DataFrame = [name: string, age: int]
创建DataSet 通过样例类创建DataSet 1 2 3 4 5 6 7 8 9 10 11 12 scala> case class Person (name: String , age: Long ) defined class Person scala> val caseClassDS = Seq (Person ("Andy" , 32 )).toDS() caseClassDS: org.apache.spark.sql.Dataset [Person ] = [name: string, age: bigint] scala> caseClassDS.show +----+---+ |name|age| +----+---+ |Andy | 32 | +----+---+
从RDD创建DataSet 1 2 3 4 5 6 7 8 9 10 11 12 13 14 scala> val peopleRDD = sc.textFile("examples/src/main/resources/people.txt" ) peopleRDD: org.apache.spark.rdd.RDD [String ] = examples/src/main/resources/people.txt MapPartitionsRDD [1 ] at textFile at <console>:24 scala> peopleRDD.map(line => {val para = line.split("," );Person (para(0 ),para(1 ).trim.toInt)}).toDS res1: org.apache.spark.sql.Dataset [Person ] = [name: string, age: bigint] scala> res1.show +-------+---+ | name|age| +-------+---+ |Michael | 29 | | Andy | 30 | | Justin | 19 | +-------+---+
DataFrame与DataSet的互相转换 DataFreame转DataSet 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 scala> val df = spark.read.json("examples/src/main/resources/people.json" ) df: org.apache.spark.sql.DataFrame = [age: bigint, name: string] scala> case class Person (name: String , age: Long ) defined class Person scala> df.as[Person ] res3: org.apache.spark.sql.Dataset [Person ] = [age: bigint, name: string] scala> res3.show +----+-------+ | age| name| +----+-------+ |null |Michael | | 30 | Andy | | 19 | Justin | +----+-------+
DasaSet转DataFrame 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 scala> case class Person (name: String , age: Long ) defined class Person scala> val ds = Seq (Person ("Andy" , 32 )).toDS() ds: org.apache.spark.sql.Dataset [Person ] = [name: string, age: bigint] scala> val df = ds.toDF df: org.apache.spark.sql.DataFrame = [name: string, age: bigint] scala> df.show +----+---+ |name|age| +----+---+ |Andy | 32 | +----+---+
三者的互相转换
三者的共性
RDD
、DataFrame
、Dataset
全都是spark平台下的分布式弹性数据集,为处理超大型数据提供便利;
三者都有惰性机制,在进行创建、转换,如map
方法时,不会立即执行,只有在遇到Action
如foreach
时,三者才会开始遍历运算;
三者有许多共同的函数,如filter,排序等;
在对DataFrame
和Dataset
进行操作许多操作都需要导入隐式转换 :import spark.implicits._
(在创建好SparkSession
对象后尽量直接导入)
编译器中的实现
1 2 3 4 package tech.mapan.beancase class people (name: String , age: BigInt )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 package tech.mapanimport org.apache.spark.sql.SparkSession import tech.mapan.bean.peopleobject SparkSQLTest { def main (args: Array [String ]): Unit = { val spark = SparkSession .builder().appName("sparkSQL" ).master("local[*]" ).getOrCreate() import spark.implicits._ val df = spark.read.json("./src/main/resources/people.json" ) df.show val ds = df.as[people] ds.show val rdd1 = df.rdd rdd1.collect().foreach(println) val rdd2 = ds.rdd rdd2.collect().foreach(println) ds.createOrReplaceTempView("persons" ) spark.sql("SELECT * FROM persons WHERE age > 21" ).show spark.sql("SELECT SUM(age) FROM persons" ).show spark.stop } }