Search This Blog

Thursday, 11 April 2019

CH5/1 Recap of DF, Schemas


Quick Recap:

  • DataFrame consists of a series of records (like rows in a table), that are of type Row, and a number of columns (like columns in a spreadsheet) that represent a computation expression that can be performed on each individual record in the Dataset.
     
  • Schemas define the name as well as the type of data in each column.

  • Partitioning of the DataFrame defines the layout of the DataFrame or Dataset’s physical distribution across the cluster. The partitioning scheme defines how that is allocated. You can set this to be based on values in a certain column or nondeterministically.

  • Following is the dataframe we will use for our examples:

1   
2    import org.apache.spark.sql._
3   
4    object SparkDefinitiveTesting {
5   
6      def main(args: Array[String]): Unit = {
7        val spark = SparkSession.builder().master("local[*]").appName("Test App").getOrCreate()
8        spark.sparkContext.setLogLevel("FATAL")
9        val df = spark.read.format("json").load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\flight-data\\json")
10       df.show(10,false)
11     }
12   }
13  
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|United States    |Romania            |15   |
|United States    |Croatia            |1    |
|United States    |Ireland            |344  |
|Egypt            |United States      |15   |
|United States    |India              |62   |
|United States    |Singapore          |1    |
|United States    |Grenada            |62   |
|Costa Rica       |United States      |588  |
|Senegal          |United States      |40   |
|Moldova          |United States      |1    |
+-----------------+-------------------+-----+

Finding Documentation for DataFrame methods:

https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.sql.Dataset

DataFrames are nothing by Datasets for Row type. So documentation for DataFrame is found under Dataset itself.

    Schemas:
     
    • The method printSchema can be used to provide a nice tree like representation of the Dataframes Schema.
       
    def printSchema(): Unit
    Prints the schema to the console in a nice tree format.

    • A schema defines the column names and types of a DataFrame. We can either let a data source define the schema (called schema-on-read) or we can define it explicitly ourselves.

    • Caution: For ad hoc analysis, schema-on-read usually works just fine (although at times it can be a bit slow with plain-text file formats like CSV or JSON). However, this can also lead to precision issues like a long type incorrectly set as an integer when reading in a file. When using Spark for production Extract, Transform, and Load (ETL), it is often a good idea to define your schemas manually, especially when working with untyped data sources like CSV and JSON because schema inference can vary depending on the type of data that you read in.

    • The method "schema" can be used to fetch the schema in a variable. Note that the method returns a StructType
       
    def schema: StructType
    Returns the schema of this Dataset.

    scala>     val df = spark.read.format("json").load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\flight-data\\json")
    df: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]

    scala> df.schema
    res5: org.apache.spark.sql.types.StructType = StructType(StructField(DEST_COUNTRY_NAME,StringType,true), StructField(ORIGIN_COUNTRY_NAME,StringType,true), StructField(count,LongType,true))

    • A schema is a StructType made up of a number of fields, StructFields, that have a name, type, a Boolean flag which specifies whether that column can contain missing or null values, and, finally, users can optionally specify associated metadata with that column. The metadata is a way of storing information about this column (Spark uses this in its machine learning library).

    • Schemas can contain other StructTypes (Spark’s complex types).  

    • Imp: If the types in the data (at runtime) do not match the schema, Spark will throw an error.

    • Following Shows how to create and enforce schema manually using the "schema" method available on DataFrameReader. A DataFrame Reader object is returned by read method on SparkSession.
       
    2    import org.apache.spark.sql._
    3    import org.apache.spark.sql.types.{LongType, StructField, StructType, StringType, Metadata}
    4   
    5    object SparkDefinitiveTesting {
    6   
    7      def main(args: Array[String]): Unit = {
    8        val spark = SparkSession.builder().master("local[*]").appName("Test App").getOrCreate()
    9        spark.sparkContext.setLogLevel("FATAL")
    10  
    11       val myManualSchema = StructType(Array(
    12         StructField("DEST_COUNTRY_NAME", StringType, true),
    13         StructField("ORIGIN_COUNTRY_NAME", StringType, true),
    14         StructField("count", LongType, false,
    15           Metadata.fromJson("{\"hello\":\"world\"}"))
    16       ))
    17  
    18       val df = spark.read.format("json").schema(myManualSchema).load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\flight-data\\json")
    19       println("=" * 35)
    20       df.show(10, false)
    21       println("=" * 35)
    22       df.printSchema()
    23       println("=" * 35)
    24       println(df.schema)
    25  
    26     }
    Results:
    ===================================
    +-----------------+-------------------+-----+
    |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
    +-----------------+-------------------+-----+
    |United States    |Romania            |15   |
    |United States    |Croatia            |1    |
    |United States    |Ireland            |344  |
    |Egypt            |United States      |15   |
    |United States    |India              |62   |
    |United States    |Singapore          |1    |
    |United States    |Grenada            |62   |
    |Costa Rica       |United States      |588  |
    |Senegal          |United States      |40   |
    |Moldova          |United States      |1    |
    +-----------------+-------------------+-----+
    only showing top 10 rows

    ===================================
    root
     |-- DEST_COUNTRY_NAME: string (nullable = true)
     |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
     |-- count: long (nullable = true)

    ===================================
    StructType(StructField(DEST_COUNTRY_NAME,StringType,true), StructField(ORIGIN_COUNTRY_NAME,StringType,true), StructField(count,LongType,true))



No comments:

Post a Comment