Spark SQL: Relational Data Processing in Spark (2015)
April 14, 2024Spark SQLは、関係データベースのテーブルにあたる抽象データ構造のDataFrameを公開し、Spark上で関係代数によるデータ処理を実現する。
DataFrameに対する操作は、saveのような出力まで遅延される。
遅延により蓄積したクエリは、最適化器Catalystで最適化された後にバイトコードにコンパイルされる。
DataFrameのAPIは、RやPythonのdata frameのDSLに似ている。
以下のコードは、usersとyoungをDataFrameとして、条件に該当するレコードの数を出力する。
ctx = new HiveContext()
users = ctx.table(" users ")
young = users.where( users("age") < 21)
println(young.count())
Catalystは、Scalaで実装されており、quasiquotesでクエリのバイトコードを出力する。 Catalystがクエリを最適化する方法には、ルールベースとコストベースの2つがある。 最適化の前にクエリは構文木に変換され、ルールベースの後にコストベースの最適化が適用される。
ルールは、構文木を別の構文木に変換する関数であり、通常パターンマッチで実装される。 たとえば、定数の加算を定数に最適化するルールは、以下のパターンマッチのように定義できる。
tree.transform {
case Add(Literal(c1), Literal(c2)) => Literal(c1+c2)
}
コストベースの最適化は、処理するデータ量によってjoinの方法を変える。 たとえば、データ量が少なければbroadcast joinが選択される。