Search This Blog

Tuesday, 16 April 2019

CH15.2 Life Cycle of Spark Application (SparkSession and SparkContext)


The life Cycle of a Spark Application (outside Spark)

  • Client Request:
    • 1st we submit an actual application.(a pre-compiled JAR or library).At this point, you are executing code on your local machine and you’re going to make a request to the cluster manager driver node .
    • We are explicitly asking for resources for the Spark driver process only. We assume that the cluster manager accepts this offer and places the driver onto a node in the cluster. The client process that submitted the original job exits and the application is off and running on the cluster.
    • To submit the application we would need to use spark-submit as shown below:

./bin/spark-submit \
  --class <main-class> \
  --master <master-url> \
  --deploy-mode cluster \
  --conf <key>=<value> \
  ... # other options
  <application-jar> \
  [application-arguments]


  • Launch :
    • Once the driver process is running it begins running Spark user Code. This code must include a SparkSession that initializes a Spark cluster (e.g., driver + executors).(Note Spark Cluster is one for one application. Spark processes are temporary processes.
    • The SparkSession will subsequently communicate with the cluster manager asking it to launch Spark executor processes across the cluster. The number of executors and their relevant configurations are set by the user via the command-line arguments in the original spark-submit call.
    • The cluster manager responds by launching the executor processes (assuming all goes well) and sends the relevant information about their locations to the driver process. After everything is hooked up correctly, we have a “Spark Cluster” 

  • Execution:
    • The driver and the workers communicate among themselves, executing code and moving data around. The driver schedules tasks onto each worker, and each worker responds with the status of those tasks and success or failure. 

  • Completion:
    • After a Spark Application completes, the driver process exits with either success or failure . The cluster manager then shuts down the executors in that Spark cluster for the driver

The Life Cycle of a Spark Application (Inside Spark)
 

Each application is made up of one or more Spark jobs. Spark jobs within an application are executed serially (unless you use threading to launch multiple actions in parallel).

  • SparkSession:
    • The first step of any Spark Application is creating a SparkSession. In Spark REPL its done for us. But in our code, we need to create one explicitly.
    • Legacy code might use the SparkContext pattern. This should be avoided in favor of the builder method on the SparkSession, which more robustly instantiates the Spark and SQL Contexts and ensures that there is no context conflict.
    • SparkSession Companion object has the method "builder" which gives us several methods to set app name, master etc.
  • Following are the method available on the Builder class. (inner class of SparkSession)

def appName(name: String): Builder
Sets a name for the application, which will be shown in the Spark web UI. If no application name is set, a randomly generated name will be used.

def config(conf: SparkConf): Builder
Sets a list of config options based on the given SparkConf.

def config(key: String, value: Boolean): Builder
Sets a config option. Options set using this method are automatically propagated to both SparkConf and SparkSession's own configuration.

def config(key: String, value: Double): Builder
Sets a config option. Options set using this method are automatically propagated to both SparkConf and SparkSession's own configuration.

def config(key: String, value: Long): Builder
Sets a config option. Options set using this method are automatically propagated to both SparkConf and SparkSession's own configuration.

def config(key: String, value: String): Builder
Sets a config option. Options set using this method are automatically propagated to both SparkConf and SparkSession's own configuration.

def enableHiveSupport(): Builder
Enables Hive support, including connectivity to a persistent Hive metastore, support for Hive serdes, and Hive user-defined functions.

def getOrCreate(): SparkSession
Gets an existing SparkSession or, if there is no existing one, creates a new one based on the options set in this builder.
This method first checks whether there is a valid thread-local SparkSession, and if yes, return that one. It then checks whether there is a valid global default SparkSession, and if yes, return that one. If no valid global default SparkSession exists, the method creates a new SparkSession and assigns the newly created SparkSession as the global default.
In case an existing SparkSession is returned, the config options specified in this builder will be applied to the existing SparkSession.

def master(master: String): Builder
Sets the Spark master URL to connect to, such as "local" to run locally, "local[4]" to run locally with 4 cores, or "spark://master:7077" to run on a Spark standalone cluster.

 val spark = SparkSession.builder().master("local[*]").appName("Test App").getOrCreate()

  • Once we have the SparkSession, we should be able to run our Spark Code. From the SparkSession, you can access all of low-level and legacy contexts and configurations accordingly, as well. 

  • Note that the SparkSession class was only added in Spark 2.X. Older code you might find would instead directly create a SparkContext and a SQLContext for the structured APIs.

  • SparkContext:

  • A SparkContext object within the SparkSession represents the connection to the Spark cluster. This class is how you communicate with some of Spark’s lower-level APIs, such as RDDs.
  • Through a SparkContext, you can create RDDs, accumulators, and broadcast variables, and you can run code on the cluster.
  • In previous versions of Spark, the SQLContext and HiveContext provided the ability to work with DataFrames and Spark SQL and were commonly stored as the variable sqlContext in examples, documentation, and legacy code. 
  • As a historical point, Spark 1.X had effectively two contexts. The SparkContext and theSQLContext. These two each performed different things. The former focused on more fine-grained control of Spark’s central abstractions, whereas the latter focused on the higher-level tools like Spark SQL.
  • In Spark 2.X, the community combined the two APIs into the centralized SparkSession that we have today. However, both of these APIs still exist and you can access them via the SparkSession. It is important to note that you should never need to use the SQLContext and rarely need to use the SparkContext.


  • Spark job:
    • When we have an "action" in our code, it triggers a complete Spark job. We can use "explain" to understand the physical execution plan. We can access this information on the SQL tab (after we actually run a query) in the Spark UI as well. We can go to localhost:4040 if you are running this on your local machine to see the Spark UI.
    • Spark job  individually consist of stages and tasks
    • Each job breaks down into a series of stages, the number of which depends on how many shuffle operations need to take place.

  • Stages:
    • Stages in Spark represent groups of tasks that can be executed together to compute the same operation on multiple machines. In general, Spark will try to pack as much work as possible (i.e., as many transformations as possible inside your job) into the same stage, but the engine starts new stages after operations called shuffles. 
    • A shuffle represents a physical repartitioning of the data. Spark starts a new stage after each shuffle, and keeps track of what order the stages must run in to compute the final result.
    • The spark.sql.shuffle.partitions default value is 200, which means that when there is a shuffle performed during execution, it outputs 200 shuffle partitions by default. You can change this value, and the number of output partitions will change. This value should be set according to the number of cores in your cluster to ensure efficient execution.
spark.conf.set("spark.sql.shuffle.partitions", 50)
  • A good rule of thumb is that the number of partitions should be larger than the number of executors on your cluster, potentially by multiple factors depending on the workload. 

  • Tasks: 
    • Stages in Spark consist of tasks.
    • If there is one big partition in our dataset, we will have one task. If there are 1,000 little partitions, we will have 1,000 tasks that can be executed in parallel.
    • A task is just a unit of computation applied to a unit of data (the partition). Partitioning your data into a greater number of partitions means that more can be executed in parallel. 

  • Execution Details: Spark automatically pipelines stages and tasks that can be done together, such as a map operation followed by another map operation. Second, for all shuffle operations, Spark writes the data to stable storage (e.g., disk), and can reuse it across multiple jobs.
    • Unlike the tools that came before it (e.g., MapReduce), Spark performs as many steps as it can at one point in time before writing data to memory or disk. Pipelining is a key optimization which occurs at and below the RDD level. With pipelining, any sequence of operations that feed data directly into each other, without needing to move it across nodes, is collapsed into a single stage of tasks that do all the operations together.
    • For example, if you write an RDD-based program that does a map, then a filter, then another map, these will result in a single stage of tasks that immediately read each input record, pass it through the first map, pass it through the filter, and pass it through the last map function if needed. This pipelined version of the computation is much faster than writing the intermediate results to memory or disk after each step. From a practical point of view, pipelining will be transparent to you as you write an application—the Spark runtime will automatically do it.
    • When Spark needs to run an operation that has to move data across nodes, such as a reduce-by-key operation (where input data for each key needs to first be brought together from many nodes), the engine can’t perform pipelining anymore, and instead it performs a cross-network shuffle. Spark always executes shuffles by first having the “source” tasks (those sending data) write shuffle files to their local disks during their execution stage. Then, the stage that does the grouping and reduction launches and runs tasks that fetch their corresponding records from each shuffle file and performs that computation (e.g., fetches and processes the data for a specific range of keys).
    • Saving the shuffle files to disk lets Spark run this stage later in time than the source stage (e.g., if there are not enough executors to run both at the same time), and also lets the engine re-launch reduce tasks on failure without rerunning all the input tasks. One side effect you’ll see for shuffle persistence is that running a new job over data that’s already been shuffled does not rerun the “source” side of the shuffle. Because the shuffle files were already written to disk earlier, Spark knows that it can use them to run the later stages of the job, and it need not redo the earlier ones.
    • This automatic optimization can save time in a workload that runs multiple jobs over the same data, but of course, for even better performance you can perform your own caching with the DataFrame or RDD cache method, which lets you control exactly which data is saved and where. 




Monday, 15 April 2019

CH15.1 Spark Application architecture, Execution Modes


    • We know how the structured APIs take a logical operation, break it up into a logical plan, and convert that to a physical plan that actually consists of Resilient Distributed Dataset (RDD) operations that execute across the cluster of machines. Here we learn about what happens when Spark goes about executing that code.
Architecture of a Spark Application:

  • The Spark driver:
    • The driver process is the controller of the execution of a Spark Application and maintains all of the state of the Spark cluster (the state and tasks of the executors).
    • It must interface with the cluster manager in order to actually get physical resources and launch executors.
    • Simply put this is just a process on a physical machine that is responsible for maintaining the state of the application running on the cluster.

  • The Spark Executors:
    • Spark executors are the processes that perform the tasks assigned by the Spark driver. 
    • Executors have one core responsibility: take the tasks assigned by the driver, run them, and report back their state (success or failure) and results.
    • Each Spark Application has its own separate executor processes.

  • The cluster Manager:
    • The cluster manager is responsible for maintaining a cluster of machines that will run your Spark Application(s). a cluster manager will have its own “driver” (sometimes called master) and “worker” abstractions. 
    • V.Imp: The core difference is that these are tied to physical machines rather than processes .
    • When it comes time to actually run a Spark Application, we request resources from the cluster manager to run it. Depending on how our application is configured, this can include a place to run the Spark driver or might be just resources for the executors for our Spark Application.
    • Spark currently supports three cluster managers: a simple built-in standalone cluster manager, Apache Mesos, and Hadoop YARN

Execution Modes:

  • An execution mode gives you the power to determine where the resources are physically located when you go to run your application. There are 3 options - Cluster Mode, Local Mode, Client mode.

  • Cluster mode:
    • In Cluster mode, when a user submits a pre-compiled JAR, Python script, or R script to a cluster manager, the cluster manager then launches the driver process on a worker node inside the cluster, in addition to the executor processes. 
    • This means that the cluster manager is responsible for maintaining all Spark Application–related processes.
       
  • Client Mode:
    • Client mode is nearly the same as cluster mode except that the Spark driver remains on the client machine that submitted the application. This means that the client machine is responsible for maintaining the Spark driver process, and the cluster manager maintains the executor processes.
    • In this mode we are running the Spark Application from a machine that is not colocated on the cluster. These machines are commonly referred to as gateway machines or edge nodes.
       
  • Local Mode:
    • This mode runs the entire Spark Application on a single machine. It achieves parallelism through threads on that single machine. 
    • Not recommend using local mode for running production applications.

CH9.7 Advanced IO Concepts



Advanced IO Concepts:

 
  • We know that we can control the parallelism of files that we write by controlling the partitions prior to writing. We can also control specific data layout by controlling two things: bucketing and partitioning.

  • Splittable File types and Compression: Certain file formats are fundamentally “splittable.” This can improve speed because it makes it possible for Spark to avoid reading an entire file, and access only the parts of the file necessary to satisfy your query. Additionally if you’re using something like Hadoop Distributed File System (HDFS), splitting a file can provide further optimization if that file spans multiple blocks. In conjunction with this is a need to manage compression. Not all compression schemes are splittable.
     
  • Reading data in parallel : Multiple executors cannot read from the same file at the same time necessarily, but they can read different files at the same time. In general, this means that when you read from a folder with multiple files in it, each one of those files will become a partition in your DataFrame and be read in by available executors in parallel (with the remaining queueing up behind the others).

  • Writing Data in parallel:  The number of files or data written is dependent on the number of partitions the DataFrame has at the time you write out the data. By default, one file is written per partition of the data

  • Partitioning(Similar to hive partitioning - allows storing data into separate directories) : Partitioning is a tool that allows you to control what data is stored (and where) as you write it.  When you write a file to a partitioned directory (or table), you basically encode a column as a folder. What this allows you to do is skip lots of data when you go to read it in later, allowing you to read in only the data relevant to your problem instead of having to scan the complete dataset.  Partitioning is supported for all file-based data sources.
This is very much similar to Dynamic partitioning in Hive. Works with target hive tables as well.

Following shows an example of writing parquet files with data partitioned by Destination Country

1    import org.apache.spark.sql._
2   
3    object SparkDefinitiveTesting {
4   
5      def main(args: Array[String]): Unit = {
6        val spark = SparkSession.builder().master("local[*]").appName("Test App").getOrCreate()
7        spark.sparkContext.setLogLevel("FATAL")
8   
9        val df_csv = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\flight-data\\csv")
10       df_csv.printSchema()
11       df_csv.write.mode(SaveMode.Overwrite).format("parquet").partitionBy("DEST_COUNTRY_NAME").save("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\flight-data\\parquetoutput")
12       
13     }
14   }
15  
root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: integer (nullable = true)
C:\Users\sukulma\Downloads\Spark-Data\Spark-data\data\flight-data\parquetoutput>dir
 Volume in drive C is OSDisk
 Volume Serial Number is C05C-4437

 Directory of C:\Users\sukulma\Downloads\Spark-Data\Spark-data\data\flight-data\parquetoutput

04/15/2019  02:41 PM    <DIR>          .
04/15/2019  02:41 PM    <DIR>          ..
04/15/2019  02:41 PM                 8 ._SUCCESS.crc
04/15/2019  02:41 PM    <DIR>          DEST_COUNTRY_NAME=Afghanistan
04/15/2019  02:41 PM    <DIR>          DEST_COUNTRY_NAME=Algeria
04/15/2019  02:41 PM    <DIR>          DEST_COUNTRY_NAME=Angola
04/15/2019  02:41 PM    <DIR>          DEST_COUNTRY_NAME=Anguilla
04/15/2019  02:41 PM    <DIR>          DEST_COUNTRY_NAME=Antigua%20and%20Barbuda
04/15/2019  02:41 PM    <DIR>          DEST_COUNTRY_NAME=Argentina
04/15/2019  02:41 PM    <DIR>          DEST_COUNTRY_NAME=Aruba
04/15/2019  02:41 PM    <DIR>          DEST_COUNTRY_NAME=Australia

 
C:\Users\sukulma\Downloads\Spark-Data\Spark-data\data\flight-data\parquetoutput>cd "DEST_COUNTRY_NAME=Australia"

C:\Users\sukulma\Downloads\Spark-Data\Spark-data\data\flight-data\parquetoutput\DEST_COUNTRY_NAME=Australia>dir
 Volume in drive C is OSDisk
 Volume Serial Number is C05C-4437

 Directory of C:\Users\sukulma\Downloads\Spark-Data\Spark-data\data\flight-data\parquetoutput\DEST_COUNTRY_NAME=Australia

04/15/2019  02:41 PM    <DIR>          .
04/15/2019  02:41 PM    <DIR>          ..
04/15/2019  02:39 PM                16 .part-00000-396c9f1b-6f72-47f2-a1b0-cbd428c5e038.snappy.parquet.crc
04/15/2019  02:39 PM                16 .part-00001-396c9f1b-6f72-47f2-a1b0-cbd428c5e038.snappy.parquet.crc
04/15/2019  02:39 PM                16 .part-00002-396c9f1b-6f72-47f2-a1b0-cbd428c5e038.snappy.parquet.crc
04/15/2019  02:39 PM               649 part-00000-396c9f1b-6f72-47f2-a1b0-cbd428c5e038.snappy.parquet
04/15/2019  02:39 PM               649 part-00001-396c9f1b-6f72-47f2-a1b0-cbd428c5e038.snappy.parquet
04/15/2019  02:39 PM               649 part-00002-396c9f1b-6f72-47f2-a1b0-cbd428c5e038.snappy.parquet
               6 File(s)          1,995 bytes
               2 Dir(s)  75,312,287,744 bytes free

  • Bucketing: Bucketing is another file organization approach with which you can control the data that is specifically written to each file. This can help avoid shuffles later when you go to read the data because data with the same bucket ID will all be grouped together into one physical partition. This means that the data is prepartitioned according to how you expect to use that data later on, meaning you can avoid expensive shuffles when joining or aggregating.

Following is the definition of the bucketBy method on the DataFrameWriter

def bucketBy(numBuckets: Int, colName: String, colNames: String*): DataFrameWriter[T]
Buckets the output by the given columns. If specified, the output is laid out on the file system similar to Hive's bucketing scheme.
This is applicable for all file-based data sources (e.g. Parquet, JSON) staring Spark .

Following shows the example of writing to 10 buckets. (This code should have worked, but didn’t as I was using older versions of spark)

1    import org.apache.spark.sql._
2   
3    object SparkDefinitiveTesting {
4   
5      def main(args: Array[String]): Unit = {
6        val spark = SparkSession.builder().master("local[*]").appName("Test App").getOrCreate()
7    //    spark.sparkContext.setLogLevel("FATAL")
8   
9        val df_csv = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\flight-data\\csv")
10       df_csv.printSchema()
11       df_csv.write.mode(SaveMode.Overwrite).bucketBy(10,"DEST_COUNTRY_NAME").parquet("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\flight-data\\parquetoutput")
12  
13     }
14   }
root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: integer (nullable = true)

Exception in thread "main" org.apache.spark.sql.AnalysisException: 'save' does not support bucketing right now;

  • Managing File Size:  Managing file sizes is an important factor not so much for writing data but reading it later on. When you’re writing lots of small files, there’s a significant metadata overhead that you incur managing all of those files.

We saw previously that the number of output files is a derivative of the number of partitions we had at write time . We can use the maxRecordsPerFile option and specify a number of  records that are written to each file.

We can use the option maxRecordsPerFile to specify the number of records in the output files.(introduced in Spark 2.2)

1    import org.apache.spark.sql._
2   
3    object SparkDefinitiveTesting {
4   
5      def main(args: Array[String]): Unit = {
6        val spark = SparkSession.builder().master("local[*]").appName("Test App").getOrCreate()
7    //    spark.sparkContext.setLogLevel("FATAL")
8   
9        val df_csv = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\all")
10       df_csv.printSchema()
11       df_csv.write.mode(SaveMode.Overwrite).option("maxRecordsPerFile",2001).csv("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\flight-data\\csvfilesized")
12  
13     }
14   }