Search This Blog

Thursday, 11 April 2019

CH5/4 Creating DF,select and selectExpr



DataFrame Transformations (Manipulating DataFrames)

When working with individual DataFrames there are some fundamental objectives. These break down into several core operations
 
  • We can add rows or columns
  • We can remove rows or columns
  • We can transform a row into a column (or vice versa)
  • We can change the order of rows based on the values in columns

Creating DataFrames:
 
  • We can create DataFrames from raw data sources as shown below

val df1 = spark.read.format("json").load("/data/flight-data/json/2015-summary.json")

  • We can also create DataFrames on the fly by taking a set of rows and converting them to a DataFrame. Here we create a scala collection of Row objects and parallelize them using sparkContext to create RDD. We also build the schema manually. Using the RDD and the Schema we create a DataFrame using createDataFrame

scala> val myManualSchema = new StructType(Array(new StructField("some", StringType, true), new StructField("col", StringType, true), new StructField("names", LongType, false)))
myManualSchema: org.apache.spark.sql.types.StructType = StructType(StructField(some,StringType,true), StructField(col,StringType,true), StructField(names,LongType,false))

scala>     /* Collection containing Row Objects*/
     |     val myRows = Seq(Row("Hello", null, 1L))
myRows: Seq[org.apache.spark.sql.Row] = List([Hello,null,1])

scala>     /*Parallelize the collection using sparkContext.parallelize*/
     |     val myRDD = spark.sparkContext.parallelize(myRows)
myRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = ParallelCollectionRDD[17] at parallelize at <console>:31

scala>     /*The SparkSession Object has the createDataFrame method*/
     |     val myDf = spark.createDataFrame(myRDD, myManualSchema)
myDf: org.apache.spark.sql.DataFrame = [some: string, col: string ... 1 more field]

scala>     myDf.show()
+-----+----+-----+
| some| col|names|
+-----+----+-----+
|Hello|null|    1|
+-----+----+-----+


def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame
Creates a DataFrame from an RDD containing Rows using the given schema. It is important to make sure that the structure of every Row of the provided RDD matches the provided schema. Otherwise, there will be runtime exception. Example:

import org.apache.spark.sql._
import org.apache.spark.sql.types._
val sparkSession = new org.apache.spark.sql.SparkSession(sc)

val schema =
  StructType(
    StructField("name", StringType, false) ::
    StructField("age", IntegerType, true) :: Nil)

val people =
  sc.textFile("examples/src/main/resources/people.txt").map(
    _.split(",")).map(p => Row(p(0), p(1).trim.toInt))
val dataFrame = sparkSession.createDataFrame(people, schema)
dataFrame.printSchema
// root
// |-- name: string (nullable = false)
// |-- age: integer (nullable = true)

dataFrame.createOrReplaceTempView("people")
sparkSession.sql("select name from people").collect.foreach(println)

  • Following shows an example how we read data from files, create RDD of Row objects and then create a DataFrame using the createDataFrame method.

scala>     val schema=StructType(Array(
     |       StructField("DEST_COUNTRY_NAME", StringType, true),
     |       StructField("ORIGIN_COUNTRY_NAME", StringType, true),
     |       StructField("count", LongType, false,
     |         Metadata.fromJson("{\"hello\":\"world\"}"))
     |     ))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(DEST_COUNTRY_NAME,StringType,true), StructField(ORIGIN_COUNTRY_NAME,StringType,true), StructField(count,LongType,false))

scala>     val rdd1=spark.sparkContext.textFile("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\flight-data\\csv").filter(x => !x.contains("DEST"))
rdd1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[59] at filter at <console>:28

scala>     val rdd2=rdd1.map(x => x.split(",")).map(x => Row(x(0),x(1),x(2).toLong))
rdd2: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[61] at map at <console>:30

scala>     val DF1 = spark.createDataFrame(rdd2,schema);
DF1: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]

scala>     DF1.show(10,false)
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|United States    |Romania            |1    |
|United States    |Ireland            |264  |
|United States    |India              |69   |
|Egypt            |United States      |24   |
|Equatorial Guinea|United States      |1    |
|United States    |Singapore          |25   |
|United States    |Grenada            |54   |
|Costa Rica       |United States      |477  |
|Senegal          |United States      |29   |
|United States    |Marshall Islands   |44   |
+-----------------+-------------------+-----+
only showing top 10 rows

  • In Scala, we can also take advantage of Spark’s implicits in the console (and if you import them in your JAR code) by running toDF on a Seq type. This does not play well with null types, so it’s not necessarily recommended for production use cases.


val myDF = Seq(("Hello", 2, 1L)).toDF("col1", "col2", "col3")

scala> val f = List(1,2,3,4).toDF()
f: org.apache.spark.sql.DataFrame = [value: int]

scala> f
res41: org.apache.spark.sql.DataFrame = [value: int]

scala> f.collect
res42: Array[org.apache.spark.sql.Row] = Array([1], [2], [3], [4])

Select and selectExpr:
 
  • The Select and selectExpr allow you to do the DataFrame equivalent of SQL queries on a table of data. We can use them to manipulate columns in your DataFrames.
     
  • We use the select method when you’re working with columns or expressions, and the selectExpr method when you’re working with expressions in strings.(SQL like expression)
     
  • Following is the documentation for methods select and selectExpr

def select(col: String, cols: String*): DataFrame
Selects a set of columns. This is a variant of select that can only select existing columns using column names (i.e. cannot construct expressions).

// The following two are equivalent:
ds.select("colA", "colB")
ds.select($"colA", $"colB")
def select(cols: Column*): DataFrame
Selects a set of column based expressions.

ds.select($"colA", $"colB" + 1)
def selectExpr(exprs: String*): DataFrame
Selects a set of SQL expressions. This is a variant of select that accepts SQL expressions.

// The following are equivalent:
ds.selectExpr("colA", "colB as newName", "abs(colC)")
ds.select(expr("colA"), expr("colB as newName"), expr("abs(colC)"))


  • Naturally some transformations are not specified as methods on columns(Column type); therefore, there exists a group of functions found in the org.apache.spark.sql.functions package.
     
  • The easiest way is just to use the select method and pass in the column names as strings with which you would like to work

scala> df.select("DEST_COUNTRY_NAME").show(2)
+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
|    United States|
|    United States|
+-----------------+
only showing top 2 rows

  • You can select multiple columns by using the same style of query, just add more column name strings to your select method call

scala> df.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").show(2)
+-----------------+-------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|
+-----------------+-------------------+
|    United States|            Romania|
|    United States|            Croatia|
+-----------------+-------------------+
only showing top 2 rows

  • We can refer to columns in a number of different ways. (We use them interchangeably)

scala>     df.select(
     |       df.col("DEST_COUNTRY_NAME"),   /*This is a method on DataFrame that returns a Column*/
     |       col("DEST_COUNTRY_NAME"),      /*col and column are functions under org.apache.spark.sql.functions. They return Column*/
     |       column("DEST_COUNTRY_NAME"),
     |       'DEST_COUNTRY_NAME,              /*$ and ' are two ways used in Scala*/
     |       $"DEST_COUNTRY_NAME",
     |       expr("DEST_COUNTRY_NAME")).show(2) /*expression is also a function and returns a Column type*/

+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|
+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|    United States|    United States|    United States|    United States|    United States|    United States|
|    United States|    United States|    United States|    United States|    United States|    United States|
+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
only showing top 2 rows

  • One common error is attempting to mix Column objects and strings. For example, the following code will result in a compiler error:

scala> df.select(col("DEST_COUNTRY_NAME"), "DEST_COUNTRY_NAME")
<console>:31: error: overloaded method value select with alternatives:

  • The expr is the most flexible reference that we can use. It can refer to a plain column or a string manipulation of a column.

scala> df.select(expr("DEST_COUNTRY_NAME AS destination")).show(2)
+-------------+
|  destination|
+-------------+
|United States|
|United States|
+-------------+
only showing top 2 rows


  • Because select followed by a series of expr is such a common pattern, Spark has a shorthand for doing this efficiently: selectExpr. This is probably the most convenient interface

scala> df.selectExpr("DEST_COUNTRY_NAME as newColumnName", "DEST_COUNTRY_NAME").show(2)
+-------------+-----------------+
|newColumnName|DEST_COUNTRY_NAME|
+-------------+-----------------+
|United States|    United States|
|United States|    United States|
+-------------+-----------------+
only showing top 2 rows

This allows to use SQL like expressions in the select

We can treat selectExpr as a simple way to build up complex expressions that create new DataFrames. In fact, we can add any valid non-aggregating SQL statement, and as long as the columns resolve, it will be valid!

Following adds  new column to our DataFrame. Similar to sql we are using * to represent all the columns.

scala> df.selectExpr(
     |     "*", // include all original columns
     |     "(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry").show(2)
+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
+-----------------+-------------------+-----+-------------+
only showing top 2 rows

  • With select expression, we can also specify aggregations over the entire DataFrame by taking advantage of the functions that we have
(Note that if we want to group by and then perform aggregation, the technique is different. With select we can only perform aggregations over entire DF.)

scala> df.selectExpr("avg(count) AS AVG", "sum(count) as SUM", "count(distinct(DEST_COUNTRY_NAME))").show(2)
+------------------+-------+---------------------------------+
|               AVG|    SUM|count(DISTINCT DEST_COUNTRY_NAME)|
+------------------+-------+---------------------------------+
|1718.3189081225032|2580915|                              167|
+------------------+-------+---------------------------------+


CH5/3 Records and Row type



Records and Rows:

scala> df.take(4)
res19: Array[org.apache.spark.sql.Row] = Array([United States,Romania,15], [United States,Croatia,1], [United States,Ireland,344], [Egypt,United States,15])

As shown below commands that return individual rows to the driver will always return one or more Row types when we are working with DataFrames.

  • We can create rows by manually instantiating a Row object with the values that belong in each column. It’s important to note that only DataFrames have schemas. Rows themselves do not have schemas. This means that if you create a Row manually, you must specify the values in the same order as the schema of the DataFrame to which they might be appended (we will see this when we discuss creating DataFrames)

scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row

scala> val myRow = Row("Hello", null, 1, false)
myRow: org.apache.spark.sql.Row = [Hello,null,1,false]

Note that the Row companion object has apply method, which allows us to instantiate Row object using Row (…..)
Remember Row is Trait and also has a companion Object.

  • Following are imp methods available on the Row Companion Object:

def apply(values: Any*): Row
This method can be used to construct a Row with the given values.
val empty: Row
Returns an empty row.
def fromSeq(values: Seq[Any]): Row
This method can be used to construct a Row from a Seq of values.
def fromTuple(tuple: Product): Row
def merge(rows: Row*): Row
Merge multiple rows into a single row, one after another.
def unapplySeq(row: Row): Some[Seq[Any]]
This method can be used to extract fields from a Row object in a pattern match. Example:
import org.apache.spark.sql._

val pairs = sql("SELECT key, value FROM src").rdd.map {
  case Row(key: Int, value: String) =>
    key -> value
}

  • To access data in Row object is easy. We just need to specify the position that we would like. Following are some of the methods available on Row trait to access elements:

def apply(i: Int): Any

Returns the value at position i. If the value is null, null is returned. The following is a mapping between Spark SQL types and return types:

BooleanType -> java.lang.Boolean
ByteType -> java.lang.Byte
ShortType -> java.lang.Short
IntegerType -> java.lang.Integer
FloatType -> java.lang.Float
DoubleType -> java.lang.Double
StringType -> String
DecimalType -> java.math.BigDecimal

DateType -> java.sql.Date
TimestampType -> java.sql.Timestamp

BinaryType -> byte array
ArrayType -> scala.collection.Seq (use getList for java.util.List)
MapType -> scala.collection.Map (use getJavaMap for java.util.Map)
StructType -> org.apache.spark.sql.Row
def getString(i: Int): String
Returns the value at position i as a String object.
def getLong(i: Int): Long
Returns the value at position i as a primitive long.
def getAs[T](i: Int): T
Returns the value at position i. For primitive types if value is null it returns 'zero value' specific for primitive ie. 0 for Int - use isNullAt to ensure that value is not null


scala> myRow(0)
res20: Any = Hello

scala> myRow(0).asInstanceOf[String]
res26: String = Hello

scala> myRow(1)
res21: Any = null

scala> myRow.getString(0)
res22: String = Hello

scala> myRow.getLong(2)
java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long
  at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:105)
  at org.apache.spark.sql.Row$class.getLong(Row.scala:231)
  at org.apache.spark.sql.catalyst.expressions.GenericRow.getLong(rows.scala:166)
  ... 51 elided

scala> myRow.getInt(2)
res24: Int = 1


CH5/2 Columns and Expressions



    Columns and Expressions:
     
    Columns:

    • Columns in Spark are similar to columns in a spreadsheet, R dataframe, or pandas DataFrame. You can select, manipulate, and remove columns from DataFrames and these operations are represented as expressions.

    • To Spark, columns are logical constructions that simply represent a value computed on a per-record basis by means of an expression. This means that to have a real value for a column, we need to have a row; and to have a row, we need to have a DataFrame. You cannot manipulate an individual column outside the context of a DataFrame

    def col(colName: String): Column
    Returns a Column based on the given column name.
    def column(colName: String): Column
    Returns a Column based on the given column name. Alias of col.

    To use either of these functions, you
    pass in a column name:
     
    scala> import org.apache.spark.sql.functions.{col, column}
    import org.apache.spark.sql.functions.{col, column}

    scala> col("someColumnName")
    res6: org.apache.spark.sql.Column = someColumnName

    scala> column("someColumnName")
    res7: org.apache.spark.sql.Column = someColumnName


    • Imp: Note that these columns might or might not exist in our DataFrames. Columns are not resolved until we compare the column names with those we are maintaining in the catalog. Column and table resolution happens in the analyzer phase.


    • Scala has some unique language features that allow for more shorthand ways of referring to columns.
      The following bits of
      syntactic sugar perform the exact same thing, namely creating a column, but provide no performance improvement. The $ allows us to designate a string as a special string that should refer to an expression. The tick mark (') is a special thing called a symbol; this is a Scala-specific construct of referring to some identifier. They both perform the same thing and are shorthand ways of referring to columns by name. Note that the type is ColumnName, which is a subtype of Column

    scala> $"someColumnName"
    res8: org.apache.spark.sql.ColumnName = someColumnName

    scala> 'someColumnName
    res9: Symbol = 'someColumnName


    • Explicit Column References: If you need to refer to a specific DataFrame’s column, you can use the col method on the specific DataFrame. This can be useful when you are performing a join and need to refer to a specific column in one DataFrame that might share a name with another column in the joined DataFrame.

    This col method is available on Dataset(Dataframe). This returns the same type, but is different than the col and column functions

    def col(colName: String): Column
    Selects column based on the column name and return it as a Column.

    scala> df.col("DEST_COUNTRY_NAME")
    res10: org.apache.spark.sql.Column = DEST_COUNTRY_NAME




    Expressions:

    • Columns are just expressions. An expression is a set of transformations on one or more values in a record in a DataFrame. Think of expression as a function that takes as input one or more column names, resolves them, and then potentially applies more expressions to create a single value for each record in the dataset. Importantly, this “single value” can actually be a complex type like a Map or Array.

    • In the simplest case, an expression, created via the expr function, is just a DataFrame column reference. In the simplest case, expr("someCol") is equivalent to col("someCol").

    scala> expr("DEST_COUNTRY_NAME")
    res13: org.apache.spark.sql.Column = DEST_COUNTRY_NAME

    scala> col("DEST_COUNTRY_NAME")
    res14: org.apache.spark.sql.Column = DEST_COUNTRY_NAME
     
    scala> expr("(((someCol + 5) * 200) - 6) < otherCol")
    res15: org.apache.spark.sql.Column = ((((someCol + 5) * 200) - 6) < otherCol)

    • The expr is a function under org.apache.spark.sql.functions Object.

    def expr(expr: String): Column
    Parses the expression string into the column that it represents, similar to DataFrame.selectExpr

    • Columns provide a subset of expression functionality. If you use col() and want to perform transformations on that column, you must perform those on that column reference. When using an expression, the expr function can actually parse transformations and column references from a string and can subsequently be passed into further transformations.

    expr("someCol - 5") is the same transformation as performing col("someCol") - 5, or even expr("someCol") - 5.

    Think as expr allows us to use string SQL expressions and returns a Column type.


    Column Type:

    • All the functions we saw earlier .i.e col,column,df.col,expr return a Column type. This is a class in Scala and we have several methods available on this Ex: providing alias, casting etc



    Accessing DataFrame Columns:

    • To see we can use printSchema . To programmatically access columns, you can use the columns property to see all columns on a DataFrame

    def columns: Array[String]
    Returns all column names as an array.

    scala> df.columns
    res16: Array[String] = Array(DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME, count)