|
CSV files:
- CSV stands
for comma-separated values. CSV reader has a large
number of options. These options give you the ability to
work around issues like certain characters needing to be escaped—for
example, commas inside of columns when the file is also comma-delimited
or null values labeled in an unconventional way.
- Following is the list
of important options for CSV reader:
|
Read/write
|
Key
|
Potential
values
|
Default
|
Description
|
|
Both
|
sep
|
Any
single string character
|
,
|
The
single character that is used as separator for each field and value.
|
|
Both
|
header
|
true, false
|
false
|
A
Boolean flag that declares whether the first line in the file(s) are the
names of the columns.
|
|
Read
|
escape
|
Any
string character
|
\
|
The
character Spark should use to escape other characters in the file.
|
|
Read
|
inferSchema
|
true, false
|
false
|
Specifies
whether Spark should infer column types when reading the file.
|
|
Read
|
ignoreLeadingWhiteSpace
|
true, false
|
false
|
Declares
whether leading spaces from values being read should be skipped.
|
|
Read
|
ignoreTrailingWhiteSpace
|
true, false
|
false
|
Declares
whether trailing spaces from values being read should be skipped.
|
|
Both
|
nullValue
|
Any
string character
|
“”
|
Declares
what character represents a null value in the file.
|
|
Both
|
nanValue
|
Any
string character
|
NaN
|
Declares
what character represents a NaN or missing character in the CSV file.
|
|
Both
|
positiveInf
|
Any
string or character
|
Inf
|
Declares
what character(s) represent a positive infinite value.
|
|
Both
|
negativeInf
|
Any
string or character
|
-Inf
|
Declares
what character(s) represent a negative infinite value.
|
|
Both
|
compression or codec
|
None, uncompressed, bzip2, deflate, gzip, lz4,
or snappy
|
none
|
Declares
what compression codec Spark should use to read or write the file.
|
|
Both
|
dateFormat
|
Any
string or character that conforms to java’s SimpleDataFormat.
|
yyyy-MM-dd
|
Declares
the date format for any columns that are date type.
|
|
Both
|
timestampFormat
|
Any
string or character that conforms to java’s SimpleDataFormat.
|
yyyy-MM-dd’T’HH:mm:ss.SSSZZ
|
Declares
the timestamp format for any columns that are timestamp type.
|
|
Read
|
maxColumns
|
Any
integer
|
20480
|
Declares
the maximum number of columns in the file.
|
|
Read
|
maxCharsPerColumn
|
Any
integer
|
1000000
|
Declares
the maximum number of characters in a column.
|
|
Read
|
escapeQuotes
|
true, false
|
true
|
Declares
whether Spark should escape quotes that are found in lines.
|
|
Read
|
maxMalformedLogPerPartition
|
Any
integer
|
10
|
Sets
the maximum number of malformed rows Spark will log for each partition.
Malformed records beyond this number will be ignored.
|
|
Write
|
quoteAll
|
true, false
|
false
|
Specifies
whether all values should be enclosed in quotes, as opposed to just
escaping values that have a quote character.
|
|
Read
|
multiLine
|
true, false
|
false
|
This
option allows you to read multiline CSV files where each logical row in the
CSV file might span multiple rows in the file itself.
|
Note we can specify the compression codec to read/write
to compressed files.
- Following shows how to
read from csv files. First we first create
a DataFrameReader for that specific format.
In below example, we specify that 1) the csv files
contains "header". 2) Schema will be inferred from the files 3) set
the read mode to FAILFAST which will fail the job if the data is malformed.
Next we also show how to specify the schema explicitly.
|
1 import org.apache.spark.sql._
2
import org.apache.spark.sql.types._
3
4
object SparkDefinitiveTesting {
5
6
def main(args: Array[String]):
Unit = {
7
val spark =
SparkSession.builder().master("local[*]").appName("Test App").getOrCreate()
8
spark.sparkContext.setLogLevel("FATAL")
9
10
val df_csv =
spark.read.format("csv").option("header", "true").option("mode", "FAILFAST").option("inferSchema", "true").load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\by-day")
11
println("Count of Dataframe df_csv:" + df_csv.count())
12
13
val manualSchema =
StructType(Seq(StructField("InvoiceNo", StringType, true),
14
StructField("StockCode", StringType, true),
15
StructField("Description", StringType, true),
16
StructField("Quantity", IntegerType, true),
17
StructField("InvoiceDate", TimestampType, true),
18
StructField("UnitPrice", DoubleType, true),
19
StructField("CustomerID", DoubleType, true),
20
StructField("Country", StringType, true)))
21
22
val df_csv_manual =
spark.read.format("csv").option("header", "true").option("mode", "FAILFAST").option("inferSchema", "true").schema(manualSchema).load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\by-day")
23
println("Count of Dataframe df_csv_manual:" + df_csv_manual.count())
24
}
25
}
2
|
|
Count of Dataframe df_csv:541909
Count of Dataframe df_csv_manual:541909
|
If the data that we read does not match
the schema, the job will fail when Spark
actually reads the data.(Not when the code
is written). As soon as we start our Spark job, it will immediately fail
(after we execute a job) due to the data not conforming to the specified
schema. In general, Spark will fail only
at job execution time rather than DataFrame definition time—even if, for example, we point to a file that does
not exist. This is due to lazy
evaluation
- Following shows how to
write data to tab separated files in gzip compressed format. Note that
the write method is not available on SparkSession. It's available on the
dataframe itself.
|
SparkDefinitiveTesting.scala
|
1 import org.apache.spark.sql._
2
import org.apache.spark.sql.types._
3
4
object SparkDefinitiveTesting {
5
6
def main(args: Array[String]):
Unit = {
7
val spark =
SparkSession.builder().master("local[*]").appName("Test App").getOrCreate()
8
spark.sparkContext.setLogLevel("FATAL")
9
10
val manualSchema =
StructType(Seq(StructField("InvoiceNo", StringType, true),
11
StructField("StockCode", StringType, true),
12
StructField("Description", StringType, true),
13
StructField("Quantity", IntegerType, true),
14
StructField("InvoiceDate", TimestampType, true),
15
StructField("UnitPrice", DoubleType, true),
16
StructField("CustomerID", DoubleType, true),
17
StructField("Country", StringType, true)))
18
19
val df_csv_manual =
spark.read.format("csv").option("header", "true").option("mode", "FAILFAST").option("inferSchema", "true").schema(manualSchema).load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\by-day").repartition(5)
20
println("Count of Dataframe df_csv_manual:" + df_csv_manual.count())
21
df_csv_manual.write.format("csv").mode(SaveMode.Overwrite).option("sep", "\t").option("codec", "gzip").save("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\tsv")
22
23
24
}
25
}
26
|
|
C:\Users\sukulma>cd
C:\Users\sukulma\Downloads\Spark-Data\Spark-data\data\retail-data\tsv
C:\Users\sukulma\Downloads\Spark-Data\Spark-data\data\retail-data\tsv>dir
Volume in
drive C is OSDisk
Volume
Serial Number is C05C-4437
Directory
of C:\Users\sukulma\Downloads\Spark-Data\Spark-data\data\retail-data\tsv
04/14/2019
02:00 PM <DIR> .
04/14/2019
02:00 PM <DIR> ..
04/14/2019
01:59 PM 15,328
.part-00000-b190aff7-124c-4991-aed8-f3529d982c33.csv.gz.crc
04/14/2019
01:59 PM 15,308
.part-00001-b190aff7-124c-4991-aed8-f3529d982c33.csv.gz.crc
04/14/2019
01:59 PM 15,308
.part-00002-b190aff7-124c-4991-aed8-f3529d982c33.csv.gz.crc
04/14/2019
01:59 PM 15,328
.part-00003-b190aff7-124c-4991-aed8-f3529d982c33.csv.gz.crc
04/14/2019
02:00 PM 15,332
.part-00004-b190aff7-124c-4991-aed8-f3529d982c33.csv.gz.crc
04/14/2019
02:00 PM 8
._SUCCESS.crc
04/14/2019
01:59 PM 1,960,940
part-00000-b190aff7-124c-4991-aed8-f3529d982c33.csv.gz
04/14/2019
01:59 PM 1,958,236
part-00001-b190aff7-124c-4991-aed8-f3529d982c33.csv.gz
04/14/2019
01:59 PM 1,958,342
part-00002-b190aff7-124c-4991-aed8-f3529d982c33.csv.gz
04/14/2019
01:59 PM 1,960,794
part-00003-b190aff7-124c-4991-aed8-f3529d982c33.csv.gz
04/14/2019
02:00 PM 1,961,057
part-00004-b190aff7-124c-4991-aed8-f3529d982c33.csv.gz
04/14/2019
02:00 PM 0
_SUCCESS
12 File(s) 9,875,981
bytes
2 Dir(s) 77,774,761,984 bytes
free
|
Note that 5 .gz files are
created inside the directory. This actually reflects the number of partitions
in our DataFrame at the time we write it out.
|
No comments:
Post a Comment