Skip to content

Latest commit

 

History

History
379 lines (307 loc) · 10.5 KB

File metadata and controls

379 lines (307 loc) · 10.5 KB

Examples for Data Processing in Spark

Spark

Spark is a DISC (data-intensive scalable computing) system that is written in Scala, a functional programming language that is compiled to Java byte code. In contrast to Hadoop MapReduce which uses excessive materialization for fault tolerance, Spark relies on logical logging (lineage) to keep track of how a data chunk was produced and if a chunk is lost, reruns this computation. Per default Spark processes data in main memory and only spills to disk if necessary.

Datasets and DataFrames

Spark provides two main abstractions for data: datasets and dataframes. Fault tolerance is based on RDDs (Resilient Distributed Dataset) which are datasets where for each chunk we record its lineage (how it was produced from input chunks). With the exception of a few operations, namely shuffle, where one chunk may depend on many or even all input chunks, this is an effective methods for ensuring fault tolerance.

Datasets

Datasets are multisets of objects of a certain type. Any Scala types (or Java or Python types when adapters for these languages are used) can be used as the base type of a set. Datasets can be created from Scala collections or from a variety of other sources, e.g., reading from a file.

  • bag of integers
val myints = Seq(1,2,3,4,10,15,1,1,1,3).toDS()
myints.show()
USING JAVA HOME /Library/Java/JavaVirtualMachines/adoptopenjdk-8.jdk/Contents/Home
20/04/21 09:44:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
20/04/21 09:44:30 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Spark context Web UI available at http://192.168.0.7:4041
Spark context available as 'sc' (master = local[*], app id = local-1587480270585).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.5
      /_/

Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_252)
Type in expressions to have them evaluated.
Type :help for more information.

scala> Loading /var/folders/f7/xnz8v_b13td37s3k4w8t5j2h0000gn/T/babel-MriqEC/spark-shell-vars-aMz8R3.scala...
myints: org.apache.spark.sql.Dataset[Int] = [value: int]
+-----+
|value|
+-----+
|    1|
|    2|
|    3|
|    4|
|   10|
|   15|
|    1|
|    1|
|    1|
|    3|
+-----+

  • list of person objects
case class Person(name: String, age: Int)
val persons = Seq(Person("Peter", 15),Person("Bob",20)).toDS()
persons.show()
defined class Person
persons: org.apache.spark.sql.Dataset[Person] = [name: string, age: int]
+-----+---+
| name|age|
+-----+---+
|Peter| 15|
|  Bob| 20|
+-----+---+

:

DataFrames

DataFrames are essentially relational tables. Fields can still be of any Scala type. Spark provides a higher-level API for running relational algebra-style operations over data frames and even has support for running SQL queries (SparkSQL).

val myintdf = Seq((1),(3),(1),(1),(5)).toDF()
myintdf.show()
myintdf: org.apache.spark.sql.DataFrame = [value: int]
+-----+
|value|
+-----+
|    1|
|    3|
|    1|
|    1|
|    5|
+-----+

val personDF = Seq(Person("Peter", 15),Person("Bob",20)).toDS()
personDF.show()
personDF: org.apache.spark.sql.Dataset[Person] = [name: string, age: int]
+-----+---+
| name|age|
+-----+---+
|Peter| 15|
|  Bob| 20|
+-----+---+

:

Transformations and Actions

Spark dataframes and datasets provide functions for applying operations. The result of calling an operation to a dataset/frame is a new dataframe (Spark’s datasets and dataframes are immutable). Operations on RDDs are classified into transformations and actions.

Transformations

Transformations are operations that can be executed lazily. For instance, when filtering a dataset, the result of this operation does not need to be computed until it has to be exposed to the user, e.g., when the user requests the result to be visualized or written to a file. Transformations in Spark are lazy: instead of executing a transformation directly when a transformation is applied to a dataset, Spark just records that the resulting dataset is the result of applying the transformation to the input dataset. When several transformations are applied in sequence to a dataset this internally results in the construction of a tree of operators which describe the combined computation of these transformations.

Actions

Actions are operations that require the output of the operation to be materialized, e.g., storing the dataset in a file or showing it to the user. When an action is applied to a dataset, then Spark generates an execution plan to materialize the dataset by running all of the transformations involved in its creation.

Example dataset transformations and actions

  • show - print dataset content
myints.show()
value
1
2
3
4
10
15
1
1
1
3
  • map and reduce
val mappedInts = myints.map( x => x * 2 )
mappedInts.show()
val reducedInts = myints.reduce( (x,y) => x + y )
mappedInts: org.apache.spark.sql.Dataset[Int] = [value: int]
+-----+
|value|
+-----+
|    2|
|    4|
|    6|
|    8|
|   20|
|   30|
|    2|
|    2|
|    2|
|    6|
+-----+

reducedInts: Int = 41
  • MR-style reduce (group on function result and then apply reducer to each group’s values). The result of grouping is either a org.apache.spark.sql.RelationalGroupedDataset or org.apache.spark.sql.KeyValueGroupedDataset
val intsGrp = myints.groupByKey(x => if (x < 10) 0 else 1) // group into two groups: less than 10 and larger than 10
intsGrp.toString()
val intsReduced = intsGrp.reduceGroups( (x,y) => x+y )
intsReduced.show()
intsGrp: org.apache.spark.sql.KeyValueGroupedDataset[Int,Int] = KeyValueGroupedDataset: [key: [value: int], value: [value: int]]
res35: String = KeyValueGroupedDataset: [key: [value: int], value: [value: int]]
intsReduced: org.apache.spark.sql.Dataset[(Int, Int)] = [value: int, ReduceAggregator(int): int]
+-----+---------------------+
|value|ReduceAggregator(int)|
+-----+---------------------+
|    1|                   25|
|    0|                   16|
+-----+---------------------+

  • filter (SELECTION in relational algebra)
val myintsLessThanTen = myints.filter( x => x < 10)
myintsLessThanTen.show()
myintsLessThanTen: org.apache.spark.sql.Dataset[Int] = [value: int]
+-----+
|value|
+-----+
|    1|
|    2|
|    3|
|    4|
|    1|
|    1|
|    1|
|    3|
+-----+

  • select (PROJECTION in relational algebra)
    • $"A" accesses attribute A
    • .as("B") renames the result of an expression as B
val myIntsDuped = myintsLessThanTen.select($"value".as("A"), ($"value" * 2).as("B"))
myIntsDuped.show()
myIntsDuped: org.apache.spark.sql.DataFrame = [A: int, B: int]
+---+---+
|  A|  B|
+---+---+
|  1|  2|
|  2|  4|
|  3|  6|
|  4|  8|
|  1|  2|
|  1|  2|
|  1|  2|
|  3|  6|
+---+---+

  • join (JOIN in relational algebra)
case class Address(id: Int, city: String, zip: Int)
case class LivesAt(person: String, addr: Int)

val addressDF = Seq(
  Address(1,"Chicago", 60616),
  Address(2,"Chicago", 60615),
  Address(3, "New York", 55555)
).toDF()

val livesatDF = Seq(
  LivesAt("Peter", 1),
  LivesAt("Peter",3),
  LivesAt("Bob", 1)
).toDF()

personDF.show()
addressDF.show()
livesatDF.show()

val whoLivesWhere = personDF.join(livesatDF, $"name" === $"person").join(addressDF, $"addr" === $"id").select($"name", $"zip")
whoLivesWhere.show()
defined class Address
defined class LivesAt
addressDF: org.apache.spark.sql.DataFrame = [id: int, city: string ... 1 more field]
livesatDF: org.apache.spark.sql.DataFrame = [person: string, addr: int]
+-----+---+
| name|age|
+-----+---+
|Peter| 15|
|  Bob| 20|
+-----+---+

+---+--------+-----+
| id|    city|  zip|
+---+--------+-----+
|  1| Chicago|60616|
|  2| Chicago|60615|
|  3|New York|55555|
+---+--------+-----+

+------+----+
|person|addr|
+------+----+
| Peter|   1|
| Peter|   3|
|   Bob|   1|
+------+----+

whoLivesWhere: org.apache.spark.sql.DataFrame = [name: string, zip: int]
+-----+-----+
| name|  zip|
+-----+-----+
|Peter|60616|
|Peter|55555|
|  Bob|60616|
+-----+-----+

Spark SQL

You can also directly execute SQL code on dataframes when they are registered as tables.

spark.sqlContext.dropTempTable("persons")
spark.sqlContext.dropTempTable("address")
spark.sqlContext.dropTempTable("livesat")
personDF.createTempView("persons")
addressDF.createTempView("address")
livesatDF.createTempView("livesat")

spark.sql("SELECT * FROM persons").show()
nameage
Peter15
Bob20
spark.sql("SELECT name, zip FROM persons p, address a, livesat l WHERE p.name = l.person AND l.addr = a.id").show()
namezip
Peter60616
Peter55555
Bob60616
spark.sql("SELECT count(*) FROM persons").show()
count(1)
2