Search This Blog

Sunday, 14 April 2019

CH9.4 Reading and Writing Parquet Files



Parquet Files
 
  • Parquet is an open source column-oriented data store that provides a variety of storage optimizations, especially for analytics workloads.

  • It provides columnar compression, which saves storage space and allows for reading individual columns instead of entire files.

  • It is a file format that works exceptionally well with Apache Spark and is in fact the default file format.

  • Its recommended that we write data out to Parquet for long-term storage because reading from a Parquet file will always be more efficient than JSON or CSV. Another advantage of Parquet is that it supports complex types. This means that if your column is an array (which would fail with a CSV file, for example), map, or struct, you’ll still be able to read and write that file without issue. 

  • As shown below parquet has very few options because it enforces its own schema when storing data. Thus, all we need to set is the format and we are good to go. (schema is built into the file itself (so no inference needed).

  • Following are the options available with Parquet data source:

Read/Write
Key
Potential Values
Default
Description
Write
compressionor codec
None, uncompressed, bzip2, deflate, gzip, lz4, or snappy
None
Declares what compression codec Spark should use to read or write the file.
Read
mergeSchema
true, false
Value of the configuration spark.sql.parquet.mergeSchema
You can incrementally add columns to newly written Parquet files in the same table/folder. Use this option to enable or disable this feature.

Even though there are only two options, you can still encounter problems if you’re working with incompatible Parquet files. Be careful when you write out Parquet files with different versions of Spark 
 
  • Following example shows how to store data from csv files as parquet and then read the same parquet files to print the Schema

1    import org.apache.spark.sql._
2    import org.apache.spark.sql.types._
3   
4    object SparkDefinitiveTesting {
5   
6      def main(args: Array[String]): Unit = {
7        val spark = SparkSession.builder().master("local[*]").appName("Test App").getOrCreate()
8        spark.sparkContext.setLogLevel("FATAL")
9   
10       val manualSchema = StructType(Seq(StructField("InvoiceNo", StringType, true),
11         StructField("StockCode", StringType, true),
12         StructField("Description", StringType, true),
13         StructField("Quantity", IntegerType, true),
14         StructField("InvoiceDate", TimestampType, true),
15         StructField("UnitPrice", DoubleType, true),
16         StructField("CustomerID", DoubleType, true),
17         StructField("Country", StringType, true)))
18  
19       val df_csv_manual = spark.read.format("csv").option("header", "true").option("mode", "FAILFAST").option("inferSchema", "true").schema(manualSchema).load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\by-day").repartition(5)
20       println("Count of Dataframe df_csv_manual:" + df_csv_manual.count())
21       df_csv_manual.write.format("parquet").mode(SaveMode.Overwrite).save("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\parquetop")
22     
23  
24       val df_parquet =   spark.read.format("parquet").load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\parquetop")
25       df_parquet.printSchema()
26       println("Count of Dataframe df_parquet:" + df_parquet.count())
27  
28     }
29   }
30  
Count of Dataframe df_csv_manual:541909

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)

Count of Dataframe df_parquet:541909

 
C:\Users\sukulma\Downloads\Spark-Data\Spark-data\data\retail-data\parquetop>dir
 Volume in drive C is OSDisk
 Volume Serial Number is C05C-4437

 Directory of C:\Users\sukulma\Downloads\Spark-Data\Spark-data\data\retail-data\parquetop

04/15/2019  01:43 AM    <DIR>          .
04/15/2019  01:43 AM    <DIR>          ..
04/15/2019  01:43 AM             9,072 .part-00000-49b4a405-89a4-466b-811e-75baf8a300d2.snappy.parquet.crc
04/15/2019  01:43 AM             9,096 .part-00001-49b4a405-89a4-466b-811e-75baf8a300d2.snappy.parquet.crc
04/15/2019  01:43 AM             9,084 .part-00002-49b4a405-89a4-466b-811e-75baf8a300d2.snappy.parquet.crc
04/15/2019  01:43 AM             9,092 .part-00003-49b4a405-89a4-466b-811e-75baf8a300d2.snappy.parquet.crc
04/15/2019  01:43 AM             9,084 .part-00004-49b4a405-89a4-466b-811e-75baf8a300d2.snappy.parquet.crc
04/15/2019  01:43 AM                 8 ._SUCCESS.crc
04/15/2019  01:43 AM         1,160,039 part-00000-49b4a405-89a4-466b-811e-75baf8a300d2.snappy.parquet
04/15/2019  01:43 AM         1,162,760 part-00001-49b4a405-89a4-466b-811e-75baf8a300d2.snappy.parquet
04/15/2019  01:43 AM         1,161,664 part-00002-49b4a405-89a4-466b-811e-75baf8a300d2.snappy.parquet
04/15/2019  01:43 AM         1,162,664 part-00003-49b4a405-89a4-466b-811e-75baf8a300d2.snappy.parquet
04/15/2019  01:43 AM         1,161,368 part-00004-49b4a405-89a4-466b-811e-75baf8a300d2.snappy.parquet
04/15/2019  01:43 AM                 0 _SUCCESS
              12 File(s)      5,853,931 bytes
               2 Dir(s)  77,645,365,248 bytes free

C:\Users\sukulma\Downloads\Spark-Data\Spark-data\data\retail-data\parquetop>


No comments:

Post a Comment