Search This Blog

Friday, 12 April 2019

CH5.7 Union of Dataframes, Sorting,Limit, Re-partition and Coaleace



Concatenating and Appending Rows (Union)

  • DataFrames are immutable. So we cannot append to DataFrames because that would be changing it. To append to a DataFrame, you must union the original DataFrame along with the new DataFrame. This just concatenates the two DataFrames.
     
  • To union two DataFrames, you must be sure that they have the same schema and number of columns; otherwise, the union will fail.

  • Unions are currently performed based on location, not on the schema. This means that columns will not automatically line up the way you think they might.

scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row

scala> val schema = df.schema
schema: org.apache.spark.sql.types.StructType = StructType(StructField(DEST_COUNTRY_NAME,StringType,true), StructField(ORIGIN_COUNTRY_NAME,StringType,true), StructField(count,LongType,true))

scala> val newRows = Seq(
     |   Row("New Country", "Other Country", 5L),
     |   Row("New Country 2", "Other Country 3", 1L)
     | )
newRows: Seq[org.apache.spark.sql.Row] = List([New Country,Other Country,5], [New Country 2,Other Country 3,1])

scala> val parallelizedRows = spark.sparkContext.parallelize(newRows)
parallelizedRows: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = ParallelCollectionRDD[150] at parallelize at <console>:27

scala> val newDF = spark.createDataFrame(parallelizedRows, schema)
newDF: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]

scala> val unionedDF=df.union(newDF)
unionedDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]

scala> unionedDF.count
res10: Long = 1504

scala> df.count
res11: Long = 1502

scala> newDF.count
res12: Long = 2

  • Following is the definition of the method union on Dataframes:

def union(other: Dataset[T]): Dataset[T]
Returns a new Dataset containing union of rows in this Dataset and another Dataset.

This is equivalent to UNION ALL in SQL. To do a SQL-style set union (that does deduplication of elements), use this function followed by a distinct.
Also as standard in SQL, this function resolves columns by position (not by name).
Sorting Rows:

  • We can use the sort or orderBy methods on Dataframe for sorting. They work exactly the same.
    They accept both column expressions and strings as well a
    s multiple columns. The default is to sort in ascending order

  • Following are the method definitions . Here also we can see that there are 2 variants of each method. Once that works with Strings and other with Column datatype

def sort(sortExprs: Column*): Dataset[T]
Returns a new Dataset sorted by the given expressions. For example:

ds.sort($"col1", $"col2".desc)
def sort(sortCol: String, sortCols: String*): Dataset[T]
Returns a new Dataset sorted by the specified column, all in ascending order.

// The following 3 are equivalent
ds.sort("sortcol")
ds.sort($"sortcol")
ds.sort($"sortcol".asc)
def orderBy(sortExprs: Column*): Dataset[T]
Returns a new Dataset sorted by the given expressions. This is an alias of the sort function.
def orderBy(sortCol: String, sortCols: String*): Dataset[T]
Returns a new Dataset sorted by the given expressions. This is an alias of the sort function.

scala> df.sort("count").show(5)
+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|               Malta|      United States|    1|
|Saint Vincent and...|      United States|    1|
|       United States|            Croatia|    1|
|       United States|          Gibraltar|    1|
|       United States|          Singapore|    1|
+--------------------+-------------------+-----+
only showing top 5 rows


scala> df.orderBy("count", "DEST_COUNTRY_NAME").show(5)
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|       Azerbaijan|      United States|    1|
|          Belarus|      United States|    1|
|          Belarus|      United States|    1|
|           Brunei|      United States|    1|
|         Bulgaria|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows


scala> df.orderBy(col("count"), col("DEST_COUNTRY_NAME")).show(5)
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|       Azerbaijan|      United States|    1|
|          Belarus|      United States|    1|
|          Belarus|      United States|    1|
|           Brunei|      United States|    1|
|         Bulgaria|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows

  • To more explicitly specify sort direction, you need to use the asc and desc functions if operating on a column.
These functions are available under package org.apache.spark.sql.functions.
Note all these functions take String arguments (not Column types)

def asc(columnName: String): Column
Returns a sort expression based on ascending order of the column.
df.sort(asc("dept"), desc("age"))
def asc_nulls_first(columnName: String): Column
Returns a sort expression based on ascending order of the column, and null values return before non-null values.
df.sort(asc_nulls_last("dept"), desc("age"))
def asc_nulls_last(columnName: String): Column
Returns a sort expression based on ascending order of the column, and null values appear after non-null values.
df.sort(asc_nulls_last("dept"), desc("age"))
def desc(columnName: String): Column
Returns a sort expression based on the descending order of the column.
df.sort(asc("dept"), desc("age"))
def desc_nulls_first(columnName: String): Column
Returns a sort expression based on the descending order of the column, and null values appear before non-null values.
df.sort(asc("dept"), desc_nulls_first("age"))
def desc_nulls_last(columnName: String): Column
Returns a sort expression based on the descending order of the column, and null values appear after non-null values.
df.sort(asc("dept"), desc_nulls_last("age"))

scala> df.sort(desc("count")).show(5)
+-----------------+-------------------+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+-----------------+-------------------+------+
|    United States|      United States|370002|
|    United States|      United States|358354|
|    United States|      United States|352742|
|    United States|      United States|348113|
|    United States|      United States|347452|
+-----------------+-------------------+------+
only showing top 5 rows

  • Note that instead of using desc and asc, we can also use the desc and asc methods available on the Column type.

def asc: Column
Returns an ascending ordering used in sorting.

// Scala: sort a DataFrame by age column in ascending order.
df.sort(df("age").asc)
// Java
df.sort(df.col("age").asc());
def asc_nulls_first: Column
Returns an ascending ordering used in sorting, where null values appear before non-null values.

// Scala: sort a DataFrame by age column in ascending order and null values appearing first.
df.sort(df("age").asc_nulls_last)
// Java
df.sort(df.col("age").asc_nulls_last());

def asc_nulls_last: Column
Returns an ordering used in sorting, where null values appear after non-null values.

// Scala: sort a DataFrame by age column in ascending order and null values appearing last.
df.sort(df("age").asc_nulls_last)
// Java
df.sort(df.col("age").asc_nulls_last());
def desc: Column
Returns an ordering used in sorting.

// Scala
df.sort(df("age").desc)
// Java
df.sort(df.col("age").desc());

def desc_nulls_first: Column
Returns a descending ordering used in sorting, where null values appear before non-null values.

// Scala: sort a DataFrame by age column in descending order and null values appearing first.
df.sort(df("age").desc_nulls_first)
// Java
df.sort(df.col("age").desc_nulls_first());

def desc_nulls_last: Column
Returns a descending ordering used in sorting, where null values appear after non-null values.

// Scala: sort a DataFrame by age column in descending order and null values appearing last.
df.sort(df("age").desc_nulls_last)
// Java
df.sort(df.col("age").desc_nulls_last());

scala> df.sort(df("count").desc).show(5)
+-----------------+-------------------+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+-----------------+-------------------+------+
|    United States|      United States|370002|
|    United States|      United States|358354|
|    United States|      United States|352742|
|    United States|      United States|348113|
|    United States|      United States|347452|
+-----------------+-------------------+------+
only showing top 5 rows

  • Sorting Within Partitions:

    For optimization purposes, it’s sometimes advisable to sort within each partition before another set of transformations. You can use the sortWithinPartitions method to do this

def sortWithinPartitions(sortExprs: Column*): Dataset[T]
Returns a new Dataset with each partition sorted by the given expressions.
This is the same operation as "SORT BY" in SQL (Hive QL).
def sortWithinPartitions(sortCol: String, sortCols: String*): Dataset[T]
Returns a new Dataset with each partition sorted by the given expressions.
This is the same operation as "SORT BY" in SQL (Hive QL).

scala> val df = spark.read.format("json").schema(myManualSchema).load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\flight-data\\json").sortWithinPartitions("count")
df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]

Limit :

  • We can restrict the number of rows fetched from dataframe using limit.

def limit(n: Int): Dataset[T]
Returns a new Dataset by taking the first n rows.
The difference between this function and head is that head is an action and returns an array (by triggering query execution) while limit returns a new Dataset.

Repartition and Coalesce:

  • Important optimization opportunity is to partition the data according to some frequently filtered columns, which control the physical layout of data across the cluster including the partitioning scheme and the number of partitions.

  • Repartition will incur a full shuffle of the data, regardless of whether one is necessary. This means that you should typically only repartition when the future number of partitions is greater than your current number of partitions or when you are looking to partition by a set of columns

scala> df.rdd.getNumPartitions
res28: Int = 3

scala> val df1 = df.repartition(5)
df1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]

scala> df1.rdd.getNumPartitions
res29: Int = 5

The rdd method on Dataframes gives the rdd of Row objects

lazy val rdd: RDD[T]
Represents the content of the Dataset as an RDD of T.

  • If you know that you’re going to be filtering by a certain column often, it can be worth repartitioning based on that column. We could also specify the number of partitions:

scala> val df2 = df.repartition(col("DEST_COUNTRY_NAME"))
df2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]

scala> df2.rdd.getNumPartitions
res30: Int = 5

scala> val df3  = df.repartition(10,col("DEST_COUNTRY_NAME"))
df3: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]

scala> df3.rdd.getNumPartitions
res31: Int = 10

Following are the method definitions:

def repartition(partitionExprs: Column*): Dataset[T]
Returns a new Dataset partitioned by the given partitioning expressions, using spark.sql.shuffle.partitions as number of partitions. The resulting Dataset is hash partitioned.

This is the same operation as "DISTRIBUTE BY" in SQL (Hive QL).
def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T]
Returns a new Dataset partitioned by the given partitioning expressions into numPartitions. The resulting Dataset is hash partitioned.

This is the same operation as "DISTRIBUTE BY" in SQL (Hive QL).
def repartition(numPartitions: Int): Dataset[T]
Returns a new Dataset that has exactly numPartitions partitions.


  • Coalesce, on the other hand, will not incur a full shuffle and will try to combine partitions. 

def coalesce(numPartitions: Int): Dataset[T]

Returns a new Dataset that has exactly numPartitions partitions, when the fewer partitions are requested.
If a larger number of partitions is requested, it will stay at the current number of partitions. Similar to coalesce defined on an RDD, this operation results in a narrow dependency, e.g. if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of the 100 new partitions will claim 10 of the current partitions.

However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, this may result in your computation taking place on fewer nodes than you like (e.g. one node in the case of numPartitions = 1). To avoid this, you can call repartition. This will add a shuffle step, but means the current upstream partitions will be executed in parallel (per whatever the current partitioning is).



Collecting Rows to your Driver:

Spark maintains the state of the cluster in the driver. There are times when you’ll want to collect some of your data to the driver in order to manipulate it on your local machine.
The method collect gets all data from the entire DataFrame, take selects the first Nrows, and show prints out a number of rows nicely.

def collect(): Array[T]
Returns an array that contains all rows in this Dataset.
Running collect requires moving all the data into the application's driver process, and doing so on a very large dataset can crash the driver process with OutOfMemoryError.
def take(n: Int): Array[T]
Returns the first n rows in the Dataset.
Running take requires moving data into the application's driver process, and doing so with a very large n can crash the driver process with OutOfMemoryError.
def show(numRows: Int, truncate: Boolean): Unit

Displays the Dataset in a tabular form. For example:

year  month AVG('Adj Close) MAX('Adj Close)
1980  12    0.503218        0.595103
1981  01    0.523289        0.570307
1982  02    0.436504        0.475256
1983  03    0.410516        0.442194
1984  04    0.450090        0.483521
numRows - Number of rows to show (default 20)
Truncate - Whether truncate long strings. If true, strings more than 20 characters will be truncated and all cells will be aligned right

No comments:

Post a Comment