- Spark can run
the same transformations, regardless of the language, in the exact same
way. We can express your business logic in SQL or DataFrames
(either in R, Python, Scala, or Java) and Spark will compile that logic down
to an underlying plan (that you can see in the explain plan) before
actually executing your code.
- With Spark SQL, we can register any DataFrame as a table or view (a temporary table) and query it using pure SQL. There is no performance difference between writing SQL queries or writing DataFrame code, they both “compile” to the same underlying plan that we specify in DataFrame code.
- We can convert any DataFrame into a table or view with one simple method call.
scala>
flightData2015.createOrReplaceTempView("flight_data_2015")
To query the table we use spark.sql function. It
returns a new DataFrame.
SQL query against a DataFrame returns another DataFrame. This makes it
possible for you to specify transformations in the manner most convenient to
you at any given point in time and not sacrifice any efficiency to do so.
Have a look at the below explain plans. We can see that they are
identical.
|
scala> val
flightData2015 = spark.read.option("inferSchema",
"true").option("header",
"true").csv("data/flight-data/csv/2015-summary.csv")
flightData2015: org.apache.spark.sql.DataFrame =
[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]
scala>
flightData2015.createOrReplaceTempView("flight_data_2015")
scala> val sqlWay =
spark.sql(s"""SELECT DEST_COUNTRY_NAME, count(1)
| FROM
flight_data_2015
| GROUP BY
DEST_COUNTRY_NAME""")
sqlWay: org.apache.spark.sql.DataFrame =
[DEST_COUNTRY_NAME: string, count(1): bigint]
scala> val dataFrameWay =
flightData2015.groupBy("DEST_COUNTRY_NAME").count()
dataFrameWay: org.apache.spark.sql.DataFrame =
[DEST_COUNTRY_NAME: string, count: bigint]
scala>
sqlWay.explain
== Physical Plan ==
*(2) HashAggregate(keys=[DEST_COUNTRY_NAME#27],
functions=[count(1)])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#27,
3000)
+- *(1)
HashAggregate(keys=[DEST_COUNTRY_NAME#27], functions=[partial_count(1)])
+- *(1)
FileScan csv [DEST_COUNTRY_NAME#27] Batched: false, Format: CSV, Location:
InMemoryFileIndex[xxxxxxx/data/flight-data/csv/2015-summary.csv],
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>
scala>
dataFrameWay.explain
== Physical Plan ==
*(2) HashAggregate(keys=[DEST_COUNTRY_NAME#27],
functions=[count(1)])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#27,
3000)
+- *(1)
HashAggregate(keys=[DEST_COUNTRY_NAME#27], functions=[partial_count(1)])
+- *(1)
FileScan csv [DEST_COUNTRY_NAME#27] Batched: false, Format: CSV, Location:
InMemoryFileIndex[xxxxxxxdata/flight-data/csv/2015-summary.csv],
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>
|
- Following simply shows few
more examples of using DataFrame functions and SQL
Underlying execution plans are the same.:
|
scala>
spark.sql("SELECT max(count) from flight_data_2015").take(1)
res5: Array[org.apache.spark.sql.Row] =
Array([370002])
|
scala>
flightData2015.select(max("count")).take(1)
res6: Array[org.apache.spark.sql.Row] =
Array([370002])
|
|
scala> val
maxSql = spark.sql("""
| SELECT DEST_COUNTRY_NAME, sum(count)
as destination_total
| FROM flight_data_2015
| GROUP BY DEST_COUNTRY_NAME
| ORDER BY sum(count) DESC
| LIMIT 5
| """)
maxSql: org.apache.spark.sql.DataFrame =
[DEST_COUNTRY_NAME: string, destination_total: bigint]
scala>
scala> maxSql.show()
+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
| United
States| 411352|
|
Canada| 8399|
|
Mexico| 7140|
| United
Kingdom| 2025|
|
Japan| 1548|
+-----------------+-----------------+
|
scala> flightData2015.groupBy("DEST_COUNTRY_NAME").sum("count").withColumnRenamed("sum(count)",
"destination_total").sort(desc("destination_total")).limit(5).show()
+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
| United
States| 411352|
|
Canada| 8399|
|
Mexico| 7140|
| United
Kingdom| 2025|
|
Japan| 1548|
+-----------------+-----------------+
|
Following is how the explain plan looks like:
|
scala>
flightData2015.groupBy("DEST_COUNTRY_NAME").sum("count").withColumnRenamed("sum(count)",
"destination_total").sort(desc("destination_total")).limit(5).explain
== Physical Plan ==
TakeOrderedAndProject(limit=5,
orderBy=[destination_total#125L DESC NULLS LAST],
output=[DEST_COUNTRY_NAME#27,destination_total#125L])
+- *(2) HashAggregate(keys=[DEST_COUNTRY_NAME#27],
functions=[sum(cast(count#29 as bigint))])
+- Exchange
hashpartitioning(DEST_COUNTRY_NAME#27, 3000)
+- *(1)
HashAggregate(keys=[DEST_COUNTRY_NAME#27], functions=[partial_sum(cast(count#29
as bigint))])
+-
*(1) FileScan csv [DEST_COUNTRY_NAME#27,count#29] Batched: false, Format:
CSV, Location:
InMemoryFileIndex[xxxxxxxdata/flight-data/csv/2015-summary.csv],
PartitionFilters: [], PushedFilters: [], ReadSchema:
struct<DEST_COUNTRY_NAME:string,count:int>
|
Execution plan is a directed acyclic graph (DAG) of
transformations, each resulting in a new immutable DataFrame, on which we call
an action to generate a result.
- The first step is to read in the data. Spark does not actually read it in until an action is called on that DataFrame or one derived from the original DataFrame.
- The second step is our grouping(groupBy). After grouping we have a RelationalGroupedDataset, which is a fancy name for a DataFrame that has a grouping specified but needs the user to specify an aggregation before it can be queried further
|
scala>
flightData2015.groupBy("DEST_COUNTRY_NAME")
res13: org.apache.spark.sql.RelationalGroupedDataset = RelationalGroupedDataset: [grouping expressions:
[DEST_COUNTRY_NAME: string], value: [DEST_COUNTRY_NAME: string,
ORIGIN_COUNTRY_NAME: string ... 1 more field], type: GroupBy]
|
- Third step is to specify the aggregation. The result of the sum method call is a new DataFrame.
|
scala>
flightData2015.groupBy("DEST_COUNTRY_NAME").sum("count")
res15: org.apache.spark.sql.DataFrame =
[DEST_COUNTRY_NAME: string, sum(count): bigint]
|
- The fourth step is a simple renaming. We use the withColumnRenamed method that takes two arguments, the original column name and the new column name.
- The fifth step sorts the data such that if we were to take results off of the top of the DataFrame, they would have the largest values in the destination_total column. We used a function called "desc".The function desc does not return a string but a Column. In general, many DataFrame methods will accept strings (as column names) or Column types or expressions. Columns and expressions are actually the exact same thing.
|
scala> desc("destination_total")
res16: org.apache.spark.sql.Column = destination_total DESC NULLS LAST
|
- Penultimately, we’ll specify a
limit. This just specifies that we only want to return the first five
values in our final DataFrame instead of all the data.
- The last step is our action! Now we actually begin the process of collecting the results of our DataFrame, and Spark will give us back a list or array in the language that we’re executing.
Although this explain plan doesn’t match our exact
“conceptual plan,” all of the pieces are there. You can see
the limit statement as well as the orderBy (in the first
line). You can also see how our aggregation
happens in two phases, in the partial_sum calls. This is because summing a list of numbers is
commutative, and Spark can perform the sum, partition by partition. Of course
we can see how we read in the DataFrame, as well.
Naturally, we don’t always need to collect the data. We can also
write it out to any data source that Spark supports.