Search This Blog

Sunday, 14 April 2019

CH9.4 Reading and Writing Parquet Files



Parquet Files
 
  • Parquet is an open source column-oriented data store that provides a variety of storage optimizations, especially for analytics workloads.

  • It provides columnar compression, which saves storage space and allows for reading individual columns instead of entire files.

  • It is a file format that works exceptionally well with Apache Spark and is in fact the default file format.

  • Its recommended that we write data out to Parquet for long-term storage because reading from a Parquet file will always be more efficient than JSON or CSV. Another advantage of Parquet is that it supports complex types. This means that if your column is an array (which would fail with a CSV file, for example), map, or struct, you’ll still be able to read and write that file without issue. 

  • As shown below parquet has very few options because it enforces its own schema when storing data. Thus, all we need to set is the format and we are good to go. (schema is built into the file itself (so no inference needed).

  • Following are the options available with Parquet data source:

Read/Write
Key
Potential Values
Default
Description
Write
compressionor codec
None, uncompressed, bzip2, deflate, gzip, lz4, or snappy
None
Declares what compression codec Spark should use to read or write the file.
Read
mergeSchema
true, false
Value of the configuration spark.sql.parquet.mergeSchema
You can incrementally add columns to newly written Parquet files in the same table/folder. Use this option to enable or disable this feature.

Even though there are only two options, you can still encounter problems if you’re working with incompatible Parquet files. Be careful when you write out Parquet files with different versions of Spark 
 
  • Following example shows how to store data from csv files as parquet and then read the same parquet files to print the Schema

1    import org.apache.spark.sql._
2    import org.apache.spark.sql.types._
3   
4    object SparkDefinitiveTesting {
5   
6      def main(args: Array[String]): Unit = {
7        val spark = SparkSession.builder().master("local[*]").appName("Test App").getOrCreate()
8        spark.sparkContext.setLogLevel("FATAL")
9   
10       val manualSchema = StructType(Seq(StructField("InvoiceNo", StringType, true),
11         StructField("StockCode", StringType, true),
12         StructField("Description", StringType, true),
13         StructField("Quantity", IntegerType, true),
14         StructField("InvoiceDate", TimestampType, true),
15         StructField("UnitPrice", DoubleType, true),
16         StructField("CustomerID", DoubleType, true),
17         StructField("Country", StringType, true)))
18  
19       val df_csv_manual = spark.read.format("csv").option("header", "true").option("mode", "FAILFAST").option("inferSchema", "true").schema(manualSchema).load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\by-day").repartition(5)
20       println("Count of Dataframe df_csv_manual:" + df_csv_manual.count())
21       df_csv_manual.write.format("parquet").mode(SaveMode.Overwrite).save("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\parquetop")
22     
23  
24       val df_parquet =   spark.read.format("parquet").load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\parquetop")
25       df_parquet.printSchema()
26       println("Count of Dataframe df_parquet:" + df_parquet.count())
27  
28     }
29   }
30  
Count of Dataframe df_csv_manual:541909

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)

Count of Dataframe df_parquet:541909

 
C:\Users\sukulma\Downloads\Spark-Data\Spark-data\data\retail-data\parquetop>dir
 Volume in drive C is OSDisk
 Volume Serial Number is C05C-4437

 Directory of C:\Users\sukulma\Downloads\Spark-Data\Spark-data\data\retail-data\parquetop

04/15/2019  01:43 AM    <DIR>          .
04/15/2019  01:43 AM    <DIR>          ..
04/15/2019  01:43 AM             9,072 .part-00000-49b4a405-89a4-466b-811e-75baf8a300d2.snappy.parquet.crc
04/15/2019  01:43 AM             9,096 .part-00001-49b4a405-89a4-466b-811e-75baf8a300d2.snappy.parquet.crc
04/15/2019  01:43 AM             9,084 .part-00002-49b4a405-89a4-466b-811e-75baf8a300d2.snappy.parquet.crc
04/15/2019  01:43 AM             9,092 .part-00003-49b4a405-89a4-466b-811e-75baf8a300d2.snappy.parquet.crc
04/15/2019  01:43 AM             9,084 .part-00004-49b4a405-89a4-466b-811e-75baf8a300d2.snappy.parquet.crc
04/15/2019  01:43 AM                 8 ._SUCCESS.crc
04/15/2019  01:43 AM         1,160,039 part-00000-49b4a405-89a4-466b-811e-75baf8a300d2.snappy.parquet
04/15/2019  01:43 AM         1,162,760 part-00001-49b4a405-89a4-466b-811e-75baf8a300d2.snappy.parquet
04/15/2019  01:43 AM         1,161,664 part-00002-49b4a405-89a4-466b-811e-75baf8a300d2.snappy.parquet
04/15/2019  01:43 AM         1,162,664 part-00003-49b4a405-89a4-466b-811e-75baf8a300d2.snappy.parquet
04/15/2019  01:43 AM         1,161,368 part-00004-49b4a405-89a4-466b-811e-75baf8a300d2.snappy.parquet
04/15/2019  01:43 AM                 0 _SUCCESS
              12 File(s)      5,853,931 bytes
               2 Dir(s)  77,645,365,248 bytes free

C:\Users\sukulma\Downloads\Spark-Data\Spark-data\data\retail-data\parquetop>


CH9.3 Reading and Writing json files


JSON file:
 
  • In Spark, when we refer to JSON files, we refer to line-delimited JSON files. This contrasts with files that have a large JSON object or array per file.
     
  • The line-delimited versus multiline trade-off is controlled by a single option: multiLine. When you set this option to true, you can read an entire file as one json object and Spark will go through the work of parsing that into a DataFrame.
     
  • Line-delimited JSON is actually a much more stable format because it allows you to append to a file with a new record (rather than having to read in an entire file and then write it out), which is what we recommend that you use. Another key reason for the popularity of line-delimited JSON is because JSON objects have structure, and JavaScript (on which JSON is based) has at least basic types. This makes it easier to work with because Spark can make more assumptions on our behalf about the data.
     

Read/write
Key
Potential values
Default
Description
Both
compression or codec
None, uncompressed, bzip2, deflate, gzip, lz4, or snappy
none
Declares what compression codec Spark should use to read or write the file.
Both
dateFormat
Any string or character that conforms to Java’s SimpleDataFormat.
yyyy-MM-dd
Declares the date format for any columns that are date type.
Both
timestampFormat
Any string or character that conforms to Java’s SimpleDataFormat.
yyyy-MM-dd’T’HH:mm:ss.SSSZZ
Declares the timestamp format for any columns that are timestamp type.
Read
primitiveAsString
true, false
false
Infers all primitive values as string type.
Read
allowComments
true, false
false
Ignores Java/C++ style comment in JSON records.
Read
allowUnquotedFieldNames
true, false
false
Allows unquoted JSON field names.
Read
allowSingleQuotes
true, false
true
Allows single quotes in addition to double quotes.
Read
allowNumericLeadingZeros
true, false
false
Allows leading zeroes in numbers (e.g., 00012).
Read
allowBackslashEscapingAnyCharacter
true, false
false
Allows accepting quoting of all characters using backslash quoting mechanism.
Read
columnNameOfCorruptRecord
Any string
Value of spark.sql.column&NameOfCorruptRecord
Allows renaming the new field having a malformed string created by permissive mode. This will override the configuration value.
Read
multiLine
true, false
false
Allows for reading in non-line-delimited JSON files.

  • Following shows an example of reading from csv files and writing to JSON file

1    import org.apache.spark.sql._
2    import org.apache.spark.sql.types._
3   
4    object SparkDefinitiveTesting {
5   
6      def main(args: Array[String]): Unit = {
7        val spark = SparkSession.builder().master("local[*]").appName("Test App").getOrCreate()
8        spark.sparkContext.setLogLevel("FATAL")
9   
10       val manualSchema = StructType(Seq(StructField("InvoiceNo", StringType, true),
11         StructField("StockCode", StringType, true),
12         StructField("Description", StringType, true),
13         StructField("Quantity", IntegerType, true),
14         StructField("InvoiceDate", TimestampType, true),
15         StructField("UnitPrice", DoubleType, true),
16         StructField("CustomerID", DoubleType, true),
17         StructField("Country", StringType, true)))
18  
19       val df_csv_manual = spark.read.format("csv").option("header", "true").option("mode", "FAILFAST").option("inferSchema", "true").schema(manualSchema).load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\by-day").repartition(5)
20       println("Count of Dataframe df_csv_manual:" + df_csv_manual.count())
21       df_csv_manual.write.format("json").mode(SaveMode.Overwrite).save("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\jsonop")
22  
23  
24     }
25   }
C:\Users\sukulma\Downloads\Spark-Data\Spark-data\data\retail-data\jsonop>dir
 Volume in drive C is OSDisk
 Volume Serial Number is C05C-4437

 Directory of C:\Users\sukulma\Downloads\Spark-Data\Spark-data\data\retail-data\jsonop

04/15/2019  01:16 AM    <DIR>          .
04/15/2019  01:16 AM    <DIR>          ..
04/15/2019  01:16 AM           173,680 .part-00000-f95a066d-d8a1-46fe-b07d-6cde56b4d2b8.json.crc
04/15/2019  01:16 AM           173,668 .part-00001-f95a066d-d8a1-46fe-b07d-6cde56b4d2b8.json.crc
04/15/2019  01:16 AM           173,684 .part-00002-f95a066d-d8a1-46fe-b07d-6cde56b4d2b8.json.crc
04/15/2019  01:16 AM           173,700 .part-00003-f95a066d-d8a1-46fe-b07d-6cde56b4d2b8.json.crc
04/15/2019  01:16 AM           173,676 .part-00004-f95a066d-d8a1-46fe-b07d-6cde56b4d2b8.json.crc
04/15/2019  01:16 AM                 8 ._SUCCESS.crc
04/15/2019  01:16 AM        22,229,808 part-00000-f95a066d-d8a1-46fe-b07d-6cde56b4d2b8.json
04/15/2019  01:16 AM        22,228,436 part-00001-f95a066d-d8a1-46fe-b07d-6cde56b4d2b8.json
04/15/2019  01:16 AM        22,230,267 part-00002-f95a066d-d8a1-46fe-b07d-6cde56b4d2b8.json
04/15/2019  01:16 AM        22,232,349 part-00003-f95a066d-d8a1-46fe-b07d-6cde56b4d2b8.json
04/15/2019  01:16 AM        22,229,060 part-00004-f95a066d-d8a1-46fe-b07d-6cde56b4d2b8.json
04/15/2019  01:16 AM                 0 _SUCCESS
              12 File(s)    112,018,336 bytes
               2 Dir(s)  77,656,604,672 bytes free

Sample output file:
 
{"InvoiceNo":"580538","StockCode":"22467","Description":"GUMBALL COAT RACK","Quantity":6,"InvoiceDate":"2011-12-05T08:38:00.000-05:00","UnitPrice":2.55,"CustomerID":14075.0,"Country":"United Kingdom"}
{"InvoiceNo":"580539","StockCode":"84030E","Description":"ENGLISH ROSE HOT WATER BOTTLE","Quantity":4,"InvoiceDate":"2011-12-05T08:39:00.000-05:00","UnitPrice":4.25,"CustomerID":18180.0,"Country":"United Kingdom"}
{"InvoiceNo":"580539","StockCode":"23235","Description":"STORAGE TIN VINTAGE LEAF","Quantity":12,"InvoiceDate":"2011-12-05T08:39:00.000-05:00","UnitPrice":1.25,"CustomerID":18180.0,"Country":"United Kingdom"}
{"InvoiceNo":"580539","StockCode":"22375","Description":"AIRLINE BAG VINTAGE JET SET BROWN","Quantity":4,"InvoiceDate":"2011-12-05T08:39:00.000-05:00","UnitPrice":4.25,"CustomerID":18180.0,"Country":"United Kingdom"}

Note that one file per partition is  written out, and the entire DataFrame will be written out as a folder. It will also have one JSON object per line.


CH9.2 Reading and Writing csv files



CSV files:
 
  • CSV stands for comma-separated values.  CSV reader has a large number of options. These options give you the ability to work around issues like certain characters needing to be escaped—for example, commas inside of columns when the file is also comma-delimited or null values labeled in an unconventional way.

  • Following is the list of important options for CSV reader:

Read/write
Key
Potential values
Default
Description
Both
sep
Any single string character
,
The single character that is used as separator for each field and value.
Both
header
true, false
false
A Boolean flag that declares whether the first line in the file(s) are the names of the columns.
Read
escape
Any string character
\
The character Spark should use to escape other characters in the file.
Read
inferSchema
true, false
false
Specifies whether Spark should infer column types when reading the file.
Read
ignoreLeadingWhiteSpace
true, false
false
Declares whether leading spaces from values being read should be skipped.
Read
ignoreTrailingWhiteSpace
true, false
false
Declares whether trailing spaces from values being read should be skipped.
Both
nullValue
Any string character
“”
Declares what character represents a null value in the file.
Both
nanValue
Any string character
NaN
Declares what character represents a NaN or missing character in the CSV file.
Both
positiveInf
Any string or character
Inf
Declares what character(s) represent a positive infinite value.
Both
negativeInf
Any string or character
-Inf
Declares what character(s) represent a negative infinite value.
Both
compression or codec
None, uncompressed, bzip2, deflate, gzip, lz4, or snappy
none
Declares what compression codec Spark should use to read or write the file.
Both
dateFormat
Any string or character that conforms to java’s SimpleDataFormat.
yyyy-MM-dd
Declares the date format for any columns that are date type.
Both
timestampFormat
Any string or character that conforms to java’s SimpleDataFormat.
yyyy-MM-dd’T’HH:mm:ss.SSSZZ
Declares the timestamp format for any columns that are timestamp type.
Read
maxColumns
Any integer
20480
Declares the maximum number of columns in the file.
Read
maxCharsPerColumn
Any integer
1000000
Declares the maximum number of characters in a column.
Read
escapeQuotes
true, false
true
Declares whether Spark should escape quotes that are found in lines.
Read
maxMalformedLogPerPartition
Any integer
10
Sets the maximum number of malformed rows Spark will log for each partition. Malformed records beyond this number will be ignored.
Write
quoteAll
true, false
false
Specifies whether all values should be enclosed in quotes, as opposed to just escaping values that have a quote character.
Read
multiLine
true, false
false
This option allows you to read multiline CSV files where each logical row in the CSV file might span multiple rows in the file itself.

Note we can specify the compression codec to read/write to compressed files.

  • Following shows how to read from csv files. First we first create a DataFrameReader for that specific format.
In below example, we specify that 1) the csv files contains "header". 2) Schema will be inferred from the files 3) set the read mode to FAILFAST which will fail the job if the data is malformed.
Next we also show how to specify the schema explicitly.

1    import org.apache.spark.sql._
2    import org.apache.spark.sql.types._
3   
4    object SparkDefinitiveTesting {
5   
6      def main(args: Array[String]): Unit = {
7        val spark = SparkSession.builder().master("local[*]").appName("Test App").getOrCreate()
8        spark.sparkContext.setLogLevel("FATAL")
9   
10       val df_csv = spark.read.format("csv").option("header", "true").option("mode", "FAILFAST").option("inferSchema", "true").load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\by-day")
11       println("Count of Dataframe df_csv:" + df_csv.count())
12  
13       val manualSchema = StructType(Seq(StructField("InvoiceNo", StringType, true),
14         StructField("StockCode", StringType, true),
15         StructField("Description", StringType, true),
16         StructField("Quantity", IntegerType, true),
17         StructField("InvoiceDate", TimestampType, true),
18         StructField("UnitPrice", DoubleType, true),
19         StructField("CustomerID", DoubleType, true),
20         StructField("Country", StringType, true)))
21  
22       val df_csv_manual = spark.read.format("csv").option("header", "true").option("mode", "FAILFAST").option("inferSchema", "true").schema(manualSchema).load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\by-day")
23       println("Count of Dataframe df_csv_manual:" + df_csv_manual.count())
24     }
25   }
2
Count of Dataframe df_csv:541909
Count of Dataframe df_csv_manual:541909



If the data that we read does not match the schema, the job will fail when Spark actually reads the data.(Not when the code is written). As soon as we start our Spark job, it will immediately fail (after we execute a job) due to the data not conforming to the specified schema. In general, Spark will fail only at job execution time rather than DataFrame definition time—even if, for example, we point to a file that does not exist. This is due to lazy evaluation

  • Following shows how to write data to tab separated files in gzip compressed format. Note that the write method is not available on SparkSession. It's available on the dataframe itself.

SparkDefinitiveTesting.scala
1    import org.apache.spark.sql._
2    import org.apache.spark.sql.types._
3   
4    object SparkDefinitiveTesting {
5   
6      def main(args: Array[String]): Unit = {
7        val spark = SparkSession.builder().master("local[*]").appName("Test App").getOrCreate()
8        spark.sparkContext.setLogLevel("FATAL")
9   
10       val manualSchema = StructType(Seq(StructField("InvoiceNo", StringType, true),
11         StructField("StockCode", StringType, true),
12         StructField("Description", StringType, true),
13         StructField("Quantity", IntegerType, true),
14         StructField("InvoiceDate", TimestampType, true),
15         StructField("UnitPrice", DoubleType, true),
16         StructField("CustomerID", DoubleType, true),
17         StructField("Country", StringType, true)))
18  
19       val df_csv_manual = spark.read.format("csv").option("header", "true").option("mode", "FAILFAST").option("inferSchema", "true").schema(manualSchema).load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\by-day").repartition(5)
20       println("Count of Dataframe df_csv_manual:" + df_csv_manual.count())
21       df_csv_manual.write.format("csv").mode(SaveMode.Overwrite).option("sep", "\t").option("codec", "gzip").save("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\tsv")
22  
23  
24     }
25   }
26  
C:\Users\sukulma>cd C:\Users\sukulma\Downloads\Spark-Data\Spark-data\data\retail-data\tsv

C:\Users\sukulma\Downloads\Spark-Data\Spark-data\data\retail-data\tsv>dir
 Volume in drive C is OSDisk
 Volume Serial Number is C05C-4437

 Directory of C:\Users\sukulma\Downloads\Spark-Data\Spark-data\data\retail-data\tsv

04/14/2019  02:00 PM    <DIR>          .
04/14/2019  02:00 PM    <DIR>          ..
04/14/2019  01:59 PM            15,328 .part-00000-b190aff7-124c-4991-aed8-f3529d982c33.csv.gz.crc
04/14/2019  01:59 PM            15,308 .part-00001-b190aff7-124c-4991-aed8-f3529d982c33.csv.gz.crc
04/14/2019  01:59 PM            15,308 .part-00002-b190aff7-124c-4991-aed8-f3529d982c33.csv.gz.crc
04/14/2019  01:59 PM            15,328 .part-00003-b190aff7-124c-4991-aed8-f3529d982c33.csv.gz.crc
04/14/2019  02:00 PM            15,332 .part-00004-b190aff7-124c-4991-aed8-f3529d982c33.csv.gz.crc
04/14/2019  02:00 PM                 8 ._SUCCESS.crc
04/14/2019  01:59 PM         1,960,940 part-00000-b190aff7-124c-4991-aed8-f3529d982c33.csv.gz
04/14/2019  01:59 PM         1,958,236 part-00001-b190aff7-124c-4991-aed8-f3529d982c33.csv.gz
04/14/2019  01:59 PM         1,958,342 part-00002-b190aff7-124c-4991-aed8-f3529d982c33.csv.gz
04/14/2019  01:59 PM         1,960,794 part-00003-b190aff7-124c-4991-aed8-f3529d982c33.csv.gz
04/14/2019  02:00 PM         1,961,057 part-00004-b190aff7-124c-4991-aed8-f3529d982c33.csv.gz
04/14/2019  02:00 PM                 0 _SUCCESS
              12 File(s)      9,875,981 bytes
               2 Dir(s)  77,774,761,984 bytes free

Note that 5 .gz files are created inside the directory. This actually reflects the number of partitions in our DataFrame at the time we write it out.