Search This Blog

Sunday, 14 April 2019

CH7.2 DataFrame Grouping



Grouping:

  • Above we have only performed dataframe level operations. We could also perform calculations per group.Grouping returns another DataFrame and is lazily performed.

  • We do this grouping in two phases. First we specify the column(s) on which we would like to group, and then we specify the aggregation(s). The first step returns a RelationalGroupedDataset, and the second step returns a DataFrame.We can specify any number of columns on which we want to group

          Following is the definition of groupBy method on Dataframes:

def groupBy(col1: String, cols: String*): RelationalGroupedDataset
Groups the Dataset using the specified columns, so that we can run aggregation on them. See RelationalGroupedDataset for all the available aggregate functions.
This is a variant of groupBy that can only group by existing columns using column names (i.e. cannot construct expressions).
// Compute the average for all numeric columns grouped by department.
ds.groupBy("department").avg()
// Compute the max age and average salary, grouped by department and gender.
ds.groupBy($"department", $"gender").agg(Map(
  "salary" -> "avg",
  "age" -> "max"
))

def groupBy(cols: Column*): RelationalGroupedDataset
Groups the Dataset using the specified columns, so we can run aggregation on them. See RelationalGroupedDataset for all the available aggregate functions.
// Compute the average for all numeric columns grouped by department.
ds.groupBy($"department").avg()
// Compute the max age and average salary, grouped by department and gender.
ds.groupBy($"department", $"gender").agg(Map(
  "salary" -> "avg",
  "age" -> "max"
))

          In following examples we can see that groupBy returns a RelationalGroupteDataset. We cannot see the schema for the RelationalGroupedDataset.

scala> df.groupBy("StockCode")
res21: org.apache.spark.sql.RelationalGroupedDataset = RelationalGroupedDataset: [grouping expressions: [StockCode: string], value: [InvoiceNo: string, StockCode: string ... 6 more fields], type: GroupBy]

scala> df.groupBy("StockCode").printSchema
<console>:26: error: value printSchema is not a member of org.apache.spark.sql.RelationalGroupedDataset
       df.groupBy("StockCode").printSchema
                               ^
scala> df.groupBy("StockCode").schema
<console>:26: error: value schema is not a member of org.apache.spark.sql.RelationalGroupedDataset
       df.groupBy("StockCode").schema
                               ^

  • Once we have the RelationalGroupedDataset, we can run several functions on top it. Following shows some simple examples.

scala>     df.groupBy("StockCode").count().show(2)
+---------+-----+
|StockCode|count|
+---------+-----+
|    22728|  810|
|    21889|  607|
+---------+-----+
only showing top 2 rows


scala>     df.groupBy("StockCode").max("Quantity").show(2)
+---------+-------------+
|StockCode|max(Quantity)|
+---------+-------------+
|    22728|          500|
|    21889|          288|
+---------+-------------+
only showing top 2 rows


scala>     df.groupBy("StockCode").min("Quantity").show(2)
+---------+-------------+
|StockCode|min(Quantity)|
+---------+-------------+
|    22728|          -16|
|    21889|          -24|
+---------+-------------+
only showing top 2 rows


scala>     df.groupBy("StockCode").sum("Quantity").show(2)
+---------+-------------+
|StockCode|sum(Quantity)|
+---------+-------------+
|    22728|         5323|
|    21889|         6377|
+---------+-------------+
only showing top 2 rows


scala>     df.groupBy("StockCode").mean("Quantity").show(2)
+---------+------------------+
|StockCode|     avg(Quantity)|
+---------+------------------+
|    22728| 6.571604938271605|
|    21889|10.505766062602966|
+---------+------------------+
only showing top 2 rows


scala>     df.groupBy("StockCode").avg("Quantity").show(2)
+---------+------------------+
|StockCode|     avg(Quantity)|
+---------+------------------+
|    22728| 6.571604938271605|
|    21889|10.505766062602966|
+---------+------------------+
only showing top 2 rows

         Following are the method definitions available on RelationalGroupedDataset. Note that they return a DataFrame.

def avg(colNames: String*): DataFrame
Compute the mean value for each numeric columns for each group. The resulting DataFrame will also contain the grouping columns. When specified columns are given, only compute the mean values for them.

def count(): DataFrame
Count the number of rows for each group. The resulting DataFrame will also contain the grouping columns.

def max(colNames: String*): DataFrame
Compute the max value for each numeric columns for each group. The resulting DataFrame will also contain the grouping columns. When specified columns are given, only compute the max values for them.

def mean(colNames: String*): DataFrame
Compute the average value for each numeric columns for each group. This is an alias for avg. The resulting DataFrame will also contain the grouping columns. When specified columns are given, only compute the average values for them.

def min(colNames: String*): DataFrame
Compute the min value for each numeric column for each group. The resulting DataFrame will also contain the grouping columns. When specified columns are given, only compute the min values for them.

def sum(colNames: String*): DataFrame
Compute the sum for each numeric columns for each group. The resulting DataFrame will also contain the grouping columns. When specified columns are given, only compute the sum for them.


  • We can also take advantage of agg method available on RelationalGroupedDataset that makes it possible for you to pass-in arbitrary expressions that just need to have some aggregation specified. You can even do things like alias a column after transforming it for later use in your data flow.

scala>     df.groupBy("StockCode").agg(count("Quantity"),max("Quantity"),min("Quantity"),sum("Quantity"),mean("Quantity"),avg("Quantity")).show(3)
+---------+---------------+-------------+-------------+-------------+------------------+------------------+
|StockCode|count(Quantity)|max(Quantity)|min(Quantity)|sum(Quantity)|     avg(Quantity)|     avg(Quantity)|
+---------+---------------+-------------+-------------+-------------+------------------+------------------+
|    22728|            810|          500|          -16|         5323| 6.571604938271605| 6.571604938271605|
|    21889|            607|          288|          -24|         6377|10.505766062602966|10.505766062602966|
|   90210B|              7|           12|            1|           41| 5.857142857142857| 5.857142857142857|
+---------+---------------+-------------+-------------+-------------+------------------+------------------+
only showing top 3 rows

scala>     df.groupBy("CustomerId").agg(collect_set("StockCode")).show(2)
+----------+----------------------+
|CustomerId|collect_set(StockCode)|
+----------+----------------------+
|     13285|  [22663, 23008, 23...|
|     13623|  [22755, 22697, 22...|
+----------+----------------------+
only showing top 2 rows


         Following are the method definitions of agg method. Note that no variant takes a list of strings.

def agg(expr: Column, exprs: Column*): DataFrame
Compute aggregates by specifying a series of aggregate columns. Note that this function by default retains the grouping columns in its output. To not retain grouping columns, set spark.sql.retainGroupColumns to false.
The available aggregate methods are defined in org.apache.spark.sql.functions.
// Selects the age of the oldest employee and the aggregate expense for each department
// Scala:
import org.apache.spark.sql.functions._
df.groupBy("department").agg(max("age"), sum("expense"))
// Java:
import static org.apache.spark.sql.functions.*;
df.groupBy("department").agg(max("age"), sum("expense"));
Note that before Spark 1.4, the default behavior is to NOT retain grouping columns. To change to that behavior, set config variable spark.sql.retainGroupColumns to false.
// Scala, 1.3.x:
df.groupBy("department").agg($"department", max("age"), sum("expense"))
// Java, 1.3.x:
df.groupBy("department").agg(col("department"), max("age"), sum("expense"));

def agg(exprs: Map[String, String]): DataFrame
(Scala-specific) Compute aggregates by specifying a map from column name to aggregate methods. The resulting DataFrame will also contain the grouping columns.
The available aggregate methods are avg, max, min, sum, count.
// Selects the age of the oldest employee and the aggregate expense for each department
df.groupBy("department").agg(Map(
  "age" -> "max",
  "expense" -> "sum"
))

def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame
(Scala-specific) Compute aggregates by specifying the column names and aggregate methods. The resulting DataFrame will also contain the grouping columns.
The available aggregate methods are avg, max, min, sum, count.
// Selects the age of the oldest employee and the aggregate expense for each department
df.groupBy("department").agg(
  "age" -> "max",
  "expense" -> "sum"
)

  • We can specify your transformations as a series of Maps for which the key is the column, and the value is the aggregation function (as a string) that you would like to perform. You can reuse multiple column names if you specify them inline.

scala>     val map1=Map("Quantity" -> "sum","InvoiceDate" -> "max")
map1: scala.collection.immutable.Map[String,String] = Map(Quantity -> sum, InvoiceDate -> max)

scala>     df.groupBy("CustomerId").agg(map1);
res44: org.apache.spark.sql.DataFrame = [CustomerId: int, sum(Quantity): bigint ... 1 more field]

scala>     df.groupBy("CustomerId").agg(map1).show(3)
+----------+-------------+----------------+
|CustomerId|sum(Quantity)|max(InvoiceDate)|
+----------+-------------+----------------+
|     12940|          406| 9/30/2011 10:46|
|     13285|         2051|  7/1/2011 14:53|
|     13623|          286| 5/13/2011 16:12|
+----------+-------------+----------------+
only showing top 3 rows




No comments:

Post a Comment