Search This Blog

Thursday, 11 April 2019

CH5/1 Recap of DF, Schemas


Quick Recap:

  • DataFrame consists of a series of records (like rows in a table), that are of type Row, and a number of columns (like columns in a spreadsheet) that represent a computation expression that can be performed on each individual record in the Dataset.
     
  • Schemas define the name as well as the type of data in each column.

  • Partitioning of the DataFrame defines the layout of the DataFrame or Dataset’s physical distribution across the cluster. The partitioning scheme defines how that is allocated. You can set this to be based on values in a certain column or nondeterministically.

  • Following is the dataframe we will use for our examples:

1   
2    import org.apache.spark.sql._
3   
4    object SparkDefinitiveTesting {
5   
6      def main(args: Array[String]): Unit = {
7        val spark = SparkSession.builder().master("local[*]").appName("Test App").getOrCreate()
8        spark.sparkContext.setLogLevel("FATAL")
9        val df = spark.read.format("json").load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\flight-data\\json")
10       df.show(10,false)
11     }
12   }
13  
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|United States    |Romania            |15   |
|United States    |Croatia            |1    |
|United States    |Ireland            |344  |
|Egypt            |United States      |15   |
|United States    |India              |62   |
|United States    |Singapore          |1    |
|United States    |Grenada            |62   |
|Costa Rica       |United States      |588  |
|Senegal          |United States      |40   |
|Moldova          |United States      |1    |
+-----------------+-------------------+-----+

Finding Documentation for DataFrame methods:

https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.sql.Dataset

DataFrames are nothing by Datasets for Row type. So documentation for DataFrame is found under Dataset itself.

    Schemas:
     
    • The method printSchema can be used to provide a nice tree like representation of the Dataframes Schema.
       
    def printSchema(): Unit
    Prints the schema to the console in a nice tree format.

    • A schema defines the column names and types of a DataFrame. We can either let a data source define the schema (called schema-on-read) or we can define it explicitly ourselves.

    • Caution: For ad hoc analysis, schema-on-read usually works just fine (although at times it can be a bit slow with plain-text file formats like CSV or JSON). However, this can also lead to precision issues like a long type incorrectly set as an integer when reading in a file. When using Spark for production Extract, Transform, and Load (ETL), it is often a good idea to define your schemas manually, especially when working with untyped data sources like CSV and JSON because schema inference can vary depending on the type of data that you read in.

    • The method "schema" can be used to fetch the schema in a variable. Note that the method returns a StructType
       
    def schema: StructType
    Returns the schema of this Dataset.

    scala>     val df = spark.read.format("json").load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\flight-data\\json")
    df: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]

    scala> df.schema
    res5: org.apache.spark.sql.types.StructType = StructType(StructField(DEST_COUNTRY_NAME,StringType,true), StructField(ORIGIN_COUNTRY_NAME,StringType,true), StructField(count,LongType,true))

    • A schema is a StructType made up of a number of fields, StructFields, that have a name, type, a Boolean flag which specifies whether that column can contain missing or null values, and, finally, users can optionally specify associated metadata with that column. The metadata is a way of storing information about this column (Spark uses this in its machine learning library).

    • Schemas can contain other StructTypes (Spark’s complex types).  

    • Imp: If the types in the data (at runtime) do not match the schema, Spark will throw an error.

    • Following Shows how to create and enforce schema manually using the "schema" method available on DataFrameReader. A DataFrame Reader object is returned by read method on SparkSession.
       
    2    import org.apache.spark.sql._
    3    import org.apache.spark.sql.types.{LongType, StructField, StructType, StringType, Metadata}
    4   
    5    object SparkDefinitiveTesting {
    6   
    7      def main(args: Array[String]): Unit = {
    8        val spark = SparkSession.builder().master("local[*]").appName("Test App").getOrCreate()
    9        spark.sparkContext.setLogLevel("FATAL")
    10  
    11       val myManualSchema = StructType(Array(
    12         StructField("DEST_COUNTRY_NAME", StringType, true),
    13         StructField("ORIGIN_COUNTRY_NAME", StringType, true),
    14         StructField("count", LongType, false,
    15           Metadata.fromJson("{\"hello\":\"world\"}"))
    16       ))
    17  
    18       val df = spark.read.format("json").schema(myManualSchema).load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\flight-data\\json")
    19       println("=" * 35)
    20       df.show(10, false)
    21       println("=" * 35)
    22       df.printSchema()
    23       println("=" * 35)
    24       println(df.schema)
    25  
    26     }
    Results:
    ===================================
    +-----------------+-------------------+-----+
    |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
    +-----------------+-------------------+-----+
    |United States    |Romania            |15   |
    |United States    |Croatia            |1    |
    |United States    |Ireland            |344  |
    |Egypt            |United States      |15   |
    |United States    |India              |62   |
    |United States    |Singapore          |1    |
    |United States    |Grenada            |62   |
    |Costa Rica       |United States      |588  |
    |Senegal          |United States      |40   |
    |Moldova          |United States      |1    |
    +-----------------+-------------------+-----+
    only showing top 10 rows

    ===================================
    root
     |-- DEST_COUNTRY_NAME: string (nullable = true)
     |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
     |-- count: long (nullable = true)

    ===================================
    StructType(StructField(DEST_COUNTRY_NAME,StringType,true), StructField(ORIGIN_COUNTRY_NAME,StringType,true), StructField(count,LongType,true))



CH4/0 Structured API Overview


    DataFrames vs Datasets:

    • Within the Structured APIs, there are two more APIs, the “untyped” DataFrames and the “typed” Datasets.

    • To say that DataFrames are untyped is a slightly inaccurate; they have types, but Spark maintains them completely and only checks whether those types line up to those specified in the schema at runtime. 

    • Datasets, on the other hand, check whether types conform to the specification at compile time. 

    • Datasets are only available to Java Virtual Machine (JVM)–based languages (Scala and Java) and we specify types with case classes or Java beans.

    • To Spark (in Scala), DataFrames are simply Datasets of Type Row. The “Row” type is Spark’s internal representation of its optimized in-memory format for computation. This format makes for highly specialized and efficient computation because rather than using JVM types, which can cause high garbage-collection and object instantiation costs, Spark can operate on its own internal format without incurring any of those costs. 

    • To Spark (in Python or R), there is no such thing as a Dataset: everything is a DataFrame and therefore we always operate on that optimized format.
Columns and Rows:


  • Columns represent a simple type like an integer or string, a complex type like an array or map, or a null value. Spark tracks all of this type information for you and offers a variety of ways, with which you can transform columns. Think about Spark Column types as columns in a table.
     
  • A row is nothing more than a record of data. Each record in a DataFrame must be of type Row. We can create these rows manually from SQL, from Resilient Distributed Datasets (RDDs), from data sources, or manually from scratch. 
Represents one row of output from a relational operator.
To create a new Row, use RowFactory.create() in Java or Row.apply() in Scala.

  • Important methods available on these types include:
     
Column
Row

Spark Types:

  • Spark has a large number of internal type representations. 
  • Below is a reference table of how Scala types lines up with the type in Spark.
     
Data type
Value type in Scala
API to access or create a data type
ByteType
Byte
ByteType
ShortType
Short
ShortType
IntegerType
Int
IntegerType
LongType
Long
LongType
FloatType
Float
FloatType
DoubleType
Double
DoubleType
DecimalType
java.math.BigDecimal
DecimalType
StringType
String
StringType
BinaryType
Array[Byte]
BinaryType
BooleanType
Boolean
BooleanType
TimestampType
java.sql.Timestamp
TimestampType
DateType
java.sql.Date
DateType
ArrayType
scala.collection.Seq
ArrayType(elementType, [containsNull]). Note: The default value of containsNull is true.
MapType
scala.collection.Map
MapType(keyType, valueType, [valueContainsNull]). Note: The default value of valueContainsNull is true.
StructType
org.apache.spark.sql.Row
StructType(fields). Note: fields is an Array of StructFields. Also, fields with the same name are not allowed.
StructField
The value type in Scala of the data type of this field (for example, Int for a StructField with the data type IntegerType)
StructField(name, dataType, [nullable]). Note: The default value of nullable is true.

For Scala API, these datatypes are under the package org.apache.spark.sql.types

These Spark types are classes and they have their companion objects.
Checkout the below links for ByteType class and Companion object:

Following shows that ByteType is a Companion Object:

scala> ByteType
res2: org.apache.spark.sql.types.ByteType.type = ByteType


 
Overview of Structured API execution:

Following walks us through the execution of a single structured API query from user code to executed code.
Overview of Steps are
  • Write DataFrame/Dataset/SQL Code.
  • If valid code, Spark converts this to a Logical Plan.
  • Spark transforms this Logical Plan to a Physical Plan, checking for optimizations along the way.
  • Spark then executes this Physical Plan (RDD manipulations) on the cluster.

Logical Planning:
The first phase of execution is meant to take user code and convert it into a logical plan.
This logical plan only
represents a set of abstract transformations that do not refer to executors or drivers, it’s purely to convert the user’s set of expressions into the most optimized version. It does this by converting user code into an unresolved logical plan. This plan is unresolved because although your code might be valid, the tables or columns that it refers to might or might not exist. Spark uses the catalog, a repository of all table and DataFrame information, to resolve columns and tables in the analyzer. The analyzer might reject the unresolved logical plan if the required table or column name does not exist in the catalog. If the analyzer can resolve it, the result is passed through the Catalyst Optimizer, a collection of rules that attempt to optimize the logical plan by pushing down predicates or selections.

Physical Planning:
The physical plan, often called a Spark plan, specifies how the logical plan will execute on the cluster by generating different physical execution strategies and comparing them through a cost model.
Physical planning results in a series of RDDs and transformations. This result is why you might have heard Spark referred to as a compiler—it takes queries in DataFrames, Datasets, and SQL and compiles them into RDD transformations for you.

Execution
Upon selecting a physical plan, Spark runs all of this code over RDDs, the lower-level programming interface of Spark
Spark performs further optimizations at runtime, generating native Java bytecode that can remove entire tasks or stages during execution. Finally the result is returned to the user.

 

CH3/2 Structured Streaming, RDD intro



    Structured Streaming:
     
    • Structured Streaming is a high-level API for stream processing that became production-ready in Spark 2.2.  (There exists a separate low level API for stream processing)
       
    • With Structured Streaming, we can take the same operations that we perform in batch mode using Spark’s structured APIs and run them in a streaming fashion.
       
    • It's easy to conceptualize because you can write your batch job as a way to prototype it and then you can convert it to a streaming job. The way all of this works is by incrementally processing that data.

    • Basic Example:
       
    Following example shows how we first analyze the data as a static dataset and create a DataFrame to do so.
    Note that we have saved the Schema in a variable staticSchema (whose type is StructType)
     
    scala>     val staticDataFrame = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\by-day\\*.csv")
    staticDataFrame: org.apache.spark.sql.DataFrame = [InvoiceNo: string, StockCode: string ... 6 more fields]

    scala>     staticDataFrame.createOrReplaceTempView("retail_data")

    scala>     val staticSchema = staticDataFrame.schema
    staticSchema: org.apache.spark.sql.types.StructType = StructType(StructField(InvoiceNo,StringType,true), StructField(StockCode,StringType,true), StructField(Description,StringType,true), StructField(Quantity,IntegerType,true), StructField(InvoiceDate,TimestampType,true), StructField(UnitPrice,DoubleType,true), StructField(CustomerID,DoubleType,true), StructField(Country,StringType,true))

    Following window function will include all data from each day in the aggregation. It’s simply a window over the time–series column in our data. This is a helpful tool for manipulating date and timestamps because we can specify our requirements in a more human form (via intervals), and Spark will group all of them together for us

    scala> staticDataFrame.selectExpr("CustomerId","(UnitPrice * Quantity) as total_cost","InvoiceDate").groupBy(col("CustomerId"),window(col("InvoiceDate"), "1 day")).sum("total_cost").show(5,false)
    +----------+------------------------------------------+-----------------+
    |CustomerId|window                                    |sum(total_cost)  |
    +----------+------------------------------------------+-----------------+
    |16057.0   |[2011-12-04 19:00:00, 2011-12-05 19:00:00]|-37.6            |
    |14126.0   |[2011-11-28 19:00:00, 2011-11-29 19:00:00]|643.6300000000001|
    |13500.0   |[2011-11-15 19:00:00, 2011-11-16 19:00:00]|497.9700000000001|
    |17160.0   |[2011-11-07 19:00:00, 2011-11-08 19:00:00]|516.8499999999999|
    |15608.0   |[2011-11-10 19:00:00, 2011-11-11 19:00:00]|122.4            |
    +----------+------------------------------------------+-----------------+
    We could have run this as a SQL code as well.
     
    Now that we’ve seen how that works, let’s take a look at the streaming code!. Very little actually changes about the code.
    The biggest change is that we
    used readStream instead of read.
    Also notice the maxFilesPerTrigger option, which simply specifies the number of files we should read in at once.
    Also notice that the
    schema we created during our static data analysis, can be directly applied when read stream of data.
    Note that streamingDataFrame is returned.

    scala> val streamingDataFrame = spark.readStream.schema(staticSchema).option("maxFilesPerTrigger", 1).format("csv").option("header", "true").load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\by-day\\*.csv")
    streamingDataFrame: org.apache.spark.sql.DataFrame = [InvoiceNo: string, StockCode: string ... 6 more fields]
     

    Now we can see whether our DataFrame is streaming

    scala> streamingDataFrame.isStreaming
    res4: Boolean = true

    Following we set up the same business logic as the previous DataFrame manipulations

    scala> val purchaseByCustomerPerHour = streamingDataFrame.selectExpr("CustomerId", "(UnitPrice * Quantity) as total_cost", "InvoiceDate").groupBy($"CustomerId", window($"InvoiceDate", "1 day")).sum("total_cost")
    purchaseByCustomerPerHour: org.apache.spark.sql.DataFrame = [CustomerId: double, window: struct<start: timestamp, end: timestamp> ... 1 more field]

     
    Remember that Streaming is still a lazy operation, so we will need to call a streaming action to start the execution of this data flow.
    Streaming actions are a bit different from our conventional static action because we’re going to be populating data somewhere instead of just calling something like count (which doesn’t make any sense on a stream anyways). The action we will use will output to an in-memory table that we will update after each trigger. In this case, each trigger is based on an individual file (the read option that we set). Spark will mutate the data in the in-memory table such that we will always have the highest value as specified in our previous aggregation

    purchaseByCustomerPerHour.writeStream.format("memory").queryName("customer_purchases").outputMode("complete").start()

    When we start the stream, we can run queries against it to debug what our result will look like if we were to write this out to a production sink:

    // in Scala
    spark.sql("""
      SELECT *
      FROM customer_purchases
      ORDER BY `sum(total_cost)` DESC
      """)
      .show(5)

    Another option you can use is to write the results out to the console:

    purchaseByCustomerPerHour.writeStream.format("console").queryName("customer_purchases_2").outputMode("complete").start()

    You shouldn’t use either of these streaming methods in production, but they do make for convenient demonstration of Structured Streaming’s power.
    Notice how this window is built on event time, as well, not the time at which Spark processes the data. This was one of the shortcomings of Spark Streaming that Structured Streaming has resolved. 

Low Level API:

 
  • Spark includes a number of lower-level primitives to allow for arbitrary Java and Python object manipulation via Resilient Distributed Datasets (RDDs). Virtually everything in Spark is built on top of RDDs.

  • DataFrame operations are built on top of RDDs and compile down to these lower-level tools for convenient and extremely efficient distributed execution. 

  • RDDs are lower level than DataFrames because they reveal physical execution characteristics (like partitions) to end users.

  • One thing that you might use RDDs for is to parallelize raw data that you have stored in memory on the driver machine. 

spark.sparkContext.parallelize(Seq(1, 2, 3)).toDF()

  • RDDs are available in Scala as well as Python. However, they’re not equivalent. This differs from the DataFrame API (where the execution characteristics are the same) due to some underlying implementation details. 

  • You shouldn’t need to use RDDs much in order to perform many tasks unless you’re maintaining older Spark code. There are basically no instances in modern Spark, for which you should be using RDDs instead of the structured APIs beyond manipulating some very raw unprocessed and unstructured data.


CH3/1 Spark-Submit, DataSets intro


    • Spark is composed of these primitives—the lower-level APIs and the Structured APIs—and then a series of standard libraries for additional functionality. Spark’s libraries support a variety of different tasks, from graph analysis and machine learning to streaming and integrations with a host of computing and storage systems

    Running Production Applications(spark-submit):
    • The spark-submit command (a built-in command-line tool) sends your application code to a cluster and launch it to execute there.

    • Upon submission, the application will run until it exits (completes the task) or encounters an error. You can do this with all of Spark’s support cluster managers including Standalone, Mesos, and YARN.

    • spark-submit offers several controls with which you can specify the resources your application needs as well as how it should be run and its command-line arguments.

    Following shows the simplest example by running application on local machine:

    ./bin/spark-submit \
      --class org.apache.spark.examples.SparkPi \
      --master local \
      ./examples/jars/spark-examples_2.11-2.2.0.jar 10

    Python Version:

    ./bin/spark-submit \
      --master local \
      ./examples/src/main/python/pi.py 10

    By changing the master argument of spark-submit, we can also submit the same application to a cluster running Spark’s standalone cluster manager, Mesos or YARN.

Datasets :Type Safe Structured APIs

  • The Type-safe version of Spark’s structured API is called Datasets, for writing statically typed code in Java and Scala.
    The Dataset API is not available in Python and R, because those languages are dynamically typed.

  • DataFrames are a distributed collection of objects of type Row that can hold various types of tabular data.
    The Dataset API gives users the ability to
    assign a Java/Scala class to the records within a DataFrame and manipulate it as a collection of typed objects similar to a Java ArrayList or Scala Seq. 

  • The APIs available on Datasets are type-safe, meaning that you cannot accidentally view the objects in a Dataset as being of another class than the class you put in initially. 

  • The Dataset class is parameterized with the type of object contained inside: Dataset<T> in Java and Dataset[T] in Scala. For example, a Dataset[Person] will be guaranteed to contain objects of class Person.

  • As of Spark 2.0, the supported types are classes following the JavaBean pattern in Java and case classes in Scala. These types are restricted because Spark needs to be able to automatically analyze the type T and create an appropriate schema for the tabular data within your Dataset.

scala> case class Flight(DEST_COUNTRY_NAME: String,ORIGIN_COUNTRY_NAME: String,count: BigInt)
defined class Flight

scala>     val flightsDF = spark.read.parquet("data/flight-data/parquet/2010-summary.parquet/")
flightsDF: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]

scala>     val flights = flightsDF.as[Flight]
flights: org.apache.spark.sql.Dataset[Flight] = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]

  • One final advantage is that when you call collect or take on a Dataset, it will collect objects of the proper type in your Dataset, not DataFrame Rows. This makes it easy to get type safety and securely perform manipulation in a distributed and a local manner without code changes.

scala> flights.count
res0: Long = 255

scala> flights.take(1)
res1: Array[Flight] = Array(Flight(United States,Romania,1))

scala> flights.take(1).foreach(x => println(x.DEST_COUNTRY_NAME))
United States

scala> flights.filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada").map(flight_row => flight_row).take(5)
res6: Array[Flight] = Array(Flight(United States,Romania,1), Flight(United States,Ireland,264), Flight(United States,India,69), Flight(Egypt,United States,24), Flight(Equatorial Guinea,United States,1))

scala> flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada").map(fr => Flight(fr.DEST_COUNTRY_NAME, fr.ORIGIN_COUNTRY_NAME, fr.count + 5))
res7: Array[Flight] = Array(Flight(United States,Romania,6), Flight(United States,Ireland,269), Flight(United States,India,74), Flight(Egypt,United States,29), Flight(Equatorial Guinea,United States,6))