Search This Blog

Saturday, 13 April 2019

CH6.1 Working with Booleans, Numbers


Where to look for API:

Following are the places where one should look for transformations and functions:



  • The package org.apache.spark.sql.functions contains a variety of functions for a range of different data types. Normally entire package imported because they are used so frequently.


 
Converting to Spark types:

  • The lit function is used to converts a type in another language to its corresponding Spark representation. 

scala> df.select(lit(5), lit("five"), lit(5.0)).take(3)
res34: Array[org.apache.spark.sql.Row] = Array([5,five,5.0], [5,five,5.0], [5,five,5.0])

Working with Booleans:

  • Booleans are foundation for all filtering. Boolean statements consist of four elements: and, or, true, and false. We use these simple structures to build logical statements that evaluate to either true or false. These statements are often used as conditional requirements for when a row of data must either pass the test (evaluate to true) or else it will be filtered out.

scala> val df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\by-day\\2010-12-01.csv")
df: org.apache.spark.sql.DataFrame = [InvoiceNo: string, StockCode: string ... 6 more fields]

scala> spark.conf.set("spark.sql.shuffle.partitions",5)

scala>     df.where(col("InvoiceNo").equalTo(536365)).select("InvoiceNo", "Description").show(5, false)
+---------+-----------------------------------+
|InvoiceNo|Description                        |
+---------+-----------------------------------+
|536365   |WHITE HANGING HEART T-LIGHT HOLDER |
|536365   |WHITE METAL LANTERN                |
|536365   |CREAM CUPID HEARTS COAT HANGER     |
|536365   |KNITTED UNION FLAG HOT WATER BOTTLE|
|536365   |RED WOOLLY HOTTIE WHITE HEART.     |
+---------+-----------------------------------+
only showing top 5 rows

Following are the methods available on Column type to perform tests.
Note that most of the methods have an operator equivalent methods. These are preferred in Scala.
The verbose ones are preferred in Java.

def &&(other: Any): Column
Boolean AND.

def and(other: Column): Column
Boolean AND.
// Scala: The following selects people that are in school and employed at the same time.
people.select( people("inSchool") && people("isEmployed") )
// Java:
people.select( people("inSchool").and(people("isEmployed")) );
--------------------------------------------------------------------------------------------------------

def ||(other: Any): Column
Boolean OR.

def or(other: Column): Column
Boolean OR.
// Scala: The following selects people that are in school or employed.
people.filter( people("inSchool") || people("isEmployed") )
// Java:
people.filter( people("inSchool").or(people("isEmployed")) );
--------------------------------------------------------------------------------------------------------

def =!=(other: Any): Column
Inequality test.

def notEqual(other: Any): Column
Inequality test.
// Scala:
df.select( df("colA") !== df("colB") )
df.select( !(df("colA") === df("colB")) )
// Java:
import static org.apache.spark.sql.functions.*;
df.filter( col("colA").notEqual(col("colB")) );
--------------------------------------------------------------------------------------------------------

def <(other: Any): Column
Less than.

def lt(other: Any): Column Less than.
// Scala: The following selects people younger than 21.
people.select( people("age") < 21 )
// Java:
people.select( people("age").lt(21) );
--------------------------------------------------------------------------------------------------------

def <=(other: Any): Column
Less than or equal to.

def leq(other: Any): Column
Less than or equal to.
// Scala: The following selects people age 21 or younger than 21.
people.select( people("age") <= 21 )
// Java:
people.select( people("age").leq(21) );

--------------------------------------------------------------------------------------------------------
def <=>(other: Any): Column
Equality test that is safe for null values.

def eqNullSafe(other: Any): Column
Equality test that is safe for null values.
--------------------------------------------------------------------------------------------------------

def ===(other: Any): Column
Equality test.

def equalTo(other: Any): Column
 Equality test.
// Scala:
df.filter( df("colA") === df("colB") )
// Java
import static org.apache.spark.sql.functions.*;
df.filter( col("colA").equalTo(col("colB")) );
--------------------------------------------------------------------------------------------------------

def >(other: Any): Column
Greater than.

def gt(other: Any): Column
Greater than.
// Scala: The following selects people older than 21.
people.select( people("age") > lit(21) )
// Java:
import static org.apache.spark.sql.functions.*;
people.select( people("age").gt(21) );
--------------------------------------------------------------------------------------------------------

def >=(other: Any): Column
Greater than or equal to an expression.

def geq(other: Any): Column
Greater than or equal to an expression.
// Scala: The following selects people age 21 or older than 21.
people.select( people("age") >= 21 )
// Java:
people.select( people("age").geq(21) )

--------------------------------------------------------------------------------------------------------


  • Scala has some particular semantics regarding the use of == and ===. In Spark, if you want to filter by equality you should use === (equal) or =!= (not equal). You can also use the not function and the equalTo method. Note that this is only needed when we use Column type. If we are writing string expressions, then it's not required.

scala> df.where(col("InvoiceNo") === 536365).select("InvoiceNo", "Description").show(5, false)

Recollect that === is a method that we are invoking on the Column type.


  • We know that we can specify multiple filters using "and" and "or" method.
In Spark, you should always chain together and filters as a sequential filter.
The reason for this is that even if Boolean statements are expressed serially (one after the other), Spark will flatten all of these filters into one statement and perform the filter at the same time, creating the and statement for us. Multiple or statements need to be specified in the same statement

Below note how we define condition as Column type. We can create such Column type instances where ever we want. However they will be analyzed only in context of a Dataframe

scala>     val priceFilter = col("UnitPrice") > 600
priceFilter: org.apache.spark.sql.Column = (UnitPrice > 600)

scala>     val descripFilter = col("Description").contains("POSTAGE")
descripFilter: org.apache.spark.sql.Column = contains(Description, POSTAGE)

scala>     df.where(col("StockCode").isin("DOT")).where(priceFilter.or(descripFilter)).show()
+---------+---------+--------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|   Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------+--------+-------------------+---------+----------+--------------+
|   536544|      DOT|DOTCOM POSTAGE|       1|2010-12-01 14:32:00|   569.77|      null|United Kingdom|
|   536592|      DOT|DOTCOM POSTAGE|       1|2010-12-01 17:06:00|   607.49|      null|United Kingdom|
+---------+---------+--------------+--------+-------------------+---------+----------+--------------+


def isin(list: Any*): Column
A boolean expression that is evaluated to true if the value of this expression is contained by the evaluated values of the arguments.

def contains(other: Any): Column
Contains the other element.



  • Boolean expressions are not just reserved to filters. We can add them as additional columns

scala>     val DOTCodeFilter = col("StockCode") === "DOT"
DOTCodeFilter: org.apache.spark.sql.Column = (StockCode = DOT)

scala>     val priceFilter = col("UnitPrice") > 600
priceFilter: org.apache.spark.sql.Column = (UnitPrice > 600)

scala>     val descripFilter = col("Description").contains("POSTAGE")
descripFilter: org.apache.spark.sql.Column = contains(Description, POSTAGE)

scala>     df.withColumn("isExpensive", DOTCodeFilter.and(priceFilter.or(descripFilter))).where("isExpensive").select("unitPrice", "isExpensive").show(5)
+---------+-----------+
|unitPrice|isExpensive|
+---------+-----------+
|   569.77|       true|
|   607.49|       true|
+---------+-----------+


scala> val pfilter =expr("UnitPrice > 600 AND StockCode ='DOT'")
pfilter: org.apache.spark.sql.Column = ((UnitPrice > 600) AND (StockCode = DOT))

scala> df.where(pfilter)
res45: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [InvoiceNo: string, StockCode: string ... 6 more fields]

scala> df.where(pfilter).show
+---------+---------+--------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|   Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------+--------+-------------------+---------+----------+--------------+
|   536592|      DOT|DOTCOM POSTAGE|       1|2010-12-01 17:06:00|   607.49|      null|United Kingdom|
+---------+---------+--------------+--------+-------------------+---------+----------+--------------+




scala>     df.createOrReplaceTempView("dfTable")

scala> val resu=spark.sql(s"""
     | SELECT UnitPrice, (StockCode = 'DOT' AND
     |   (UnitPrice > 600 OR instr(Description, "POSTAGE") >= 1)) as isExpensive
     | FROM dfTable
     | WHERE (StockCode = 'DOT' AND
     |        (UnitPrice > 600 OR instr(Description, "POSTAGE") >= 1))
     | """)


As shown above its often easier to just express filters as SQL statements than using the programmatic DataFrame interface and Spark SQL allows us to do this without paying any performance penalty.
 
  • Caution when working with null data when creating Boolean expressions.  If there is a null in your data we can eqNullSafe to perform   null-safe equivalence test.


Working with Numbers:
 
  • Following shows an example of using pow functions on numeric values. Note how we can construct the Column first and then use it.
We show multiple ways of performing the, --- using col, using expr ,using select Expr.

scala>     /*Using col*/
     |     import org.apache.spark.sql.functions.{expr, pow}
import org.apache.spark.sql.functions.{expr, pow}

scala>     var fabricatedQuantity = pow(col("Quantity") * col("UnitPrice"), 2) + 5
fabricatedQuantity: org.apache.spark.sql.Column = (POWER((Quantity * UnitPrice), 2.0) + 5)

scala>     df.select(expr("CustomerId"), fabricatedQuantity.alias("realQuantity")).show(2)
+----------+------------------+
|CustomerId|      realQuantity|
+----------+------------------+
|   17850.0|239.08999999999997|
|   17850.0|          418.7156|
+----------+------------------+
only showing top 2 rows


scala>     /*Using Expr*/
     |     fabricatedQuantity = expr("(pow((Quantity * UnitPrice),2))+5")
fabricatedQuantity: org.apache.spark.sql.Column = (pow((Quantity * UnitPrice), 2) + 5)

scala>     df.select(expr("CustomerId"), fabricatedQuantity.alias("realQuantity")).show(2)
+----------+------------------+
|CustomerId|      realQuantity|
+----------+------------------+
|   17850.0|239.08999999999997|
|   17850.0|          418.7156|
+----------+------------------+
only showing top 2 rows


scala>     /*Using selectExpr*/
     |     df.selectExpr("CustomerId","(POWER((Quantity * UnitPrice), 2.0) + 5) as realQuantity").show(2)
+----------+------------------+
|CustomerId|      realQuantity|
+----------+------------------+
|   17850.0|239.08999999999997|
|   17850.0|          418.7156|
+----------+------------------+
only showing top 2 rows

  • Following shows how to round numbers. To round to a whole number, we can cast the value to an integer. Spark also has more detailed functions for performing this explicitly and to a certain level of precision.

By default, the round function rounds up if you’re exactly in between two numbers. You can round down by using the bround

scala>     df.select(round(col("UnitPrice"), 1).alias("rounded"), col("UnitPrice")).show(5)
+-------+---------+
|rounded|UnitPrice|
+-------+---------+
|    2.6|     2.55|
|    3.4|     3.39|
|    2.8|     2.75|
|    3.4|     3.39|
|    3.4|     3.39|
+-------+---------+
only showing top 5 rows


scala>     df.select(round(lit("2.5")), bround(lit("2.5"))).show(2)
+-------------+--------------+
|round(2.5, 0)|bround(2.5, 0)|
+-------------+--------------+
|          3.0|           2.0|
|          3.0|           2.0|
+-------------+--------------+
only showing top 2 rows


scala>     df.selectExpr("round(2.5)","bround(2.5)").show(2)
+-------------+--------------+
|round(2.5, 0)|bround(2.5, 0)|
+-------------+--------------+
|            3|             2|
|            3|             2|
+-------------+--------------+
only showing top 2 rows


  • Following shows how to compute correlation between two columns. Using stat method we get the instance of DataFrameStatFunctions class, on top of which we can run statistics methods.

Following shows the definition of stat method on Dataframe

def stat: DataFrameStatFunctions
Returns a DataFrameStatFunctions for working statistic functions support.
// Finding frequent items in column with name 'a'.
ds.stat.freqItems(Seq("a"))

Note how using stat methods in select are simpler

scala> import org.apache.spark.sql.functions.{corr}
import org.apache.spark.sql.functions.corr

scala>     df.stat.corr("Quantity", "UnitPrice")
res0: Double = -0.04112314436835551

scala>     df.select(corr("Quantity", "UnitPrice")).show()
+-------------------------+
|corr(Quantity, UnitPrice)|
+-------------------------+
|     -0.04112314436835551|
+-------------------------+

  • Following shows how to compute summary statistics for a column of set of columns. The method "describe" is used to compute these statistics. This will take all numeric columns and calculate the count, mean, standard deviation, min, and max. 

def describe(cols: String*): DataFrame
Computes statistics for numeric and string columns, including count, mean, stddev, min, and max.
If no columns are given, this function computes statistics for all numerical or string columns.
This function is meant for exploratory data analysis, as we make no guarantee about the backward compatibility of the schema of the resulting Dataset. If you want to programmatically compute summary statistics, use the agg function instead.
ds.describe("age", "height").show()
// output:
// summary age   height
// count   10.0  10.0
// mean    53.3  178.05
// stddev  11.6  15.7
// min     18.0  163.0
// max     92.0  192.0


scala> df.describe()
res3: org.apache.spark.sql.DataFrame = [summary: string, InvoiceNo: string ... 6 more fields]

scala> df.describe().show
+-------+-----------------+------------------+--------------------+------------------+------------------+------------------+--------------+
|summary|        InvoiceNo|         StockCode|         Description|          Quantity|         UnitPrice|        CustomerID|       Country|
+-------+-----------------+------------------+--------------------+------------------+------------------+------------------+--------------+
|  count|             3108|              3108|                3098|              3108|              3108|              1968|          3108|
|   mean| 536516.684944841|27834.304044117645|                null| 8.627413127413128| 4.151946589446603|15661.388719512195|          null|
| stddev|72.89447869788873|17407.897548583845|                null|26.371821677029203|15.638659854603892|1854.4496996893627|          null|
|    min|           536365|             10002| 4 PURPLE FLOCK D...|               -24|               0.0|           12431.0|     Australia|
|    max|          C536548|              POST|ZINC WILLIE WINKI...|               600|            607.49|           18229.0|United Kingdom|
+-------+-----------------+------------------+--------------------+------------------+------------------+------------------+--------------+


scala> df.describe("Quantity","Country").show
+-------+------------------+--------------+
|summary|          Quantity|       Country|
+-------+------------------+--------------+
|  count|              3108|          3108|
|   mean| 8.627413127413128|          null|
| stddev|26.371821677029203|          null|
|    min|               -24|     Australia|
|    max|               600|United Kingdom|
+-------+------------------+--------------+

  • Following shows how to add a unique id to each row starting with 0

def monotonically_increasing_id(): Column
A column expression that generates monotonically increasing 64-bit integers.
The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive. The current implementation puts the partition ID in the upper 31 bits, and the record number within each partition in the lower 33 bits. The assumption is that the data frame has less than 1 billion partitions, and each partition has less than 8 billion records.
As an example, consider a DataFrame with two partitions, each with 3 records. This expression would return the following IDs:
0, 1, 2, 8589934592 (1L << 33), 8589934593, 8589934594.

scala> df.selectExpr("monotonically_increasing_id() + 1","*").show(5)
+-----------------------------------+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|(monotonically_increasing_id() + 1)|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+-----------------------------------+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|                                  1|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|
|                                  2|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|                                  3|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|   17850.0|United Kingdom|
|                                  4|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|                                  5|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
+-----------------------------------+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 5 rows