Search This Blog

Monday, 15 April 2019

CH9.5 Reading and Writing ORC files



ORC Files:

 
  • ORC is a self-describing, type-aware columnar file format designed for Hadoop workloads. It is optimized for large streaming reads, but with integrated support for finding required rows quickly. ORC actually has no options for reading in data because Spark understands the file format quite well.

  • The fundamental difference is that Parquet is further optimized for use with Spark, whereas ORC is further optimized for Hive.
     
  • Following shows an example of writing to orc files:

1    import org.apache.spark.sql._
2    import org.apache.spark.sql.types._
3   
4    object SparkDefinitiveTesting {
5   
6      def main(args: Array[String]): Unit = {
7        val spark = SparkSession.builder().master("local[*]").appName("Test App").getOrCreate()
8        spark.sparkContext.setLogLevel("FATAL")
9   
10       val manualSchema = StructType(Seq(StructField("InvoiceNo", StringType, true),
11         StructField("StockCode", StringType, true),
12         StructField("Description", StringType, true),
13         StructField("Quantity", IntegerType, true),
14         StructField("InvoiceDate", TimestampType, true),
15         StructField("UnitPrice", DoubleType, true),
16         StructField("CustomerID", DoubleType, true),
17         StructField("Country", StringType, true)))
18  
19       val df_csv_manual = spark.read.format("csv").option("header", "true").option("mode", "FAILFAST").option("inferSchema", "true").schema(manualSchema).load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\by-day").repartition(5)
20       println("Count of Dataframe df_csv_manual:" + df_csv_manual.count())
21       df_csv_manual.write.format("orc").mode(SaveMode.Overwrite).save("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\orcop")
22  
23  
24      val df_orc =   spark.read.format("orc").load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\orcop")
25      df_orc.printSchema()
26  
27     }
28   }
C:\Users\sukulma\Downloads\Spark-Data\Spark-data\data\retail-data\orcop>dir
 Volume in drive C is OSDisk
 Volume Serial Number is C05C-4437

 Directory of C:\Users\sukulma\Downloads\Spark-Data\Spark-data\data\retail-data\orcop

04/15/2019  02:19 AM    <DIR>          .
04/15/2019  02:19 AM    <DIR>          ..
04/15/2019  02:19 AM             9,776 .part-00000-ae4f177b-fc07-42f3-a8c6-0ce6a859c281.snappy.orc.crc
04/15/2019  02:19 AM             9,784 .part-00001-ae4f177b-fc07-42f3-a8c6-0ce6a859c281.snappy.orc.crc
04/15/2019  02:19 AM             9,788 .part-00002-ae4f177b-fc07-42f3-a8c6-0ce6a859c281.snappy.orc.crc
04/15/2019  02:19 AM             9,792 .part-00003-ae4f177b-fc07-42f3-a8c6-0ce6a859c281.snappy.orc.crc
04/15/2019  02:19 AM             9,788 .part-00004-ae4f177b-fc07-42f3-a8c6-0ce6a859c281.snappy.orc.crc
04/15/2019  02:19 AM                 8 ._SUCCESS.crc
04/15/2019  02:19 AM         1,250,159 part-00000-ae4f177b-fc07-42f3-a8c6-0ce6a859c281.snappy.orc
04/15/2019  02:19 AM         1,251,208 part-00001-ae4f177b-fc07-42f3-a8c6-0ce6a859c281.snappy.orc
04/15/2019  02:19 AM         1,251,806 part-00002-ae4f177b-fc07-42f3-a8c6-0ce6a859c281.snappy.orc
04/15/2019  02:19 AM         1,252,097 part-00003-ae4f177b-fc07-42f3-a8c6-0ce6a859c281.snappy.orc
04/15/2019  02:19 AM         1,251,355 part-00004-ae4f177b-fc07-42f3-a8c6-0ce6a859c281.snappy.orc
04/15/2019  02:19 AM                 0 _SUCCESS
              12 File(s)      6,305,561 bytes
               2 Dir(s)  77,677,350,912 bytes free

Result:
Count of Dataframe df_csv_manual:541909
root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)


 


Sunday, 14 April 2019

CH9.4 Reading and Writing Parquet Files



Parquet Files
 
  • Parquet is an open source column-oriented data store that provides a variety of storage optimizations, especially for analytics workloads.

  • It provides columnar compression, which saves storage space and allows for reading individual columns instead of entire files.

  • It is a file format that works exceptionally well with Apache Spark and is in fact the default file format.

  • Its recommended that we write data out to Parquet for long-term storage because reading from a Parquet file will always be more efficient than JSON or CSV. Another advantage of Parquet is that it supports complex types. This means that if your column is an array (which would fail with a CSV file, for example), map, or struct, you’ll still be able to read and write that file without issue. 

  • As shown below parquet has very few options because it enforces its own schema when storing data. Thus, all we need to set is the format and we are good to go. (schema is built into the file itself (so no inference needed).

  • Following are the options available with Parquet data source:

Read/Write
Key
Potential Values
Default
Description
Write
compressionor codec
None, uncompressed, bzip2, deflate, gzip, lz4, or snappy
None
Declares what compression codec Spark should use to read or write the file.
Read
mergeSchema
true, false
Value of the configuration spark.sql.parquet.mergeSchema
You can incrementally add columns to newly written Parquet files in the same table/folder. Use this option to enable or disable this feature.

Even though there are only two options, you can still encounter problems if you’re working with incompatible Parquet files. Be careful when you write out Parquet files with different versions of Spark 
 
  • Following example shows how to store data from csv files as parquet and then read the same parquet files to print the Schema

1    import org.apache.spark.sql._
2    import org.apache.spark.sql.types._
3   
4    object SparkDefinitiveTesting {
5   
6      def main(args: Array[String]): Unit = {
7        val spark = SparkSession.builder().master("local[*]").appName("Test App").getOrCreate()
8        spark.sparkContext.setLogLevel("FATAL")
9   
10       val manualSchema = StructType(Seq(StructField("InvoiceNo", StringType, true),
11         StructField("StockCode", StringType, true),
12         StructField("Description", StringType, true),
13         StructField("Quantity", IntegerType, true),
14         StructField("InvoiceDate", TimestampType, true),
15         StructField("UnitPrice", DoubleType, true),
16         StructField("CustomerID", DoubleType, true),
17         StructField("Country", StringType, true)))
18  
19       val df_csv_manual = spark.read.format("csv").option("header", "true").option("mode", "FAILFAST").option("inferSchema", "true").schema(manualSchema).load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\by-day").repartition(5)
20       println("Count of Dataframe df_csv_manual:" + df_csv_manual.count())
21       df_csv_manual.write.format("parquet").mode(SaveMode.Overwrite).save("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\parquetop")
22     
23  
24       val df_parquet =   spark.read.format("parquet").load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\parquetop")
25       df_parquet.printSchema()
26       println("Count of Dataframe df_parquet:" + df_parquet.count())
27  
28     }
29   }
30  
Count of Dataframe df_csv_manual:541909

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)

Count of Dataframe df_parquet:541909

 
C:\Users\sukulma\Downloads\Spark-Data\Spark-data\data\retail-data\parquetop>dir
 Volume in drive C is OSDisk
 Volume Serial Number is C05C-4437

 Directory of C:\Users\sukulma\Downloads\Spark-Data\Spark-data\data\retail-data\parquetop

04/15/2019  01:43 AM    <DIR>          .
04/15/2019  01:43 AM    <DIR>          ..
04/15/2019  01:43 AM             9,072 .part-00000-49b4a405-89a4-466b-811e-75baf8a300d2.snappy.parquet.crc
04/15/2019  01:43 AM             9,096 .part-00001-49b4a405-89a4-466b-811e-75baf8a300d2.snappy.parquet.crc
04/15/2019  01:43 AM             9,084 .part-00002-49b4a405-89a4-466b-811e-75baf8a300d2.snappy.parquet.crc
04/15/2019  01:43 AM             9,092 .part-00003-49b4a405-89a4-466b-811e-75baf8a300d2.snappy.parquet.crc
04/15/2019  01:43 AM             9,084 .part-00004-49b4a405-89a4-466b-811e-75baf8a300d2.snappy.parquet.crc
04/15/2019  01:43 AM                 8 ._SUCCESS.crc
04/15/2019  01:43 AM         1,160,039 part-00000-49b4a405-89a4-466b-811e-75baf8a300d2.snappy.parquet
04/15/2019  01:43 AM         1,162,760 part-00001-49b4a405-89a4-466b-811e-75baf8a300d2.snappy.parquet
04/15/2019  01:43 AM         1,161,664 part-00002-49b4a405-89a4-466b-811e-75baf8a300d2.snappy.parquet
04/15/2019  01:43 AM         1,162,664 part-00003-49b4a405-89a4-466b-811e-75baf8a300d2.snappy.parquet
04/15/2019  01:43 AM         1,161,368 part-00004-49b4a405-89a4-466b-811e-75baf8a300d2.snappy.parquet
04/15/2019  01:43 AM                 0 _SUCCESS
              12 File(s)      5,853,931 bytes
               2 Dir(s)  77,645,365,248 bytes free

C:\Users\sukulma\Downloads\Spark-Data\Spark-data\data\retail-data\parquetop>


CH9.3 Reading and Writing json files


JSON file:
 
  • In Spark, when we refer to JSON files, we refer to line-delimited JSON files. This contrasts with files that have a large JSON object or array per file.
     
  • The line-delimited versus multiline trade-off is controlled by a single option: multiLine. When you set this option to true, you can read an entire file as one json object and Spark will go through the work of parsing that into a DataFrame.
     
  • Line-delimited JSON is actually a much more stable format because it allows you to append to a file with a new record (rather than having to read in an entire file and then write it out), which is what we recommend that you use. Another key reason for the popularity of line-delimited JSON is because JSON objects have structure, and JavaScript (on which JSON is based) has at least basic types. This makes it easier to work with because Spark can make more assumptions on our behalf about the data.
     

Read/write
Key
Potential values
Default
Description
Both
compression or codec
None, uncompressed, bzip2, deflate, gzip, lz4, or snappy
none
Declares what compression codec Spark should use to read or write the file.
Both
dateFormat
Any string or character that conforms to Java’s SimpleDataFormat.
yyyy-MM-dd
Declares the date format for any columns that are date type.
Both
timestampFormat
Any string or character that conforms to Java’s SimpleDataFormat.
yyyy-MM-dd’T’HH:mm:ss.SSSZZ
Declares the timestamp format for any columns that are timestamp type.
Read
primitiveAsString
true, false
false
Infers all primitive values as string type.
Read
allowComments
true, false
false
Ignores Java/C++ style comment in JSON records.
Read
allowUnquotedFieldNames
true, false
false
Allows unquoted JSON field names.
Read
allowSingleQuotes
true, false
true
Allows single quotes in addition to double quotes.
Read
allowNumericLeadingZeros
true, false
false
Allows leading zeroes in numbers (e.g., 00012).
Read
allowBackslashEscapingAnyCharacter
true, false
false
Allows accepting quoting of all characters using backslash quoting mechanism.
Read
columnNameOfCorruptRecord
Any string
Value of spark.sql.column&NameOfCorruptRecord
Allows renaming the new field having a malformed string created by permissive mode. This will override the configuration value.
Read
multiLine
true, false
false
Allows for reading in non-line-delimited JSON files.

  • Following shows an example of reading from csv files and writing to JSON file

1    import org.apache.spark.sql._
2    import org.apache.spark.sql.types._
3   
4    object SparkDefinitiveTesting {
5   
6      def main(args: Array[String]): Unit = {
7        val spark = SparkSession.builder().master("local[*]").appName("Test App").getOrCreate()
8        spark.sparkContext.setLogLevel("FATAL")
9   
10       val manualSchema = StructType(Seq(StructField("InvoiceNo", StringType, true),
11         StructField("StockCode", StringType, true),
12         StructField("Description", StringType, true),
13         StructField("Quantity", IntegerType, true),
14         StructField("InvoiceDate", TimestampType, true),
15         StructField("UnitPrice", DoubleType, true),
16         StructField("CustomerID", DoubleType, true),
17         StructField("Country", StringType, true)))
18  
19       val df_csv_manual = spark.read.format("csv").option("header", "true").option("mode", "FAILFAST").option("inferSchema", "true").schema(manualSchema).load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\by-day").repartition(5)
20       println("Count of Dataframe df_csv_manual:" + df_csv_manual.count())
21       df_csv_manual.write.format("json").mode(SaveMode.Overwrite).save("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\jsonop")
22  
23  
24     }
25   }
C:\Users\sukulma\Downloads\Spark-Data\Spark-data\data\retail-data\jsonop>dir
 Volume in drive C is OSDisk
 Volume Serial Number is C05C-4437

 Directory of C:\Users\sukulma\Downloads\Spark-Data\Spark-data\data\retail-data\jsonop

04/15/2019  01:16 AM    <DIR>          .
04/15/2019  01:16 AM    <DIR>          ..
04/15/2019  01:16 AM           173,680 .part-00000-f95a066d-d8a1-46fe-b07d-6cde56b4d2b8.json.crc
04/15/2019  01:16 AM           173,668 .part-00001-f95a066d-d8a1-46fe-b07d-6cde56b4d2b8.json.crc
04/15/2019  01:16 AM           173,684 .part-00002-f95a066d-d8a1-46fe-b07d-6cde56b4d2b8.json.crc
04/15/2019  01:16 AM           173,700 .part-00003-f95a066d-d8a1-46fe-b07d-6cde56b4d2b8.json.crc
04/15/2019  01:16 AM           173,676 .part-00004-f95a066d-d8a1-46fe-b07d-6cde56b4d2b8.json.crc
04/15/2019  01:16 AM                 8 ._SUCCESS.crc
04/15/2019  01:16 AM        22,229,808 part-00000-f95a066d-d8a1-46fe-b07d-6cde56b4d2b8.json
04/15/2019  01:16 AM        22,228,436 part-00001-f95a066d-d8a1-46fe-b07d-6cde56b4d2b8.json
04/15/2019  01:16 AM        22,230,267 part-00002-f95a066d-d8a1-46fe-b07d-6cde56b4d2b8.json
04/15/2019  01:16 AM        22,232,349 part-00003-f95a066d-d8a1-46fe-b07d-6cde56b4d2b8.json
04/15/2019  01:16 AM        22,229,060 part-00004-f95a066d-d8a1-46fe-b07d-6cde56b4d2b8.json
04/15/2019  01:16 AM                 0 _SUCCESS
              12 File(s)    112,018,336 bytes
               2 Dir(s)  77,656,604,672 bytes free

Sample output file:
 
{"InvoiceNo":"580538","StockCode":"22467","Description":"GUMBALL COAT RACK","Quantity":6,"InvoiceDate":"2011-12-05T08:38:00.000-05:00","UnitPrice":2.55,"CustomerID":14075.0,"Country":"United Kingdom"}
{"InvoiceNo":"580539","StockCode":"84030E","Description":"ENGLISH ROSE HOT WATER BOTTLE","Quantity":4,"InvoiceDate":"2011-12-05T08:39:00.000-05:00","UnitPrice":4.25,"CustomerID":18180.0,"Country":"United Kingdom"}
{"InvoiceNo":"580539","StockCode":"23235","Description":"STORAGE TIN VINTAGE LEAF","Quantity":12,"InvoiceDate":"2011-12-05T08:39:00.000-05:00","UnitPrice":1.25,"CustomerID":18180.0,"Country":"United Kingdom"}
{"InvoiceNo":"580539","StockCode":"22375","Description":"AIRLINE BAG VINTAGE JET SET BROWN","Quantity":4,"InvoiceDate":"2011-12-05T08:39:00.000-05:00","UnitPrice":4.25,"CustomerID":18180.0,"Country":"United Kingdom"}

Note that one file per partition is  written out, and the entire DataFrame will be written out as a folder. It will also have one JSON object per line.