Search This Blog

Sunday, 14 April 2019

CH9.2 Reading and Writing csv files



CSV files:
 
  • CSV stands for comma-separated values.  CSV reader has a large number of options. These options give you the ability to work around issues like certain characters needing to be escaped—for example, commas inside of columns when the file is also comma-delimited or null values labeled in an unconventional way.

  • Following is the list of important options for CSV reader:

Read/write
Key
Potential values
Default
Description
Both
sep
Any single string character
,
The single character that is used as separator for each field and value.
Both
header
true, false
false
A Boolean flag that declares whether the first line in the file(s) are the names of the columns.
Read
escape
Any string character
\
The character Spark should use to escape other characters in the file.
Read
inferSchema
true, false
false
Specifies whether Spark should infer column types when reading the file.
Read
ignoreLeadingWhiteSpace
true, false
false
Declares whether leading spaces from values being read should be skipped.
Read
ignoreTrailingWhiteSpace
true, false
false
Declares whether trailing spaces from values being read should be skipped.
Both
nullValue
Any string character
“”
Declares what character represents a null value in the file.
Both
nanValue
Any string character
NaN
Declares what character represents a NaN or missing character in the CSV file.
Both
positiveInf
Any string or character
Inf
Declares what character(s) represent a positive infinite value.
Both
negativeInf
Any string or character
-Inf
Declares what character(s) represent a negative infinite value.
Both
compression or codec
None, uncompressed, bzip2, deflate, gzip, lz4, or snappy
none
Declares what compression codec Spark should use to read or write the file.
Both
dateFormat
Any string or character that conforms to java’s SimpleDataFormat.
yyyy-MM-dd
Declares the date format for any columns that are date type.
Both
timestampFormat
Any string or character that conforms to java’s SimpleDataFormat.
yyyy-MM-dd’T’HH:mm:ss.SSSZZ
Declares the timestamp format for any columns that are timestamp type.
Read
maxColumns
Any integer
20480
Declares the maximum number of columns in the file.
Read
maxCharsPerColumn
Any integer
1000000
Declares the maximum number of characters in a column.
Read
escapeQuotes
true, false
true
Declares whether Spark should escape quotes that are found in lines.
Read
maxMalformedLogPerPartition
Any integer
10
Sets the maximum number of malformed rows Spark will log for each partition. Malformed records beyond this number will be ignored.
Write
quoteAll
true, false
false
Specifies whether all values should be enclosed in quotes, as opposed to just escaping values that have a quote character.
Read
multiLine
true, false
false
This option allows you to read multiline CSV files where each logical row in the CSV file might span multiple rows in the file itself.

Note we can specify the compression codec to read/write to compressed files.

  • Following shows how to read from csv files. First we first create a DataFrameReader for that specific format.
In below example, we specify that 1) the csv files contains "header". 2) Schema will be inferred from the files 3) set the read mode to FAILFAST which will fail the job if the data is malformed.
Next we also show how to specify the schema explicitly.

1    import org.apache.spark.sql._
2    import org.apache.spark.sql.types._
3   
4    object SparkDefinitiveTesting {
5   
6      def main(args: Array[String]): Unit = {
7        val spark = SparkSession.builder().master("local[*]").appName("Test App").getOrCreate()
8        spark.sparkContext.setLogLevel("FATAL")
9   
10       val df_csv = spark.read.format("csv").option("header", "true").option("mode", "FAILFAST").option("inferSchema", "true").load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\by-day")
11       println("Count of Dataframe df_csv:" + df_csv.count())
12  
13       val manualSchema = StructType(Seq(StructField("InvoiceNo", StringType, true),
14         StructField("StockCode", StringType, true),
15         StructField("Description", StringType, true),
16         StructField("Quantity", IntegerType, true),
17         StructField("InvoiceDate", TimestampType, true),
18         StructField("UnitPrice", DoubleType, true),
19         StructField("CustomerID", DoubleType, true),
20         StructField("Country", StringType, true)))
21  
22       val df_csv_manual = spark.read.format("csv").option("header", "true").option("mode", "FAILFAST").option("inferSchema", "true").schema(manualSchema).load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\by-day")
23       println("Count of Dataframe df_csv_manual:" + df_csv_manual.count())
24     }
25   }
2
Count of Dataframe df_csv:541909
Count of Dataframe df_csv_manual:541909



If the data that we read does not match the schema, the job will fail when Spark actually reads the data.(Not when the code is written). As soon as we start our Spark job, it will immediately fail (after we execute a job) due to the data not conforming to the specified schema. In general, Spark will fail only at job execution time rather than DataFrame definition time—even if, for example, we point to a file that does not exist. This is due to lazy evaluation

  • Following shows how to write data to tab separated files in gzip compressed format. Note that the write method is not available on SparkSession. It's available on the dataframe itself.

SparkDefinitiveTesting.scala
1    import org.apache.spark.sql._
2    import org.apache.spark.sql.types._
3   
4    object SparkDefinitiveTesting {
5   
6      def main(args: Array[String]): Unit = {
7        val spark = SparkSession.builder().master("local[*]").appName("Test App").getOrCreate()
8        spark.sparkContext.setLogLevel("FATAL")
9   
10       val manualSchema = StructType(Seq(StructField("InvoiceNo", StringType, true),
11         StructField("StockCode", StringType, true),
12         StructField("Description", StringType, true),
13         StructField("Quantity", IntegerType, true),
14         StructField("InvoiceDate", TimestampType, true),
15         StructField("UnitPrice", DoubleType, true),
16         StructField("CustomerID", DoubleType, true),
17         StructField("Country", StringType, true)))
18  
19       val df_csv_manual = spark.read.format("csv").option("header", "true").option("mode", "FAILFAST").option("inferSchema", "true").schema(manualSchema).load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\by-day").repartition(5)
20       println("Count of Dataframe df_csv_manual:" + df_csv_manual.count())
21       df_csv_manual.write.format("csv").mode(SaveMode.Overwrite).option("sep", "\t").option("codec", "gzip").save("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\tsv")
22  
23  
24     }
25   }
26  
C:\Users\sukulma>cd C:\Users\sukulma\Downloads\Spark-Data\Spark-data\data\retail-data\tsv

C:\Users\sukulma\Downloads\Spark-Data\Spark-data\data\retail-data\tsv>dir
 Volume in drive C is OSDisk
 Volume Serial Number is C05C-4437

 Directory of C:\Users\sukulma\Downloads\Spark-Data\Spark-data\data\retail-data\tsv

04/14/2019  02:00 PM    <DIR>          .
04/14/2019  02:00 PM    <DIR>          ..
04/14/2019  01:59 PM            15,328 .part-00000-b190aff7-124c-4991-aed8-f3529d982c33.csv.gz.crc
04/14/2019  01:59 PM            15,308 .part-00001-b190aff7-124c-4991-aed8-f3529d982c33.csv.gz.crc
04/14/2019  01:59 PM            15,308 .part-00002-b190aff7-124c-4991-aed8-f3529d982c33.csv.gz.crc
04/14/2019  01:59 PM            15,328 .part-00003-b190aff7-124c-4991-aed8-f3529d982c33.csv.gz.crc
04/14/2019  02:00 PM            15,332 .part-00004-b190aff7-124c-4991-aed8-f3529d982c33.csv.gz.crc
04/14/2019  02:00 PM                 8 ._SUCCESS.crc
04/14/2019  01:59 PM         1,960,940 part-00000-b190aff7-124c-4991-aed8-f3529d982c33.csv.gz
04/14/2019  01:59 PM         1,958,236 part-00001-b190aff7-124c-4991-aed8-f3529d982c33.csv.gz
04/14/2019  01:59 PM         1,958,342 part-00002-b190aff7-124c-4991-aed8-f3529d982c33.csv.gz
04/14/2019  01:59 PM         1,960,794 part-00003-b190aff7-124c-4991-aed8-f3529d982c33.csv.gz
04/14/2019  02:00 PM         1,961,057 part-00004-b190aff7-124c-4991-aed8-f3529d982c33.csv.gz
04/14/2019  02:00 PM                 0 _SUCCESS
              12 File(s)      9,875,981 bytes
               2 Dir(s)  77,774,761,984 bytes free

Note that 5 .gz files are created inside the directory. This actually reflects the number of partitions in our DataFrame at the time we write it out.


 

No comments:

Post a Comment