Search This Blog

Monday, 15 April 2019

CH15.1 Spark Application architecture, Execution Modes


    • We know how the structured APIs take a logical operation, break it up into a logical plan, and convert that to a physical plan that actually consists of Resilient Distributed Dataset (RDD) operations that execute across the cluster of machines. Here we learn about what happens when Spark goes about executing that code.
Architecture of a Spark Application:

  • The Spark driver:
    • The driver process is the controller of the execution of a Spark Application and maintains all of the state of the Spark cluster (the state and tasks of the executors).
    • It must interface with the cluster manager in order to actually get physical resources and launch executors.
    • Simply put this is just a process on a physical machine that is responsible for maintaining the state of the application running on the cluster.

  • The Spark Executors:
    • Spark executors are the processes that perform the tasks assigned by the Spark driver. 
    • Executors have one core responsibility: take the tasks assigned by the driver, run them, and report back their state (success or failure) and results.
    • Each Spark Application has its own separate executor processes.

  • The cluster Manager:
    • The cluster manager is responsible for maintaining a cluster of machines that will run your Spark Application(s). a cluster manager will have its own “driver” (sometimes called master) and “worker” abstractions. 
    • V.Imp: The core difference is that these are tied to physical machines rather than processes .
    • When it comes time to actually run a Spark Application, we request resources from the cluster manager to run it. Depending on how our application is configured, this can include a place to run the Spark driver or might be just resources for the executors for our Spark Application.
    • Spark currently supports three cluster managers: a simple built-in standalone cluster manager, Apache Mesos, and Hadoop YARN

Execution Modes:

  • An execution mode gives you the power to determine where the resources are physically located when you go to run your application. There are 3 options - Cluster Mode, Local Mode, Client mode.

  • Cluster mode:
    • In Cluster mode, when a user submits a pre-compiled JAR, Python script, or R script to a cluster manager, the cluster manager then launches the driver process on a worker node inside the cluster, in addition to the executor processes. 
    • This means that the cluster manager is responsible for maintaining all Spark Application–related processes.
       
  • Client Mode:
    • Client mode is nearly the same as cluster mode except that the Spark driver remains on the client machine that submitted the application. This means that the client machine is responsible for maintaining the Spark driver process, and the cluster manager maintains the executor processes.
    • In this mode we are running the Spark Application from a machine that is not colocated on the cluster. These machines are commonly referred to as gateway machines or edge nodes.
       
  • Local Mode:
    • This mode runs the entire Spark Application on a single machine. It achieves parallelism through threads on that single machine. 
    • Not recommend using local mode for running production applications.

CH9.7 Advanced IO Concepts



Advanced IO Concepts:

 
  • We know that we can control the parallelism of files that we write by controlling the partitions prior to writing. We can also control specific data layout by controlling two things: bucketing and partitioning.

  • Splittable File types and Compression: Certain file formats are fundamentally “splittable.” This can improve speed because it makes it possible for Spark to avoid reading an entire file, and access only the parts of the file necessary to satisfy your query. Additionally if you’re using something like Hadoop Distributed File System (HDFS), splitting a file can provide further optimization if that file spans multiple blocks. In conjunction with this is a need to manage compression. Not all compression schemes are splittable.
     
  • Reading data in parallel : Multiple executors cannot read from the same file at the same time necessarily, but they can read different files at the same time. In general, this means that when you read from a folder with multiple files in it, each one of those files will become a partition in your DataFrame and be read in by available executors in parallel (with the remaining queueing up behind the others).

  • Writing Data in parallel:  The number of files or data written is dependent on the number of partitions the DataFrame has at the time you write out the data. By default, one file is written per partition of the data

  • Partitioning(Similar to hive partitioning - allows storing data into separate directories) : Partitioning is a tool that allows you to control what data is stored (and where) as you write it.  When you write a file to a partitioned directory (or table), you basically encode a column as a folder. What this allows you to do is skip lots of data when you go to read it in later, allowing you to read in only the data relevant to your problem instead of having to scan the complete dataset.  Partitioning is supported for all file-based data sources.
This is very much similar to Dynamic partitioning in Hive. Works with target hive tables as well.

Following shows an example of writing parquet files with data partitioned by Destination Country

1    import org.apache.spark.sql._
2   
3    object SparkDefinitiveTesting {
4   
5      def main(args: Array[String]): Unit = {
6        val spark = SparkSession.builder().master("local[*]").appName("Test App").getOrCreate()
7        spark.sparkContext.setLogLevel("FATAL")
8   
9        val df_csv = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\flight-data\\csv")
10       df_csv.printSchema()
11       df_csv.write.mode(SaveMode.Overwrite).format("parquet").partitionBy("DEST_COUNTRY_NAME").save("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\flight-data\\parquetoutput")
12       
13     }
14   }
15  
root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: integer (nullable = true)
C:\Users\sukulma\Downloads\Spark-Data\Spark-data\data\flight-data\parquetoutput>dir
 Volume in drive C is OSDisk
 Volume Serial Number is C05C-4437

 Directory of C:\Users\sukulma\Downloads\Spark-Data\Spark-data\data\flight-data\parquetoutput

04/15/2019  02:41 PM    <DIR>          .
04/15/2019  02:41 PM    <DIR>          ..
04/15/2019  02:41 PM                 8 ._SUCCESS.crc
04/15/2019  02:41 PM    <DIR>          DEST_COUNTRY_NAME=Afghanistan
04/15/2019  02:41 PM    <DIR>          DEST_COUNTRY_NAME=Algeria
04/15/2019  02:41 PM    <DIR>          DEST_COUNTRY_NAME=Angola
04/15/2019  02:41 PM    <DIR>          DEST_COUNTRY_NAME=Anguilla
04/15/2019  02:41 PM    <DIR>          DEST_COUNTRY_NAME=Antigua%20and%20Barbuda
04/15/2019  02:41 PM    <DIR>          DEST_COUNTRY_NAME=Argentina
04/15/2019  02:41 PM    <DIR>          DEST_COUNTRY_NAME=Aruba
04/15/2019  02:41 PM    <DIR>          DEST_COUNTRY_NAME=Australia

 
C:\Users\sukulma\Downloads\Spark-Data\Spark-data\data\flight-data\parquetoutput>cd "DEST_COUNTRY_NAME=Australia"

C:\Users\sukulma\Downloads\Spark-Data\Spark-data\data\flight-data\parquetoutput\DEST_COUNTRY_NAME=Australia>dir
 Volume in drive C is OSDisk
 Volume Serial Number is C05C-4437

 Directory of C:\Users\sukulma\Downloads\Spark-Data\Spark-data\data\flight-data\parquetoutput\DEST_COUNTRY_NAME=Australia

04/15/2019  02:41 PM    <DIR>          .
04/15/2019  02:41 PM    <DIR>          ..
04/15/2019  02:39 PM                16 .part-00000-396c9f1b-6f72-47f2-a1b0-cbd428c5e038.snappy.parquet.crc
04/15/2019  02:39 PM                16 .part-00001-396c9f1b-6f72-47f2-a1b0-cbd428c5e038.snappy.parquet.crc
04/15/2019  02:39 PM                16 .part-00002-396c9f1b-6f72-47f2-a1b0-cbd428c5e038.snappy.parquet.crc
04/15/2019  02:39 PM               649 part-00000-396c9f1b-6f72-47f2-a1b0-cbd428c5e038.snappy.parquet
04/15/2019  02:39 PM               649 part-00001-396c9f1b-6f72-47f2-a1b0-cbd428c5e038.snappy.parquet
04/15/2019  02:39 PM               649 part-00002-396c9f1b-6f72-47f2-a1b0-cbd428c5e038.snappy.parquet
               6 File(s)          1,995 bytes
               2 Dir(s)  75,312,287,744 bytes free

  • Bucketing: Bucketing is another file organization approach with which you can control the data that is specifically written to each file. This can help avoid shuffles later when you go to read the data because data with the same bucket ID will all be grouped together into one physical partition. This means that the data is prepartitioned according to how you expect to use that data later on, meaning you can avoid expensive shuffles when joining or aggregating.

Following is the definition of the bucketBy method on the DataFrameWriter

def bucketBy(numBuckets: Int, colName: String, colNames: String*): DataFrameWriter[T]
Buckets the output by the given columns. If specified, the output is laid out on the file system similar to Hive's bucketing scheme.
This is applicable for all file-based data sources (e.g. Parquet, JSON) staring Spark .

Following shows the example of writing to 10 buckets. (This code should have worked, but didn’t as I was using older versions of spark)

1    import org.apache.spark.sql._
2   
3    object SparkDefinitiveTesting {
4   
5      def main(args: Array[String]): Unit = {
6        val spark = SparkSession.builder().master("local[*]").appName("Test App").getOrCreate()
7    //    spark.sparkContext.setLogLevel("FATAL")
8   
9        val df_csv = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\flight-data\\csv")
10       df_csv.printSchema()
11       df_csv.write.mode(SaveMode.Overwrite).bucketBy(10,"DEST_COUNTRY_NAME").parquet("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\flight-data\\parquetoutput")
12  
13     }
14   }
root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: integer (nullable = true)

Exception in thread "main" org.apache.spark.sql.AnalysisException: 'save' does not support bucketing right now;

  • Managing File Size:  Managing file sizes is an important factor not so much for writing data but reading it later on. When you’re writing lots of small files, there’s a significant metadata overhead that you incur managing all of those files.

We saw previously that the number of output files is a derivative of the number of partitions we had at write time . We can use the maxRecordsPerFile option and specify a number of  records that are written to each file.

We can use the option maxRecordsPerFile to specify the number of records in the output files.(introduced in Spark 2.2)

1    import org.apache.spark.sql._
2   
3    object SparkDefinitiveTesting {
4   
5      def main(args: Array[String]): Unit = {
6        val spark = SparkSession.builder().master("local[*]").appName("Test App").getOrCreate()
7    //    spark.sparkContext.setLogLevel("FATAL")
8   
9        val df_csv = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\all")
10       df_csv.printSchema()
11       df_csv.write.mode(SaveMode.Overwrite).option("maxRecordsPerFile",2001).csv("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\flight-data\\csvfilesized")
12  
13     }
14   }





CH9.6 Reading and Writing Text Files


Text Files:

 
  • Spark also allows you to read in plain-text files. Each line in the file becomes a record in the DataFrame. It is then up to you to transform it accordingly. Ex:  Parse some Apache log files to some more structured format.

  • Reading text files is straightforward: you simply specify the type to be textFile. With textFile, partitioned directory names are ignored. To read and write text files according to partitions, you should use "text", which respects partitioning on reading and writing.

Following are the method definitions available on the DataFrameReader:


def text(paths: String*): DataFrame
Loads text files and returns a DataFrame whose schema starts with a string column named "value", and followed by partitioned columns if there are any.
Each line in the text files is a new row in the resulting DataFrame. For example:
// Scala:
spark.read.text("/path/to/spark/README.md")

def text(path: String): DataFrame
Loads text files and returns a DataFrame whose schema starts with a string column named "value", and followed by partitioned columns if there are any. See the documentation on the other overloaded text() method for more details.

def textFile(paths: String*): Dataset[String]
Loads text files and returns a Dataset of String. The underlying schema of the Dataset contains a single string column named "value".
If the directory structure of the text files contains partitioning information, those are ignored in the resulting Dataset. To include partitioning information as columns, use text.
Each line in the text files is a new element in the resulting Dataset. For example:
// Scala:
spark.read.textFile("/path/to/spark/README.md")
// Java:
spark.read().textFile("/path/to/spark/README.md")

def textFile(path: String): Dataset[String]
Loads text files and returns a Dataset of String. See the documentation on the other overloaded textFile() method for more details


  • Writing to text files: When you write a text file, you need to be sure to have only one string column; otherwise, the write will fail.

  • If you perform some partitioning when performing your write you can write more columns. However, those columns will manifest as directories in the folder to which you’re writing out to, instead of columns on every single file
     
Following is the definition of the methods "text" and "partitionBy" available on the DataFrameWriter objects:

def text(path: String): Unit
Saves the content of the DataFrame in a text file at the specified path. The DataFrame must have only one column that is of string type. Each row becomes a new line in the output file. For example:
// Scala:
df.write.text("/path/to/output")
// Java:
df.write().text("/path/to/output")
You can set the following option(s) for writing text files:
compression (default null): compression codec to use when saving to file. This can be one of the known case-insensitive shorten names (none, bzip2, gzip, lz4, snappy and deflate).

def partitionBy(colNames: String*): DataFrameWriter[T]
Partitions the output by the given columns on the file system. If specified, the output is laid out on the file system similar to Hive's partitioning scheme. As an example, when we partition a dataset by year and then month, the directory layout would look like:
year=2016/month=01/
year=2016/month=02/
Partitioning is one of the most widely used techniques to optimize physical data layout. It provides a coarse-grained index for skipping unnecessary data reads when queries have predicates on the partitioned columns. In order for partitioning to work well, the number of distinct values in each column should typically be less than tens of thousands.
This is applicable for all file-based data sources (e.g. Parquet, JSON) staring Spark .

  • Following example shows how to read data from text files and write to a directory with partitions. Pay attention to the field names.

1    import org.apache.spark.sql._
2    import org.apache.spark.sql.types._
3   
4    object SparkDefinitiveTesting {
5   
6      def main(args: Array[String]): Unit = {
7        val spark = SparkSession.builder().master("local[*]").appName("Test App").getOrCreate()
8        spark.sparkContext.setLogLevel("FATAL")
9   
10       val manualSchema = StructType(Seq(StructField("InvoiceNo", StringType, true),
11         StructField("StockCode", StringType, true),
12         StructField("Description", StringType, true),
13         StructField("Quantity", IntegerType, true),
14         StructField("InvoiceDate", TimestampType, true),
15         StructField("UnitPrice", DoubleType, true),
16         StructField("CustomerID", DoubleType, true),
17         StructField("Country", StringType, true)))
18  
19  
20       //Reading csv files as plain text files and extracting infomation.
21       val df_text=spark.read.textFile("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\by-day").repartition(5)
22       df_text.printSchema()
23       df_text.show(5,false)
24       val df_text_split=df_text.selectExpr("split(value,',')[2] as Desc","split(value,',')[7] as Country")
25       df_text_split.printSchema()
26       df_text_split.show(5,false)
27  
28      //Writing files using partitioning.
29       df_text_split.write.partitionBy("Country").text("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\textop")
30  
31  
32     }
33   }
root
 |-- value: string (nullable = true)

+--------------------------------------------------------------------------------------------------+
|value                                                                                             |
+--------------------------------------------------------------------------------------------------+
|580538,21914,BLUE HARMONICA IN BOX ,24,2011-12-05 08:38:00,1.25,14075.0,United Kingdom            |
|580539,21479,WHITE SKULL HOT WATER BOTTLE ,4,2011-12-05 08:39:00,4.25,18180.0,United Kingdom      |
|580539,21411,GINGHAM HEART  DOORSTOP RED,8,2011-12-05 08:39:00,1.95,18180.0,United Kingdom        |
|580539,22372,AIRLINE BAG VINTAGE WORLD CHAMPION ,4,2011-12-05 08:39:00,4.25,18180.0,United Kingdom|
|580539,22389,PAPERWEIGHT SAVE THE PLANET,12,2011-12-05 08:39:00,0.39,18180.0,United Kingdom       |
+--------------------------------------------------------------------------------------------------+
only showing top 5 rows

root
 |-- Desc: string (nullable = true)
 |-- Country: string (nullable = true)

+-----------------------------------+--------------+
|Desc                               |Country       |
+-----------------------------------+--------------+
|BLUE HARMONICA IN BOX              |United Kingdom|
|WHITE SKULL HOT WATER BOTTLE       |United Kingdom|
|GINGHAM HEART  DOORSTOP RED        |United Kingdom|
|AIRLINE BAG VINTAGE WORLD CHAMPION |United Kingdom|
|PAPERWEIGHT SAVE THE PLANET        |United Kingdom|
+-----------------------------------+--------------+
only showing top 5 rows
04/15/2019  05:18 AM    <DIR>          Country=Australia
04/15/2019  05:18 AM    <DIR>          Country=Austria
04/15/2019  05:18 AM    <DIR>          Country=Bahrain
04/15/2019  05:18 AM    <DIR>          Country=Belgium
04/15/2019  05:18 AM    <DIR>          Country=Brazil
04/15/2019  05:18 AM    <DIR>          Country=Canada
04/15/2019  05:18 AM    <DIR>          Country=Channel%20Islands
04/15/2019  05:18 AM    <DIR>          Country=Country
04/15/2019  05:18 AM    <DIR>          Country=Cyprus
04/15/2019  05:18 AM    <DIR>          Country=Czech%20Republic
04/15/2019  05:18 AM    <DIR>          Country=Denmark
04/15/2019  05:18 AM    <DIR>          Country=EIRE
04/15/2019  05:18 AM    <DIR>          Country=European%20Community
04/15/2019  05:18 AM    <DIR>          Country=Finland
04/15/2019  05:18 AM    <DIR>          Country=France
04/15/2019  05:18 AM    <DIR>          Country=Germany
04/15/2019  05:18 AM    <DIR>          Country=Greece
04/15/2019  05:18 AM    <DIR>          Country=Hong%20Kong
04/15/2019  05:18 AM    <DIR>          Country=Iceland