Skip to content

Commit 1d6d53b

Browse files
committed
docs:更新spark专栏
1 parent f2eb13e commit 1d6d53b

File tree

2 files changed

+110
-0
lines changed

2 files changed

+110
-0
lines changed

docs/.vuepress/config.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -970,6 +970,7 @@ module.exports = {
970970
"01-Spark的Local模式与应用开发入门",
971971
"03-SparkSQL入门",
972972
"04-SparkSQL的API编程之DataFrame",
973+
"05-快速理解SparkSQL的DataSet",
973974
]
974975
},
975976
],
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
# 05-快速理解SparkSQL的DataSet
2+
3+
## 1 定义
4+
5+
一个数据集是分布式的数据集合。Spark 1.6增加新接口Dataset,提供
6+
7+
- RDD的优点:强类型、能够使用强大lambda函数
8+
- Spark SQL优化执行引擎的优点
9+
10+
可从JVM对象构造Dataset,然后函数式转换(map、flatMap、filter等)操作。Dataset API在Scala和Java中可用。
11+
12+
Python不支持Dataset API,但由于Python动态性质,许多Dataset API优点已经能使用(可通过名称自然访问行的字段row.columnName)。R的情况类似。
13+
14+
> Python支持DataFrame API是因为DataFrame API是基于Python#Pandas库构建,而Pandas库提供强大易用的数据分析工具集。因此,Spark提供对Pandas DataFrame对象的支持,使Python使用DataFrame API非常方便。Python的Pandas也提供强类型保证,使Spark可在保持动态特性同时提供类型检查和类型推断。因此,虽Python不支持Spark的Dataset API,但它支持Spark的DataFrame API,这为Python用户提供一种方便的数据处理方式。
15+
16+
## 2 案例
17+
18+
```scala
19+
package com.javaedge.bigdata.cp04
20+
21+
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
22+
23+
object DatasetApp {
24+
25+
def main(args: Array[String]): Unit = {
26+
val projectRootPath = "/Users/javaedge/Downloads/soft/sparksql-train"
27+
val spark = SparkSession.builder()
28+
.master("local").appName("DatasetApp")
29+
.getOrCreate()
30+
import spark.implicits._
31+
32+
// 创建一个包含一条记录的Seq,这条记录包含一个名为 "JavaEdge" 年龄为 18 的人员信息
33+
val ds: Dataset[Person] = Seq(Person("JavaEdge", "18"))
34+
// 将Seq转换为一个Dataset[Person]类型数据集,该数据集只包含一条记录
35+
.toDS()
36+
ds.show()
37+
38+
val primitiveDS: Dataset[Int] = Seq(1, 2, 3).toDS()
39+
primitiveDS.map(x => x + 1).collect().foreach(println)
40+
41+
val peopleDF: DataFrame = spark.read.json(projectRootPath + "/data/people.json")
42+
val peopleDS: Dataset[Person] = peopleDF.as[Person]
43+
peopleDS.show(false)
44+
peopleDF.select("name").show()
45+
peopleDS.map(x => x.name).show()
46+
47+
spark.stop()
48+
}
49+
50+
/**
51+
* 自定义的 case class,其中包含两个属性
52+
*/
53+
private case class Person(name: String, age: String)
54+
55+
}
56+
57+
output:
58+
+--------+---+
59+
| name|age|
60+
+--------+---+
61+
|JavaEdge| 18|
62+
+--------+---+
63+
64+
2
65+
3
66+
4
67+
+----+-------+
68+
|age |name |
69+
+----+-------+
70+
|null|Michael|
71+
|30 |Andy |
72+
|19 |Justin |
73+
+----+-------+
74+
75+
+-------+
76+
| name|
77+
+-------+
78+
|Michael|
79+
| Andy|
80+
| Justin|
81+
+-------+
82+
83+
+-------+
84+
| value|
85+
+-------+
86+
|Michael|
87+
| Andy|
88+
| Justin|
89+
+-------+
90+
```
91+
92+
## 3 DataFrame V.S Dataset
93+
94+
```scala
95+
val peopleDF: DataFrame = spark.read.json(projectRootPath + "/data/people.json")
96+
val peopleDS: Dataset[Person] = peopleDF.as[Person]
97+
peopleDS.show(false)
98+
```
99+
100+
```scala
101+
// 弱语言类型,运行时才报错
102+
peopleDF.select("nameEdge").show()
103+
```
104+
105+
![](https://codeselect.oss-cn-shanghai.aliyuncs.com/image-20240324223437191.png)
106+
107+
编译期报错:
108+
109+
![](https://codeselect.oss-cn-shanghai.aliyuncs.com/image-20240324223553844.png)

0 commit comments

Comments
 (0)