|
The Structure of the Data
Sources API
Read API Structure
- The core structure for
reading data is
DataFrameReader.format(...).option("key",
"value").schema(...).load()
- This format is used to
read from all of our data sources.
- The format method
is optional because by default Spark will use the Parquet format.
- The
method option allows you to set key-value
configurations to parameterize how you will read data. (We can use options method to pass a Map of all the options)
- The
method schema is optional if the data source
provides a schema or if you intend to use schema inference.
- The foundation
for reading data in Spark is the DataFrameReader.
We access this through the SparkSession via
the read attribute.
|
def read: DataFrameReader
Returns
a DataFrameReader that can be used to read non-streaming data in as a
DataFrame.
sparkSession.read.parquet("/path/to/file.parquet")
sparkSession.read.schema(schema).json("/path/to/file.json")
|
- After we have a
DataFrame reader, we specify several values:
- The format
- The schema
- The read mode
- A series
of options
The format, options, and schema each return a DataFrameReader that can undergo further transformations and are
all optional, except for one option. Each data source has a specific set of
options that determine how the data is read into Spark. At a minimum, you
must supply the DataFrameReader a path to from which to read.
- Following are the
definitions of some of the methods on DataFrameReader. Note that most of
the methods return another DataframeReader allowing us to chain the
method calls:
|
def option(key: String, value: Double):
DataFrameReader
Adds
an input option for the underlying data source.
def options(options: Map[String, String]):
DataFrameReader
(Scala-specific)
Adds input options for the underlying data source.
def format(source: String): DataFrameReader
Specifies
the input data source format.
def schema(schema: StructType): DataFrameReader
Specifies
the input schema. Some data sources (e.g. JSON) can infer the input schema
automatically from data. By specifying the schema here, the underlying data
source can skip the schema inference step, and thus speed up data loading.
def load(paths: String*): DataFrame
Loads
input in as a DataFrame, for data sources that support multiple paths. Only
works if the source is a HadoopFsRelationProvider.
def load(path: String): DataFrame
Loads
input in as a DataFrame, for data sources that require a path (e.g. data
backed by a local or distributed file system).
def load(): DataFrame
Loads
input in as a DataFrame, for data sources that don't require a path (e.g.
external key-value stores).
|
|
spark.read.format("csv")
.option("mode", "FAILFAST")
.option("inferSchema", "true")
.option("path", "path/to/file(s)")
.schema(someSchema)
.load()
|
- Read
modes: Read modes specify what will happen when Spark does come
across malformed records. The mode is specified using the
"mode" option.
|
Read
mode
|
Description
|
|
Permissive (default)
|
Sets
all fields to null when it encounters a corrupted record and
places all corrupted records in a string column called _corrupt_record
|
|
dropMalformed
|
Drops
the row that contains malformed records
|
|
failFast
|
Fails
immediately upon encountering malformed records
|
Write API Structure
- The core
structure for writing data is as follows: DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy( ...).save()
The format is optional because by default, Spark
will use the arquet format. option, again, allows us to configure how to
write out our given data.
- The foundation
for writing data is quite similar to that of reading data. Instead of
the DataFrameReader, we
have the DataFrameWriter.
|
def write: DataFrameWriter[T]
Interface
for saving the content of the non-streaming Dataset out into external
storage.
|
- Once we have the
DataFrameWriter we can specify the format, options, write mode. At a
minimum, you must supply a path.
|
dataframe.write.format("csv")
.option("mode",
"OVERWRITE")
.option("dateFormat",
"yyyy-MM-dd")
.option("path",
"path/to/file(s)")
.save()
|
Following shows some of the important methods available
on DataFrameWriter:
|
def format(source: String): DataFrameWriter[T]
Specifies
the underlying output data source. Built-in options include
"parquet", "json", etc.
def option(key: String, value: Double):
DataFrameWriter[T]
Adds
an output option for the underlying data source.
def options(options: Map[String, String]):
DataFrameWriter[T]
(Scala-specific)
Adds output options for the underlying data source.
def save(path: String): Unit
Saves
the content of the DataFrame at the specified path.
|
Save mode specifies what will happen if Spark finds
data at the specified location.
Following is the list.
|
Save
mode
|
Description
|
|
Append
|
Appends
the output files to the list of files that already exist at that location
|
|
overwrite
|
Will
completely overwrite any data that already exists there
|
|
errorIfExists
(default)
|
Throws
an error and fails the write if data or files already exist at the
specified location
|
|
ignore
|
If
data or files exist at the location, do nothing with the current DataFrame
|
We can specify the mode using option OR
using the overloaded method "mode" available on the DataframeWriter.
|
def mode(saveMode: String): DataFrameWriter[T]
Specifies
the behavior when data or table already exists. Options include:
overwrite:
overwrite the existing data.
append:
append the data.
ignore:
ignore the operation (i.e. no-op).
error:
default option, throw an exception at runtime.
def mode(saveMode: SaveMode): DataFrameWriter[T]
Specifies
the behavior when data or table already exists. Options include:
SaveMode.Overwrite:
overwrite the existing data.
SaveMode.Append:
append the data.
SaveMode.Ignore:
ignore the operation (i.e. no-op).
SaveMode.ErrorIfExists:
default option, throw an exception at runtime.
|
|
No comments:
Post a Comment