Search This Blog

Tuesday, 16 April 2019

CH15.2 Life Cycle of Spark Application (SparkSession and SparkContext)


The life Cycle of a Spark Application (outside Spark)

  • Client Request:
    • 1st we submit an actual application.(a pre-compiled JAR or library).At this point, you are executing code on your local machine and you’re going to make a request to the cluster manager driver node .
    • We are explicitly asking for resources for the Spark driver process only. We assume that the cluster manager accepts this offer and places the driver onto a node in the cluster. The client process that submitted the original job exits and the application is off and running on the cluster.
    • To submit the application we would need to use spark-submit as shown below:

./bin/spark-submit \
  --class <main-class> \
  --master <master-url> \
  --deploy-mode cluster \
  --conf <key>=<value> \
  ... # other options
  <application-jar> \
  [application-arguments]


  • Launch :
    • Once the driver process is running it begins running Spark user Code. This code must include a SparkSession that initializes a Spark cluster (e.g., driver + executors).(Note Spark Cluster is one for one application. Spark processes are temporary processes.
    • The SparkSession will subsequently communicate with the cluster manager asking it to launch Spark executor processes across the cluster. The number of executors and their relevant configurations are set by the user via the command-line arguments in the original spark-submit call.
    • The cluster manager responds by launching the executor processes (assuming all goes well) and sends the relevant information about their locations to the driver process. After everything is hooked up correctly, we have a “Spark Cluster” 

  • Execution:
    • The driver and the workers communicate among themselves, executing code and moving data around. The driver schedules tasks onto each worker, and each worker responds with the status of those tasks and success or failure. 

  • Completion:
    • After a Spark Application completes, the driver process exits with either success or failure . The cluster manager then shuts down the executors in that Spark cluster for the driver

The Life Cycle of a Spark Application (Inside Spark)
 

Each application is made up of one or more Spark jobs. Spark jobs within an application are executed serially (unless you use threading to launch multiple actions in parallel).

  • SparkSession:
    • The first step of any Spark Application is creating a SparkSession. In Spark REPL its done for us. But in our code, we need to create one explicitly.
    • Legacy code might use the SparkContext pattern. This should be avoided in favor of the builder method on the SparkSession, which more robustly instantiates the Spark and SQL Contexts and ensures that there is no context conflict.
    • SparkSession Companion object has the method "builder" which gives us several methods to set app name, master etc.
  • Following are the method available on the Builder class. (inner class of SparkSession)

def appName(name: String): Builder
Sets a name for the application, which will be shown in the Spark web UI. If no application name is set, a randomly generated name will be used.

def config(conf: SparkConf): Builder
Sets a list of config options based on the given SparkConf.

def config(key: String, value: Boolean): Builder
Sets a config option. Options set using this method are automatically propagated to both SparkConf and SparkSession's own configuration.

def config(key: String, value: Double): Builder
Sets a config option. Options set using this method are automatically propagated to both SparkConf and SparkSession's own configuration.

def config(key: String, value: Long): Builder
Sets a config option. Options set using this method are automatically propagated to both SparkConf and SparkSession's own configuration.

def config(key: String, value: String): Builder
Sets a config option. Options set using this method are automatically propagated to both SparkConf and SparkSession's own configuration.

def enableHiveSupport(): Builder
Enables Hive support, including connectivity to a persistent Hive metastore, support for Hive serdes, and Hive user-defined functions.

def getOrCreate(): SparkSession
Gets an existing SparkSession or, if there is no existing one, creates a new one based on the options set in this builder.
This method first checks whether there is a valid thread-local SparkSession, and if yes, return that one. It then checks whether there is a valid global default SparkSession, and if yes, return that one. If no valid global default SparkSession exists, the method creates a new SparkSession and assigns the newly created SparkSession as the global default.
In case an existing SparkSession is returned, the config options specified in this builder will be applied to the existing SparkSession.

def master(master: String): Builder
Sets the Spark master URL to connect to, such as "local" to run locally, "local[4]" to run locally with 4 cores, or "spark://master:7077" to run on a Spark standalone cluster.

 val spark = SparkSession.builder().master("local[*]").appName("Test App").getOrCreate()

  • Once we have the SparkSession, we should be able to run our Spark Code. From the SparkSession, you can access all of low-level and legacy contexts and configurations accordingly, as well. 

  • Note that the SparkSession class was only added in Spark 2.X. Older code you might find would instead directly create a SparkContext and a SQLContext for the structured APIs.

  • SparkContext:

  • A SparkContext object within the SparkSession represents the connection to the Spark cluster. This class is how you communicate with some of Spark’s lower-level APIs, such as RDDs.
  • Through a SparkContext, you can create RDDs, accumulators, and broadcast variables, and you can run code on the cluster.
  • In previous versions of Spark, the SQLContext and HiveContext provided the ability to work with DataFrames and Spark SQL and were commonly stored as the variable sqlContext in examples, documentation, and legacy code. 
  • As a historical point, Spark 1.X had effectively two contexts. The SparkContext and theSQLContext. These two each performed different things. The former focused on more fine-grained control of Spark’s central abstractions, whereas the latter focused on the higher-level tools like Spark SQL.
  • In Spark 2.X, the community combined the two APIs into the centralized SparkSession that we have today. However, both of these APIs still exist and you can access them via the SparkSession. It is important to note that you should never need to use the SQLContext and rarely need to use the SparkContext.


  • Spark job:
    • When we have an "action" in our code, it triggers a complete Spark job. We can use "explain" to understand the physical execution plan. We can access this information on the SQL tab (after we actually run a query) in the Spark UI as well. We can go to localhost:4040 if you are running this on your local machine to see the Spark UI.
    • Spark job  individually consist of stages and tasks
    • Each job breaks down into a series of stages, the number of which depends on how many shuffle operations need to take place.

  • Stages:
    • Stages in Spark represent groups of tasks that can be executed together to compute the same operation on multiple machines. In general, Spark will try to pack as much work as possible (i.e., as many transformations as possible inside your job) into the same stage, but the engine starts new stages after operations called shuffles. 
    • A shuffle represents a physical repartitioning of the data. Spark starts a new stage after each shuffle, and keeps track of what order the stages must run in to compute the final result.
    • The spark.sql.shuffle.partitions default value is 200, which means that when there is a shuffle performed during execution, it outputs 200 shuffle partitions by default. You can change this value, and the number of output partitions will change. This value should be set according to the number of cores in your cluster to ensure efficient execution.
spark.conf.set("spark.sql.shuffle.partitions", 50)
  • A good rule of thumb is that the number of partitions should be larger than the number of executors on your cluster, potentially by multiple factors depending on the workload. 

  • Tasks: 
    • Stages in Spark consist of tasks.
    • If there is one big partition in our dataset, we will have one task. If there are 1,000 little partitions, we will have 1,000 tasks that can be executed in parallel.
    • A task is just a unit of computation applied to a unit of data (the partition). Partitioning your data into a greater number of partitions means that more can be executed in parallel. 

  • Execution Details: Spark automatically pipelines stages and tasks that can be done together, such as a map operation followed by another map operation. Second, for all shuffle operations, Spark writes the data to stable storage (e.g., disk), and can reuse it across multiple jobs.
    • Unlike the tools that came before it (e.g., MapReduce), Spark performs as many steps as it can at one point in time before writing data to memory or disk. Pipelining is a key optimization which occurs at and below the RDD level. With pipelining, any sequence of operations that feed data directly into each other, without needing to move it across nodes, is collapsed into a single stage of tasks that do all the operations together.
    • For example, if you write an RDD-based program that does a map, then a filter, then another map, these will result in a single stage of tasks that immediately read each input record, pass it through the first map, pass it through the filter, and pass it through the last map function if needed. This pipelined version of the computation is much faster than writing the intermediate results to memory or disk after each step. From a practical point of view, pipelining will be transparent to you as you write an application—the Spark runtime will automatically do it.
    • When Spark needs to run an operation that has to move data across nodes, such as a reduce-by-key operation (where input data for each key needs to first be brought together from many nodes), the engine can’t perform pipelining anymore, and instead it performs a cross-network shuffle. Spark always executes shuffles by first having the “source” tasks (those sending data) write shuffle files to their local disks during their execution stage. Then, the stage that does the grouping and reduction launches and runs tasks that fetch their corresponding records from each shuffle file and performs that computation (e.g., fetches and processes the data for a specific range of keys).
    • Saving the shuffle files to disk lets Spark run this stage later in time than the source stage (e.g., if there are not enough executors to run both at the same time), and also lets the engine re-launch reduce tasks on failure without rerunning all the input tasks. One side effect you’ll see for shuffle persistence is that running a new job over data that’s already been shuffled does not rerun the “source” side of the shuffle. Because the shuffle files were already written to disk earlier, Spark knows that it can use them to run the later stages of the job, and it need not redo the earlier ones.
    • This automatic optimization can save time in a workload that runs multiple jobs over the same data, but of course, for even better performance you can perform your own caching with the DataFrame or RDD cache method, which lets you control exactly which data is saved and where.