Search This Blog

Thursday, 11 April 2019

CH5.5 Spark Literals,adding Columns, removing Columns, Renaming Columns, casting



Converting to Spark types(literals):


  • Literals are used to pass explicit values into Spark that are just a value (rather than a new column).This might be a constant value or something we’ll need to compare to later on. 

  • This is basically a translation from a given programming language’s literal value to one that Spark understands.
     
  • Literals are expressions and you can use them in the same way
     
scala> df.select(expr("*"), lit(1).as("One")).show(2)
+-----------------+-------------------+-----+---+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|One|
+-----------------+-------------------+-----+---+
|    United States|            Romania|   15|  1|
|    United States|            Croatia|    1|  1|
+-----------------+-------------------+-----+---+
only showing top 2 rows


def lit(literal: Any): Column
Creates a Column of literal value.

The passed in object is returned directly if it is already a Column. If the object is a Scala Symbol, it is converted into a Column also. Otherwise, a new Column is created to represent the literal value.

Adding Columns:

  • The withColumn method can be used on our dataframe to add a new column. Following is the method definition. Note that it returns a DataFrame. It takes in a name of the column and a Column type


def withColumn(colName: String, col: Column): DataFrame
Returns a new Dataset by adding a column or replacing the existing column that has the same name.

scala> df.withColumn("Double Count",$"count" * 2).show(2)
+-----------------+-------------------+-----+------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|Double Count|
+-----------------+-------------------+-----+------------+
|    United States|            Romania|   15|          30|
|    United States|            Croatia|    1|           2|
+-----------------+-------------------+-----+------------+
only showing top 2 rows

scala> df.withColumn("Same Country flat",expr("DEST_COUNTRY_NAME == ORIGIN_COUNTRY_NAME")).show(2)
+-----------------+-------------------+-----+-----------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|Same Country flat|
+-----------------+-------------------+-----+-----------------+
|    United States|            Romania|   15|            false|
|    United States|            Croatia|    1|            false|
+-----------------+-------------------+-----+-----------------+
only showing top 2 rows

  • We can also rename a column this way.

scala> df.withColumn("Destination", expr("DEST_COUNTRY_NAME")).columns
res5: Array[String] = Array(DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME, count, Destination)

Renaming Columns:

  • Another alternative to rename column is withColumnRenamed  method on Dataframe. This will rename the column with the name of the string in the first argument to the string in the second argument. Following is the method definition.

def withColumnRenamed(existingName: String, newName: String): DataFrame
Returns a new Dataset with a column renamed. This is a no-op if schema doesn't contain existingName.

scala> df.withColumn("Destination", expr("DEST_COUNTRY_NAME")).columns
res5: Array[String] = Array(DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME, count, Destination)

Reserved Characters and keywords:
 
  • If there are reserved characters like spaces or dashes in column names, we should escape the column names . In Spark, we do this by using backtick (`) characters. 

    dfWithLongColName.selectExpr(
      "`This Long Column-Name`",
      "`This Long Column-Name` as `new col`").show(2)

We can refer to columns with reserved characters (and not escape them) if we’re doing an explicit string-to-column reference, which is interpreted as a literal instead of an expression. We only need to escape expressions that use reserved characters or keywords.

dfWithLongColName.select(col("This Long Column-Name")).columns

Case Sensitivity:
 
  • By default Spark is case insensitive; however, you can make Spark case sensitive by setting the configuration.
     
-- in SQL
set spark.sql.caseSensitive true
Removing Columns:
 

  • We can drop columns from DataFrame by selecting the columns we are interested in. However there is also a dedicated method called drop

  • The method has overloaded versions - One take a Column type and other take String column names. Using Strings we can also specify multiple columns . Note that all return a Dataframe with column(s) dropped

def drop(col: Column): DataFrame
Returns a new Dataset with a column dropped. This version of drop accepts a Column rather than a name. This is a no-op if the Dataset doesn't have a column with an equivalent expression.
def drop(colNames: String*): DataFrame
Returns a new Dataset with columns dropped. This is a no-op if schema doesn't contain column name(s).

This method can only be used to drop top level columns. the colName string is treated literally without further interpretation.
def drop(colName: String): DataFrame
Returns a new Dataset with a column dropped. This is a no-op if schema doesn't contain column name.

This method can only be used to drop top level columns. the colName string is treated literally without further interpretation.

scala> df.drop("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").show(2)
+-----+
|count|
+-----+
|   15|
|    1|
+-----+
only showing top 2 rows
Changing Column type(casting):

  • We can convert columns from one type to another by casting the column from one type to another.  The cast method available on Column type is used for this purpose.


def cast(to: String): Column
Casts the column to a different data type, using the canonical string representation of the type. The supported types are: string, boolean, byte, short, int, long, float, double, decimal, date, timestamp.

// Casts colA to integer.
df.select(df("colA").cast("int"))
def cast(to: DataType): Column
Casts the column to a different data type.

// Casts colA to IntegerType.
import org.apache.spark.sql.types.IntegerType
df.select(df("colA").cast(IntegerType))

// equivalent to
df.select(df("colA").cast("int"))

         Target Datatype can be specified using Strings or one of the Spark internal types.

scala> df.withColumn("count2", col("count").cast("int")).dtypes
res12: Array[(String, String)] = Array((DEST_COUNTRY_NAME,StringType), (ORIGIN_COUNTRY_NAME,StringType), (count,LongType), (count2,IntegerType))


CH5/4 Creating DF,select and selectExpr



DataFrame Transformations (Manipulating DataFrames)

When working with individual DataFrames there are some fundamental objectives. These break down into several core operations
 
  • We can add rows or columns
  • We can remove rows or columns
  • We can transform a row into a column (or vice versa)
  • We can change the order of rows based on the values in columns

Creating DataFrames:
 
  • We can create DataFrames from raw data sources as shown below

val df1 = spark.read.format("json").load("/data/flight-data/json/2015-summary.json")

  • We can also create DataFrames on the fly by taking a set of rows and converting them to a DataFrame. Here we create a scala collection of Row objects and parallelize them using sparkContext to create RDD. We also build the schema manually. Using the RDD and the Schema we create a DataFrame using createDataFrame

scala> val myManualSchema = new StructType(Array(new StructField("some", StringType, true), new StructField("col", StringType, true), new StructField("names", LongType, false)))
myManualSchema: org.apache.spark.sql.types.StructType = StructType(StructField(some,StringType,true), StructField(col,StringType,true), StructField(names,LongType,false))

scala>     /* Collection containing Row Objects*/
     |     val myRows = Seq(Row("Hello", null, 1L))
myRows: Seq[org.apache.spark.sql.Row] = List([Hello,null,1])

scala>     /*Parallelize the collection using sparkContext.parallelize*/
     |     val myRDD = spark.sparkContext.parallelize(myRows)
myRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = ParallelCollectionRDD[17] at parallelize at <console>:31

scala>     /*The SparkSession Object has the createDataFrame method*/
     |     val myDf = spark.createDataFrame(myRDD, myManualSchema)
myDf: org.apache.spark.sql.DataFrame = [some: string, col: string ... 1 more field]

scala>     myDf.show()
+-----+----+-----+
| some| col|names|
+-----+----+-----+
|Hello|null|    1|
+-----+----+-----+


def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame
Creates a DataFrame from an RDD containing Rows using the given schema. It is important to make sure that the structure of every Row of the provided RDD matches the provided schema. Otherwise, there will be runtime exception. Example:

import org.apache.spark.sql._
import org.apache.spark.sql.types._
val sparkSession = new org.apache.spark.sql.SparkSession(sc)

val schema =
  StructType(
    StructField("name", StringType, false) ::
    StructField("age", IntegerType, true) :: Nil)

val people =
  sc.textFile("examples/src/main/resources/people.txt").map(
    _.split(",")).map(p => Row(p(0), p(1).trim.toInt))
val dataFrame = sparkSession.createDataFrame(people, schema)
dataFrame.printSchema
// root
// |-- name: string (nullable = false)
// |-- age: integer (nullable = true)

dataFrame.createOrReplaceTempView("people")
sparkSession.sql("select name from people").collect.foreach(println)

  • Following shows an example how we read data from files, create RDD of Row objects and then create a DataFrame using the createDataFrame method.

scala>     val schema=StructType(Array(
     |       StructField("DEST_COUNTRY_NAME", StringType, true),
     |       StructField("ORIGIN_COUNTRY_NAME", StringType, true),
     |       StructField("count", LongType, false,
     |         Metadata.fromJson("{\"hello\":\"world\"}"))
     |     ))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(DEST_COUNTRY_NAME,StringType,true), StructField(ORIGIN_COUNTRY_NAME,StringType,true), StructField(count,LongType,false))

scala>     val rdd1=spark.sparkContext.textFile("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\flight-data\\csv").filter(x => !x.contains("DEST"))
rdd1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[59] at filter at <console>:28

scala>     val rdd2=rdd1.map(x => x.split(",")).map(x => Row(x(0),x(1),x(2).toLong))
rdd2: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[61] at map at <console>:30

scala>     val DF1 = spark.createDataFrame(rdd2,schema);
DF1: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]

scala>     DF1.show(10,false)
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|United States    |Romania            |1    |
|United States    |Ireland            |264  |
|United States    |India              |69   |
|Egypt            |United States      |24   |
|Equatorial Guinea|United States      |1    |
|United States    |Singapore          |25   |
|United States    |Grenada            |54   |
|Costa Rica       |United States      |477  |
|Senegal          |United States      |29   |
|United States    |Marshall Islands   |44   |
+-----------------+-------------------+-----+
only showing top 10 rows

  • In Scala, we can also take advantage of Spark’s implicits in the console (and if you import them in your JAR code) by running toDF on a Seq type. This does not play well with null types, so it’s not necessarily recommended for production use cases.


val myDF = Seq(("Hello", 2, 1L)).toDF("col1", "col2", "col3")

scala> val f = List(1,2,3,4).toDF()
f: org.apache.spark.sql.DataFrame = [value: int]

scala> f
res41: org.apache.spark.sql.DataFrame = [value: int]

scala> f.collect
res42: Array[org.apache.spark.sql.Row] = Array([1], [2], [3], [4])

Select and selectExpr:
 
  • The Select and selectExpr allow you to do the DataFrame equivalent of SQL queries on a table of data. We can use them to manipulate columns in your DataFrames.
     
  • We use the select method when you’re working with columns or expressions, and the selectExpr method when you’re working with expressions in strings.(SQL like expression)
     
  • Following is the documentation for methods select and selectExpr

def select(col: String, cols: String*): DataFrame
Selects a set of columns. This is a variant of select that can only select existing columns using column names (i.e. cannot construct expressions).

// The following two are equivalent:
ds.select("colA", "colB")
ds.select($"colA", $"colB")
def select(cols: Column*): DataFrame
Selects a set of column based expressions.

ds.select($"colA", $"colB" + 1)
def selectExpr(exprs: String*): DataFrame
Selects a set of SQL expressions. This is a variant of select that accepts SQL expressions.

// The following are equivalent:
ds.selectExpr("colA", "colB as newName", "abs(colC)")
ds.select(expr("colA"), expr("colB as newName"), expr("abs(colC)"))


  • Naturally some transformations are not specified as methods on columns(Column type); therefore, there exists a group of functions found in the org.apache.spark.sql.functions package.
     
  • The easiest way is just to use the select method and pass in the column names as strings with which you would like to work

scala> df.select("DEST_COUNTRY_NAME").show(2)
+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
|    United States|
|    United States|
+-----------------+
only showing top 2 rows

  • You can select multiple columns by using the same style of query, just add more column name strings to your select method call

scala> df.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").show(2)
+-----------------+-------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|
+-----------------+-------------------+
|    United States|            Romania|
|    United States|            Croatia|
+-----------------+-------------------+
only showing top 2 rows

  • We can refer to columns in a number of different ways. (We use them interchangeably)

scala>     df.select(
     |       df.col("DEST_COUNTRY_NAME"),   /*This is a method on DataFrame that returns a Column*/
     |       col("DEST_COUNTRY_NAME"),      /*col and column are functions under org.apache.spark.sql.functions. They return Column*/
     |       column("DEST_COUNTRY_NAME"),
     |       'DEST_COUNTRY_NAME,              /*$ and ' are two ways used in Scala*/
     |       $"DEST_COUNTRY_NAME",
     |       expr("DEST_COUNTRY_NAME")).show(2) /*expression is also a function and returns a Column type*/

+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|
+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|    United States|    United States|    United States|    United States|    United States|    United States|
|    United States|    United States|    United States|    United States|    United States|    United States|
+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
only showing top 2 rows

  • One common error is attempting to mix Column objects and strings. For example, the following code will result in a compiler error:

scala> df.select(col("DEST_COUNTRY_NAME"), "DEST_COUNTRY_NAME")
<console>:31: error: overloaded method value select with alternatives:

  • The expr is the most flexible reference that we can use. It can refer to a plain column or a string manipulation of a column.

scala> df.select(expr("DEST_COUNTRY_NAME AS destination")).show(2)
+-------------+
|  destination|
+-------------+
|United States|
|United States|
+-------------+
only showing top 2 rows


  • Because select followed by a series of expr is such a common pattern, Spark has a shorthand for doing this efficiently: selectExpr. This is probably the most convenient interface

scala> df.selectExpr("DEST_COUNTRY_NAME as newColumnName", "DEST_COUNTRY_NAME").show(2)
+-------------+-----------------+
|newColumnName|DEST_COUNTRY_NAME|
+-------------+-----------------+
|United States|    United States|
|United States|    United States|
+-------------+-----------------+
only showing top 2 rows

This allows to use SQL like expressions in the select

We can treat selectExpr as a simple way to build up complex expressions that create new DataFrames. In fact, we can add any valid non-aggregating SQL statement, and as long as the columns resolve, it will be valid!

Following adds  new column to our DataFrame. Similar to sql we are using * to represent all the columns.

scala> df.selectExpr(
     |     "*", // include all original columns
     |     "(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry").show(2)
+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
+-----------------+-------------------+-----+-------------+
only showing top 2 rows

  • With select expression, we can also specify aggregations over the entire DataFrame by taking advantage of the functions that we have
(Note that if we want to group by and then perform aggregation, the technique is different. With select we can only perform aggregations over entire DF.)

scala> df.selectExpr("avg(count) AS AVG", "sum(count) as SUM", "count(distinct(DEST_COUNTRY_NAME))").show(2)
+------------------+-------+---------------------------------+
|               AVG|    SUM|count(DISTINCT DEST_COUNTRY_NAME)|
+------------------+-------+---------------------------------+
|1718.3189081225032|2580915|                              167|
+------------------+-------+---------------------------------+