Search This Blog

Sunday, 14 April 2019

CH7.4 Grouping Set, Rollup, Cube and grouping_id()



Grouping Set:
 
  • Grouping Sets: Used to perform aggregations across multiple groups. Grouping sets are a low-level tool for combining sets of aggregations together. They give you the ability to create arbitrary aggregation in their group-by statements.

  • Grouping sets depend on null values for aggregation levels. If you do not filter-out null values, you will get incorrect results. This applies to cubes, rollups, and grouping sets.

  • The GROUPING SETS operator is only available in SQL. To perform the same in DataFrames, you use the rollup and cube operators—which allow us to get the same results.
     
  • To include the total number of items, regardless of customer or stock code, we simply specify that we would like to aggregate at that level, as well, in our grouping set. (as shown in next example) . This is, effectively, the union of several different groupings together

val dfNoNull = dfWithDate.drop()
dfNoNull.createOrReplaceTempView("dfNoNull")
 
SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull
GROUP BY customerId, stockCode
GROUPING SETS((customerId, stockCode))
ORDER BY CustomerId DESC, stockCode DESC

SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull
GROUP BY customerId, stockCode
GROUPING SETS((customerId, stockCode),())
ORDER BY CustomerId DESC, stockCode DESC

Rollups:
 
  • Grouping Set and group by are for explicit groupings. A rollup is a multidimensional aggregation that performs a variety of group-by style calculations for us.

  • Following example create a rollup that looks across time (with our new Date column) and space (with the Country column) and creates a new DataFrame that includes the grand total over all dates, the grand total for each date in the DataFrame, and the subtotal for each country on each date in the DataFrame. :

scala>     val dfNoNull = dfWithDate.drop()
dfNoNull: org.apache.spark.sql.DataFrame = [InvoiceNo: string, StockCode: string ... 7 more fields]

scala>     val rolledUpDF = dfNoNull.rollup("Date", "Country").agg(sum("Quantity")).selectExpr("Date", "Country", "`sum(Quantity)` as total_quantity").orderBy("Date")
rolledUpDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Date: date, Country: string ... 1 more field]

scala>     rolledUpDF.show()
+----------+--------------+--------------+
|      Date|       Country|total_quantity|
+----------+--------------+--------------+
|      null|          null|       5176450|
|2010-12-01|        Norway|          1852|
|2010-12-01|        France|           449|
|2010-12-01|          null|         26814|
|2010-12-01|     Australia|           107|
|2010-12-01|   Netherlands|            97|
|2010-12-01|       Germany|           117|
|2010-12-01|          EIRE|           243|
|2010-12-01|United Kingdom|         23949|
|2010-12-02|       Germany|           146|
|2010-12-02|          null|         21023|
|2010-12-02|          EIRE|             4|
|2010-12-02|United Kingdom|         20873|
|2010-12-03|        Poland|           140|
|2010-12-03|   Switzerland|           110|
|2010-12-03|        France|           239|
|2010-12-03|          null|         14830|
|2010-12-03|         Italy|           164|
|2010-12-03|      Portugal|            65|
|2010-12-03|         Spain|           400|
+----------+--------------+--------------+
only showing top 20 rows

Now where you see the null values is where you’ll find the grand totals. A null in both rollup columns specifies the grand total across both of those columns

  • Following is the definition of the rollup method on Dataframe. Note that this also returns  a RelationalGroupedDataset allowing us to use all the aggregate functions available:

def rollup(col1: String, cols: String*): RelationalGroupedDataset
Create a multi-dimensional rollup for the current Dataset using the specified columns, so we can run aggregation on them. See RelationalGroupedDataset for all the available aggregate functions.
This is a variant of rollup that can only group by existing columns using column names (i.e. cannot construct expressions).
// Compute the average for all numeric columns rolluped by department and group.
ds.rollup("department", "group").avg()
// Compute the max age and average salary, rolluped by department and gender.
ds.rollup($"department", $"gender").agg(Map(
  "salary" -> "avg",
  "age" -> "max"
))

def rollup(cols: Column*): RelationalGroupedDataset
Create a multi-dimensional rollup for the current Dataset using the specified columns, so we can run aggregation on them. See RelationalGroupedDataset for all the available aggregate functions.
// Compute the average for all numeric columns rolluped by department and group.
ds.rollup($"department", $"group").avg()
// Compute the max age and average salary, rolluped by department and gender.
ds.rollup($"department", $"gender").agg(Map(
  "salary" -> "avg",
  "age" -> "max"
))



 
Cube :
 
  • A cube takes the rollup to a level deeper. Rather than treating elements hierarchically, a cube does the same thing across all dimensions. The method call is quite similar, but instead of calling rollup, we call cube. Note that the definition shows that this also returns a RelationalGroupedDataset.

def cube(col1: String, cols: String*): RelationalGroupedDataset
Create a multi-dimensional cube for the current Dataset using the specified columns, so we can run aggregation on them. See RelationalGroupedDataset for all the available aggregate functions.
This is a variant of cube that can only group by existing columns using column names (i.e. cannot construct expressions).
// Compute the average for all numeric columns cubed by department and group.
ds.cube("department", "group").avg()
// Compute the max age and average salary, cubed by department and gender.
ds.cube($"department", $"gender").agg(Map(
  "salary" -> "avg",
  "age" -> "max"
))

def cube(cols: Column*): RelationalGroupedDataset
Create a multi-dimensional cube for the current Dataset using the specified columns, so we can run aggregation on them. See RelationalGroupedDataset for all the available aggregate functions.
// Compute the average for all numeric columns cubed by department and group.
ds.cube($"department", $"group").avg()
// Compute the max age and average salary, cubed by department and gender.
ds.cube($"department", $"gender").agg(Map(
  "salary" -> "avg",
  "age" -> "max"
))

scala>     val dfNoNull = dfWithDate.drop()
dfNoNull: org.apache.spark.sql.DataFrame = [InvoiceNo: string, StockCode: string ... 7 more fields]

scala>     val rolledUpDF = dfNoNull.cube("Date", "Country").agg(sum("Quantity")).selectExpr("Date", "Country", "`sum(Quantity)` as total_quantity").orderBy("Date")
rolledUpDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Date: date, Country: string ... 1 more field]

scala>     rolledUpDF.show()
+----+--------------------+--------------+
|Date|             Country|total_quantity|
+----+--------------------+--------------+
|null|               Japan|         25218|
|null|            Portugal|         16180|
|null|             Germany|        117448|
|null|                null|       5176450|
|null|           Australia|         83653|
|null|              Cyprus|          6317|
|null|           Singapore|          5234|
|null|                 RSA|           352|
|null|         Unspecified|          3300|
|null|United Arab Emirates|           982|
|null|                 USA|          1034|

Grouping Metadata:
 
  • When using cubes, rollup  the grouping_id() function allows us to query the aggregation levels so that you can easily filter them down accordingly.  This gives us a column specifying the level of aggregation that we have in our result set.
     
scala> dfNoNull.rollup("customerId", "stockCode").agg(grouping_id().as("gid"), sum("Quantity")).where("gid > 1").show
+----------+---------+---+-------------+
|customerId|stockCode|gid|sum(Quantity)|
+----------+---------+---+-------------+
|      null|     null|  3|      5176450|
+----------+---------+---+-------------+


scala> dfNoNull.rollup("customerId", "stockCode").agg(grouping_id().as("gid"), sum("Quantity")).where("gid >= 1").show
+----------+---------+---+-------------+
|customerId|stockCode|gid|sum(Quantity)|
+----------+---------+---+-------------+
|     18245|     null|  1|         1781|
|     14995|     null|  1|          103|
|     13934|     null|  1|          390|
|     12670|     null|  1|         1391|
|     14947|     null|  1|          179|
|     16400|     null|  1|          172|

No comments:

Post a Comment