Search This Blog

Friday, 12 April 2019

CH5.6 Filtering DataFrame Rows, Getting Uniques,Random Samples, RandomSplits


Filtering Rows:

  • There are two methods to perform this operation: you can use where or filter and they both will perform the same operation and accept the same argument types when used with DataFrames. 

  • Following are the definitions of these methods available on Dataframes. Note that there are two variants. One that takes a Column type and other that takes a String. When using the Dataset API from either Scala or Java, filter also accepts an arbitrary function that Spark will apply to each record in the Dataset.

def where(conditionExpr: String): Dataset[T]
Filters rows using the given SQL expression.

peopleDs.where("age > 15")
def where(condition: Column): Dataset[T]
Filters rows using the given condition. This is an alias for filter.

// The following are equivalent:
peopleDs.filter($"age" > 15)
peopleDs.where($"age" > 15)
def filter(func: (T) Boolean): Dataset[T]
(Scala-specific) Returns a new Dataset that only contains elements where func returns true.
def filter(conditionExpr: String): Dataset[T]
Filters rows using the given SQL expression.

peopleDs.filter("age > 15")
def filter(condition: Column): Dataset[T]
Filters rows using the given condition.

// The following are equivalent:
peopleDs.filter($"age" > 15)
peopleDs.where($"age" > 15)

scala> df.count
res24: Long = 1502

scala> df.where("ORIGIN_COUNTRY_NAME == 'Romania'").collect
res25: Array[org.apache.spark.sql.Row] = Array([United States,Romania,15], [United States,Romania,1], [United States,Romania,3], [United States,Romania,12], [United States,Romania,12])

scala> df.where(col("ORIGIN_COUNTRY_NAME") === "Romania").collect
res26: Array[org.apache.spark.sql.Row] = Array([United States,Romania,15], [United States,Romania,1], [United States,Romania,3], [United States,Romania,12], [United States,Romania,12])

  • Following shows how to define a Column type and use that directly in where condition.

scala> val FilterCond=expr("ORIGIN_COUNTRY_NAME == 'Romania'")
FilterCond: org.apache.spark.sql.Column = (ORIGIN_COUNTRY_NAME = Romania)

scala> df.where(FilterCond).collect
res27: Array[org.apache.spark.sql.Row] = Array([United States,Romania,15], [United States,Romania,1], [United States,Romania,3], [United States,Romania,12], [United States,Romania,12])

  • We can also put multiple filrers in same expression as shown below. Although this is possible, it is not always useful, because Spark automatically performs all filtering operations at the same time regardless of the filter ordering. This means that if you want to specify multiple AND filters, just chain them sequentially and let Spark handle the rest

scala> val FilterCond=expr("ORIGIN_COUNTRY_NAME == 'Romania'").and(expr("count > 3"))
FilterCond: org.apache.spark.sql.Column = ((ORIGIN_COUNTRY_NAME = Romania) AND (count > 3))

scala> df.where(FilterCond).collect
res28: Array[org.apache.spark.sql.Row] = Array([United States,Romania,15], [United States,Romania,12], [United States,Romania,12])

The "and" and "or" are methods available on Column type:

def and(other: Column): Column
Boolean AND.

// Scala: The following selects people that are in school and employed at the same time.
people.select( people("inSchool") && people("isEmployed") )

// Java:
people.select( people("inSchool").and(people("isEmployed")) );
def or(other: Column): Column
Boolean OR.

// Scala: The following selects people that are in school or employed.
people.filter( people("inSchool") || people("isEmployed") )

// Java:
people.filter( people("inSchool").or(people("isEmployed")) );

Following shows how to chain multiple filters

scala> val FilterCond=expr("ORIGIN_COUNTRY_NAME == 'Romania'")
FilterCond: org.apache.spark.sql.Column = (ORIGIN_COUNTRY_NAME = Romania)

scala> val Cond2=col("count") > 3
Cond2: org.apache.spark.sql.Column = (count > 3)

scala> df.where(FilterCond).where(Cond2).collect
res29: Array[org.apache.spark.sql.Row] = Array([United States,Romania,15], [United States,Romania,12], [United States,Romania,12])

Getting Unique Rows:


  • The distinct method on DataFrames allows us to deduplicate any rows that are in that DataFrame.

def distinct(): Dataset[T]
Returns a new Dataset that contains only the unique rows from this Dataset. This is an alias for dropDuplicates.

scala> df.select("DEST_COUNTRY_NAME").distinct.show
+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
|             Chad|
|         Anguilla|
|           Russia|
|         Paraguay|
|            Yemen|
|          Senegal|
|           Sweden|
|         Kiribati|
|           Guyana|
|      Philippines|
|         Djibouti|
|         Malaysia|
|        Singapore|
|             Fiji|
|           Turkey|
|           Malawi|
|             Iraq|
|          Germany|
|      Afghanistan|
|           Jordan|
+-----------------+
only showing top 20 rows

scala> df.select("DEST_COUNTRY_NAME").distinct.count
res0: Long = 167


Random Samples:

  • The  sample method on a DataFrame, makes it possible for you to specify a fraction of rows to extract from a DataFrame and whether you’d like to sample with or without replacement
  • Below shows that there are two overloaded versions. One takes a seed value and other uses random seed.

def sample(withReplacement: Boolean, fraction: Double): Dataset[T]
Returns a new Dataset by sampling a fraction of rows, using a random seed.

withReplacement- Sample with replacement or not.
Fraction -Fraction of rows to generate.
def sample(withReplacement: Boolean, fraction: Double, seed: Long): Dataset[T]
Returns a new Dataset by sampling a fraction of rows, using a user-supplied seed.

withReplacement- Sample with replacement or not.
Fraction - Fraction of rows to generate.
Seed- Seed for sampling.


scala> val seed = 5
seed: Int = 5

scala> val withReplacement = false
withReplacement: Boolean = false

scala> val fraction = 0.5
fraction: Double = 0.5

scala> df.sample(withReplacement, fraction, seed).show(5)
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|            Egypt|      United States|   15|
|    United States|              India|   62|
|       Costa Rica|      United States|  588|
|          Senegal|      United States|   40|
+-----------------+-------------------+-----+
only showing top 5 rows


Random Splits:

  • The randomSplit method can be used to break up your DataFrame into a random “splits” of the original DataFrame.
  • This is often used with machine learning algorithms to create training, validation, and test sets.
  • Below shows that there are two overloaded versions. One takes a seed value and other uses random seed.
  • It’s important to note that if you don’t specify a proportion for each DataFrame that adds up to one, they will be normalized so that they do

def randomSplit(weights: Array[Double]): Array[Dataset[T]]
Randomly splits this Dataset with the provided weights.
Weights -weights for splits, will be normalized if they don't sum to 1
def randomSplit(weights: Array[Double], seed: Long): Array[Dataset[T]]
Randomly splits this Dataset with the provided weights.

Weights - weights for splits, will be normalized if they don't sum to 1.
Seed - Seed for sampling

scala>   val df = spark.read.format("json").schema(myManualSchema).load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\flight-data\\json")
df: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]

scala> val weightsarray=Array(0.33,0.1,0.5)
weightsarray: Array[Double] = Array(0.33, 0.1, 0.5)

scala> spark.conf.set("spark.sql.shuffle.partitions",5)

scala>     df.randomSplit(weightsarray).foreach(x => { val perc= (x.count).toDouble/totalcount
     |        println(s"Counts in DF ${perc}")})
Counts in DF 0.35153129161118507
Counts in DF 0.1151797603195739
Counts in DF 0.533288948069241



No comments:

Post a Comment