Search This Blog

Sunday, 14 April 2019

CH9.1 Data Sources API structure


    Spark has six “core” data sources and hundreds of external data sources written by the community

    Following are Spark’s core data sources:
    • CSV
    • JSON
    • Parquet
    • ORC
    • JDBC/ODBC connections
    • Plain-text files

    Spark has numerous community-created data sources. Here’s just a small sample:


The Structure of the Data Sources API

Read API Structure

  •   The core structure for reading data is  DataFrameReader.format(...).option("key", "value").schema(...).load()

  • This format is used to read from all of our data sources.
    • The format method is optional because by default Spark will use the Parquet format.
    • The method option allows you to set key-value configurations to parameterize how you will read data.  (We can use options method to  pass a Map of all the options)
    • The method schema is optional if the data source provides a schema or if you intend to use schema inference.

  • The foundation for reading data in Spark is the DataFrameReader. We access this through the SparkSession via the read attribute.

Following is the definition of the "read" method on SparkSession (https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.sql.SparkSession)

def read: DataFrameReader
Returns a DataFrameReader that can be used to read non-streaming data in as a DataFrame.

sparkSession.read.parquet("/path/to/file.parquet")
sparkSession.read.schema(schema).json("/path/to/file.json")

  • After we have a DataFrame reader, we specify several values:
    • The format
    • The schema
    • The read mode
    • A series of options

The format, options, and schema each return a DataFrameReader that can undergo further transformations and are all optional, except for one option. Each data source has a specific set of options that determine how the data is read into Spark. At a minimum, you must supply the DataFrameReader a path to from which to read.

  • Following are the definitions of some of the methods on DataFrameReader. Note that most of the methods return another DataframeReader allowing us to chain the method calls:

def option(key: String, value: Double): DataFrameReader
Adds an input option for the underlying data source.

def options(options: Map[String, String]): DataFrameReader
(Scala-specific) Adds input options for the underlying data source.

def format(source: String): DataFrameReader
Specifies the input data source format.

def schema(schema: StructType): DataFrameReader
Specifies the input schema. Some data sources (e.g. JSON) can infer the input schema automatically from data. By specifying the schema here, the underlying data source can skip the schema inference step, and thus speed up data loading.

def load(paths: String*): DataFrame
Loads input in as a DataFrame, for data sources that support multiple paths. Only works if the source is a HadoopFsRelationProvider.

def load(path: String): DataFrame
Loads input in as a DataFrame, for data sources that require a path (e.g. data backed by a local or distributed file system).

def load(): DataFrame
Loads input in as a DataFrame, for data sources that don't require a path (e.g. external key-value stores).

  • Example:

spark.read.format("csv")
  .option("mode", "FAILFAST")
  .option("inferSchema", "true")
  .option("path", "path/to/file(s)")
  .schema(someSchema)
  .load()

  • Read modes: Read modes specify what will happen when Spark does come across malformed records. The mode is specified using the "mode" option.

Read mode
Description
Permissive (default)
Sets all fields to null when it encounters a corrupted record and places all corrupted records in a string column called _corrupt_record
dropMalformed
Drops the row that contains malformed records
failFast
Fails immediately upon encountering malformed records

Write API Structure


  • The core structure for writing data is as follows: DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy(  ...).save()
The format is optional because by default, Spark will use the arquet format. option, again, allows us to configure how to write out our given data. 

  • The foundation for writing data is quite similar to that of reading data. Instead of the DataFrameReader, we have the DataFrameWriter.


def write: DataFrameWriter[T]
Interface for saving the content of the non-streaming Dataset out into external storage.

  • Once we have the DataFrameWriter we can specify the format, options, write mode. At a minimum, you must supply a path.

dataframe.write.format("csv")
  .option("mode", "OVERWRITE")
  .option("dateFormat", "yyyy-MM-dd")
  .option("path", "path/to/file(s)")
  .save()

Following shows some of the important methods available on DataFrameWriter:

def format(source: String): DataFrameWriter[T]
Specifies the underlying output data source. Built-in options include "parquet", "json", etc.

def option(key: String, value: Double): DataFrameWriter[T]
Adds an output option for the underlying data source.

def options(options: Map[String, String]): DataFrameWriter[T]
(Scala-specific) Adds output options for the underlying data source.

def save(path: String): Unit
Saves the content of the DataFrame at the specified path.

  • Save modes:

Save mode specifies what will happen if Spark finds data at the specified location.
Following is the list.

Save mode
Description
Append
Appends the output files to the list of files that already exist at that location
overwrite
Will completely overwrite any data that already exists there
errorIfExists (default)
Throws an error and fails the write if data or files already exist at the specified location
ignore
If data or files exist at the location, do nothing with the current DataFrame

We can specify the mode using option OR using the overloaded method "mode" available on the DataframeWriter.

def mode(saveMode: String): DataFrameWriter[T]
Specifies the behavior when data or table already exists. Options include:

overwrite: overwrite the existing data.
append: append the data.
ignore: ignore the operation (i.e. no-op).
error: default option, throw an exception at runtime.


def mode(saveMode: SaveMode): DataFrameWriter[T]
Specifies the behavior when data or table already exists. Options include:

SaveMode.Overwrite: overwrite the existing data.
SaveMode.Append: append the data.
SaveMode.Ignore: ignore the operation (i.e. no-op).
SaveMode.ErrorIfExists: default option, throw an exception at runtime.



CH7.5 Pivots



Pivots:
 
  • Pivots make it possible for you to convert a row into a column.  The pivot method is available on RelationalGroupedDataset. Note that it returns another RelationalGroupedDataset on which we can invoke aggregate functions.

def pivot(pivotColumn: String, values: List[Any]): RelationalGroupedDataset
def pivot(pivotColumn: String, values: Seq[Any]): RelationalGroupedDataset
def pivot(pivotColumn: String): RelationalGroupedDataset

Pivots a column of the current DataFrame and perform the specified aggregation. There are two versions of pivot function: one that requires the caller to specify the list of distinct values to pivot on, and one that does not. The latter is more concise but less efficient, because Spark needs to first compute the list of distinct values internally.

// Compute the sum of earnings for each year by course with each course as a separate column
df.groupBy("year").pivot("course", Arrays.<Object>asList("dotNET", "Java")).sum("earnings");
// Or without specifying column values (less efficient)
df.groupBy("year").pivot("course").sum("earnings");

pivotColumn - Name of the column to pivot.
values - List of values that will be translated to columns in the output DataFrame.


scala> df.groupBy().pivot("Country")
res69: org.apache.spark.sql.RelationalGroupedDataset = RelationalGroupedDataset: [grouping expressions: [], value: [InvoiceNo: string, StockCode: string ... 6 more fields], type: Pivot]

scala> df.groupBy().pivot("Country").count()
res70: org.apache.spark.sql.DataFrame = [Australia: bigint, Austria: bigint ... 36 more fields]

scala> df.groupBy().pivot("Country").count().printSchema
root
 |-- Australia: long (nullable = true)
 |-- Austria: long (nullable = true)
 |-- Bahrain: long (nullable = true)
 |-- Belgium: long (nullable = true)
 |-- Brazil: long (nullable = true)
 |-- Canada: long (nullable = true)
 |-- Channel Islands: long (nullable = true)
 |-- Cyprus: long (nullable = true)
 |-- Czech Republic: long (nullable = true)
 |-- Denmark: long (nullable = true)
 |-- EIRE: long (nullable = true)
 |-- European Community: long (nullable = true)
 |-- Finland: long (nullable = true)
 |-- France: long (nullable = true)
 |-- Germany: long (nullable = true)
 |-- Greece: long (nullable = true)
 |-- Hong Kong: long (nullable = true)
 |-- Iceland: long (nullable = true)
 |-- Israel: long (nullable = true)
 |-- Italy: long (nullable = true)
 |-- Japan: long (nullable = true)
 |-- Lebanon: long (nullable = true)
 |-- Lithuania: long (nullable = true)
 |-- Malta: long (nullable = true)
 |-- Netherlands: long (nullable = true)
 |-- Norway: long (nullable = true)
 |-- Poland: long (nullable = true)
 |-- Portugal: long (nullable = true)
 |-- RSA: long (nullable = true)
 |-- Saudi Arabia: long (nullable = true)
 |-- Singapore: long (nullable = true)
 |-- Spain: long (nullable = true)
 |-- Sweden: long (nullable = true)
 |-- Switzerland: long (nullable = true)
 |-- USA: long (nullable = true)
 |-- United Arab Emirates: long (nullable = true)
 |-- United Kingdom: long (nullable = true)
 |-- Unspecified: long (nullable = true)

 
scala> df.groupBy().pivot("Country").count().show
+---------+-------+-------+-------+------+------+---------------+------+--------------+-------+----+------------------+-------+------+-------+------+---------+-------+------+-----+-----+-------+---------+-----+-----------+------+------+--------+---+------------+---------+-----+------+-----------+---+--------------------+--------------+-----------+
|Australia|Austria|Bahrain|Belgium|Brazil|Canada|Channel Islands|Cyprus|Czech Republic|Denmark|EIRE|European Community|Finland|France|Germany|Greece|Hong Kong|Iceland|Israel|Italy|Japan|Lebanon|Lithuania|Malta|Netherlands|Norway|Poland|Portugal|RSA|Saudi Arabia|Singapore|Spain|Sweden|Switzerland|USA|United Arab Emirates|United Kingdom|Unspecified|
+---------+-------+-------+-------+------+------+---------------+------+--------------+-------+----+------------------+-------+------+-------+------+---------+-------+------+-----+-----+-------+---------+-----+-----------+------+------+--------+---+------------+---------+-----+------+-----------+---+--------------------+--------------+-----------+
|     1259|    401|     19|   2069|    32|   151|            758|   622|            30|    389|8196|                61|    695|  8557|   9495|   146|      288|    182|   297|  803|  358|     45|       35|  127|       2371|  1086|   341|    1519| 58|          10|      229| 2533|   462|       2002|291|                  68|        495478|        446|
+---------+-------+-------+-------+------+------+---------------+------+--------------+-------+----+------------------+-------+------+-------+------+---------+-------+------+-----+-----+-------+---------+-----+-----------+------+------+--------+---+------------+---------+-----+------+-----------+---+--------------------+--------------+-----------+

 
scala> df.groupBy().pivot("Country",Seq("Canada","Germany")).count().show
+------+-------+
|Canada|Germany|
+------+-------+
|   151|   9495|
+------+-------+

Note that distinct values in the country columns become the column names.


    • Note: Simply using df.groupBy() also gives us entire dataset as a RelationalGroupedDataset on which we can invoke aggregate methods or pivot methods.

CH7.4 Grouping Set, Rollup, Cube and grouping_id()



Grouping Set:
 
  • Grouping Sets: Used to perform aggregations across multiple groups. Grouping sets are a low-level tool for combining sets of aggregations together. They give you the ability to create arbitrary aggregation in their group-by statements.

  • Grouping sets depend on null values for aggregation levels. If you do not filter-out null values, you will get incorrect results. This applies to cubes, rollups, and grouping sets.

  • The GROUPING SETS operator is only available in SQL. To perform the same in DataFrames, you use the rollup and cube operators—which allow us to get the same results.
     
  • To include the total number of items, regardless of customer or stock code, we simply specify that we would like to aggregate at that level, as well, in our grouping set. (as shown in next example) . This is, effectively, the union of several different groupings together

val dfNoNull = dfWithDate.drop()
dfNoNull.createOrReplaceTempView("dfNoNull")
 
SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull
GROUP BY customerId, stockCode
GROUPING SETS((customerId, stockCode))
ORDER BY CustomerId DESC, stockCode DESC

SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull
GROUP BY customerId, stockCode
GROUPING SETS((customerId, stockCode),())
ORDER BY CustomerId DESC, stockCode DESC

Rollups:
 
  • Grouping Set and group by are for explicit groupings. A rollup is a multidimensional aggregation that performs a variety of group-by style calculations for us.

  • Following example create a rollup that looks across time (with our new Date column) and space (with the Country column) and creates a new DataFrame that includes the grand total over all dates, the grand total for each date in the DataFrame, and the subtotal for each country on each date in the DataFrame. :

scala>     val dfNoNull = dfWithDate.drop()
dfNoNull: org.apache.spark.sql.DataFrame = [InvoiceNo: string, StockCode: string ... 7 more fields]

scala>     val rolledUpDF = dfNoNull.rollup("Date", "Country").agg(sum("Quantity")).selectExpr("Date", "Country", "`sum(Quantity)` as total_quantity").orderBy("Date")
rolledUpDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Date: date, Country: string ... 1 more field]

scala>     rolledUpDF.show()
+----------+--------------+--------------+
|      Date|       Country|total_quantity|
+----------+--------------+--------------+
|      null|          null|       5176450|
|2010-12-01|        Norway|          1852|
|2010-12-01|        France|           449|
|2010-12-01|          null|         26814|
|2010-12-01|     Australia|           107|
|2010-12-01|   Netherlands|            97|
|2010-12-01|       Germany|           117|
|2010-12-01|          EIRE|           243|
|2010-12-01|United Kingdom|         23949|
|2010-12-02|       Germany|           146|
|2010-12-02|          null|         21023|
|2010-12-02|          EIRE|             4|
|2010-12-02|United Kingdom|         20873|
|2010-12-03|        Poland|           140|
|2010-12-03|   Switzerland|           110|
|2010-12-03|        France|           239|
|2010-12-03|          null|         14830|
|2010-12-03|         Italy|           164|
|2010-12-03|      Portugal|            65|
|2010-12-03|         Spain|           400|
+----------+--------------+--------------+
only showing top 20 rows

Now where you see the null values is where you’ll find the grand totals. A null in both rollup columns specifies the grand total across both of those columns

  • Following is the definition of the rollup method on Dataframe. Note that this also returns  a RelationalGroupedDataset allowing us to use all the aggregate functions available:

def rollup(col1: String, cols: String*): RelationalGroupedDataset
Create a multi-dimensional rollup for the current Dataset using the specified columns, so we can run aggregation on them. See RelationalGroupedDataset for all the available aggregate functions.
This is a variant of rollup that can only group by existing columns using column names (i.e. cannot construct expressions).
// Compute the average for all numeric columns rolluped by department and group.
ds.rollup("department", "group").avg()
// Compute the max age and average salary, rolluped by department and gender.
ds.rollup($"department", $"gender").agg(Map(
  "salary" -> "avg",
  "age" -> "max"
))

def rollup(cols: Column*): RelationalGroupedDataset
Create a multi-dimensional rollup for the current Dataset using the specified columns, so we can run aggregation on them. See RelationalGroupedDataset for all the available aggregate functions.
// Compute the average for all numeric columns rolluped by department and group.
ds.rollup($"department", $"group").avg()
// Compute the max age and average salary, rolluped by department and gender.
ds.rollup($"department", $"gender").agg(Map(
  "salary" -> "avg",
  "age" -> "max"
))



 
Cube :
 
  • A cube takes the rollup to a level deeper. Rather than treating elements hierarchically, a cube does the same thing across all dimensions. The method call is quite similar, but instead of calling rollup, we call cube. Note that the definition shows that this also returns a RelationalGroupedDataset.

def cube(col1: String, cols: String*): RelationalGroupedDataset
Create a multi-dimensional cube for the current Dataset using the specified columns, so we can run aggregation on them. See RelationalGroupedDataset for all the available aggregate functions.
This is a variant of cube that can only group by existing columns using column names (i.e. cannot construct expressions).
// Compute the average for all numeric columns cubed by department and group.
ds.cube("department", "group").avg()
// Compute the max age and average salary, cubed by department and gender.
ds.cube($"department", $"gender").agg(Map(
  "salary" -> "avg",
  "age" -> "max"
))

def cube(cols: Column*): RelationalGroupedDataset
Create a multi-dimensional cube for the current Dataset using the specified columns, so we can run aggregation on them. See RelationalGroupedDataset for all the available aggregate functions.
// Compute the average for all numeric columns cubed by department and group.
ds.cube($"department", $"group").avg()
// Compute the max age and average salary, cubed by department and gender.
ds.cube($"department", $"gender").agg(Map(
  "salary" -> "avg",
  "age" -> "max"
))

scala>     val dfNoNull = dfWithDate.drop()
dfNoNull: org.apache.spark.sql.DataFrame = [InvoiceNo: string, StockCode: string ... 7 more fields]

scala>     val rolledUpDF = dfNoNull.cube("Date", "Country").agg(sum("Quantity")).selectExpr("Date", "Country", "`sum(Quantity)` as total_quantity").orderBy("Date")
rolledUpDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Date: date, Country: string ... 1 more field]

scala>     rolledUpDF.show()
+----+--------------------+--------------+
|Date|             Country|total_quantity|
+----+--------------------+--------------+
|null|               Japan|         25218|
|null|            Portugal|         16180|
|null|             Germany|        117448|
|null|                null|       5176450|
|null|           Australia|         83653|
|null|              Cyprus|          6317|
|null|           Singapore|          5234|
|null|                 RSA|           352|
|null|         Unspecified|          3300|
|null|United Arab Emirates|           982|
|null|                 USA|          1034|

Grouping Metadata:
 
  • When using cubes, rollup  the grouping_id() function allows us to query the aggregation levels so that you can easily filter them down accordingly.  This gives us a column specifying the level of aggregation that we have in our result set.
     
scala> dfNoNull.rollup("customerId", "stockCode").agg(grouping_id().as("gid"), sum("Quantity")).where("gid > 1").show
+----------+---------+---+-------------+
|customerId|stockCode|gid|sum(Quantity)|
+----------+---------+---+-------------+
|      null|     null|  3|      5176450|
+----------+---------+---+-------------+


scala> dfNoNull.rollup("customerId", "stockCode").agg(grouping_id().as("gid"), sum("Quantity")).where("gid >= 1").show
+----------+---------+---+-------------+
|customerId|stockCode|gid|sum(Quantity)|
+----------+---------+---+-------------+
|     18245|     null|  1|         1781|
|     14995|     null|  1|          103|
|     13934|     null|  1|          390|
|     12670|     null|  1|         1391|
|     14947|     null|  1|          179|
|     16400|     null|  1|          172|