Search This Blog

Sunday, 14 April 2019

CH9.1 Data Sources API structure


    Spark has six “core” data sources and hundreds of external data sources written by the community

    Following are Spark’s core data sources:
    • CSV
    • JSON
    • Parquet
    • ORC
    • JDBC/ODBC connections
    • Plain-text files

    Spark has numerous community-created data sources. Here’s just a small sample:


The Structure of the Data Sources API

Read API Structure

  •   The core structure for reading data is  DataFrameReader.format(...).option("key", "value").schema(...).load()

  • This format is used to read from all of our data sources.
    • The format method is optional because by default Spark will use the Parquet format.
    • The method option allows you to set key-value configurations to parameterize how you will read data.  (We can use options method to  pass a Map of all the options)
    • The method schema is optional if the data source provides a schema or if you intend to use schema inference.

  • The foundation for reading data in Spark is the DataFrameReader. We access this through the SparkSession via the read attribute.

Following is the definition of the "read" method on SparkSession (https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.sql.SparkSession)

def read: DataFrameReader
Returns a DataFrameReader that can be used to read non-streaming data in as a DataFrame.

sparkSession.read.parquet("/path/to/file.parquet")
sparkSession.read.schema(schema).json("/path/to/file.json")

  • After we have a DataFrame reader, we specify several values:
    • The format
    • The schema
    • The read mode
    • A series of options

The format, options, and schema each return a DataFrameReader that can undergo further transformations and are all optional, except for one option. Each data source has a specific set of options that determine how the data is read into Spark. At a minimum, you must supply the DataFrameReader a path to from which to read.

  • Following are the definitions of some of the methods on DataFrameReader. Note that most of the methods return another DataframeReader allowing us to chain the method calls:

def option(key: String, value: Double): DataFrameReader
Adds an input option for the underlying data source.

def options(options: Map[String, String]): DataFrameReader
(Scala-specific) Adds input options for the underlying data source.

def format(source: String): DataFrameReader
Specifies the input data source format.

def schema(schema: StructType): DataFrameReader
Specifies the input schema. Some data sources (e.g. JSON) can infer the input schema automatically from data. By specifying the schema here, the underlying data source can skip the schema inference step, and thus speed up data loading.

def load(paths: String*): DataFrame
Loads input in as a DataFrame, for data sources that support multiple paths. Only works if the source is a HadoopFsRelationProvider.

def load(path: String): DataFrame
Loads input in as a DataFrame, for data sources that require a path (e.g. data backed by a local or distributed file system).

def load(): DataFrame
Loads input in as a DataFrame, for data sources that don't require a path (e.g. external key-value stores).

  • Example:

spark.read.format("csv")
  .option("mode", "FAILFAST")
  .option("inferSchema", "true")
  .option("path", "path/to/file(s)")
  .schema(someSchema)
  .load()

  • Read modes: Read modes specify what will happen when Spark does come across malformed records. The mode is specified using the "mode" option.

Read mode
Description
Permissive (default)
Sets all fields to null when it encounters a corrupted record and places all corrupted records in a string column called _corrupt_record
dropMalformed
Drops the row that contains malformed records
failFast
Fails immediately upon encountering malformed records

Write API Structure


  • The core structure for writing data is as follows: DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy(  ...).save()
The format is optional because by default, Spark will use the arquet format. option, again, allows us to configure how to write out our given data. 

  • The foundation for writing data is quite similar to that of reading data. Instead of the DataFrameReader, we have the DataFrameWriter.


def write: DataFrameWriter[T]
Interface for saving the content of the non-streaming Dataset out into external storage.

  • Once we have the DataFrameWriter we can specify the format, options, write mode. At a minimum, you must supply a path.

dataframe.write.format("csv")
  .option("mode", "OVERWRITE")
  .option("dateFormat", "yyyy-MM-dd")
  .option("path", "path/to/file(s)")
  .save()

Following shows some of the important methods available on DataFrameWriter:

def format(source: String): DataFrameWriter[T]
Specifies the underlying output data source. Built-in options include "parquet", "json", etc.

def option(key: String, value: Double): DataFrameWriter[T]
Adds an output option for the underlying data source.

def options(options: Map[String, String]): DataFrameWriter[T]
(Scala-specific) Adds output options for the underlying data source.

def save(path: String): Unit
Saves the content of the DataFrame at the specified path.

  • Save modes:

Save mode specifies what will happen if Spark finds data at the specified location.
Following is the list.

Save mode
Description
Append
Appends the output files to the list of files that already exist at that location
overwrite
Will completely overwrite any data that already exists there
errorIfExists (default)
Throws an error and fails the write if data or files already exist at the specified location
ignore
If data or files exist at the location, do nothing with the current DataFrame

We can specify the mode using option OR using the overloaded method "mode" available on the DataframeWriter.

def mode(saveMode: String): DataFrameWriter[T]
Specifies the behavior when data or table already exists. Options include:

overwrite: overwrite the existing data.
append: append the data.
ignore: ignore the operation (i.e. no-op).
error: default option, throw an exception at runtime.


def mode(saveMode: SaveMode): DataFrameWriter[T]
Specifies the behavior when data or table already exists. Options include:

SaveMode.Overwrite: overwrite the existing data.
SaveMode.Append: append the data.
SaveMode.Ignore: ignore the operation (i.e. no-op).
SaveMode.ErrorIfExists: default option, throw an exception at runtime.



No comments:

Post a Comment