Search This Blog

Friday, 12 April 2019

CH5.7 Union of Dataframes, Sorting,Limit, Re-partition and Coaleace



Concatenating and Appending Rows (Union)

  • DataFrames are immutable. So we cannot append to DataFrames because that would be changing it. To append to a DataFrame, you must union the original DataFrame along with the new DataFrame. This just concatenates the two DataFrames.
     
  • To union two DataFrames, you must be sure that they have the same schema and number of columns; otherwise, the union will fail.

  • Unions are currently performed based on location, not on the schema. This means that columns will not automatically line up the way you think they might.

scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row

scala> val schema = df.schema
schema: org.apache.spark.sql.types.StructType = StructType(StructField(DEST_COUNTRY_NAME,StringType,true), StructField(ORIGIN_COUNTRY_NAME,StringType,true), StructField(count,LongType,true))

scala> val newRows = Seq(
     |   Row("New Country", "Other Country", 5L),
     |   Row("New Country 2", "Other Country 3", 1L)
     | )
newRows: Seq[org.apache.spark.sql.Row] = List([New Country,Other Country,5], [New Country 2,Other Country 3,1])

scala> val parallelizedRows = spark.sparkContext.parallelize(newRows)
parallelizedRows: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = ParallelCollectionRDD[150] at parallelize at <console>:27

scala> val newDF = spark.createDataFrame(parallelizedRows, schema)
newDF: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]

scala> val unionedDF=df.union(newDF)
unionedDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]

scala> unionedDF.count
res10: Long = 1504

scala> df.count
res11: Long = 1502

scala> newDF.count
res12: Long = 2

  • Following is the definition of the method union on Dataframes:

def union(other: Dataset[T]): Dataset[T]
Returns a new Dataset containing union of rows in this Dataset and another Dataset.

This is equivalent to UNION ALL in SQL. To do a SQL-style set union (that does deduplication of elements), use this function followed by a distinct.
Also as standard in SQL, this function resolves columns by position (not by name).
Sorting Rows:

  • We can use the sort or orderBy methods on Dataframe for sorting. They work exactly the same.
    They accept both column expressions and strings as well a
    s multiple columns. The default is to sort in ascending order

  • Following are the method definitions . Here also we can see that there are 2 variants of each method. Once that works with Strings and other with Column datatype

def sort(sortExprs: Column*): Dataset[T]
Returns a new Dataset sorted by the given expressions. For example:

ds.sort($"col1", $"col2".desc)
def sort(sortCol: String, sortCols: String*): Dataset[T]
Returns a new Dataset sorted by the specified column, all in ascending order.

// The following 3 are equivalent
ds.sort("sortcol")
ds.sort($"sortcol")
ds.sort($"sortcol".asc)
def orderBy(sortExprs: Column*): Dataset[T]
Returns a new Dataset sorted by the given expressions. This is an alias of the sort function.
def orderBy(sortCol: String, sortCols: String*): Dataset[T]
Returns a new Dataset sorted by the given expressions. This is an alias of the sort function.

scala> df.sort("count").show(5)
+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|               Malta|      United States|    1|
|Saint Vincent and...|      United States|    1|
|       United States|            Croatia|    1|
|       United States|          Gibraltar|    1|
|       United States|          Singapore|    1|
+--------------------+-------------------+-----+
only showing top 5 rows


scala> df.orderBy("count", "DEST_COUNTRY_NAME").show(5)
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|       Azerbaijan|      United States|    1|
|          Belarus|      United States|    1|
|          Belarus|      United States|    1|
|           Brunei|      United States|    1|
|         Bulgaria|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows


scala> df.orderBy(col("count"), col("DEST_COUNTRY_NAME")).show(5)
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|       Azerbaijan|      United States|    1|
|          Belarus|      United States|    1|
|          Belarus|      United States|    1|
|           Brunei|      United States|    1|
|         Bulgaria|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows

  • To more explicitly specify sort direction, you need to use the asc and desc functions if operating on a column.
These functions are available under package org.apache.spark.sql.functions.
Note all these functions take String arguments (not Column types)

def asc(columnName: String): Column
Returns a sort expression based on ascending order of the column.
df.sort(asc("dept"), desc("age"))
def asc_nulls_first(columnName: String): Column
Returns a sort expression based on ascending order of the column, and null values return before non-null values.
df.sort(asc_nulls_last("dept"), desc("age"))
def asc_nulls_last(columnName: String): Column
Returns a sort expression based on ascending order of the column, and null values appear after non-null values.
df.sort(asc_nulls_last("dept"), desc("age"))
def desc(columnName: String): Column
Returns a sort expression based on the descending order of the column.
df.sort(asc("dept"), desc("age"))
def desc_nulls_first(columnName: String): Column
Returns a sort expression based on the descending order of the column, and null values appear before non-null values.
df.sort(asc("dept"), desc_nulls_first("age"))
def desc_nulls_last(columnName: String): Column
Returns a sort expression based on the descending order of the column, and null values appear after non-null values.
df.sort(asc("dept"), desc_nulls_last("age"))

scala> df.sort(desc("count")).show(5)
+-----------------+-------------------+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+-----------------+-------------------+------+
|    United States|      United States|370002|
|    United States|      United States|358354|
|    United States|      United States|352742|
|    United States|      United States|348113|
|    United States|      United States|347452|
+-----------------+-------------------+------+
only showing top 5 rows

  • Note that instead of using desc and asc, we can also use the desc and asc methods available on the Column type.

def asc: Column
Returns an ascending ordering used in sorting.

// Scala: sort a DataFrame by age column in ascending order.
df.sort(df("age").asc)
// Java
df.sort(df.col("age").asc());
def asc_nulls_first: Column
Returns an ascending ordering used in sorting, where null values appear before non-null values.

// Scala: sort a DataFrame by age column in ascending order and null values appearing first.
df.sort(df("age").asc_nulls_last)
// Java
df.sort(df.col("age").asc_nulls_last());

def asc_nulls_last: Column
Returns an ordering used in sorting, where null values appear after non-null values.

// Scala: sort a DataFrame by age column in ascending order and null values appearing last.
df.sort(df("age").asc_nulls_last)
// Java
df.sort(df.col("age").asc_nulls_last());
def desc: Column
Returns an ordering used in sorting.

// Scala
df.sort(df("age").desc)
// Java
df.sort(df.col("age").desc());

def desc_nulls_first: Column
Returns a descending ordering used in sorting, where null values appear before non-null values.

// Scala: sort a DataFrame by age column in descending order and null values appearing first.
df.sort(df("age").desc_nulls_first)
// Java
df.sort(df.col("age").desc_nulls_first());

def desc_nulls_last: Column
Returns a descending ordering used in sorting, where null values appear after non-null values.

// Scala: sort a DataFrame by age column in descending order and null values appearing last.
df.sort(df("age").desc_nulls_last)
// Java
df.sort(df.col("age").desc_nulls_last());

scala> df.sort(df("count").desc).show(5)
+-----------------+-------------------+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+-----------------+-------------------+------+
|    United States|      United States|370002|
|    United States|      United States|358354|
|    United States|      United States|352742|
|    United States|      United States|348113|
|    United States|      United States|347452|
+-----------------+-------------------+------+
only showing top 5 rows

  • Sorting Within Partitions:

    For optimization purposes, it’s sometimes advisable to sort within each partition before another set of transformations. You can use the sortWithinPartitions method to do this

def sortWithinPartitions(sortExprs: Column*): Dataset[T]
Returns a new Dataset with each partition sorted by the given expressions.
This is the same operation as "SORT BY" in SQL (Hive QL).
def sortWithinPartitions(sortCol: String, sortCols: String*): Dataset[T]
Returns a new Dataset with each partition sorted by the given expressions.
This is the same operation as "SORT BY" in SQL (Hive QL).

scala> val df = spark.read.format("json").schema(myManualSchema).load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\flight-data\\json").sortWithinPartitions("count")
df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]

Limit :

  • We can restrict the number of rows fetched from dataframe using limit.

def limit(n: Int): Dataset[T]
Returns a new Dataset by taking the first n rows.
The difference between this function and head is that head is an action and returns an array (by triggering query execution) while limit returns a new Dataset.

Repartition and Coalesce:

  • Important optimization opportunity is to partition the data according to some frequently filtered columns, which control the physical layout of data across the cluster including the partitioning scheme and the number of partitions.

  • Repartition will incur a full shuffle of the data, regardless of whether one is necessary. This means that you should typically only repartition when the future number of partitions is greater than your current number of partitions or when you are looking to partition by a set of columns

scala> df.rdd.getNumPartitions
res28: Int = 3

scala> val df1 = df.repartition(5)
df1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]

scala> df1.rdd.getNumPartitions
res29: Int = 5

The rdd method on Dataframes gives the rdd of Row objects

lazy val rdd: RDD[T]
Represents the content of the Dataset as an RDD of T.

  • If you know that you’re going to be filtering by a certain column often, it can be worth repartitioning based on that column. We could also specify the number of partitions:

scala> val df2 = df.repartition(col("DEST_COUNTRY_NAME"))
df2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]

scala> df2.rdd.getNumPartitions
res30: Int = 5

scala> val df3  = df.repartition(10,col("DEST_COUNTRY_NAME"))
df3: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]

scala> df3.rdd.getNumPartitions
res31: Int = 10

Following are the method definitions:

def repartition(partitionExprs: Column*): Dataset[T]
Returns a new Dataset partitioned by the given partitioning expressions, using spark.sql.shuffle.partitions as number of partitions. The resulting Dataset is hash partitioned.

This is the same operation as "DISTRIBUTE BY" in SQL (Hive QL).
def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T]
Returns a new Dataset partitioned by the given partitioning expressions into numPartitions. The resulting Dataset is hash partitioned.

This is the same operation as "DISTRIBUTE BY" in SQL (Hive QL).
def repartition(numPartitions: Int): Dataset[T]
Returns a new Dataset that has exactly numPartitions partitions.


  • Coalesce, on the other hand, will not incur a full shuffle and will try to combine partitions. 

def coalesce(numPartitions: Int): Dataset[T]

Returns a new Dataset that has exactly numPartitions partitions, when the fewer partitions are requested.
If a larger number of partitions is requested, it will stay at the current number of partitions. Similar to coalesce defined on an RDD, this operation results in a narrow dependency, e.g. if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of the 100 new partitions will claim 10 of the current partitions.

However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, this may result in your computation taking place on fewer nodes than you like (e.g. one node in the case of numPartitions = 1). To avoid this, you can call repartition. This will add a shuffle step, but means the current upstream partitions will be executed in parallel (per whatever the current partitioning is).



Collecting Rows to your Driver:

Spark maintains the state of the cluster in the driver. There are times when you’ll want to collect some of your data to the driver in order to manipulate it on your local machine.
The method collect gets all data from the entire DataFrame, take selects the first Nrows, and show prints out a number of rows nicely.

def collect(): Array[T]
Returns an array that contains all rows in this Dataset.
Running collect requires moving all the data into the application's driver process, and doing so on a very large dataset can crash the driver process with OutOfMemoryError.
def take(n: Int): Array[T]
Returns the first n rows in the Dataset.
Running take requires moving data into the application's driver process, and doing so with a very large n can crash the driver process with OutOfMemoryError.
def show(numRows: Int, truncate: Boolean): Unit

Displays the Dataset in a tabular form. For example:

year  month AVG('Adj Close) MAX('Adj Close)
1980  12    0.503218        0.595103
1981  01    0.523289        0.570307
1982  02    0.436504        0.475256
1983  03    0.410516        0.442194
1984  04    0.450090        0.483521
numRows - Number of rows to show (default 20)
Truncate - Whether truncate long strings. If true, strings more than 20 characters will be truncated and all cells will be aligned right

CH5.6 Filtering DataFrame Rows, Getting Uniques,Random Samples, RandomSplits


Filtering Rows:

  • There are two methods to perform this operation: you can use where or filter and they both will perform the same operation and accept the same argument types when used with DataFrames. 

  • Following are the definitions of these methods available on Dataframes. Note that there are two variants. One that takes a Column type and other that takes a String. When using the Dataset API from either Scala or Java, filter also accepts an arbitrary function that Spark will apply to each record in the Dataset.

def where(conditionExpr: String): Dataset[T]
Filters rows using the given SQL expression.

peopleDs.where("age > 15")
def where(condition: Column): Dataset[T]
Filters rows using the given condition. This is an alias for filter.

// The following are equivalent:
peopleDs.filter($"age" > 15)
peopleDs.where($"age" > 15)
def filter(func: (T) Boolean): Dataset[T]
(Scala-specific) Returns a new Dataset that only contains elements where func returns true.
def filter(conditionExpr: String): Dataset[T]
Filters rows using the given SQL expression.

peopleDs.filter("age > 15")
def filter(condition: Column): Dataset[T]
Filters rows using the given condition.

// The following are equivalent:
peopleDs.filter($"age" > 15)
peopleDs.where($"age" > 15)

scala> df.count
res24: Long = 1502

scala> df.where("ORIGIN_COUNTRY_NAME == 'Romania'").collect
res25: Array[org.apache.spark.sql.Row] = Array([United States,Romania,15], [United States,Romania,1], [United States,Romania,3], [United States,Romania,12], [United States,Romania,12])

scala> df.where(col("ORIGIN_COUNTRY_NAME") === "Romania").collect
res26: Array[org.apache.spark.sql.Row] = Array([United States,Romania,15], [United States,Romania,1], [United States,Romania,3], [United States,Romania,12], [United States,Romania,12])

  • Following shows how to define a Column type and use that directly in where condition.

scala> val FilterCond=expr("ORIGIN_COUNTRY_NAME == 'Romania'")
FilterCond: org.apache.spark.sql.Column = (ORIGIN_COUNTRY_NAME = Romania)

scala> df.where(FilterCond).collect
res27: Array[org.apache.spark.sql.Row] = Array([United States,Romania,15], [United States,Romania,1], [United States,Romania,3], [United States,Romania,12], [United States,Romania,12])

  • We can also put multiple filrers in same expression as shown below. Although this is possible, it is not always useful, because Spark automatically performs all filtering operations at the same time regardless of the filter ordering. This means that if you want to specify multiple AND filters, just chain them sequentially and let Spark handle the rest

scala> val FilterCond=expr("ORIGIN_COUNTRY_NAME == 'Romania'").and(expr("count > 3"))
FilterCond: org.apache.spark.sql.Column = ((ORIGIN_COUNTRY_NAME = Romania) AND (count > 3))

scala> df.where(FilterCond).collect
res28: Array[org.apache.spark.sql.Row] = Array([United States,Romania,15], [United States,Romania,12], [United States,Romania,12])

The "and" and "or" are methods available on Column type:

def and(other: Column): Column
Boolean AND.

// Scala: The following selects people that are in school and employed at the same time.
people.select( people("inSchool") && people("isEmployed") )

// Java:
people.select( people("inSchool").and(people("isEmployed")) );
def or(other: Column): Column
Boolean OR.

// Scala: The following selects people that are in school or employed.
people.filter( people("inSchool") || people("isEmployed") )

// Java:
people.filter( people("inSchool").or(people("isEmployed")) );

Following shows how to chain multiple filters

scala> val FilterCond=expr("ORIGIN_COUNTRY_NAME == 'Romania'")
FilterCond: org.apache.spark.sql.Column = (ORIGIN_COUNTRY_NAME = Romania)

scala> val Cond2=col("count") > 3
Cond2: org.apache.spark.sql.Column = (count > 3)

scala> df.where(FilterCond).where(Cond2).collect
res29: Array[org.apache.spark.sql.Row] = Array([United States,Romania,15], [United States,Romania,12], [United States,Romania,12])

Getting Unique Rows:


  • The distinct method on DataFrames allows us to deduplicate any rows that are in that DataFrame.

def distinct(): Dataset[T]
Returns a new Dataset that contains only the unique rows from this Dataset. This is an alias for dropDuplicates.

scala> df.select("DEST_COUNTRY_NAME").distinct.show
+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
|             Chad|
|         Anguilla|
|           Russia|
|         Paraguay|
|            Yemen|
|          Senegal|
|           Sweden|
|         Kiribati|
|           Guyana|
|      Philippines|
|         Djibouti|
|         Malaysia|
|        Singapore|
|             Fiji|
|           Turkey|
|           Malawi|
|             Iraq|
|          Germany|
|      Afghanistan|
|           Jordan|
+-----------------+
only showing top 20 rows

scala> df.select("DEST_COUNTRY_NAME").distinct.count
res0: Long = 167


Random Samples:

  • The  sample method on a DataFrame, makes it possible for you to specify a fraction of rows to extract from a DataFrame and whether you’d like to sample with or without replacement
  • Below shows that there are two overloaded versions. One takes a seed value and other uses random seed.

def sample(withReplacement: Boolean, fraction: Double): Dataset[T]
Returns a new Dataset by sampling a fraction of rows, using a random seed.

withReplacement- Sample with replacement or not.
Fraction -Fraction of rows to generate.
def sample(withReplacement: Boolean, fraction: Double, seed: Long): Dataset[T]
Returns a new Dataset by sampling a fraction of rows, using a user-supplied seed.

withReplacement- Sample with replacement or not.
Fraction - Fraction of rows to generate.
Seed- Seed for sampling.


scala> val seed = 5
seed: Int = 5

scala> val withReplacement = false
withReplacement: Boolean = false

scala> val fraction = 0.5
fraction: Double = 0.5

scala> df.sample(withReplacement, fraction, seed).show(5)
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|            Egypt|      United States|   15|
|    United States|              India|   62|
|       Costa Rica|      United States|  588|
|          Senegal|      United States|   40|
+-----------------+-------------------+-----+
only showing top 5 rows


Random Splits:

  • The randomSplit method can be used to break up your DataFrame into a random “splits” of the original DataFrame.
  • This is often used with machine learning algorithms to create training, validation, and test sets.
  • Below shows that there are two overloaded versions. One takes a seed value and other uses random seed.
  • It’s important to note that if you don’t specify a proportion for each DataFrame that adds up to one, they will be normalized so that they do

def randomSplit(weights: Array[Double]): Array[Dataset[T]]
Randomly splits this Dataset with the provided weights.
Weights -weights for splits, will be normalized if they don't sum to 1
def randomSplit(weights: Array[Double], seed: Long): Array[Dataset[T]]
Randomly splits this Dataset with the provided weights.

Weights - weights for splits, will be normalized if they don't sum to 1.
Seed - Seed for sampling

scala>   val df = spark.read.format("json").schema(myManualSchema).load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\flight-data\\json")
df: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]

scala> val weightsarray=Array(0.33,0.1,0.5)
weightsarray: Array[Double] = Array(0.33, 0.1, 0.5)

scala> spark.conf.set("spark.sql.shuffle.partitions",5)

scala>     df.randomSplit(weightsarray).foreach(x => { val perc= (x.count).toDouble/totalcount
     |        println(s"Counts in DF ${perc}")})
Counts in DF 0.35153129161118507
Counts in DF 0.1151797603195739
Counts in DF 0.533288948069241