|
The Life Cycle of a Spark Application
(Inside Spark)
Each application is made up of one or more Spark jobs. Spark jobs within an application are executed
serially (unless you use threading to launch multiple actions in parallel).
- SparkSession:
- The first
step of any Spark Application is creating a SparkSession. In
Spark REPL its done for us. But in our code, we need to create one
explicitly.
- Legacy
code might use the SparkContext pattern.
This should be avoided in favor of the builder method on
the SparkSession, which more robustly instantiates the
Spark and SQL Contexts and ensures that there is no context
conflict.
- SparkSession
Companion object has the method "builder" which gives us
several methods to set app name, master etc.
- Following are the
method available on the Builder class. (inner class of SparkSession)
|
def appName(name: String): Builder
Sets
a name for the application, which will be shown in the Spark web UI. If no
application name is set, a randomly generated name will be used.
def config(conf: SparkConf): Builder
Sets
a list of config options based on the given SparkConf.
def config(key: String, value: Boolean): Builder
Sets
a config option. Options set using this method are automatically propagated
to both SparkConf and SparkSession's own configuration.
def config(key: String, value: Double): Builder
Sets
a config option. Options set using this method are automatically propagated
to both SparkConf and SparkSession's own configuration.
def config(key: String, value: Long): Builder
Sets
a config option. Options set using this method are automatically propagated
to both SparkConf and SparkSession's own configuration.
def config(key: String, value: String): Builder
Sets
a config option. Options set using this method are automatically propagated
to both SparkConf and SparkSession's own configuration.
def enableHiveSupport(): Builder
Enables
Hive support, including connectivity to a persistent Hive metastore,
support for Hive serdes, and Hive user-defined functions.
def getOrCreate(): SparkSession
Gets
an existing SparkSession or, if there is no existing one, creates a new one
based on the options set in this builder.
This
method first checks whether there is a valid thread-local SparkSession, and
if yes, return that one. It then checks whether there is a valid global
default SparkSession, and if yes, return that one. If no valid global
default SparkSession exists, the method creates a new SparkSession and
assigns the newly created SparkSession as the global default.
In
case an existing SparkSession is returned, the config options specified in
this builder will be applied to the existing SparkSession.
def master(master: String): Builder
Sets
the Spark master URL to connect to, such as "local" to run
locally, "local[4]" to run locally with 4 cores, or
"spark://master:7077" to run on a Spark standalone cluster.
|
|
val spark =
SparkSession.builder().master("local[*]").appName("Test
App").getOrCreate()
|
- Once we have the
SparkSession, we should be able to run our Spark Code. From
the SparkSession, you can access all of low-level and legacy
contexts and configurations accordingly, as well.
- Note that
the SparkSession class was only added in Spark 2.X. Older code
you might find would instead directly create a SparkContext and
a SQLContext for the structured APIs.
- A SparkContext object within
the SparkSession represents the connection to the Spark
cluster. This class is how you communicate
with some of Spark’s lower-level APIs, such as RDDs.
- Through
a SparkContext, you can create RDDs, accumulators, and broadcast
variables, and you can run code on the cluster.
- In previous
versions of Spark,
the SQLContext and HiveContext provided the ability
to work with DataFrames and Spark SQL and were commonly stored as the
variable sqlContext in examples, documentation, and
legacy code.
- As a historical point,
Spark 1.X had effectively two contexts. The SparkContext and
theSQLContext. These two each performed different things. The former
focused on more fine-grained control of Spark’s central abstractions,
whereas the latter focused on the higher-level tools like Spark SQL.
- In Spark 2.X, the
community combined the two APIs into the
centralized SparkSession that we have today. However, both of
these APIs still exist and you can access them via
the SparkSession. It is important to note that you should never need
to use the SQLContext and rarely need to use
the SparkContext.
- Spark job:
- When we have an
"action" in our code, it triggers a complete Spark job. We
can use "explain" to understand the physical execution plan.
We can access this information on the SQL tab (after we actually run a
query) in the Spark UI as well. We can go to localhost:4040 if you are
running this on your local machine to see the Spark UI.
- Spark job
individually consist of stages and tasks
- Each job breaks down
into a series of stages, the number of which depends on how many
shuffle operations need to take place.
- Stages:
- Stages
in Spark represent groups of tasks that can be executed
together to compute the same operation on multiple
machines. In general, Spark will try to pack as much
work as possible (i.e., as many transformations as possible inside your
job) into the same stage, but the engine starts
new stages after operations called shuffles.
- A
shuffle represents a physical repartitioning of the data. Spark
starts a new stage after each shuffle, and
keeps track of what order the stages must run in to compute the final
result.
- The spark.sql.shuffle.partitions default
value is 200, which means that when there is a shuffle
performed during execution, it outputs 200 shuffle partitions by
default. You can change this value, and the number of output partitions
will change. This value should be set according to the number of
cores in your cluster to ensure efficient execution.
spark.conf.set("spark.sql.shuffle.partitions",
50)
- A good rule of thumb
is that the number of partitions should be larger than the number of
executors on your cluster, potentially by multiple factors depending on
the workload.
- Tasks:
- Stages in Spark
consist of tasks.
- If there is one big
partition in our dataset, we will have one task. If there are 1,000
little partitions, we will have 1,000 tasks that can be executed in
parallel.
- A task is just a unit of computation applied to a
unit of data (the partition). Partitioning your data into a greater
number of partitions means that more can be executed in parallel.
- Execution Details: Spark
automatically pipelines stages
and tasks that can be done together, such as a map operation followed by
another map operation. Second,
for all shuffle operations, Spark writes the data to
stable storage (e.g., disk), and can reuse it across multiple
jobs.
- Unlike
the tools that came before it (e.g., MapReduce), Spark performs
as many steps as it can at one point in time before writing data to
memory or disk. Pipelining is a key optimization
which occurs at and below the RDD level. With pipelining, any
sequence of operations that feed data directly into each other, without
needing to move it across nodes, is collapsed into a single stage of
tasks that do all the operations together.
- For
example, if you write an RDD-based program that does a map, then
a filter, then another map, these will result in
a single stage of tasks that immediately read each input
record, pass it through the first map, pass it through
the filter, and pass it through the last map function if
needed. This pipelined version of the computation is much faster
than writing the intermediate results to memory or disk after each step. From
a practical point of view, pipelining will be transparent to you as you
write an application—the Spark runtime will automatically do it.
- When
Spark needs to run an operation that has to move
data across nodes, such as a reduce-by-key operation (where
input data for each key needs to first be brought together from many
nodes), the engine can’t perform pipelining anymore, and
instead it performs a cross-network shuffle. Spark
always executes shuffles by first having the “source”
tasks (those sending data) write shuffle files to their local
disks during their execution stage. Then, the stage that does
the grouping and reduction launches and runs tasks that fetch
their corresponding records from each shuffle file and performs that
computation (e.g., fetches and processes the data for a specific
range of keys).
- Saving
the shuffle files to disk lets Spark run this stage later in time
than the source stage (e.g., if there are not enough
executors to run both at the same time), and also
lets the engine re-launch reduce tasks on failure without rerunning all
the input tasks. One side effect you’ll see for shuffle
persistence is that running a new job over data that’s already been
shuffled does not rerun the “source” side of the shuffle. Because the
shuffle files were already written to disk earlier, Spark knows that it
can use them to run the later stages of the job, and it need not redo
the earlier ones.
- This
automatic optimization can save time in a workload that runs multiple
jobs over the same data, but of course, for even better
performance you can perform your own caching with the DataFrame or
RDD cache method, which lets you control exactly which data
is saved and where.
|