Spark is a DISC (data-intensive scalable computing) system that is written in Scala, a functional programming language that is compiled to Java byte code. In contrast to Hadoop MapReduce which uses excessive materialization for fault tolerance, Spark relies on logical logging (lineage) to keep track of how a data chunk was produced and if a chunk is lost, reruns this computation. Per default Spark processes data in main memory and only spills to disk if necessary.
Spark provides two main abstractions for data: datasets and dataframes. Fault tolerance is based on RDDs (Resilient Distributed Dataset) which are datasets where for each chunk we record its lineage (how it was produced from input chunks). With the exception of a few operations, namely shuffle, where one chunk may depend on many or even all input chunks, this is an effective methods for ensuring fault tolerance.
Datasets are multisets of objects of a certain type. Any Scala types (or Java or Python types when adapters for these languages are used) can be used as the base type of a set. Datasets can be created from Scala collections or from a variety of other sources, e.g., reading from a file.
- bag of integers
val myints = Seq(1,2,3,4,10,15,1,1,1,3).toDS()
myints.show()
USING JAVA HOME /Library/Java/JavaVirtualMachines/adoptopenjdk-8.jdk/Contents/Home
20/04/21 09:44:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
20/04/21 09:44:30 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Spark context Web UI available at http://192.168.0.7:4041
Spark context available as 'sc' (master = local[*], app id = local-1587480270585).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.5
/_/
Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_252)
Type in expressions to have them evaluated.
Type :help for more information.
scala> Loading /var/folders/f7/xnz8v_b13td37s3k4w8t5j2h0000gn/T/babel-MriqEC/spark-shell-vars-aMz8R3.scala...
myints: org.apache.spark.sql.Dataset[Int] = [value: int]
+-----+
|value|
+-----+
| 1|
| 2|
| 3|
| 4|
| 10|
| 15|
| 1|
| 1|
| 1|
| 3|
+-----+
- list of person objects
case class Person(name: String, age: Int)
val persons = Seq(Person("Peter", 15),Person("Bob",20)).toDS()
persons.show()
defined class Person persons: org.apache.spark.sql.Dataset[Person] = [name: string, age: int] +-----+---+ | name|age| +-----+---+ |Peter| 15| | Bob| 20| +-----+---+
:
DataFrames are essentially relational tables. Fields can still be of any Scala type. Spark provides a higher-level API for running relational algebra-style operations over data frames and even has support for running SQL queries (SparkSQL).
val myintdf = Seq((1),(3),(1),(1),(5)).toDF()
myintdf.show()
myintdf: org.apache.spark.sql.DataFrame = [value: int] +-----+ |value| +-----+ | 1| | 3| | 1| | 1| | 5| +-----+
val personDF = Seq(Person("Peter", 15),Person("Bob",20)).toDS()
personDF.show()
personDF: org.apache.spark.sql.Dataset[Person] = [name: string, age: int] +-----+---+ | name|age| +-----+---+ |Peter| 15| | Bob| 20| +-----+---+
:
Spark dataframes and datasets provide functions for applying operations. The result of calling an operation to a dataset/frame is a new dataframe (Spark’s datasets and dataframes are immutable). Operations on RDDs are classified into transformations and actions.
Transformations are operations that can be executed lazily. For instance, when filtering a dataset, the result of this operation does not need to be computed until it has to be exposed to the user, e.g., when the user requests the result to be visualized or written to a file. Transformations in Spark are lazy: instead of executing a transformation directly when a transformation is applied to a dataset, Spark just records that the resulting dataset is the result of applying the transformation to the input dataset. When several transformations are applied in sequence to a dataset this internally results in the construction of a tree of operators which describe the combined computation of these transformations.
Actions are operations that require the output of the operation to be materialized, e.g., storing the dataset in a file or showing it to the user. When an action is applied to a dataset, then Spark generates an execution plan to materialize the dataset by running all of the transformations involved in its creation.
show- print dataset content
myints.show()
| value |
|---|
| 1 |
| 2 |
| 3 |
| 4 |
| 10 |
| 15 |
| 1 |
| 1 |
| 1 |
| 3 |
mapandreduce
val mappedInts = myints.map( x => x * 2 )
mappedInts.show()
val reducedInts = myints.reduce( (x,y) => x + y )
mappedInts: org.apache.spark.sql.Dataset[Int] = [value: int] +-----+ |value| +-----+ | 2| | 4| | 6| | 8| | 20| | 30| | 2| | 2| | 2| | 6| +-----+ reducedInts: Int = 41
- MR-style reduce (group on function result and then apply reducer to each group’s values). The result of grouping is either a
org.apache.spark.sql.RelationalGroupedDatasetororg.apache.spark.sql.KeyValueGroupedDataset
val intsGrp = myints.groupByKey(x => if (x < 10) 0 else 1) // group into two groups: less than 10 and larger than 10
intsGrp.toString()
val intsReduced = intsGrp.reduceGroups( (x,y) => x+y )
intsReduced.show()
intsGrp: org.apache.spark.sql.KeyValueGroupedDataset[Int,Int] = KeyValueGroupedDataset: [key: [value: int], value: [value: int]] res35: String = KeyValueGroupedDataset: [key: [value: int], value: [value: int]] intsReduced: org.apache.spark.sql.Dataset[(Int, Int)] = [value: int, ReduceAggregator(int): int] +-----+---------------------+ |value|ReduceAggregator(int)| +-----+---------------------+ | 1| 25| | 0| 16| +-----+---------------------+
- filter (
SELECTIONin relational algebra)
val myintsLessThanTen = myints.filter( x => x < 10)
myintsLessThanTen.show()
myintsLessThanTen: org.apache.spark.sql.Dataset[Int] = [value: int] +-----+ |value| +-----+ | 1| | 2| | 3| | 4| | 1| | 1| | 1| | 3| +-----+
- select (
PROJECTIONin relational algebra)$"A"accesses attributeA.as("B")renames the result of an expression asB
val myIntsDuped = myintsLessThanTen.select($"value".as("A"), ($"value" * 2).as("B"))
myIntsDuped.show()
myIntsDuped: org.apache.spark.sql.DataFrame = [A: int, B: int] +---+---+ | A| B| +---+---+ | 1| 2| | 2| 4| | 3| 6| | 4| 8| | 1| 2| | 1| 2| | 1| 2| | 3| 6| +---+---+
- join (
JOINin relational algebra)
case class Address(id: Int, city: String, zip: Int)
case class LivesAt(person: String, addr: Int)
val addressDF = Seq(
Address(1,"Chicago", 60616),
Address(2,"Chicago", 60615),
Address(3, "New York", 55555)
).toDF()
val livesatDF = Seq(
LivesAt("Peter", 1),
LivesAt("Peter",3),
LivesAt("Bob", 1)
).toDF()
personDF.show()
addressDF.show()
livesatDF.show()
val whoLivesWhere = personDF.join(livesatDF, $"name" === $"person").join(addressDF, $"addr" === $"id").select($"name", $"zip")
whoLivesWhere.show()
defined class Address defined class LivesAt addressDF: org.apache.spark.sql.DataFrame = [id: int, city: string ... 1 more field] livesatDF: org.apache.spark.sql.DataFrame = [person: string, addr: int] +-----+---+ | name|age| +-----+---+ |Peter| 15| | Bob| 20| +-----+---+ +---+--------+-----+ | id| city| zip| +---+--------+-----+ | 1| Chicago|60616| | 2| Chicago|60615| | 3|New York|55555| +---+--------+-----+ +------+----+ |person|addr| +------+----+ | Peter| 1| | Peter| 3| | Bob| 1| +------+----+ whoLivesWhere: org.apache.spark.sql.DataFrame = [name: string, zip: int] +-----+-----+ | name| zip| +-----+-----+ |Peter|60616| |Peter|55555| | Bob|60616| +-----+-----+
You can also directly execute SQL code on dataframes when they are registered as tables.
spark.sqlContext.dropTempTable("persons")
spark.sqlContext.dropTempTable("address")
spark.sqlContext.dropTempTable("livesat")
personDF.createTempView("persons")
addressDF.createTempView("address")
livesatDF.createTempView("livesat")
spark.sql("SELECT * FROM persons").show()
| name | age |
|---|---|
| Peter | 15 |
| Bob | 20 |
spark.sql("SELECT name, zip FROM persons p, address a, livesat l WHERE p.name = l.person AND l.addr = a.id").show()
| name | zip |
|---|---|
| Peter | 60616 |
| Peter | 55555 |
| Bob | 60616 |
spark.sql("SELECT count(*) FROM persons").show()
| count(1) |
|---|
| 2 |