Spark SQL: Relational Data Processing in Spark (2015)
April 14, 2024Spark SQL provides a DataFrame API, an abstract data type equivalent to a table in a relational database. The DataFrame objects can be manipulated in a manner consistent with relational algebra. For example, the API includes various relational operators such as where and group by, similar to data frames in R and Python. The DataFrame objects are evaluated lazily, meaning they are not evaluated until certain output operations, like count, are performed. Catalyst, an optimizer written in Scala, optimizes these lazy queries and then compiles them into Java bytecode.
The DataFrame API is a DSL similar to data frames in R and Python. For instance, the Scala code below counts the number of users under 21 using the DataFrame API. In this code, users and young are DataFrames.
ctx = new HiveContext()
users = ctx.table("users")
young = users.where(users("age") < 21)
println(young.count())
Catalyst generates Java bytecode with quasiquotes, allowing for the programmatic construction of ASTs and their compilation at runtime. This optimizer supports both rule-based and cost-based optimizations. The queries are transformed into ASTs, and rule-based optimizations are applied to the tree, followed by cost-based optimization.
A rule is a function that maps a tree to another one, which can be computed more efficiently. Pattern matching in Scala can be used to implement such a rule. For example, the code below transforms an add operation between constants into a constant representation.
tree.transform {
case Add(Literal(c1), Literal(c2)) => Literal(c1+c2)
}
As of 2015, cost-based optimization was only used to select the join algorithm. Spark SQL processed small relations with a broadcast join, utilizing a peer-to-peer broadcast facility available in Spark.