Search This Blog

Monday, 15 April 2019

CH9.7 Advanced IO Concepts



Advanced IO Concepts:

 
  • We know that we can control the parallelism of files that we write by controlling the partitions prior to writing. We can also control specific data layout by controlling two things: bucketing and partitioning.

  • Splittable File types and Compression: Certain file formats are fundamentally “splittable.” This can improve speed because it makes it possible for Spark to avoid reading an entire file, and access only the parts of the file necessary to satisfy your query. Additionally if you’re using something like Hadoop Distributed File System (HDFS), splitting a file can provide further optimization if that file spans multiple blocks. In conjunction with this is a need to manage compression. Not all compression schemes are splittable.
     
  • Reading data in parallel : Multiple executors cannot read from the same file at the same time necessarily, but they can read different files at the same time. In general, this means that when you read from a folder with multiple files in it, each one of those files will become a partition in your DataFrame and be read in by available executors in parallel (with the remaining queueing up behind the others).

  • Writing Data in parallel:  The number of files or data written is dependent on the number of partitions the DataFrame has at the time you write out the data. By default, one file is written per partition of the data

  • Partitioning(Similar to hive partitioning - allows storing data into separate directories) : Partitioning is a tool that allows you to control what data is stored (and where) as you write it.  When you write a file to a partitioned directory (or table), you basically encode a column as a folder. What this allows you to do is skip lots of data when you go to read it in later, allowing you to read in only the data relevant to your problem instead of having to scan the complete dataset.  Partitioning is supported for all file-based data sources.
This is very much similar to Dynamic partitioning in Hive. Works with target hive tables as well.

Following shows an example of writing parquet files with data partitioned by Destination Country

1    import org.apache.spark.sql._
2   
3    object SparkDefinitiveTesting {
4   
5      def main(args: Array[String]): Unit = {
6        val spark = SparkSession.builder().master("local[*]").appName("Test App").getOrCreate()
7        spark.sparkContext.setLogLevel("FATAL")
8   
9        val df_csv = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\flight-data\\csv")
10       df_csv.printSchema()
11       df_csv.write.mode(SaveMode.Overwrite).format("parquet").partitionBy("DEST_COUNTRY_NAME").save("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\flight-data\\parquetoutput")
12       
13     }
14   }
15  
root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: integer (nullable = true)
C:\Users\sukulma\Downloads\Spark-Data\Spark-data\data\flight-data\parquetoutput>dir
 Volume in drive C is OSDisk
 Volume Serial Number is C05C-4437

 Directory of C:\Users\sukulma\Downloads\Spark-Data\Spark-data\data\flight-data\parquetoutput

04/15/2019  02:41 PM    <DIR>          .
04/15/2019  02:41 PM    <DIR>          ..
04/15/2019  02:41 PM                 8 ._SUCCESS.crc
04/15/2019  02:41 PM    <DIR>          DEST_COUNTRY_NAME=Afghanistan
04/15/2019  02:41 PM    <DIR>          DEST_COUNTRY_NAME=Algeria
04/15/2019  02:41 PM    <DIR>          DEST_COUNTRY_NAME=Angola
04/15/2019  02:41 PM    <DIR>          DEST_COUNTRY_NAME=Anguilla
04/15/2019  02:41 PM    <DIR>          DEST_COUNTRY_NAME=Antigua%20and%20Barbuda
04/15/2019  02:41 PM    <DIR>          DEST_COUNTRY_NAME=Argentina
04/15/2019  02:41 PM    <DIR>          DEST_COUNTRY_NAME=Aruba
04/15/2019  02:41 PM    <DIR>          DEST_COUNTRY_NAME=Australia

 
C:\Users\sukulma\Downloads\Spark-Data\Spark-data\data\flight-data\parquetoutput>cd "DEST_COUNTRY_NAME=Australia"

C:\Users\sukulma\Downloads\Spark-Data\Spark-data\data\flight-data\parquetoutput\DEST_COUNTRY_NAME=Australia>dir
 Volume in drive C is OSDisk
 Volume Serial Number is C05C-4437

 Directory of C:\Users\sukulma\Downloads\Spark-Data\Spark-data\data\flight-data\parquetoutput\DEST_COUNTRY_NAME=Australia

04/15/2019  02:41 PM    <DIR>          .
04/15/2019  02:41 PM    <DIR>          ..
04/15/2019  02:39 PM                16 .part-00000-396c9f1b-6f72-47f2-a1b0-cbd428c5e038.snappy.parquet.crc
04/15/2019  02:39 PM                16 .part-00001-396c9f1b-6f72-47f2-a1b0-cbd428c5e038.snappy.parquet.crc
04/15/2019  02:39 PM                16 .part-00002-396c9f1b-6f72-47f2-a1b0-cbd428c5e038.snappy.parquet.crc
04/15/2019  02:39 PM               649 part-00000-396c9f1b-6f72-47f2-a1b0-cbd428c5e038.snappy.parquet
04/15/2019  02:39 PM               649 part-00001-396c9f1b-6f72-47f2-a1b0-cbd428c5e038.snappy.parquet
04/15/2019  02:39 PM               649 part-00002-396c9f1b-6f72-47f2-a1b0-cbd428c5e038.snappy.parquet
               6 File(s)          1,995 bytes
               2 Dir(s)  75,312,287,744 bytes free

  • Bucketing: Bucketing is another file organization approach with which you can control the data that is specifically written to each file. This can help avoid shuffles later when you go to read the data because data with the same bucket ID will all be grouped together into one physical partition. This means that the data is prepartitioned according to how you expect to use that data later on, meaning you can avoid expensive shuffles when joining or aggregating.

Following is the definition of the bucketBy method on the DataFrameWriter

def bucketBy(numBuckets: Int, colName: String, colNames: String*): DataFrameWriter[T]
Buckets the output by the given columns. If specified, the output is laid out on the file system similar to Hive's bucketing scheme.
This is applicable for all file-based data sources (e.g. Parquet, JSON) staring Spark .

Following shows the example of writing to 10 buckets. (This code should have worked, but didn’t as I was using older versions of spark)

1    import org.apache.spark.sql._
2   
3    object SparkDefinitiveTesting {
4   
5      def main(args: Array[String]): Unit = {
6        val spark = SparkSession.builder().master("local[*]").appName("Test App").getOrCreate()
7    //    spark.sparkContext.setLogLevel("FATAL")
8   
9        val df_csv = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\flight-data\\csv")
10       df_csv.printSchema()
11       df_csv.write.mode(SaveMode.Overwrite).bucketBy(10,"DEST_COUNTRY_NAME").parquet("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\flight-data\\parquetoutput")
12  
13     }
14   }
root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: integer (nullable = true)

Exception in thread "main" org.apache.spark.sql.AnalysisException: 'save' does not support bucketing right now;

  • Managing File Size:  Managing file sizes is an important factor not so much for writing data but reading it later on. When you’re writing lots of small files, there’s a significant metadata overhead that you incur managing all of those files.

We saw previously that the number of output files is a derivative of the number of partitions we had at write time . We can use the maxRecordsPerFile option and specify a number of  records that are written to each file.

We can use the option maxRecordsPerFile to specify the number of records in the output files.(introduced in Spark 2.2)

1    import org.apache.spark.sql._
2   
3    object SparkDefinitiveTesting {
4   
5      def main(args: Array[String]): Unit = {
6        val spark = SparkSession.builder().master("local[*]").appName("Test App").getOrCreate()
7    //    spark.sparkContext.setLogLevel("FATAL")
8   
9        val df_csv = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\all")
10       df_csv.printSchema()
11       df_csv.write.mode(SaveMode.Overwrite).option("maxRecordsPerFile",2001).csv("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\flight-data\\csvfilesized")
12  
13     }
14   }





CH9.6 Reading and Writing Text Files


Text Files:

 
  • Spark also allows you to read in plain-text files. Each line in the file becomes a record in the DataFrame. It is then up to you to transform it accordingly. Ex:  Parse some Apache log files to some more structured format.

  • Reading text files is straightforward: you simply specify the type to be textFile. With textFile, partitioned directory names are ignored. To read and write text files according to partitions, you should use "text", which respects partitioning on reading and writing.

Following are the method definitions available on the DataFrameReader:


def text(paths: String*): DataFrame
Loads text files and returns a DataFrame whose schema starts with a string column named "value", and followed by partitioned columns if there are any.
Each line in the text files is a new row in the resulting DataFrame. For example:
// Scala:
spark.read.text("/path/to/spark/README.md")

def text(path: String): DataFrame
Loads text files and returns a DataFrame whose schema starts with a string column named "value", and followed by partitioned columns if there are any. See the documentation on the other overloaded text() method for more details.

def textFile(paths: String*): Dataset[String]
Loads text files and returns a Dataset of String. The underlying schema of the Dataset contains a single string column named "value".
If the directory structure of the text files contains partitioning information, those are ignored in the resulting Dataset. To include partitioning information as columns, use text.
Each line in the text files is a new element in the resulting Dataset. For example:
// Scala:
spark.read.textFile("/path/to/spark/README.md")
// Java:
spark.read().textFile("/path/to/spark/README.md")

def textFile(path: String): Dataset[String]
Loads text files and returns a Dataset of String. See the documentation on the other overloaded textFile() method for more details


  • Writing to text files: When you write a text file, you need to be sure to have only one string column; otherwise, the write will fail.

  • If you perform some partitioning when performing your write you can write more columns. However, those columns will manifest as directories in the folder to which you’re writing out to, instead of columns on every single file
     
Following is the definition of the methods "text" and "partitionBy" available on the DataFrameWriter objects:

def text(path: String): Unit
Saves the content of the DataFrame in a text file at the specified path. The DataFrame must have only one column that is of string type. Each row becomes a new line in the output file. For example:
// Scala:
df.write.text("/path/to/output")
// Java:
df.write().text("/path/to/output")
You can set the following option(s) for writing text files:
compression (default null): compression codec to use when saving to file. This can be one of the known case-insensitive shorten names (none, bzip2, gzip, lz4, snappy and deflate).

def partitionBy(colNames: String*): DataFrameWriter[T]
Partitions the output by the given columns on the file system. If specified, the output is laid out on the file system similar to Hive's partitioning scheme. As an example, when we partition a dataset by year and then month, the directory layout would look like:
year=2016/month=01/
year=2016/month=02/
Partitioning is one of the most widely used techniques to optimize physical data layout. It provides a coarse-grained index for skipping unnecessary data reads when queries have predicates on the partitioned columns. In order for partitioning to work well, the number of distinct values in each column should typically be less than tens of thousands.
This is applicable for all file-based data sources (e.g. Parquet, JSON) staring Spark .

  • Following example shows how to read data from text files and write to a directory with partitions. Pay attention to the field names.

1    import org.apache.spark.sql._
2    import org.apache.spark.sql.types._
3   
4    object SparkDefinitiveTesting {
5   
6      def main(args: Array[String]): Unit = {
7        val spark = SparkSession.builder().master("local[*]").appName("Test App").getOrCreate()
8        spark.sparkContext.setLogLevel("FATAL")
9   
10       val manualSchema = StructType(Seq(StructField("InvoiceNo", StringType, true),
11         StructField("StockCode", StringType, true),
12         StructField("Description", StringType, true),
13         StructField("Quantity", IntegerType, true),
14         StructField("InvoiceDate", TimestampType, true),
15         StructField("UnitPrice", DoubleType, true),
16         StructField("CustomerID", DoubleType, true),
17         StructField("Country", StringType, true)))
18  
19  
20       //Reading csv files as plain text files and extracting infomation.
21       val df_text=spark.read.textFile("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\by-day").repartition(5)
22       df_text.printSchema()
23       df_text.show(5,false)
24       val df_text_split=df_text.selectExpr("split(value,',')[2] as Desc","split(value,',')[7] as Country")
25       df_text_split.printSchema()
26       df_text_split.show(5,false)
27  
28      //Writing files using partitioning.
29       df_text_split.write.partitionBy("Country").text("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\textop")
30  
31  
32     }
33   }
root
 |-- value: string (nullable = true)

+--------------------------------------------------------------------------------------------------+
|value                                                                                             |
+--------------------------------------------------------------------------------------------------+
|580538,21914,BLUE HARMONICA IN BOX ,24,2011-12-05 08:38:00,1.25,14075.0,United Kingdom            |
|580539,21479,WHITE SKULL HOT WATER BOTTLE ,4,2011-12-05 08:39:00,4.25,18180.0,United Kingdom      |
|580539,21411,GINGHAM HEART  DOORSTOP RED,8,2011-12-05 08:39:00,1.95,18180.0,United Kingdom        |
|580539,22372,AIRLINE BAG VINTAGE WORLD CHAMPION ,4,2011-12-05 08:39:00,4.25,18180.0,United Kingdom|
|580539,22389,PAPERWEIGHT SAVE THE PLANET,12,2011-12-05 08:39:00,0.39,18180.0,United Kingdom       |
+--------------------------------------------------------------------------------------------------+
only showing top 5 rows

root
 |-- Desc: string (nullable = true)
 |-- Country: string (nullable = true)

+-----------------------------------+--------------+
|Desc                               |Country       |
+-----------------------------------+--------------+
|BLUE HARMONICA IN BOX              |United Kingdom|
|WHITE SKULL HOT WATER BOTTLE       |United Kingdom|
|GINGHAM HEART  DOORSTOP RED        |United Kingdom|
|AIRLINE BAG VINTAGE WORLD CHAMPION |United Kingdom|
|PAPERWEIGHT SAVE THE PLANET        |United Kingdom|
+-----------------------------------+--------------+
only showing top 5 rows
04/15/2019  05:18 AM    <DIR>          Country=Australia
04/15/2019  05:18 AM    <DIR>          Country=Austria
04/15/2019  05:18 AM    <DIR>          Country=Bahrain
04/15/2019  05:18 AM    <DIR>          Country=Belgium
04/15/2019  05:18 AM    <DIR>          Country=Brazil
04/15/2019  05:18 AM    <DIR>          Country=Canada
04/15/2019  05:18 AM    <DIR>          Country=Channel%20Islands
04/15/2019  05:18 AM    <DIR>          Country=Country
04/15/2019  05:18 AM    <DIR>          Country=Cyprus
04/15/2019  05:18 AM    <DIR>          Country=Czech%20Republic
04/15/2019  05:18 AM    <DIR>          Country=Denmark
04/15/2019  05:18 AM    <DIR>          Country=EIRE
04/15/2019  05:18 AM    <DIR>          Country=European%20Community
04/15/2019  05:18 AM    <DIR>          Country=Finland
04/15/2019  05:18 AM    <DIR>          Country=France
04/15/2019  05:18 AM    <DIR>          Country=Germany
04/15/2019  05:18 AM    <DIR>          Country=Greece
04/15/2019  05:18 AM    <DIR>          Country=Hong%20Kong
04/15/2019  05:18 AM    <DIR>          Country=Iceland


CH9.5 Reading and Writing ORC files



ORC Files:

 
  • ORC is a self-describing, type-aware columnar file format designed for Hadoop workloads. It is optimized for large streaming reads, but with integrated support for finding required rows quickly. ORC actually has no options for reading in data because Spark understands the file format quite well.

  • The fundamental difference is that Parquet is further optimized for use with Spark, whereas ORC is further optimized for Hive.
     
  • Following shows an example of writing to orc files:

1    import org.apache.spark.sql._
2    import org.apache.spark.sql.types._
3   
4    object SparkDefinitiveTesting {
5   
6      def main(args: Array[String]): Unit = {
7        val spark = SparkSession.builder().master("local[*]").appName("Test App").getOrCreate()
8        spark.sparkContext.setLogLevel("FATAL")
9   
10       val manualSchema = StructType(Seq(StructField("InvoiceNo", StringType, true),
11         StructField("StockCode", StringType, true),
12         StructField("Description", StringType, true),
13         StructField("Quantity", IntegerType, true),
14         StructField("InvoiceDate", TimestampType, true),
15         StructField("UnitPrice", DoubleType, true),
16         StructField("CustomerID", DoubleType, true),
17         StructField("Country", StringType, true)))
18  
19       val df_csv_manual = spark.read.format("csv").option("header", "true").option("mode", "FAILFAST").option("inferSchema", "true").schema(manualSchema).load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\by-day").repartition(5)
20       println("Count of Dataframe df_csv_manual:" + df_csv_manual.count())
21       df_csv_manual.write.format("orc").mode(SaveMode.Overwrite).save("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\orcop")
22  
23  
24      val df_orc =   spark.read.format("orc").load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\orcop")
25      df_orc.printSchema()
26  
27     }
28   }
C:\Users\sukulma\Downloads\Spark-Data\Spark-data\data\retail-data\orcop>dir
 Volume in drive C is OSDisk
 Volume Serial Number is C05C-4437

 Directory of C:\Users\sukulma\Downloads\Spark-Data\Spark-data\data\retail-data\orcop

04/15/2019  02:19 AM    <DIR>          .
04/15/2019  02:19 AM    <DIR>          ..
04/15/2019  02:19 AM             9,776 .part-00000-ae4f177b-fc07-42f3-a8c6-0ce6a859c281.snappy.orc.crc
04/15/2019  02:19 AM             9,784 .part-00001-ae4f177b-fc07-42f3-a8c6-0ce6a859c281.snappy.orc.crc
04/15/2019  02:19 AM             9,788 .part-00002-ae4f177b-fc07-42f3-a8c6-0ce6a859c281.snappy.orc.crc
04/15/2019  02:19 AM             9,792 .part-00003-ae4f177b-fc07-42f3-a8c6-0ce6a859c281.snappy.orc.crc
04/15/2019  02:19 AM             9,788 .part-00004-ae4f177b-fc07-42f3-a8c6-0ce6a859c281.snappy.orc.crc
04/15/2019  02:19 AM                 8 ._SUCCESS.crc
04/15/2019  02:19 AM         1,250,159 part-00000-ae4f177b-fc07-42f3-a8c6-0ce6a859c281.snappy.orc
04/15/2019  02:19 AM         1,251,208 part-00001-ae4f177b-fc07-42f3-a8c6-0ce6a859c281.snappy.orc
04/15/2019  02:19 AM         1,251,806 part-00002-ae4f177b-fc07-42f3-a8c6-0ce6a859c281.snappy.orc
04/15/2019  02:19 AM         1,252,097 part-00003-ae4f177b-fc07-42f3-a8c6-0ce6a859c281.snappy.orc
04/15/2019  02:19 AM         1,251,355 part-00004-ae4f177b-fc07-42f3-a8c6-0ce6a859c281.snappy.orc
04/15/2019  02:19 AM                 0 _SUCCESS
              12 File(s)      6,305,561 bytes
               2 Dir(s)  77,677,350,912 bytes free

Result:
Count of Dataframe df_csv_manual:541909
root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)