Search This Blog

Thursday, 11 April 2019

CH5/3 Records and Row type



Records and Rows:

scala> df.take(4)
res19: Array[org.apache.spark.sql.Row] = Array([United States,Romania,15], [United States,Croatia,1], [United States,Ireland,344], [Egypt,United States,15])

As shown below commands that return individual rows to the driver will always return one or more Row types when we are working with DataFrames.

  • We can create rows by manually instantiating a Row object with the values that belong in each column. It’s important to note that only DataFrames have schemas. Rows themselves do not have schemas. This means that if you create a Row manually, you must specify the values in the same order as the schema of the DataFrame to which they might be appended (we will see this when we discuss creating DataFrames)

scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row

scala> val myRow = Row("Hello", null, 1, false)
myRow: org.apache.spark.sql.Row = [Hello,null,1,false]

Note that the Row companion object has apply method, which allows us to instantiate Row object using Row (…..)
Remember Row is Trait and also has a companion Object.

  • Following are imp methods available on the Row Companion Object:

def apply(values: Any*): Row
This method can be used to construct a Row with the given values.
val empty: Row
Returns an empty row.
def fromSeq(values: Seq[Any]): Row
This method can be used to construct a Row from a Seq of values.
def fromTuple(tuple: Product): Row
def merge(rows: Row*): Row
Merge multiple rows into a single row, one after another.
def unapplySeq(row: Row): Some[Seq[Any]]
This method can be used to extract fields from a Row object in a pattern match. Example:
import org.apache.spark.sql._

val pairs = sql("SELECT key, value FROM src").rdd.map {
  case Row(key: Int, value: String) =>
    key -> value
}

  • To access data in Row object is easy. We just need to specify the position that we would like. Following are some of the methods available on Row trait to access elements:

def apply(i: Int): Any

Returns the value at position i. If the value is null, null is returned. The following is a mapping between Spark SQL types and return types:

BooleanType -> java.lang.Boolean
ByteType -> java.lang.Byte
ShortType -> java.lang.Short
IntegerType -> java.lang.Integer
FloatType -> java.lang.Float
DoubleType -> java.lang.Double
StringType -> String
DecimalType -> java.math.BigDecimal

DateType -> java.sql.Date
TimestampType -> java.sql.Timestamp

BinaryType -> byte array
ArrayType -> scala.collection.Seq (use getList for java.util.List)
MapType -> scala.collection.Map (use getJavaMap for java.util.Map)
StructType -> org.apache.spark.sql.Row
def getString(i: Int): String
Returns the value at position i as a String object.
def getLong(i: Int): Long
Returns the value at position i as a primitive long.
def getAs[T](i: Int): T
Returns the value at position i. For primitive types if value is null it returns 'zero value' specific for primitive ie. 0 for Int - use isNullAt to ensure that value is not null


scala> myRow(0)
res20: Any = Hello

scala> myRow(0).asInstanceOf[String]
res26: String = Hello

scala> myRow(1)
res21: Any = null

scala> myRow.getString(0)
res22: String = Hello

scala> myRow.getLong(2)
java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long
  at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:105)
  at org.apache.spark.sql.Row$class.getLong(Row.scala:231)
  at org.apache.spark.sql.catalyst.expressions.GenericRow.getLong(rows.scala:166)
  ... 51 elided

scala> myRow.getInt(2)
res24: Int = 1


CH5/2 Columns and Expressions



    Columns and Expressions:
     
    Columns:

    • Columns in Spark are similar to columns in a spreadsheet, R dataframe, or pandas DataFrame. You can select, manipulate, and remove columns from DataFrames and these operations are represented as expressions.

    • To Spark, columns are logical constructions that simply represent a value computed on a per-record basis by means of an expression. This means that to have a real value for a column, we need to have a row; and to have a row, we need to have a DataFrame. You cannot manipulate an individual column outside the context of a DataFrame

    def col(colName: String): Column
    Returns a Column based on the given column name.
    def column(colName: String): Column
    Returns a Column based on the given column name. Alias of col.

    To use either of these functions, you
    pass in a column name:
     
    scala> import org.apache.spark.sql.functions.{col, column}
    import org.apache.spark.sql.functions.{col, column}

    scala> col("someColumnName")
    res6: org.apache.spark.sql.Column = someColumnName

    scala> column("someColumnName")
    res7: org.apache.spark.sql.Column = someColumnName


    • Imp: Note that these columns might or might not exist in our DataFrames. Columns are not resolved until we compare the column names with those we are maintaining in the catalog. Column and table resolution happens in the analyzer phase.


    • Scala has some unique language features that allow for more shorthand ways of referring to columns.
      The following bits of
      syntactic sugar perform the exact same thing, namely creating a column, but provide no performance improvement. The $ allows us to designate a string as a special string that should refer to an expression. The tick mark (') is a special thing called a symbol; this is a Scala-specific construct of referring to some identifier. They both perform the same thing and are shorthand ways of referring to columns by name. Note that the type is ColumnName, which is a subtype of Column

    scala> $"someColumnName"
    res8: org.apache.spark.sql.ColumnName = someColumnName

    scala> 'someColumnName
    res9: Symbol = 'someColumnName


    • Explicit Column References: If you need to refer to a specific DataFrame’s column, you can use the col method on the specific DataFrame. This can be useful when you are performing a join and need to refer to a specific column in one DataFrame that might share a name with another column in the joined DataFrame.

    This col method is available on Dataset(Dataframe). This returns the same type, but is different than the col and column functions

    def col(colName: String): Column
    Selects column based on the column name and return it as a Column.

    scala> df.col("DEST_COUNTRY_NAME")
    res10: org.apache.spark.sql.Column = DEST_COUNTRY_NAME




    Expressions:

    • Columns are just expressions. An expression is a set of transformations on one or more values in a record in a DataFrame. Think of expression as a function that takes as input one or more column names, resolves them, and then potentially applies more expressions to create a single value for each record in the dataset. Importantly, this “single value” can actually be a complex type like a Map or Array.

    • In the simplest case, an expression, created via the expr function, is just a DataFrame column reference. In the simplest case, expr("someCol") is equivalent to col("someCol").

    scala> expr("DEST_COUNTRY_NAME")
    res13: org.apache.spark.sql.Column = DEST_COUNTRY_NAME

    scala> col("DEST_COUNTRY_NAME")
    res14: org.apache.spark.sql.Column = DEST_COUNTRY_NAME
     
    scala> expr("(((someCol + 5) * 200) - 6) < otherCol")
    res15: org.apache.spark.sql.Column = ((((someCol + 5) * 200) - 6) < otherCol)

    • The expr is a function under org.apache.spark.sql.functions Object.

    def expr(expr: String): Column
    Parses the expression string into the column that it represents, similar to DataFrame.selectExpr

    • Columns provide a subset of expression functionality. If you use col() and want to perform transformations on that column, you must perform those on that column reference. When using an expression, the expr function can actually parse transformations and column references from a string and can subsequently be passed into further transformations.

    expr("someCol - 5") is the same transformation as performing col("someCol") - 5, or even expr("someCol") - 5.

    Think as expr allows us to use string SQL expressions and returns a Column type.


    Column Type:

    • All the functions we saw earlier .i.e col,column,df.col,expr return a Column type. This is a class in Scala and we have several methods available on this Ex: providing alias, casting etc



    Accessing DataFrame Columns:

    • To see we can use printSchema . To programmatically access columns, you can use the columns property to see all columns on a DataFrame

    def columns: Array[String]
    Returns all column names as an array.

    scala> df.columns
    res16: Array[String] = Array(DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME, count)



CH5/1 Recap of DF, Schemas


Quick Recap:

  • DataFrame consists of a series of records (like rows in a table), that are of type Row, and a number of columns (like columns in a spreadsheet) that represent a computation expression that can be performed on each individual record in the Dataset.
     
  • Schemas define the name as well as the type of data in each column.

  • Partitioning of the DataFrame defines the layout of the DataFrame or Dataset’s physical distribution across the cluster. The partitioning scheme defines how that is allocated. You can set this to be based on values in a certain column or nondeterministically.

  • Following is the dataframe we will use for our examples:

1   
2    import org.apache.spark.sql._
3   
4    object SparkDefinitiveTesting {
5   
6      def main(args: Array[String]): Unit = {
7        val spark = SparkSession.builder().master("local[*]").appName("Test App").getOrCreate()
8        spark.sparkContext.setLogLevel("FATAL")
9        val df = spark.read.format("json").load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\flight-data\\json")
10       df.show(10,false)
11     }
12   }
13  
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|United States    |Romania            |15   |
|United States    |Croatia            |1    |
|United States    |Ireland            |344  |
|Egypt            |United States      |15   |
|United States    |India              |62   |
|United States    |Singapore          |1    |
|United States    |Grenada            |62   |
|Costa Rica       |United States      |588  |
|Senegal          |United States      |40   |
|Moldova          |United States      |1    |
+-----------------+-------------------+-----+

Finding Documentation for DataFrame methods:

https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.sql.Dataset

DataFrames are nothing by Datasets for Row type. So documentation for DataFrame is found under Dataset itself.

    Schemas:
     
    • The method printSchema can be used to provide a nice tree like representation of the Dataframes Schema.
       
    def printSchema(): Unit
    Prints the schema to the console in a nice tree format.

    • A schema defines the column names and types of a DataFrame. We can either let a data source define the schema (called schema-on-read) or we can define it explicitly ourselves.

    • Caution: For ad hoc analysis, schema-on-read usually works just fine (although at times it can be a bit slow with plain-text file formats like CSV or JSON). However, this can also lead to precision issues like a long type incorrectly set as an integer when reading in a file. When using Spark for production Extract, Transform, and Load (ETL), it is often a good idea to define your schemas manually, especially when working with untyped data sources like CSV and JSON because schema inference can vary depending on the type of data that you read in.

    • The method "schema" can be used to fetch the schema in a variable. Note that the method returns a StructType
       
    def schema: StructType
    Returns the schema of this Dataset.

    scala>     val df = spark.read.format("json").load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\flight-data\\json")
    df: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]

    scala> df.schema
    res5: org.apache.spark.sql.types.StructType = StructType(StructField(DEST_COUNTRY_NAME,StringType,true), StructField(ORIGIN_COUNTRY_NAME,StringType,true), StructField(count,LongType,true))

    • A schema is a StructType made up of a number of fields, StructFields, that have a name, type, a Boolean flag which specifies whether that column can contain missing or null values, and, finally, users can optionally specify associated metadata with that column. The metadata is a way of storing information about this column (Spark uses this in its machine learning library).

    • Schemas can contain other StructTypes (Spark’s complex types).  

    • Imp: If the types in the data (at runtime) do not match the schema, Spark will throw an error.

    • Following Shows how to create and enforce schema manually using the "schema" method available on DataFrameReader. A DataFrame Reader object is returned by read method on SparkSession.
       
    2    import org.apache.spark.sql._
    3    import org.apache.spark.sql.types.{LongType, StructField, StructType, StringType, Metadata}
    4   
    5    object SparkDefinitiveTesting {
    6   
    7      def main(args: Array[String]): Unit = {
    8        val spark = SparkSession.builder().master("local[*]").appName("Test App").getOrCreate()
    9        spark.sparkContext.setLogLevel("FATAL")
    10  
    11       val myManualSchema = StructType(Array(
    12         StructField("DEST_COUNTRY_NAME", StringType, true),
    13         StructField("ORIGIN_COUNTRY_NAME", StringType, true),
    14         StructField("count", LongType, false,
    15           Metadata.fromJson("{\"hello\":\"world\"}"))
    16       ))
    17  
    18       val df = spark.read.format("json").schema(myManualSchema).load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\flight-data\\json")
    19       println("=" * 35)
    20       df.show(10, false)
    21       println("=" * 35)
    22       df.printSchema()
    23       println("=" * 35)
    24       println(df.schema)
    25  
    26     }
    Results:
    ===================================
    +-----------------+-------------------+-----+
    |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
    +-----------------+-------------------+-----+
    |United States    |Romania            |15   |
    |United States    |Croatia            |1    |
    |United States    |Ireland            |344  |
    |Egypt            |United States      |15   |
    |United States    |India              |62   |
    |United States    |Singapore          |1    |
    |United States    |Grenada            |62   |
    |Costa Rica       |United States      |588  |
    |Senegal          |United States      |40   |
    |Moldova          |United States      |1    |
    +-----------------+-------------------+-----+
    only showing top 10 rows

    ===================================
    root
     |-- DEST_COUNTRY_NAME: string (nullable = true)
     |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
     |-- count: long (nullable = true)

    ===================================
    StructType(StructField(DEST_COUNTRY_NAME,StringType,true), StructField(ORIGIN_COUNTRY_NAME,StringType,true), StructField(count,LongType,true))



CH4/0 Structured API Overview


    DataFrames vs Datasets:

    • Within the Structured APIs, there are two more APIs, the “untyped” DataFrames and the “typed” Datasets.

    • To say that DataFrames are untyped is a slightly inaccurate; they have types, but Spark maintains them completely and only checks whether those types line up to those specified in the schema at runtime. 

    • Datasets, on the other hand, check whether types conform to the specification at compile time. 

    • Datasets are only available to Java Virtual Machine (JVM)–based languages (Scala and Java) and we specify types with case classes or Java beans.

    • To Spark (in Scala), DataFrames are simply Datasets of Type Row. The “Row” type is Spark’s internal representation of its optimized in-memory format for computation. This format makes for highly specialized and efficient computation because rather than using JVM types, which can cause high garbage-collection and object instantiation costs, Spark can operate on its own internal format without incurring any of those costs. 

    • To Spark (in Python or R), there is no such thing as a Dataset: everything is a DataFrame and therefore we always operate on that optimized format.
Columns and Rows:


  • Columns represent a simple type like an integer or string, a complex type like an array or map, or a null value. Spark tracks all of this type information for you and offers a variety of ways, with which you can transform columns. Think about Spark Column types as columns in a table.
     
  • A row is nothing more than a record of data. Each record in a DataFrame must be of type Row. We can create these rows manually from SQL, from Resilient Distributed Datasets (RDDs), from data sources, or manually from scratch. 
Represents one row of output from a relational operator.
To create a new Row, use RowFactory.create() in Java or Row.apply() in Scala.

  • Important methods available on these types include:
     
Column
Row

Spark Types:

  • Spark has a large number of internal type representations. 
  • Below is a reference table of how Scala types lines up with the type in Spark.
     
Data type
Value type in Scala
API to access or create a data type
ByteType
Byte
ByteType
ShortType
Short
ShortType
IntegerType
Int
IntegerType
LongType
Long
LongType
FloatType
Float
FloatType
DoubleType
Double
DoubleType
DecimalType
java.math.BigDecimal
DecimalType
StringType
String
StringType
BinaryType
Array[Byte]
BinaryType
BooleanType
Boolean
BooleanType
TimestampType
java.sql.Timestamp
TimestampType
DateType
java.sql.Date
DateType
ArrayType
scala.collection.Seq
ArrayType(elementType, [containsNull]). Note: The default value of containsNull is true.
MapType
scala.collection.Map
MapType(keyType, valueType, [valueContainsNull]). Note: The default value of valueContainsNull is true.
StructType
org.apache.spark.sql.Row
StructType(fields). Note: fields is an Array of StructFields. Also, fields with the same name are not allowed.
StructField
The value type in Scala of the data type of this field (for example, Int for a StructField with the data type IntegerType)
StructField(name, dataType, [nullable]). Note: The default value of nullable is true.

For Scala API, these datatypes are under the package org.apache.spark.sql.types

These Spark types are classes and they have their companion objects.
Checkout the below links for ByteType class and Companion object:

Following shows that ByteType is a Companion Object:

scala> ByteType
res2: org.apache.spark.sql.types.ByteType.type = ByteType


 
Overview of Structured API execution:

Following walks us through the execution of a single structured API query from user code to executed code.
Overview of Steps are
  • Write DataFrame/Dataset/SQL Code.
  • If valid code, Spark converts this to a Logical Plan.
  • Spark transforms this Logical Plan to a Physical Plan, checking for optimizations along the way.
  • Spark then executes this Physical Plan (RDD manipulations) on the cluster.

Logical Planning:
The first phase of execution is meant to take user code and convert it into a logical plan.
This logical plan only
represents a set of abstract transformations that do not refer to executors or drivers, it’s purely to convert the user’s set of expressions into the most optimized version. It does this by converting user code into an unresolved logical plan. This plan is unresolved because although your code might be valid, the tables or columns that it refers to might or might not exist. Spark uses the catalog, a repository of all table and DataFrame information, to resolve columns and tables in the analyzer. The analyzer might reject the unresolved logical plan if the required table or column name does not exist in the catalog. If the analyzer can resolve it, the result is passed through the Catalyst Optimizer, a collection of rules that attempt to optimize the logical plan by pushing down predicates or selections.

Physical Planning:
The physical plan, often called a Spark plan, specifies how the logical plan will execute on the cluster by generating different physical execution strategies and comparing them through a cost model.
Physical planning results in a series of RDDs and transformations. This result is why you might have heard Spark referred to as a compiler—it takes queries in DataFrames, Datasets, and SQL and compiles them into RDD transformations for you.

Execution
Upon selecting a physical plan, Spark runs all of this code over RDDs, the lower-level programming interface of Spark
Spark performs further optimizations at runtime, generating native Java bytecode that can remove entire tasks or stages during execution. Finally the result is returned to the user.