Search This Blog

Monday, 15 April 2019

CH9.7 Advanced IO Concepts



Advanced IO Concepts:

 
  • We know that we can control the parallelism of files that we write by controlling the partitions prior to writing. We can also control specific data layout by controlling two things: bucketing and partitioning.

  • Splittable File types and Compression: Certain file formats are fundamentally “splittable.” This can improve speed because it makes it possible for Spark to avoid reading an entire file, and access only the parts of the file necessary to satisfy your query. Additionally if you’re using something like Hadoop Distributed File System (HDFS), splitting a file can provide further optimization if that file spans multiple blocks. In conjunction with this is a need to manage compression. Not all compression schemes are splittable.
     
  • Reading data in parallel : Multiple executors cannot read from the same file at the same time necessarily, but they can read different files at the same time. In general, this means that when you read from a folder with multiple files in it, each one of those files will become a partition in your DataFrame and be read in by available executors in parallel (with the remaining queueing up behind the others).

  • Writing Data in parallel:  The number of files or data written is dependent on the number of partitions the DataFrame has at the time you write out the data. By default, one file is written per partition of the data

  • Partitioning(Similar to hive partitioning - allows storing data into separate directories) : Partitioning is a tool that allows you to control what data is stored (and where) as you write it.  When you write a file to a partitioned directory (or table), you basically encode a column as a folder. What this allows you to do is skip lots of data when you go to read it in later, allowing you to read in only the data relevant to your problem instead of having to scan the complete dataset.  Partitioning is supported for all file-based data sources.
This is very much similar to Dynamic partitioning in Hive. Works with target hive tables as well.

Following shows an example of writing parquet files with data partitioned by Destination Country

1    import org.apache.spark.sql._
2   
3    object SparkDefinitiveTesting {
4   
5      def main(args: Array[String]): Unit = {
6        val spark = SparkSession.builder().master("local[*]").appName("Test App").getOrCreate()
7        spark.sparkContext.setLogLevel("FATAL")
8   
9        val df_csv = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\flight-data\\csv")
10       df_csv.printSchema()
11       df_csv.write.mode(SaveMode.Overwrite).format("parquet").partitionBy("DEST_COUNTRY_NAME").save("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\flight-data\\parquetoutput")
12       
13     }
14   }
15  
root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: integer (nullable = true)
C:\Users\sukulma\Downloads\Spark-Data\Spark-data\data\flight-data\parquetoutput>dir
 Volume in drive C is OSDisk
 Volume Serial Number is C05C-4437

 Directory of C:\Users\sukulma\Downloads\Spark-Data\Spark-data\data\flight-data\parquetoutput

04/15/2019  02:41 PM    <DIR>          .
04/15/2019  02:41 PM    <DIR>          ..
04/15/2019  02:41 PM                 8 ._SUCCESS.crc
04/15/2019  02:41 PM    <DIR>          DEST_COUNTRY_NAME=Afghanistan
04/15/2019  02:41 PM    <DIR>          DEST_COUNTRY_NAME=Algeria
04/15/2019  02:41 PM    <DIR>          DEST_COUNTRY_NAME=Angola
04/15/2019  02:41 PM    <DIR>          DEST_COUNTRY_NAME=Anguilla
04/15/2019  02:41 PM    <DIR>          DEST_COUNTRY_NAME=Antigua%20and%20Barbuda
04/15/2019  02:41 PM    <DIR>          DEST_COUNTRY_NAME=Argentina
04/15/2019  02:41 PM    <DIR>          DEST_COUNTRY_NAME=Aruba
04/15/2019  02:41 PM    <DIR>          DEST_COUNTRY_NAME=Australia

 
C:\Users\sukulma\Downloads\Spark-Data\Spark-data\data\flight-data\parquetoutput>cd "DEST_COUNTRY_NAME=Australia"

C:\Users\sukulma\Downloads\Spark-Data\Spark-data\data\flight-data\parquetoutput\DEST_COUNTRY_NAME=Australia>dir
 Volume in drive C is OSDisk
 Volume Serial Number is C05C-4437

 Directory of C:\Users\sukulma\Downloads\Spark-Data\Spark-data\data\flight-data\parquetoutput\DEST_COUNTRY_NAME=Australia

04/15/2019  02:41 PM    <DIR>          .
04/15/2019  02:41 PM    <DIR>          ..
04/15/2019  02:39 PM                16 .part-00000-396c9f1b-6f72-47f2-a1b0-cbd428c5e038.snappy.parquet.crc
04/15/2019  02:39 PM                16 .part-00001-396c9f1b-6f72-47f2-a1b0-cbd428c5e038.snappy.parquet.crc
04/15/2019  02:39 PM                16 .part-00002-396c9f1b-6f72-47f2-a1b0-cbd428c5e038.snappy.parquet.crc
04/15/2019  02:39 PM               649 part-00000-396c9f1b-6f72-47f2-a1b0-cbd428c5e038.snappy.parquet
04/15/2019  02:39 PM               649 part-00001-396c9f1b-6f72-47f2-a1b0-cbd428c5e038.snappy.parquet
04/15/2019  02:39 PM               649 part-00002-396c9f1b-6f72-47f2-a1b0-cbd428c5e038.snappy.parquet
               6 File(s)          1,995 bytes
               2 Dir(s)  75,312,287,744 bytes free

  • Bucketing: Bucketing is another file organization approach with which you can control the data that is specifically written to each file. This can help avoid shuffles later when you go to read the data because data with the same bucket ID will all be grouped together into one physical partition. This means that the data is prepartitioned according to how you expect to use that data later on, meaning you can avoid expensive shuffles when joining or aggregating.

Following is the definition of the bucketBy method on the DataFrameWriter

def bucketBy(numBuckets: Int, colName: String, colNames: String*): DataFrameWriter[T]
Buckets the output by the given columns. If specified, the output is laid out on the file system similar to Hive's bucketing scheme.
This is applicable for all file-based data sources (e.g. Parquet, JSON) staring Spark .

Following shows the example of writing to 10 buckets. (This code should have worked, but didn’t as I was using older versions of spark)

1    import org.apache.spark.sql._
2   
3    object SparkDefinitiveTesting {
4   
5      def main(args: Array[String]): Unit = {
6        val spark = SparkSession.builder().master("local[*]").appName("Test App").getOrCreate()
7    //    spark.sparkContext.setLogLevel("FATAL")
8   
9        val df_csv = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\flight-data\\csv")
10       df_csv.printSchema()
11       df_csv.write.mode(SaveMode.Overwrite).bucketBy(10,"DEST_COUNTRY_NAME").parquet("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\flight-data\\parquetoutput")
12  
13     }
14   }
root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: integer (nullable = true)

Exception in thread "main" org.apache.spark.sql.AnalysisException: 'save' does not support bucketing right now;

  • Managing File Size:  Managing file sizes is an important factor not so much for writing data but reading it later on. When you’re writing lots of small files, there’s a significant metadata overhead that you incur managing all of those files.

We saw previously that the number of output files is a derivative of the number of partitions we had at write time . We can use the maxRecordsPerFile option and specify a number of  records that are written to each file.

We can use the option maxRecordsPerFile to specify the number of records in the output files.(introduced in Spark 2.2)

1    import org.apache.spark.sql._
2   
3    object SparkDefinitiveTesting {
4   
5      def main(args: Array[String]): Unit = {
6        val spark = SparkSession.builder().master("local[*]").appName("Test App").getOrCreate()
7    //    spark.sparkContext.setLogLevel("FATAL")
8   
9        val df_csv = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\all")
10       df_csv.printSchema()
11       df_csv.write.mode(SaveMode.Overwrite).option("maxRecordsPerFile",2001).csv("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\flight-data\\csvfilesized")
12  
13     }
14   }





No comments:

Post a Comment