Search This Blog

Sunday, 14 April 2019

CH7.1 Aggregations, Aggregating to complex types


    • Aggregating => Act of collecting something together and is a cornerstone of big data analytics. In an aggregation, you will specify a key or grouping and an aggregation function that specifies how you should transform one or more columns. This function must produce one result for each group, given multiple input values

    • In general, you use aggregations to summarize numerical data usually by means of some grouping. With Spark you can aggregate any kind of value into an array, list, or map,

    • In addition to working with any type of values, Spark also allows us to create the following groupings types:

    • The simplest grouping is to just summarize a complete DataFrame by performing an aggregation in a select statement.
    • A “group by” allows you to specify one or more keys as well as one or more aggregation functions to transform the value columns.
    • A “window” gives you the ability to specify one or more keys as well as one or more aggregation functions to transform the value columns. However, the rows input to the function are somehow related to the current row.
    • A “grouping set,” which you can use to aggregate at multiple different levels. Grouping sets are available as a primitive in SQL and via rollups and cubes in DataFrames.
    • A “rollup” makes it possible for you to specify one or more keys as well as one or more aggregation functions to transform the value columns, which will be summarized hierarchically.
    • A “cube” allows you to specify one or more keys as well as one or more aggregation functions to transform the value columns, which will be summarized across all combinations of columns.


    scala> val df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\all\\*.csv").coalesce(5)
    df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [InvoiceNo: string, StockCode: string ... 6 more fields]

    scala> df.cache()
    res0: df.type = [InvoiceNo: string, StockCode: string ... 6 more fields]

    scala> df.count
    res1: Long = 541909

    The method count is actually an action as opposed to a transformation, and so it returns immediately.
    Now, this method is a bit of an outlier because it exists as a method (in this case) as opposed to a function and is eagerly evaluated instead of a lazy transformation. We also have a count lazy function.

Aggregations Functions:


  • Count: The count function is a transformation and not action.
We can do one of two things: specify a specific column to count, or all the columns by using count(*) or count(1) to represent that we want to count every row as the literal one.
When performing a count(*), Spark will count null values (including rows containing all nulls). However, when counting an individual column, Spark will not count the null values.

scala>     df1.select(count("StockCode"),count("*")).show()
+----------------+--------+
|count(StockCode)|count(1)|
+----------------+--------+
|          541909|  541909|
+----------------+--------+


scala>     df1.selectExpr("count(1)","count(*)","count(StockCode)").show()
+--------+--------+----------------+
|count(1)|count(1)|count(StockCode)|
+--------+--------+----------------+
|  541909|  541909|          541909|
+--------+--------+----------------+

  • countDistinct : The  countDistinct function is also a transformation and gives count of unique values. This is a bit more relevant for individual columns.

  • approx_count_distinct : Use this function when an approximation to a certain degree of accuracy will work just fine (Ex counting over large datasets) .The function approx_count_distinct takes another parameter with which you can specify the maximum estimation error allowed. If we specified a rather large error , we receive an answer that is quite far off but does complete more quickly than countDistinct.  We can see much greater performance gains with larger datasets.

  • First and last : Used to get first and last values from a Dataframe. This will be based on the rows in the DataFrame, not on the values in the DataFrame. I think these functions make more dense when grouping.

  • Min and max: To extract the minimum and maximum values from a DataFrame
     
  • Sum: Used to perform sum on a particular column in DF.
     
  • SumDistinct: Used to perform sum on distinct values in a column of a DataFrame
     
  • Avg : Used to calculate average on a column of a DataFrame

scala> df1.select(
     |   count("StockCode").as("CountStockCode"),
     |   count("*").as("Countstar"),
     |   countDistinct(col("Quantity")).as("DistinctQuantity"),
     |   approx_count_distinct("Quantity",0.2).as("approxc"),
     |   first("Quantity").as("first"),
     |   last("Quantity").as("last"),
     |   max("Quantity").as("max"),
     |   min("Quantity").as("min"),
     |   sum("Quantity").as("sum"),
     |   sumDistinct("Quantity").as("sumDi"),
     |   avg("Quantity").as("avg")
     | ).show()
+--------------+---------+----------------+-------+-----+----+-----+------+-------+-----+----------------+
|CountStockCode|Countstar|DistinctQuantity|approxc|first|last|  max|   min|    sum|sumDi|             avg|
+--------------+---------+----------------+-------+-----+----+-----+------+-------+-----+----------------+
|        541909|   541909|             722|    639|    6|   3|80995|-80995|5176450|29310|9.55224954743324|
+--------------+---------+----------------+-------+-----+----+-----+------+-------+-----+----------------+


scala>

scala> df1.selectExpr(
     |   "count(1)",
     |   "count(*)",
     |   "count(StockCode)",
     |   "countDistinct(Quantity) as DistinctQuantity",
     |   "approx_count_distinct(Quantity,0.2) as approcx",
     |   "first(Quantity) as first",
     |   "last(Quantity) as last",
     |   "max(Quantity) as max",
     |   "min(Quantity) as min",
     |   "sum(Quantity) as sum",
     |   "sumDistinct(Quantity) as sumDi",
     |   "avg(Quantity) as avg"
     | ).show()

Following are the functions under org.apache.spark.sql.functions package.
Note that these functions are returning a Column type. Also note that almost all functions have overloaded versions that take String column names and other that takes Column type.

def count(columnName: String): TypedColumn[Any, Long]
Aggregate function: returns the number of items in a group.

def count(e: Column): Column
Aggregate function: returns the number of items in a group.

def countDistinct(columnName: String, columnNames: String*): Column
Aggregate function: returns the number of distinct items in a group.

def countDistinct(expr: Column, exprs: Column*): Column
Aggregate function: returns the number of distinct items in a group.

def approx_count_distinct(columnName: String, rsd: Double): Column
Aggregate function: returns the approximate number of distinct items in a group.
rsd - maximum estimation error allowed (default = 0.05)

def approx_count_distinct(e: Column, rsd: Double): Column
Aggregate function: returns the approximate number of distinct items in a group.
rsd - maximum estimation error allowed (default = 0.05)

def approx_count_distinct(columnName: String): Column
Aggregate function: returns the approximate number of distinct items in a group.

def approx_count_distinct(e: Column): Column
Aggregate function: returns the approximate number of distinct items in a group.

def first(columnName: String): Column
Aggregate function: returns the first value of a column in a group.
The function by default returns the first values it sees. It will return the first non-null value it sees when ignoreNulls is set to true. If all values are null, then null is returned.

def first(e: Column): Column
Aggregate function: returns the first value in a group.
The function by default returns the first values it sees. It will return the first non-null value it sees when ignoreNulls is set to true. If all values are null, then null is returned.

def first(columnName: String, ignoreNulls: Boolean): Column
Aggregate function: returns the first value of a column in a group.
The function by default returns the first values it sees. It will return the first non-null value it sees when ignoreNulls is set to true. If all values are null, then null is returned.

def first(e: Column, ignoreNulls: Boolean): Column
Aggregate function: returns the first value in a group.
The function by default returns the first values it sees. It will return the first non-null value it sees when ignoreNulls is set to true. If all values are null, then null is returned.

def last(columnName: String): Column
Aggregate function: returns the last value of the column in a group.
The function by default returns the last values it sees. It will return the last non-null value it sees when ignoreNulls is set to true. If all values are null, then null is returned.

def last(e: Column): Column
Aggregate function: returns the last value in a group.
The function by default returns the last values it sees. It will return the last non-null value it sees when ignoreNulls is set to true. If all values are null, then null is returned.

def last(columnName: String, ignoreNulls: Boolean): Column
Aggregate function: returns the last value of the column in a group.
The function by default returns the last values it sees. It will return the last non-null value it sees when ignoreNulls is set to true. If all values are null, then null is returned.

def last(e: Column, ignoreNulls: Boolean): Column
Aggregate function: returns the last value in a group.
The function by default returns the last values it sees. It will return the last non-null value it sees when ignoreNulls is set to true. If all values are null, then null is returned.

def max(columnName: String): Column
Aggregate function: returns the maximum value of the column in a group.

def max(e: Column): Column
Aggregate function: returns the maximum value of the expression in a group.

def min(columnName: String): Column
Aggregate function: returns the minimum value of the column in a group.

def min(e: Column): Column
Aggregate function: returns the minimum value of the expression in a group.

def sum(columnName: String): Column
Aggregate function: returns the sum of all values in the given column.

def sum(e: Column): Column
Aggregate function: returns the sum of all values in the expression.

def sumDistinct(columnName: String): Column
Aggregate function: returns the sum of distinct values in the expression.

def sumDistinct(e: Column): Column
Aggregate function: returns the sum of distinct values in the expression.

def avg(columnName: String): Column
Aggregate function: returns the average of the values in a group.

def avg(e: Column): Column
Aggregate function: returns the average of the values in a group.

def mean(columnName: String): Column
Aggregate function: returns the average of the values in a group. Alias for avg.

def mean(e: Column): Column
Aggregate function: returns the average of the values in a group. Alias for avg.

  • Variance and Standard Deviation:  These both measures of the spread of the data around the mean. The variance is the average of the squared differences from the mean, and the standard deviation is the square root of the variance. Spark has both the formula for the sample standard deviation as well as the formula for the population standard deviation.
These are fundamentally different statistical formulae.
By default, Spark performs the formula for the sample standard deviation or variance if you use the variance or stddev functions. 

scala>     df1.select(
     |     stddev("Quantity"),
     |       stddev_pop("Quantity"),
     |         stddev_samp("Quantity"),
     |       variance("Quantity"),
     |       var_pop("Quantity"),
     |       var_samp("Quantity")
     |           ).show()
+---------------------+--------------------+---------------------+------------------+-----------------+------------------+
|stddev_samp(Quantity)|stddev_pop(Quantity)|stddev_samp(Quantity)|var_samp(Quantity)|var_pop(Quantity)|var_samp(Quantity)|
+---------------------+--------------------+---------------------+------------------+-----------------+------------------+
|   218.08115785023455|  218.08095663447835|   218.08115785023455| 47559.39140929892|47559.30364660923| 47559.39140929892|
+---------------------+--------------------+---------------------+------------------+-----------------+------------------+

def var_pop(columnName: String): Column
Aggregate function: returns the population variance of the values in a group.

def var_pop(e: Column): Column
Aggregate function: returns the population variance of the values in a group.

def var_samp(columnName: String): Column
Aggregate function: returns the unbiased variance of the values in a group.

def var_samp(e: Column): Column
Aggregate function: returns the unbiased variance of the values in a group.

def variance(columnName: String): Column
Aggregate function: alias for var_samp.

def variance(e: Column): Column
Aggregate function: alias for var_samp.

def stddev(columnName: String): Column
Aggregate function: alias for stddev_samp.

def stddev(e: Column): Column
Aggregate function: alias for stddev_samp.

def stddev_pop(columnName: String): Column
Aggregate function: returns the population standard deviation of the expression in a group.

def stddev_pop(e: Column): Column
Aggregate function: returns the population standard deviation of the expression in a group.

def stddev_samp(columnName: String): Column
Aggregate function: returns the sample standard deviation of the expression in a group.

def stddev_samp(e: Column): Column
Aggregate function: returns the sample standard deviation of the expression in a group.

  • Skewness and kurtosis:  Skewness and kurtosis are both measurements of extreme points in your data. Skewness measures the asymmetry of the values in your data around the mean, whereas kurtosis is a measure of the tail of data. 

  • Covariance and Correlation: Correlation measures the Pearson correlation coefficient, which is scaled between –1 and +1. The covariance is scaled according to the inputs in the data.Like the var function, covariance can be calculated either as the sample covariance or the population covariance. Therefore it can be important to specify which formula you want to use. Correlation has no notion of this and therefore does not have calculations for population or sample. 

scala>     df.select(
     |       skewness("Quantity"),
     |       kurtosis("Quantity"),
     |       corr("Quantity","StockCode"),
     |       covar_pop("Quantity","StockCode"),
     |       covar_samp("Quantity","StockCode")
     |     ).show()
+--------------------+------------------+-------------------------+------------------------------+-------------------------------+
|  skewness(Quantity)|kurtosis(Quantity)|corr(Quantity, StockCode)|covar_pop(Quantity, StockCode)|covar_samp(Quantity, StockCode)|
+--------------------+------------------+-------------------------+------------------------------+-------------------------------+
|-0.26407557610528376|119768.05495530753|     0.003148487638324...|             12062.85169025026|             12062.876166204545|
+--------------------+------------------+-------------------------+------------------------------+-------------------------------+

def skewness(columnName: String): Column
Aggregate function: returns the skewness of the values in a group.

def skewness(e: Column): Column
Aggregate function: returns the skewness of the values in a group.

def kurtosis(columnName: String): Column
Aggregate function: returns the kurtosis of the values in a group.

def kurtosis(e: Column): Column
Aggregate function: returns the kurtosis of the values in a group.

def corr(columnName1: String, columnName2: String): Column
Aggregate function: returns the Pearson Correlation Coefficient for two columns.

def corr(column1: Column, column2: Column): Column
Aggregate function: returns the Pearson Correlation Coefficient for two columns.

def covar_pop(columnName1: String, columnName2: String): Column
Aggregate function: returns the population covariance for two columns.

def covar_pop(column1: Column, column2: Column): Column
Aggregate function: returns the population covariance for two columns.

def covar_samp(columnName1: String, columnName2: String): Column
Aggregate function: returns the sample covariance for two columns.

def covar_samp(column1: Column, column2: Column): Column
Aggregate function: returns the sample covariance for two columns.


Aggregating to Complex types:
 
  • We can collect a list of values present in a given column or only the unique values by collecting to a set. We can use this to carry out some more programmatic access later on in the pipeline or pass the entire collection in a user-defined function (UDF).

def collect_list(columnName: String): Column
Aggregate function: returns a list of objects with duplicates.

def collect_list(e: Column): Column
Aggregate function: returns a list of objects with duplicates.

def collect_set(columnName: String): Column
Aggregate function: returns a set of objects with duplicate elements eliminated.

def collect_set(e: Column): Column
Aggregate function: returns a set of objects with duplicate elements eliminated.

scala> df.agg(collect_set("Country").as("set1"), collect_list("Country").as("list1")).show
+--------------------+--------------------+
|                set1|               list1|
+--------------------+--------------------+
|[Portugal, Italy,...|[United Kingdom, ...|
+--------------------+--------------------+


scala> df.agg(collect_set("Country").as("set1"), collect_list("Country").as("list1")).selectExpr("size(set1)","size(list1)").show
+----------+-----------+
|size(set1)|size(list1)|
+----------+-----------+
|        38|     541909|
+----------+-----------+

In above example we are using agg method on dataframes. Normally we have seen that method being used on RelationalGroupedDatasets.
The agg method on dataframes allows aggregation over entire datasets.

def agg(expr: Column, exprs: Column*): DataFrame
Aggregates on the entire Dataset without groups.
// ds.agg(...) is a shorthand for ds.groupBy().agg(...)
ds.agg(max($"age"), avg($"salary"))
ds.groupBy().agg(max($"age"), avg($"salary"))

def agg(exprs: Map[String, String]): DataFrame
(Scala-specific) Aggregates on the entire Dataset without groups.
// ds.agg(...) is a shorthand for ds.groupBy().agg(...)
ds.agg(Map("age" -> "max", "salary" -> "avg"))
ds.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))

def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame
(Scala-specific) Aggregates on the entire Dataset without groups.
// ds.agg(...) is a shorthand for ds.groupBy().agg(...)
ds.agg("age" -> "max", "salary" -> "avg")
ds.groupBy().agg("age" -> "max", "salary" -> "avg")


No comments:

Post a Comment