Search This Blog

Sunday, 14 April 2019

CH7.1 Aggregations, Aggregating to complex types


    • Aggregating => Act of collecting something together and is a cornerstone of big data analytics. In an aggregation, you will specify a key or grouping and an aggregation function that specifies how you should transform one or more columns. This function must produce one result for each group, given multiple input values

    • In general, you use aggregations to summarize numerical data usually by means of some grouping. With Spark you can aggregate any kind of value into an array, list, or map,

    • In addition to working with any type of values, Spark also allows us to create the following groupings types:

    • The simplest grouping is to just summarize a complete DataFrame by performing an aggregation in a select statement.
    • A “group by” allows you to specify one or more keys as well as one or more aggregation functions to transform the value columns.
    • A “window” gives you the ability to specify one or more keys as well as one or more aggregation functions to transform the value columns. However, the rows input to the function are somehow related to the current row.
    • A “grouping set,” which you can use to aggregate at multiple different levels. Grouping sets are available as a primitive in SQL and via rollups and cubes in DataFrames.
    • A “rollup” makes it possible for you to specify one or more keys as well as one or more aggregation functions to transform the value columns, which will be summarized hierarchically.
    • A “cube” allows you to specify one or more keys as well as one or more aggregation functions to transform the value columns, which will be summarized across all combinations of columns.


    scala> val df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\all\\*.csv").coalesce(5)
    df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [InvoiceNo: string, StockCode: string ... 6 more fields]

    scala> df.cache()
    res0: df.type = [InvoiceNo: string, StockCode: string ... 6 more fields]

    scala> df.count
    res1: Long = 541909

    The method count is actually an action as opposed to a transformation, and so it returns immediately.
    Now, this method is a bit of an outlier because it exists as a method (in this case) as opposed to a function and is eagerly evaluated instead of a lazy transformation. We also have a count lazy function.

Aggregations Functions:


  • Count: The count function is a transformation and not action.
We can do one of two things: specify a specific column to count, or all the columns by using count(*) or count(1) to represent that we want to count every row as the literal one.
When performing a count(*), Spark will count null values (including rows containing all nulls). However, when counting an individual column, Spark will not count the null values.

scala>     df1.select(count("StockCode"),count("*")).show()
+----------------+--------+
|count(StockCode)|count(1)|
+----------------+--------+
|          541909|  541909|
+----------------+--------+


scala>     df1.selectExpr("count(1)","count(*)","count(StockCode)").show()
+--------+--------+----------------+
|count(1)|count(1)|count(StockCode)|
+--------+--------+----------------+
|  541909|  541909|          541909|
+--------+--------+----------------+

  • countDistinct : The  countDistinct function is also a transformation and gives count of unique values. This is a bit more relevant for individual columns.

  • approx_count_distinct : Use this function when an approximation to a certain degree of accuracy will work just fine (Ex counting over large datasets) .The function approx_count_distinct takes another parameter with which you can specify the maximum estimation error allowed. If we specified a rather large error , we receive an answer that is quite far off but does complete more quickly than countDistinct.  We can see much greater performance gains with larger datasets.

  • First and last : Used to get first and last values from a Dataframe. This will be based on the rows in the DataFrame, not on the values in the DataFrame. I think these functions make more dense when grouping.

  • Min and max: To extract the minimum and maximum values from a DataFrame
     
  • Sum: Used to perform sum on a particular column in DF.
     
  • SumDistinct: Used to perform sum on distinct values in a column of a DataFrame
     
  • Avg : Used to calculate average on a column of a DataFrame

scala> df1.select(
     |   count("StockCode").as("CountStockCode"),
     |   count("*").as("Countstar"),
     |   countDistinct(col("Quantity")).as("DistinctQuantity"),
     |   approx_count_distinct("Quantity",0.2).as("approxc"),
     |   first("Quantity").as("first"),
     |   last("Quantity").as("last"),
     |   max("Quantity").as("max"),
     |   min("Quantity").as("min"),
     |   sum("Quantity").as("sum"),
     |   sumDistinct("Quantity").as("sumDi"),
     |   avg("Quantity").as("avg")
     | ).show()
+--------------+---------+----------------+-------+-----+----+-----+------+-------+-----+----------------+
|CountStockCode|Countstar|DistinctQuantity|approxc|first|last|  max|   min|    sum|sumDi|             avg|
+--------------+---------+----------------+-------+-----+----+-----+------+-------+-----+----------------+
|        541909|   541909|             722|    639|    6|   3|80995|-80995|5176450|29310|9.55224954743324|
+--------------+---------+----------------+-------+-----+----+-----+------+-------+-----+----------------+


scala>

scala> df1.selectExpr(
     |   "count(1)",
     |   "count(*)",
     |   "count(StockCode)",
     |   "countDistinct(Quantity) as DistinctQuantity",
     |   "approx_count_distinct(Quantity,0.2) as approcx",
     |   "first(Quantity) as first",
     |   "last(Quantity) as last",
     |   "max(Quantity) as max",
     |   "min(Quantity) as min",
     |   "sum(Quantity) as sum",
     |   "sumDistinct(Quantity) as sumDi",
     |   "avg(Quantity) as avg"
     | ).show()

Following are the functions under org.apache.spark.sql.functions package.
Note that these functions are returning a Column type. Also note that almost all functions have overloaded versions that take String column names and other that takes Column type.

def count(columnName: String): TypedColumn[Any, Long]
Aggregate function: returns the number of items in a group.

def count(e: Column): Column
Aggregate function: returns the number of items in a group.

def countDistinct(columnName: String, columnNames: String*): Column
Aggregate function: returns the number of distinct items in a group.

def countDistinct(expr: Column, exprs: Column*): Column
Aggregate function: returns the number of distinct items in a group.

def approx_count_distinct(columnName: String, rsd: Double): Column
Aggregate function: returns the approximate number of distinct items in a group.
rsd - maximum estimation error allowed (default = 0.05)

def approx_count_distinct(e: Column, rsd: Double): Column
Aggregate function: returns the approximate number of distinct items in a group.
rsd - maximum estimation error allowed (default = 0.05)

def approx_count_distinct(columnName: String): Column
Aggregate function: returns the approximate number of distinct items in a group.

def approx_count_distinct(e: Column): Column
Aggregate function: returns the approximate number of distinct items in a group.

def first(columnName: String): Column
Aggregate function: returns the first value of a column in a group.
The function by default returns the first values it sees. It will return the first non-null value it sees when ignoreNulls is set to true. If all values are null, then null is returned.

def first(e: Column): Column
Aggregate function: returns the first value in a group.
The function by default returns the first values it sees. It will return the first non-null value it sees when ignoreNulls is set to true. If all values are null, then null is returned.

def first(columnName: String, ignoreNulls: Boolean): Column
Aggregate function: returns the first value of a column in a group.
The function by default returns the first values it sees. It will return the first non-null value it sees when ignoreNulls is set to true. If all values are null, then null is returned.

def first(e: Column, ignoreNulls: Boolean): Column
Aggregate function: returns the first value in a group.
The function by default returns the first values it sees. It will return the first non-null value it sees when ignoreNulls is set to true. If all values are null, then null is returned.

def last(columnName: String): Column
Aggregate function: returns the last value of the column in a group.
The function by default returns the last values it sees. It will return the last non-null value it sees when ignoreNulls is set to true. If all values are null, then null is returned.

def last(e: Column): Column
Aggregate function: returns the last value in a group.
The function by default returns the last values it sees. It will return the last non-null value it sees when ignoreNulls is set to true. If all values are null, then null is returned.

def last(columnName: String, ignoreNulls: Boolean): Column
Aggregate function: returns the last value of the column in a group.
The function by default returns the last values it sees. It will return the last non-null value it sees when ignoreNulls is set to true. If all values are null, then null is returned.

def last(e: Column, ignoreNulls: Boolean): Column
Aggregate function: returns the last value in a group.
The function by default returns the last values it sees. It will return the last non-null value it sees when ignoreNulls is set to true. If all values are null, then null is returned.

def max(columnName: String): Column
Aggregate function: returns the maximum value of the column in a group.

def max(e: Column): Column
Aggregate function: returns the maximum value of the expression in a group.

def min(columnName: String): Column
Aggregate function: returns the minimum value of the column in a group.

def min(e: Column): Column
Aggregate function: returns the minimum value of the expression in a group.

def sum(columnName: String): Column
Aggregate function: returns the sum of all values in the given column.

def sum(e: Column): Column
Aggregate function: returns the sum of all values in the expression.

def sumDistinct(columnName: String): Column
Aggregate function: returns the sum of distinct values in the expression.

def sumDistinct(e: Column): Column
Aggregate function: returns the sum of distinct values in the expression.

def avg(columnName: String): Column
Aggregate function: returns the average of the values in a group.

def avg(e: Column): Column
Aggregate function: returns the average of the values in a group.

def mean(columnName: String): Column
Aggregate function: returns the average of the values in a group. Alias for avg.

def mean(e: Column): Column
Aggregate function: returns the average of the values in a group. Alias for avg.

  • Variance and Standard Deviation:  These both measures of the spread of the data around the mean. The variance is the average of the squared differences from the mean, and the standard deviation is the square root of the variance. Spark has both the formula for the sample standard deviation as well as the formula for the population standard deviation.
These are fundamentally different statistical formulae.
By default, Spark performs the formula for the sample standard deviation or variance if you use the variance or stddev functions. 

scala>     df1.select(
     |     stddev("Quantity"),
     |       stddev_pop("Quantity"),
     |         stddev_samp("Quantity"),
     |       variance("Quantity"),
     |       var_pop("Quantity"),
     |       var_samp("Quantity")
     |           ).show()
+---------------------+--------------------+---------------------+------------------+-----------------+------------------+
|stddev_samp(Quantity)|stddev_pop(Quantity)|stddev_samp(Quantity)|var_samp(Quantity)|var_pop(Quantity)|var_samp(Quantity)|
+---------------------+--------------------+---------------------+------------------+-----------------+------------------+
|   218.08115785023455|  218.08095663447835|   218.08115785023455| 47559.39140929892|47559.30364660923| 47559.39140929892|
+---------------------+--------------------+---------------------+------------------+-----------------+------------------+

def var_pop(columnName: String): Column
Aggregate function: returns the population variance of the values in a group.

def var_pop(e: Column): Column
Aggregate function: returns the population variance of the values in a group.

def var_samp(columnName: String): Column
Aggregate function: returns the unbiased variance of the values in a group.

def var_samp(e: Column): Column
Aggregate function: returns the unbiased variance of the values in a group.

def variance(columnName: String): Column
Aggregate function: alias for var_samp.

def variance(e: Column): Column
Aggregate function: alias for var_samp.

def stddev(columnName: String): Column
Aggregate function: alias for stddev_samp.

def stddev(e: Column): Column
Aggregate function: alias for stddev_samp.

def stddev_pop(columnName: String): Column
Aggregate function: returns the population standard deviation of the expression in a group.

def stddev_pop(e: Column): Column
Aggregate function: returns the population standard deviation of the expression in a group.

def stddev_samp(columnName: String): Column
Aggregate function: returns the sample standard deviation of the expression in a group.

def stddev_samp(e: Column): Column
Aggregate function: returns the sample standard deviation of the expression in a group.

  • Skewness and kurtosis:  Skewness and kurtosis are both measurements of extreme points in your data. Skewness measures the asymmetry of the values in your data around the mean, whereas kurtosis is a measure of the tail of data. 

  • Covariance and Correlation: Correlation measures the Pearson correlation coefficient, which is scaled between –1 and +1. The covariance is scaled according to the inputs in the data.Like the var function, covariance can be calculated either as the sample covariance or the population covariance. Therefore it can be important to specify which formula you want to use. Correlation has no notion of this and therefore does not have calculations for population or sample. 

scala>     df.select(
     |       skewness("Quantity"),
     |       kurtosis("Quantity"),
     |       corr("Quantity","StockCode"),
     |       covar_pop("Quantity","StockCode"),
     |       covar_samp("Quantity","StockCode")
     |     ).show()
+--------------------+------------------+-------------------------+------------------------------+-------------------------------+
|  skewness(Quantity)|kurtosis(Quantity)|corr(Quantity, StockCode)|covar_pop(Quantity, StockCode)|covar_samp(Quantity, StockCode)|
+--------------------+------------------+-------------------------+------------------------------+-------------------------------+
|-0.26407557610528376|119768.05495530753|     0.003148487638324...|             12062.85169025026|             12062.876166204545|
+--------------------+------------------+-------------------------+------------------------------+-------------------------------+

def skewness(columnName: String): Column
Aggregate function: returns the skewness of the values in a group.

def skewness(e: Column): Column
Aggregate function: returns the skewness of the values in a group.

def kurtosis(columnName: String): Column
Aggregate function: returns the kurtosis of the values in a group.

def kurtosis(e: Column): Column
Aggregate function: returns the kurtosis of the values in a group.

def corr(columnName1: String, columnName2: String): Column
Aggregate function: returns the Pearson Correlation Coefficient for two columns.

def corr(column1: Column, column2: Column): Column
Aggregate function: returns the Pearson Correlation Coefficient for two columns.

def covar_pop(columnName1: String, columnName2: String): Column
Aggregate function: returns the population covariance for two columns.

def covar_pop(column1: Column, column2: Column): Column
Aggregate function: returns the population covariance for two columns.

def covar_samp(columnName1: String, columnName2: String): Column
Aggregate function: returns the sample covariance for two columns.

def covar_samp(column1: Column, column2: Column): Column
Aggregate function: returns the sample covariance for two columns.


Aggregating to Complex types:
 
  • We can collect a list of values present in a given column or only the unique values by collecting to a set. We can use this to carry out some more programmatic access later on in the pipeline or pass the entire collection in a user-defined function (UDF).

def collect_list(columnName: String): Column
Aggregate function: returns a list of objects with duplicates.

def collect_list(e: Column): Column
Aggregate function: returns a list of objects with duplicates.

def collect_set(columnName: String): Column
Aggregate function: returns a set of objects with duplicate elements eliminated.

def collect_set(e: Column): Column
Aggregate function: returns a set of objects with duplicate elements eliminated.

scala> df.agg(collect_set("Country").as("set1"), collect_list("Country").as("list1")).show
+--------------------+--------------------+
|                set1|               list1|
+--------------------+--------------------+
|[Portugal, Italy,...|[United Kingdom, ...|
+--------------------+--------------------+


scala> df.agg(collect_set("Country").as("set1"), collect_list("Country").as("list1")).selectExpr("size(set1)","size(list1)").show
+----------+-----------+
|size(set1)|size(list1)|
+----------+-----------+
|        38|     541909|
+----------+-----------+

In above example we are using agg method on dataframes. Normally we have seen that method being used on RelationalGroupedDatasets.
The agg method on dataframes allows aggregation over entire datasets.

def agg(expr: Column, exprs: Column*): DataFrame
Aggregates on the entire Dataset without groups.
// ds.agg(...) is a shorthand for ds.groupBy().agg(...)
ds.agg(max($"age"), avg($"salary"))
ds.groupBy().agg(max($"age"), avg($"salary"))

def agg(exprs: Map[String, String]): DataFrame
(Scala-specific) Aggregates on the entire Dataset without groups.
// ds.agg(...) is a shorthand for ds.groupBy().agg(...)
ds.agg(Map("age" -> "max", "salary" -> "avg"))
ds.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))

def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame
(Scala-specific) Aggregates on the entire Dataset without groups.
// ds.agg(...) is a shorthand for ds.groupBy().agg(...)
ds.agg("age" -> "max", "salary" -> "avg")
ds.groupBy().agg("age" -> "max", "salary" -> "avg")


Saturday, 13 April 2019

CH6.1 Working with Booleans, Numbers


Where to look for API:

Following are the places where one should look for transformations and functions:



  • The package org.apache.spark.sql.functions contains a variety of functions for a range of different data types. Normally entire package imported because they are used so frequently.


 
Converting to Spark types:

  • The lit function is used to converts a type in another language to its corresponding Spark representation. 

scala> df.select(lit(5), lit("five"), lit(5.0)).take(3)
res34: Array[org.apache.spark.sql.Row] = Array([5,five,5.0], [5,five,5.0], [5,five,5.0])

Working with Booleans:

  • Booleans are foundation for all filtering. Boolean statements consist of four elements: and, or, true, and false. We use these simple structures to build logical statements that evaluate to either true or false. These statements are often used as conditional requirements for when a row of data must either pass the test (evaluate to true) or else it will be filtered out.

scala> val df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\by-day\\2010-12-01.csv")
df: org.apache.spark.sql.DataFrame = [InvoiceNo: string, StockCode: string ... 6 more fields]

scala> spark.conf.set("spark.sql.shuffle.partitions",5)

scala>     df.where(col("InvoiceNo").equalTo(536365)).select("InvoiceNo", "Description").show(5, false)
+---------+-----------------------------------+
|InvoiceNo|Description                        |
+---------+-----------------------------------+
|536365   |WHITE HANGING HEART T-LIGHT HOLDER |
|536365   |WHITE METAL LANTERN                |
|536365   |CREAM CUPID HEARTS COAT HANGER     |
|536365   |KNITTED UNION FLAG HOT WATER BOTTLE|
|536365   |RED WOOLLY HOTTIE WHITE HEART.     |
+---------+-----------------------------------+
only showing top 5 rows

Following are the methods available on Column type to perform tests.
Note that most of the methods have an operator equivalent methods. These are preferred in Scala.
The verbose ones are preferred in Java.

def &&(other: Any): Column
Boolean AND.

def and(other: Column): Column
Boolean AND.
// Scala: The following selects people that are in school and employed at the same time.
people.select( people("inSchool") && people("isEmployed") )
// Java:
people.select( people("inSchool").and(people("isEmployed")) );
--------------------------------------------------------------------------------------------------------

def ||(other: Any): Column
Boolean OR.

def or(other: Column): Column
Boolean OR.
// Scala: The following selects people that are in school or employed.
people.filter( people("inSchool") || people("isEmployed") )
// Java:
people.filter( people("inSchool").or(people("isEmployed")) );
--------------------------------------------------------------------------------------------------------

def =!=(other: Any): Column
Inequality test.

def notEqual(other: Any): Column
Inequality test.
// Scala:
df.select( df("colA") !== df("colB") )
df.select( !(df("colA") === df("colB")) )
// Java:
import static org.apache.spark.sql.functions.*;
df.filter( col("colA").notEqual(col("colB")) );
--------------------------------------------------------------------------------------------------------

def <(other: Any): Column
Less than.

def lt(other: Any): Column Less than.
// Scala: The following selects people younger than 21.
people.select( people("age") < 21 )
// Java:
people.select( people("age").lt(21) );
--------------------------------------------------------------------------------------------------------

def <=(other: Any): Column
Less than or equal to.

def leq(other: Any): Column
Less than or equal to.
// Scala: The following selects people age 21 or younger than 21.
people.select( people("age") <= 21 )
// Java:
people.select( people("age").leq(21) );

--------------------------------------------------------------------------------------------------------
def <=>(other: Any): Column
Equality test that is safe for null values.

def eqNullSafe(other: Any): Column
Equality test that is safe for null values.
--------------------------------------------------------------------------------------------------------

def ===(other: Any): Column
Equality test.

def equalTo(other: Any): Column
 Equality test.
// Scala:
df.filter( df("colA") === df("colB") )
// Java
import static org.apache.spark.sql.functions.*;
df.filter( col("colA").equalTo(col("colB")) );
--------------------------------------------------------------------------------------------------------

def >(other: Any): Column
Greater than.

def gt(other: Any): Column
Greater than.
// Scala: The following selects people older than 21.
people.select( people("age") > lit(21) )
// Java:
import static org.apache.spark.sql.functions.*;
people.select( people("age").gt(21) );
--------------------------------------------------------------------------------------------------------

def >=(other: Any): Column
Greater than or equal to an expression.

def geq(other: Any): Column
Greater than or equal to an expression.
// Scala: The following selects people age 21 or older than 21.
people.select( people("age") >= 21 )
// Java:
people.select( people("age").geq(21) )

--------------------------------------------------------------------------------------------------------


  • Scala has some particular semantics regarding the use of == and ===. In Spark, if you want to filter by equality you should use === (equal) or =!= (not equal). You can also use the not function and the equalTo method. Note that this is only needed when we use Column type. If we are writing string expressions, then it's not required.

scala> df.where(col("InvoiceNo") === 536365).select("InvoiceNo", "Description").show(5, false)

Recollect that === is a method that we are invoking on the Column type.


  • We know that we can specify multiple filters using "and" and "or" method.
In Spark, you should always chain together and filters as a sequential filter.
The reason for this is that even if Boolean statements are expressed serially (one after the other), Spark will flatten all of these filters into one statement and perform the filter at the same time, creating the and statement for us. Multiple or statements need to be specified in the same statement

Below note how we define condition as Column type. We can create such Column type instances where ever we want. However they will be analyzed only in context of a Dataframe

scala>     val priceFilter = col("UnitPrice") > 600
priceFilter: org.apache.spark.sql.Column = (UnitPrice > 600)

scala>     val descripFilter = col("Description").contains("POSTAGE")
descripFilter: org.apache.spark.sql.Column = contains(Description, POSTAGE)

scala>     df.where(col("StockCode").isin("DOT")).where(priceFilter.or(descripFilter)).show()
+---------+---------+--------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|   Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------+--------+-------------------+---------+----------+--------------+
|   536544|      DOT|DOTCOM POSTAGE|       1|2010-12-01 14:32:00|   569.77|      null|United Kingdom|
|   536592|      DOT|DOTCOM POSTAGE|       1|2010-12-01 17:06:00|   607.49|      null|United Kingdom|
+---------+---------+--------------+--------+-------------------+---------+----------+--------------+


def isin(list: Any*): Column
A boolean expression that is evaluated to true if the value of this expression is contained by the evaluated values of the arguments.

def contains(other: Any): Column
Contains the other element.



  • Boolean expressions are not just reserved to filters. We can add them as additional columns

scala>     val DOTCodeFilter = col("StockCode") === "DOT"
DOTCodeFilter: org.apache.spark.sql.Column = (StockCode = DOT)

scala>     val priceFilter = col("UnitPrice") > 600
priceFilter: org.apache.spark.sql.Column = (UnitPrice > 600)

scala>     val descripFilter = col("Description").contains("POSTAGE")
descripFilter: org.apache.spark.sql.Column = contains(Description, POSTAGE)

scala>     df.withColumn("isExpensive", DOTCodeFilter.and(priceFilter.or(descripFilter))).where("isExpensive").select("unitPrice", "isExpensive").show(5)
+---------+-----------+
|unitPrice|isExpensive|
+---------+-----------+
|   569.77|       true|
|   607.49|       true|
+---------+-----------+


scala> val pfilter =expr("UnitPrice > 600 AND StockCode ='DOT'")
pfilter: org.apache.spark.sql.Column = ((UnitPrice > 600) AND (StockCode = DOT))

scala> df.where(pfilter)
res45: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [InvoiceNo: string, StockCode: string ... 6 more fields]

scala> df.where(pfilter).show
+---------+---------+--------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|   Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------+--------+-------------------+---------+----------+--------------+
|   536592|      DOT|DOTCOM POSTAGE|       1|2010-12-01 17:06:00|   607.49|      null|United Kingdom|
+---------+---------+--------------+--------+-------------------+---------+----------+--------------+




scala>     df.createOrReplaceTempView("dfTable")

scala> val resu=spark.sql(s"""
     | SELECT UnitPrice, (StockCode = 'DOT' AND
     |   (UnitPrice > 600 OR instr(Description, "POSTAGE") >= 1)) as isExpensive
     | FROM dfTable
     | WHERE (StockCode = 'DOT' AND
     |        (UnitPrice > 600 OR instr(Description, "POSTAGE") >= 1))
     | """)


As shown above its often easier to just express filters as SQL statements than using the programmatic DataFrame interface and Spark SQL allows us to do this without paying any performance penalty.
 
  • Caution when working with null data when creating Boolean expressions.  If there is a null in your data we can eqNullSafe to perform   null-safe equivalence test.


Working with Numbers:
 
  • Following shows an example of using pow functions on numeric values. Note how we can construct the Column first and then use it.
We show multiple ways of performing the, --- using col, using expr ,using select Expr.

scala>     /*Using col*/
     |     import org.apache.spark.sql.functions.{expr, pow}
import org.apache.spark.sql.functions.{expr, pow}

scala>     var fabricatedQuantity = pow(col("Quantity") * col("UnitPrice"), 2) + 5
fabricatedQuantity: org.apache.spark.sql.Column = (POWER((Quantity * UnitPrice), 2.0) + 5)

scala>     df.select(expr("CustomerId"), fabricatedQuantity.alias("realQuantity")).show(2)
+----------+------------------+
|CustomerId|      realQuantity|
+----------+------------------+
|   17850.0|239.08999999999997|
|   17850.0|          418.7156|
+----------+------------------+
only showing top 2 rows


scala>     /*Using Expr*/
     |     fabricatedQuantity = expr("(pow((Quantity * UnitPrice),2))+5")
fabricatedQuantity: org.apache.spark.sql.Column = (pow((Quantity * UnitPrice), 2) + 5)

scala>     df.select(expr("CustomerId"), fabricatedQuantity.alias("realQuantity")).show(2)
+----------+------------------+
|CustomerId|      realQuantity|
+----------+------------------+
|   17850.0|239.08999999999997|
|   17850.0|          418.7156|
+----------+------------------+
only showing top 2 rows


scala>     /*Using selectExpr*/
     |     df.selectExpr("CustomerId","(POWER((Quantity * UnitPrice), 2.0) + 5) as realQuantity").show(2)
+----------+------------------+
|CustomerId|      realQuantity|
+----------+------------------+
|   17850.0|239.08999999999997|
|   17850.0|          418.7156|
+----------+------------------+
only showing top 2 rows

  • Following shows how to round numbers. To round to a whole number, we can cast the value to an integer. Spark also has more detailed functions for performing this explicitly and to a certain level of precision.

By default, the round function rounds up if you’re exactly in between two numbers. You can round down by using the bround

scala>     df.select(round(col("UnitPrice"), 1).alias("rounded"), col("UnitPrice")).show(5)
+-------+---------+
|rounded|UnitPrice|
+-------+---------+
|    2.6|     2.55|
|    3.4|     3.39|
|    2.8|     2.75|
|    3.4|     3.39|
|    3.4|     3.39|
+-------+---------+
only showing top 5 rows


scala>     df.select(round(lit("2.5")), bround(lit("2.5"))).show(2)
+-------------+--------------+
|round(2.5, 0)|bround(2.5, 0)|
+-------------+--------------+
|          3.0|           2.0|
|          3.0|           2.0|
+-------------+--------------+
only showing top 2 rows


scala>     df.selectExpr("round(2.5)","bround(2.5)").show(2)
+-------------+--------------+
|round(2.5, 0)|bround(2.5, 0)|
+-------------+--------------+
|            3|             2|
|            3|             2|
+-------------+--------------+
only showing top 2 rows


  • Following shows how to compute correlation between two columns. Using stat method we get the instance of DataFrameStatFunctions class, on top of which we can run statistics methods.

Following shows the definition of stat method on Dataframe

def stat: DataFrameStatFunctions
Returns a DataFrameStatFunctions for working statistic functions support.
// Finding frequent items in column with name 'a'.
ds.stat.freqItems(Seq("a"))

Note how using stat methods in select are simpler

scala> import org.apache.spark.sql.functions.{corr}
import org.apache.spark.sql.functions.corr

scala>     df.stat.corr("Quantity", "UnitPrice")
res0: Double = -0.04112314436835551

scala>     df.select(corr("Quantity", "UnitPrice")).show()
+-------------------------+
|corr(Quantity, UnitPrice)|
+-------------------------+
|     -0.04112314436835551|
+-------------------------+

  • Following shows how to compute summary statistics for a column of set of columns. The method "describe" is used to compute these statistics. This will take all numeric columns and calculate the count, mean, standard deviation, min, and max. 

def describe(cols: String*): DataFrame
Computes statistics for numeric and string columns, including count, mean, stddev, min, and max.
If no columns are given, this function computes statistics for all numerical or string columns.
This function is meant for exploratory data analysis, as we make no guarantee about the backward compatibility of the schema of the resulting Dataset. If you want to programmatically compute summary statistics, use the agg function instead.
ds.describe("age", "height").show()
// output:
// summary age   height
// count   10.0  10.0
// mean    53.3  178.05
// stddev  11.6  15.7
// min     18.0  163.0
// max     92.0  192.0


scala> df.describe()
res3: org.apache.spark.sql.DataFrame = [summary: string, InvoiceNo: string ... 6 more fields]

scala> df.describe().show
+-------+-----------------+------------------+--------------------+------------------+------------------+------------------+--------------+
|summary|        InvoiceNo|         StockCode|         Description|          Quantity|         UnitPrice|        CustomerID|       Country|
+-------+-----------------+------------------+--------------------+------------------+------------------+------------------+--------------+
|  count|             3108|              3108|                3098|              3108|              3108|              1968|          3108|
|   mean| 536516.684944841|27834.304044117645|                null| 8.627413127413128| 4.151946589446603|15661.388719512195|          null|
| stddev|72.89447869788873|17407.897548583845|                null|26.371821677029203|15.638659854603892|1854.4496996893627|          null|
|    min|           536365|             10002| 4 PURPLE FLOCK D...|               -24|               0.0|           12431.0|     Australia|
|    max|          C536548|              POST|ZINC WILLIE WINKI...|               600|            607.49|           18229.0|United Kingdom|
+-------+-----------------+------------------+--------------------+------------------+------------------+------------------+--------------+


scala> df.describe("Quantity","Country").show
+-------+------------------+--------------+
|summary|          Quantity|       Country|
+-------+------------------+--------------+
|  count|              3108|          3108|
|   mean| 8.627413127413128|          null|
| stddev|26.371821677029203|          null|
|    min|               -24|     Australia|
|    max|               600|United Kingdom|
+-------+------------------+--------------+

  • Following shows how to add a unique id to each row starting with 0

def monotonically_increasing_id(): Column
A column expression that generates monotonically increasing 64-bit integers.
The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive. The current implementation puts the partition ID in the upper 31 bits, and the record number within each partition in the lower 33 bits. The assumption is that the data frame has less than 1 billion partitions, and each partition has less than 8 billion records.
As an example, consider a DataFrame with two partitions, each with 3 records. This expression would return the following IDs:
0, 1, 2, 8589934592 (1L << 33), 8589934593, 8589934594.

scala> df.selectExpr("monotonically_increasing_id() + 1","*").show(5)
+-----------------------------------+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|(monotonically_increasing_id() + 1)|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+-----------------------------------+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|                                  1|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|
|                                  2|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|                                  3|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|   17850.0|United Kingdom|
|                                  4|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|                                  5|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
+-----------------------------------+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 5 rows