Search This Blog

Wednesday, 10 April 2019

CH2/3 DataFrames and SQL Example, Execution plans


  • Spark can run the same transformations, regardless of the language, in the exact same way. We can express your business logic in SQL or DataFrames (either in R, Python, Scala, or Java) and Spark will compile that logic down to an underlying plan (that you can see in the explain plan) before actually executing your code.
     
  • With Spark SQL, we can register any DataFrame as a table or view (a temporary table) and query it using pure SQL. There is no performance difference between writing SQL queries or writing DataFrame code, they both “compile” to the same underlying plan that we specify in DataFrame code.

  • We can convert any DataFrame into a table or view with one simple method call.

scala> flightData2015.createOrReplaceTempView("flight_data_2015")

To query the table we use spark.sql function. It  returns a new DataFrame.

SQL query against a DataFrame returns another DataFrame.  This makes it possible for you to specify transformations in the manner most convenient to you at any given point in time and not sacrifice any efficiency to do so.
 
Have a look at the below explain plans. We can see that they are identical.


scala>     val flightData2015 = spark.read.option("inferSchema", "true").option("header", "true").csv("data/flight-data/csv/2015-summary.csv")
flightData2015: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]

scala>     flightData2015.createOrReplaceTempView("flight_data_2015")

scala>     val sqlWay = spark.sql(s"""SELECT DEST_COUNTRY_NAME, count(1)
     |                            FROM flight_data_2015
     |                            GROUP BY DEST_COUNTRY_NAME""")
sqlWay: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string, count(1): bigint]

scala>     val dataFrameWay = flightData2015.groupBy("DEST_COUNTRY_NAME").count()
dataFrameWay: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string, count: bigint]

scala>     sqlWay.explain
== Physical Plan ==
*(2) HashAggregate(keys=[DEST_COUNTRY_NAME#27], functions=[count(1)])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#27, 3000)
   +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#27], functions=[partial_count(1)])
      +- *(1) FileScan csv [DEST_COUNTRY_NAME#27] Batched: false, Format: CSV, Location: InMemoryFileIndex[xxxxxxx/data/flight-data/csv/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>

scala>     dataFrameWay.explain
== Physical Plan ==
*(2) HashAggregate(keys=[DEST_COUNTRY_NAME#27], functions=[count(1)])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#27, 3000)
   +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#27], functions=[partial_count(1)])
      +- *(1) FileScan csv [DEST_COUNTRY_NAME#27] Batched: false, Format: CSV, Location: InMemoryFileIndex[xxxxxxxdata/flight-data/csv/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>

  • Following simply shows few more examples of using DataFrame functions and SQL
    Underlying execution plans are the same.:

scala> spark.sql("SELECT max(count) from flight_data_2015").take(1)
res5: Array[org.apache.spark.sql.Row] = Array([370002])

scala> flightData2015.select(max("count")).take(1)
res6: Array[org.apache.spark.sql.Row] = Array([370002])
scala> val maxSql = spark.sql("""
     | SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
     | FROM flight_data_2015
     | GROUP BY DEST_COUNTRY_NAME
     | ORDER BY sum(count) DESC
     | LIMIT 5
     | """)
maxSql: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string, destination_total: bigint]

scala>

scala> maxSql.show()
+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+
scala> flightData2015.groupBy("DEST_COUNTRY_NAME").sum("count").withColumnRenamed("sum(count)", "destination_total").sort(desc("destination_total")).limit(5).show()
+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+

Following is how the explain plan looks like:

scala> flightData2015.groupBy("DEST_COUNTRY_NAME").sum("count").withColumnRenamed("sum(count)", "destination_total").sort(desc("destination_total")).limit(5).explain
== Physical Plan ==
TakeOrderedAndProject(limit=5, orderBy=[destination_total#125L DESC NULLS LAST], output=[DEST_COUNTRY_NAME#27,destination_total#125L])
+- *(2) HashAggregate(keys=[DEST_COUNTRY_NAME#27], functions=[sum(cast(count#29 as bigint))])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#27, 3000)
      +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#27], functions=[partial_sum(cast(count#29 as bigint))])
         +- *(1) FileScan csv [DEST_COUNTRY_NAME#27,count#29] Batched: false, Format: CSV, Location: InMemoryFileIndex[xxxxxxxdata/flight-data/csv/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,count:int>

Execution plan is a directed acyclic graph (DAG) of transformations, each resulting in a new immutable DataFrame, on which we call an action to generate a result.
 
  • The first step is to read in the data. Spark does not actually read it in until an action is called on that DataFrame or one derived from the original DataFrame.
  • The second step is our grouping(groupBy). After grouping we have a RelationalGroupedDataset, which is a fancy name for a DataFrame that has a grouping specified but needs the user to specify an aggregation before it can be queried further

scala> flightData2015.groupBy("DEST_COUNTRY_NAME")
res13: org.apache.spark.sql.RelationalGroupedDataset = RelationalGroupedDataset: [grouping expressions: [DEST_COUNTRY_NAME: string], value: [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field], type: GroupBy]

  • Third step is to specify the aggregation. The result of the sum method call is a new DataFrame.

scala> flightData2015.groupBy("DEST_COUNTRY_NAME").sum("count")
res15: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string, sum(count): bigint]

  • The fourth step is a simple renaming. We use the withColumnRenamed method that takes two arguments, the original column name and the new column name.

  • The fifth step sorts the data such that if we were to take results off of the top of the DataFrame, they would have the largest values in the destination_total column. We used a function called "desc".The function desc does not return a string but a Column. In general, many DataFrame methods will accept strings (as column names) or Column types or expressions. Columns and expressions are actually the exact same thing.

scala> desc("destination_total")
res16: org.apache.spark.sql.Column = destination_total DESC NULLS LAST

  • Penultimately, we’ll specify a limit. This just specifies that we only want to return the first five values in our final DataFrame instead of all the data.
     
  • The last step is our action! Now we actually begin the process of collecting the results of our DataFrame, and Spark will give us back a list or array in the language that we’re executing.

Although this explain plan doesn’t match our exact “conceptual plan,” all of the pieces are there. You can see the limit statement as well as the orderBy (in the first line). You can also see how our aggregation happens in two phases, in the partial_sum calls. This is because summing a list of numbers is commutative, and Spark can perform the sum, partition by partition. Of course we can see how we read in the DataFrame, as well.
Naturally, we don’t always need to collect the data. We can also write it out to any data source that Spark supports.

No comments:

Post a Comment