Search This Blog

Thursday, 11 April 2019

CH5.5 Spark Literals,adding Columns, removing Columns, Renaming Columns, casting



Converting to Spark types(literals):


  • Literals are used to pass explicit values into Spark that are just a value (rather than a new column).This might be a constant value or something we’ll need to compare to later on. 

  • This is basically a translation from a given programming language’s literal value to one that Spark understands.
     
  • Literals are expressions and you can use them in the same way
     
scala> df.select(expr("*"), lit(1).as("One")).show(2)
+-----------------+-------------------+-----+---+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|One|
+-----------------+-------------------+-----+---+
|    United States|            Romania|   15|  1|
|    United States|            Croatia|    1|  1|
+-----------------+-------------------+-----+---+
only showing top 2 rows


def lit(literal: Any): Column
Creates a Column of literal value.

The passed in object is returned directly if it is already a Column. If the object is a Scala Symbol, it is converted into a Column also. Otherwise, a new Column is created to represent the literal value.

Adding Columns:

  • The withColumn method can be used on our dataframe to add a new column. Following is the method definition. Note that it returns a DataFrame. It takes in a name of the column and a Column type


def withColumn(colName: String, col: Column): DataFrame
Returns a new Dataset by adding a column or replacing the existing column that has the same name.

scala> df.withColumn("Double Count",$"count" * 2).show(2)
+-----------------+-------------------+-----+------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|Double Count|
+-----------------+-------------------+-----+------------+
|    United States|            Romania|   15|          30|
|    United States|            Croatia|    1|           2|
+-----------------+-------------------+-----+------------+
only showing top 2 rows

scala> df.withColumn("Same Country flat",expr("DEST_COUNTRY_NAME == ORIGIN_COUNTRY_NAME")).show(2)
+-----------------+-------------------+-----+-----------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|Same Country flat|
+-----------------+-------------------+-----+-----------------+
|    United States|            Romania|   15|            false|
|    United States|            Croatia|    1|            false|
+-----------------+-------------------+-----+-----------------+
only showing top 2 rows

  • We can also rename a column this way.

scala> df.withColumn("Destination", expr("DEST_COUNTRY_NAME")).columns
res5: Array[String] = Array(DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME, count, Destination)

Renaming Columns:

  • Another alternative to rename column is withColumnRenamed  method on Dataframe. This will rename the column with the name of the string in the first argument to the string in the second argument. Following is the method definition.

def withColumnRenamed(existingName: String, newName: String): DataFrame
Returns a new Dataset with a column renamed. This is a no-op if schema doesn't contain existingName.

scala> df.withColumn("Destination", expr("DEST_COUNTRY_NAME")).columns
res5: Array[String] = Array(DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME, count, Destination)

Reserved Characters and keywords:
 
  • If there are reserved characters like spaces or dashes in column names, we should escape the column names . In Spark, we do this by using backtick (`) characters. 

    dfWithLongColName.selectExpr(
      "`This Long Column-Name`",
      "`This Long Column-Name` as `new col`").show(2)

We can refer to columns with reserved characters (and not escape them) if we’re doing an explicit string-to-column reference, which is interpreted as a literal instead of an expression. We only need to escape expressions that use reserved characters or keywords.

dfWithLongColName.select(col("This Long Column-Name")).columns

Case Sensitivity:
 
  • By default Spark is case insensitive; however, you can make Spark case sensitive by setting the configuration.
     
-- in SQL
set spark.sql.caseSensitive true
Removing Columns:
 

  • We can drop columns from DataFrame by selecting the columns we are interested in. However there is also a dedicated method called drop

  • The method has overloaded versions - One take a Column type and other take String column names. Using Strings we can also specify multiple columns . Note that all return a Dataframe with column(s) dropped

def drop(col: Column): DataFrame
Returns a new Dataset with a column dropped. This version of drop accepts a Column rather than a name. This is a no-op if the Dataset doesn't have a column with an equivalent expression.
def drop(colNames: String*): DataFrame
Returns a new Dataset with columns dropped. This is a no-op if schema doesn't contain column name(s).

This method can only be used to drop top level columns. the colName string is treated literally without further interpretation.
def drop(colName: String): DataFrame
Returns a new Dataset with a column dropped. This is a no-op if schema doesn't contain column name.

This method can only be used to drop top level columns. the colName string is treated literally without further interpretation.

scala> df.drop("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").show(2)
+-----+
|count|
+-----+
|   15|
|    1|
+-----+
only showing top 2 rows
Changing Column type(casting):

  • We can convert columns from one type to another by casting the column from one type to another.  The cast method available on Column type is used for this purpose.


def cast(to: String): Column
Casts the column to a different data type, using the canonical string representation of the type. The supported types are: string, boolean, byte, short, int, long, float, double, decimal, date, timestamp.

// Casts colA to integer.
df.select(df("colA").cast("int"))
def cast(to: DataType): Column
Casts the column to a different data type.

// Casts colA to IntegerType.
import org.apache.spark.sql.types.IntegerType
df.select(df("colA").cast(IntegerType))

// equivalent to
df.select(df("colA").cast("int"))

         Target Datatype can be specified using Strings or one of the Spark internal types.

scala> df.withColumn("count2", col("count").cast("int")).dtypes
res12: Array[(String, String)] = Array((DEST_COUNTRY_NAME,StringType), (ORIGIN_COUNTRY_NAME,StringType), (count,LongType), (count2,IntegerType))


No comments:

Post a Comment