1- # 04-Spark SQL API编程之 DataFrame
1+ # 04-SparkSQL的API编程之DataFrame
22
33## 1 SparkSession
44
@@ -95,9 +95,9 @@ output:
9595
9696## 2 DataFrame
9797
98- 最早在R语言数据分析包中提出,表示一种类似表格的数据结构,其中行和列都可以有命名 。
98+ 最早在R语言数据分析包中提出,表示一种类似表格的数据结构,行列都可有命名 。
9999
100- Spark的DataFrame是基于RDD(弹性分布式数据集)的一种高级抽象 ,类似关系型数据库的表格。Spark DataFrame可看作带有模式(Schema)的RDD,而Schema则是由结构化数据类型(如字符串、整型、浮点型等)和字段名组成。
100+ Spark的DataFrame是基于RDD的一种高级抽象 ,类似关系型数据库的表格。Spark DataFrame可看作带有模式(Schema)的RDD,而Schema则是由结构化数据类型(如字符串、整型、浮点型等)和字段名组成。
101101
102102### 2.1 命名变迁
103103
@@ -112,24 +112,26 @@ Spark 1.3版本开始,SchemaRDD重命名为DataFrame,以更好反映其API
112112
113113因此,DataFrame已成Spark SQL核心组件,广泛应用于数据分析、数据挖掘。
114114
115- ## 3 数据分析选型: PySpark V.S R 语言
115+ ## 3 PySpark V.S R
116116
117- 1 . 数据规模:如果需要处理大型数据集,则使用PySpark更为合适,因为它可以在分布式计算集群上运行,并且能够处理较大规模的数据。而R语言则可能会受限于单机内存和计算能力。
118- 2 . 熟练程度:如果你或你的团队已经很熟悉Python,那么使用PySpark也许更好一些,因为你们不需要再去学习新的编程语言。相反,如果已经对R语言很熟悉,那么继续使用R语言也许更为方便。
119- 3 . 生态系统:Spark生态系统提供了许多额外的库和工具,例如Spark Streaming和GraphX等,这些库和工具可以与PySpark无缝集成。而R语言的生态系统也有一些类似的库和工具,但相对来说可选择性就更少一些。
117+ 数分选型:
120118
121- 总之,选择使用哪种工具进行数据分析应该基于具体情况进行考虑。如果需要处理大规模数据集,并需要与Spark生态系统集成,那么PySpark可能更适合;如果更加熟悉R语言,或者数据量较小,那么使用R语言也可以做到高效的数据分析。
119+ 1 . 数据规模:处理大型数据集,PySpark更合适,因为它可以在分布式计算集群上运行,且能处理较大规模数据。而R语言可能受限于单机内存和计算能力
120+ 2 . 熟练度:若你的团队熟悉Python,PySpark更好;对R熟悉,继续使用R语言
121+ 3 . 生态系统:Spark生态系统提供许多额外库和工具,如Spark Streaming和GraphX,这些库和工具可以与PySpark无缝集成。而R语言的生态系统也有一些类似的库和工具,但相对来说可选择性就更少
122122
123123## 4 深入理解
124124
125- Dataset是一个分布式数据集,提供RDD强类型和使用强大的lambda函数的能力,并结合了Spark SQL优化的执行引擎。Dataset可以从JVM对象构建而成,并通过函数式转换(如map、flatMap、filter等)进行操作。Scala和Java都支持Dataset API,但Python没有对Dataset API提供支持。由于Python是一种动态语言,许多Dataset API的优点已经自然地可用,例如可以通过名称访问行的字段。R语言也有类似的特点。
125+ Dataset,一个分布式数据集,提供RDD强类型和使用强大的lambda函数能力,并结合了Spark SQL优化的执行引擎。
126+
127+ 可从JVM对象构建而成,并通过函数式转换(如map、flatMap、filter等)进行操作。Scala和Java都支持Dataset API,但Python没有支持。由于Python是一种动态语言,许多Dataset API的优点已经自然地可用,例如可以通过名称访问行的字段。R语言也有类似的特点。
126128
127129DataFrame,具有命名列的Dataset,类似:
128130
129131- 关系数据库中的表
130132- Python中的数据框
131133
132- 但内部有更多优化功能。DataFrame可从各种数据源构建,如:
134+ 但内部有更多优化功能。DataFrame可从各种数据源构建,如:
133135
134136- 结构化数据文件
135137- Hive表
@@ -145,7 +147,7 @@ DataFrame API 在 Scala、Java、Python 和 R 都可用。在Scala和Java中,D
145147
146148## 5 实战
147149
148- People.json
150+ ### 5.1 数据文件: People.json
149151
150152``` json
151153{"name" :" Michael" }
@@ -162,14 +164,14 @@ object DataFrameAPIApp {
162164
163165 def main (args : Array [String ]): Unit = {
164166
167+ val projectRootPath = " /Users/javaedge/Downloads/soft/sparksql-train"
165168 val spark = SparkSession .builder()
166169 .master(" local" ).appName(" DataFrameAPIApp" )
167170 .getOrCreate()
168171 import spark .implicits ._
169172
170173
171- val people : DataFrame = spark.read.json(
172- " /Users/javaedge/Downloads/sparksql-train/data/people.json" )
174+ val people : DataFrame = spark.read.json(projectRootPath + " /data/people.json" )
173175
174176 // 查看DF的内部结构:列名、列的数据类型、是否可以为空
175177 people.printSchema()
@@ -178,8 +180,6 @@ object DataFrameAPIApp {
178180 people.show()
179181 }
180182}
181-
182-
183183output:
184184root
185185 |-- age : long (nullable = true )
195195```
196196
197197``` scala
198- // DF里面有两列 ,只要name列 ==> select name from people
199- // 两个 API 一样的 ,只是参数不同,使用稍有不同
198+ // DF里有两列 ,只要name列 ==> select name from people
199+ // 两个 API 一样 ,只是参数不同,使用稍不同
200200people.select(" name" ).show()
201201people.select($" name" ).show()
202202
@@ -216,7 +216,6 @@ output:
216216people.filter($" age" > 21 ).show()
217217people.filter(" age > 21" ).show()
218218
219-
220219output:
221220+---+----+
222221| age| name|
@@ -239,40 +238,76 @@ output:
239238+----+-----+
240239```
241240
242- ### createOrReplaceTempView
241+ ### 5.2 createOrReplaceTempView
243242
244- 若现在,我就想完全使用 SQL 查询了,怎么实现 DF 到表的转换呢 ?
243+ 若想完全用 SQL 查询,咋实现 DF 到表的转换 ?
245244
246- Spark SQL用来将一个 DataFrame 注册成一个临时表(Temporary Table)的方法。之后可使用 Spark SQL 语法及已注册的表名对 DataFrame 进行查询和操作。
245+ Spark SQL用来将一个 DataFrame 注册成一个临时表(Temporary Table)的方法。之后可用 Spark SQL 语法及已注册的表名对 DataFrame 进行查询和操作。
247246
248- 允许为 DataFrame 指定一个名称,并将其保存为一个临时表。该表只存在于当前 SparkSession 的上下文,不会在元数据存储中注册表,也不会在磁盘创建任何文件。因此,临时表在SparkSession终止后就会被删 。
247+ 允许为 DataFrame 指定一个名称,并将其保存为一个临时表。该表只存在于当前 SparkSession 的上下文,不会在元数据存储中注册表,也不会在磁盘创建任何文件。因此,临时表在SparkSession终止后被删 。
249248
250- 一旦临时表被注册,就可使用 SQL 或 DSL 对其查询。如 :
249+ 一旦临时表被注册,可用 SQL 或 DSL 对其查询:
251250
252- ``` sql
251+ ``` scala
252+ // 使用给定名称创建本地临时视图。此临时视图的生存期与用于创建此数据集的 SparkSession 生存期相关联
253253people.createOrReplaceTempView(" people" )
254-
255254spark.sql(" select name from people where age > 21" ).show()
256255```
257256
258- ### 大文件处理
257+ ### 5.3 大文件处理
259258
260259``` scala
261- val zips : DataFrame = spark.read.json(" /Users/javaedge/Downloads/sparksql-train /data/zips.json" )
260+ val zips : DataFrame = spark.read.json(projectRootPath + " /data/zips.json" )
262261zips.printSchema()
263262
264263zips.show(5 )
265264```
266265
267- loc信息没用展示全,超过一定长度就使用...来展示,默认只显示前20条:show() ==> show(20) ==> show(numRows, truncate = true)
268-
266+ loc信息没用展示全,超过一定长度就使用...展示,默认只显示前20条:
269267
268+ - show() ==>
269+ - show(20) ==>
270+ - show(numRows, truncate = true)
270271
271- ![ ] ( https://img-blog.csdnimg.cn/26fb9ff918b346e58da51e8b1e293cd7.png )
272+ ``` bash
273+ root
274+ | -- _id: string (nullable = true)
275+ | -- city: string (nullable = true)
276+ | -- loc: array (nullable = true)
277+ | | -- element: double (containsNull = true)
278+ | -- pop: long (nullable = true)
279+ | -- state: string (nullable = true)
280+
281+ +-----+-----------+--------------------+-----+-----+
282+ | _id| city| loc| pop| state|
283+ +-----+-----------+--------------------+-----+-----+
284+ | 01001| AGAWAM| [-72.622739, 42.0...| 15338| MA|
285+ | 01002| CUSHMAN| [-72.51565, 42.37...| 36963| MA|
286+ | 01005| BARRE| [-72.108354, 42.4...| 4546| MA|
287+ | 01007| BELCHERTOWN| [-72.410953, 42.2...| 10579| MA|
288+ | 01008| BLANDFORD| [-72.936114, 42.1...| 1240| MA|
289+ +-----+-----------+--------------------+-----+-----+
290+ only showing top 5 rows
291+ ` ` `
272292
273293不想被截断就这样:
274294
275- ![ ] ( https://img-blog.csdnimg.cn/5a8a8a4cdf66418c88a28a6acfc62c6b.png )
295+ ` ` ` scala
296+ zips.show(5, truncate = false)
297+ ` ` `
298+
299+ ` ` ` bash
300+ +-----+-----------+-----------------------+-----+-----+
301+ | _id | city | loc | pop | state|
302+ +-----+-----------+-----------------------+-----+-----+
303+ | 01001| AGAWAM | [-72.622739, 42.070206]| 15338| MA |
304+ | 01002| CUSHMAN | [-72.51565, 42.377017] | 36963| MA |
305+ | 01005| BARRE | [-72.108354, 42.409698]| 4546 | MA |
306+ | 01007| BELCHERTOWN| [-72.410953, 42.275103]| 10579| MA |
307+ | 01008| BLANDFORD | [-72.936114, 42.182949]| 1240 | MA |
308+ +-----+-----------+-----------------------+-----+-----+
309+ only showing top 5 rows
310+ ` ` `
276311
277312` ` ` scala
278313zips.head(3).foreach(println)
@@ -282,13 +317,17 @@ zips.take(5)
282317
283318Output:
284319
285- ![ ] ( https://img-blog.csdnimg.cn/0afe2e5405ba445387ef1bfb5d342af8.png )
320+ ` ` ` bash
321+ [01001,AGAWAM,WrappedArray(-72.622739, 42.070206),15338,MA]
322+ [01002,CUSHMAN,WrappedArray(-72.51565, 42.377017),36963,MA]
323+ [01005,BARRE,WrappedArray(-72.108354, 42.409698),4546,MA]
324+ ` ` `
286325
287326# #### head(n: Int)
288327
289- Spark的DataFrame API中的一个方法,可以返回一个包含前n行数据的数组。这个方法通常用于快速检查一个DataFrame的前几行数据 ,以了解数据集的大致结构和内容。
328+ Spark的DataFrame API中的一个方法,返回一个包含前n行数据的数组。通常用于快速检查一个DataFrame的前几行数据 ,以了解数据集的大致结构和内容。
290329
291- - 先对DataFrame使用` .limit(n) ` 方法 ,限制返回行数前n行
330+ - 先对DataFrame使用` .limit(n)` ,限制返回行数前n行
292331- 然后使用` queryExecution` 方法生成一个Spark SQL查询计划
293332- 最后使用` collectFromPlan` 方法收集数据并返回一个包含前n行数据的数组
294333
@@ -334,7 +373,7 @@ output:
334373+-----+------------+-----+-----+
335374` ` `
336375
337- 可惜啊 ,我不会写代码,可以使用 MySQL 语法吗?
376+ 可惜 ,我不会写代码,懒得记这些 API,可用 MySQL 语法吗?
338377
339378` ` ` scala
340379zips.createOrReplaceTempView(" zips" )
@@ -344,29 +383,37 @@ spark.sql("select _id,city,pop,state" +
344383 " limit 10" ).show()
345384` ` `
346385
347- ### import spark.implicits._ 作用
386+ # ## 5.4 import spark.implicits._
387+
388+ Scala中用Spark进行数分时经常用到,为将隐式转换函数导入当前作用域。这些隐式转换函数包含许多DataFrame、Dataset的转换方法,如将RDD转为DataFrame或将元组转换为Dataset等。
348389
349- 在Scala中使用Apache Spark进行数据分析时经常用到的,它的作用是将隐式转换函数导入当前作用域中。这些隐式转换函数包含了许多DataFrame和Dataset的转换方法,例如将RDD转换为DataFrame或将元组转换为Dataset等 。
390+ 这行代码使用SparkSession对象中的implicits属性,该属性返回一个类型为org.apache.spark.sql.SQLImplicits的实例,用于将常见 Scala 对象转换为 Datasets 的隐式方法的集合 。
350391
351- 具体来说,这行代码使用了SparkSession对象中的implicits属性,该属性返回了一个类型为org.apache.spark.sql.SQLImplicits的实例。 通过调用该实例的方法,可以将各种Scala数据类型 (如case class、元组等)与Spark SQL中的数据类型(如Row、DataFrame、Dataset等)之间进行转换 ,从而方便地进行数据操作和查询。
392+ 通过调用该实例的方法,可将各种Scala数据类型 (如case class、元组等)与Spark SQL中的数据类型(如Row、DataFrame、Dataset等)之间转换 ,从而方便地进行数据操作和查询。
352393
353394在使用许多Spark SQL API的时候,往往需要使用这行代码将隐式转换函数导入当前上下文,以获得更加简洁和易于理解的代码编写方式。
354395
355- #### 如果不导入会咋样
396+ # ### 若不导入,会咋样?
397+
398+ 若不导入` spark.implicits._` 会导致编译错误或运行时异常。因为进行DataFrame和Dataset操作时,需用到一些隐式转换函数。不导入,则这些隐式转换函数无法被自动引入当前上下文,就需手动导入这些函数,编码麻烦。
399+
400+ 如进行RDD和DataFrame之间的转换时,如不导入` spark.implicits._` ,则需手导:
356401
357- 如果不导入` spark.implicits._ ` 会导致编译错误或者运行时异常。因为在进行DataFrame和Dataset的操作时,需要使用到一些隐式转换函数。如果没有导入` spark.implicits._ ` ,则这些隐式转换函数无法被自动引入当前上下文,就需要手动地导入这些函数,这样会使编码变得比较麻烦。
402+ - ` org.apache.spark.sql.Row`
403+ - ` ` org.apache.spark.sql.functions._` 等包
404+ - 并通过调用`toDF ()` 方法将RDD转换为DataFrame
358405
359- 例如,在进行RDD和DataFrame之间的转换时,如果不导入 ` spark.implicits._ ` ,则需要手动导入 ` org.apache.spark.sql.Row ` 以及 ` org.apache.spark.sql.functions._ ` 等包,并通过调用 ` toDF() ` 方法将RDD转换为DataFrame。 而有了导入` spark.implicits._ ` 后,只需要直接调用RDD对象的 ` toDF() ` 方法即可完成转换 。
406+ 而有了导入`spark.implicits._`后,只需直接调用RDD对象的 `toDF ()` 即完成转换 。
360407
361- 因此,为了简化编码,通常会在Scala中使用Spark SQL时导入` spark.implicits._ ` ,从而获得更加简洁易读的代码 。
408+ 因此,为简化编码,通常在Scala中用Spark SQL时导入` spark.implicits._` 。
362409
363410# ### 案例
364411
365412` ` ` scala
366413people.select($"name").show ()
367414` ` `
368415
369- 如果不导入 ` spark.implicits._ ` ,则可以手动创建一个 ` Column ` 对象来进行筛选操作。例如,可以使用 ` col ` 函数来创建一个 ` Column ` 对象,然后在 ` select ` 方法中使用该列 :
416+ 不导入 ` spark.implicits._` ,可手动创建一个 ` Column` 对象来进行筛选操作。如用 ` col` 函数创建一个 ` Column` 对象,然后在 ` select` 使用该列 :
370417
371418` ` ` scala
372419import org.apache.spark.sql.functions.col
@@ -375,4 +422,4 @@ val selected = people.select(col("name"))
375422selected.show ()
376423` ` `
377424
378- 这样就可以实现与 ` people.select($"name").show() ` 相同的效果,但需要手动创建 ` Column ` 对象。显然,在编写复杂的数据操作时,手动创建 ` Column ` 对象可能会变得非常繁琐和困难,因此通常情况下我们会选择使用隐式转换函数,从而更加方便地使用DataFrame的API 。
425+ 即可实现与 `people.select($"name").show ()` 相同效果,但需手动创建 ` Column` 对象。显然,编写复杂数据操作时,手创 ` Column` 对象非常繁琐,因此通常选用隐式转换函数,从而更方便使用DataFrame的API 。
0 commit comments