Search This Blog

Wednesday, 10 April 2019

CH2/1 Spark Architecture,Applications,SparkSession,Partitions,Transformations, Actions


Spark's Basic Architecture:

  • In Spark(and any distributed processing engine) a cluster, or group, of computers, pools the resources of many machines together, giving us the ability to use all the cumulative resources as if they were a single computer.
     
  • Now, a group of machines alone is not powerful, we need a framework to coordinate work across them. Spark does just that -  managing and coordinating the execution of tasks on data across a cluster of computers.

  • The cluster of machines that Spark will use to execute tasks is managed by a cluster manager like Spark’s standalone cluster manager, YARN, or Mesos.
    We then
    submit Spark Applications to these cluster managers, which will grant resources to our application so that we can complete our work.



Spark Applications:

  • Spark Applications consist of a driver process and a set of executor processes.
     
  • The driver process runs our main() function, sits on a node in the cluster, and is responsible for three things:
    • maintaining information about the Spark Application;
    • responding to a user’s program or input;
    • and analyzing, distributing, and scheduling work across the executors
The driver process is absolutely essential—it’s the heart of a Spark Application and maintains all relevant information during the lifetime of the application.
Driver also negotiates resources with the cluster manager

  • The driver can be “driven” from a number of different languages through Spark’s language APIs.

  • The executors are responsible for actually carrying out the work that the driver assigns them.
    Each executor is responsible for only two things:
    • executing code assigned to it by the driver, and
    • reporting the state of the computation on that executor back to the driver node.
The user can specify how many executors should fall on each node through configurations.
 
  • Cluster manager controls physical machines and allocates resources to Spark Applications.
Three core cluster managers:
  • Spark’s standalone cluster manager,
  • YARN, or
  • Mesos.
This means that there can be multiple Spark Applications running on a cluster at the same time.
Spark employs a cluster manager that keeps track of the resources available.



Spark in Local Mode:

Spark, in addition to its cluster mode, also has a local mode.
The driver and executors are simply processes,
which means that they can live on the same machine or different machines.
In local mode, the driver and executers run (as threads) on our individual computer instead of a cluster. 


Spark Language API:
Spark’s language APIs make it possible for us to run Spark code using various programming languages.
 
  • Scala-Spark is primarily written in Scala, making it Spark’s “default” language. 
  • Java-Even though Spark is written in Scala, Spark’s authors have been careful to ensure that you can write Spark code in Java.
  • Python- Python supports nearly all constructs that Scala supports. 
  • SQL -Spark supports a subset of the ANSI SQL 2003 standard. This makes it easy for analysts and non-programmers to take advantage of the big data powers of Spark.
  • R - Spark has two commonly used R libraries: one as a part of Spark core (SparkR) and another as an R community-driven package (sparklyr).

There is a SparkSession object available to the user, which is the entrance point to running Spark code.

When using Spark from Python or R, you don’t write explicit JVM instructions; instead, you write Python and R code that
Spark translates into code that it then can run on the executor JVMs.



Spark API's
  • Spark API's allow us to drive Spark from a variety of languages, what it makes available in those languages is worth mentioning.
     
  • Spark has two fundamental sets of APIs:
    • the low-level “unstructured” APIs, and
    • the higher-level structured APIs.



Starting Spark:
  • When we start writing applications we are going to need a way to send user commands and data to it. This is done by 1st creating Spark Session.
     
  • When you start Spark in this interactive mode, you implicitly create a SparkSession that manages the Spark Application.
     
  • When you start it through a standalone application, you must create the SparkSession object yourself in your application code.
We can submit standalone applications to Spark using spark-submit (where we can submit a precompiled application to Spark)



SparkSession:
  • We control our Spark Application through a driver process called the SparkSession.
     
  • The SparkSession instance is the way Spark executes user-defined manipulations across the cluster.
     
  • There is a one-to-one correspondence between a SparkSession and a Spark Application.
     
  • In Scala and Python, the variable is available as "spark" when we start the console.

scala> spark
res13: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@604d0523

scala> val myRange = spark.range(1000).toDF("number")
myRange: org.apache.spark.sql.DataFrame = [number: bigint]

scala> myRange.take(10)
res14: Array[org.apache.spark.sql.Row] = Array([0], [1], [2], [3], [4], [5], [6], [7], [8], [9])

  • Above we created a DataFrame with one column containing 1,000 rows with values from 0 to 999. This range of numbers represents a distributed collection. When run on a cluster, each part of this range of numbers exists on a different executor.



DataFrames:
  • A DataFrame is the most common Structured API and simply represents a table of data with rows and columns.
     
  • The list that defines the columns and the types within those columns is called the schema.
     
  • You can think of a DataFrame as a spreadsheet with named columns. A spreadsheet sits on one computer in one specific location, whereas a Spark DataFrame can span thousands of computers.  The reason for putting the data on more than one computer should be either the data is too large to fit on one machine or it would simply take too long to perform that computation on one machine.
     
  • The DataFrame concept is not unique to Spark. R and Python both have similar concepts. However, Python/R DataFrames (with some exceptions) exist on one machine rather than multiple machines.  However, because Spark has language interfaces for both Python and R, it’s quite easy to convert Pandas (Python) DataFrames to Spark DataFrames, and R DataFrames to Spark DataFrames.

Spark has several core abstractions: Datasets, DataFrames, SQL Tables, and Resilient Distributed Datasets (RDDs). These different abstractions all represent distributed collections of data. The easiest and most efficient are DataFrames, which are available in all languages.



Partitions:

  • To allow every executor to perform work in parallel, Spark breaks up the data into chunks called partitions.
     
  • A partition is a collection of rows that sit on one physical machine in your cluster.
     
  • A DataFrame’s partitions represent how the data is physically distributed across the cluster of machines during execution.
     
  • If you have one partition, Spark will have a parallelism of only one, even if you have thousands of executors.
    If you have many partitions but only one executor, Spark will still have a parallelism of only one because there is only one computation resource.



Transformations

  • To Change a dataframe we need to instruct Spark how we would like to modify it to do what we want. These instructions are called transformations. Transformations are the core of how you express your business logic using Spark.

Following shows how to perform a simple transformation to find all even numbers in our current DataFrame.

scala> val myRange = spark.range(1000).toDF("number")
myRange: org.apache.spark.sql.DataFrame = [number: bigint]

scala> val divisBy2 = myRange.where("number % 2 = 0")
divisBy2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [number: bigint]

  • Spark will not act on transformations until we call an action
     
  • There are two types of transformations:
    • those that specify narrow dependencies, and
    • those that specify wide dependencies.

Transformations consisting of narrow dependencies (Narrow transformations)  are those for which each input partition will contribute to only one output partition. Ex: the where statement specifies a narrow dependency, where only one partition contributes to at most one output partition
With narrow transformations, Spark will automatically perform an operation called 
pipelining, meaning that if we specify multiple filters on DataFrames, they’ll all be performed in-memory. The same cannot be said for shuffles. 

A wide dependency (or wide transformation) style transformation will have input partitions contributing to many output partitions. This is often referred as shuffle whereby Spark will exchange partitions across the cluster. 
When we perform a shuffle, Spark writes the results to disk.




Lazy Evaluation:

  • Lazy evaulation means that Spark will wait until the very last moment to execute the graph of computation instructions.

  • In Spark, instead of modifying the data immediately when you express some operation, you build up a plan of transformations that you would like to apply to your source data

  • By waiting until the last minute to execute the code, Spark compiles this plan from your raw DataFrame transformations to a streamlined physical plan that will run as efficiently as possible across the cluster. This provides immense benefits because Spark can optimize the entire data flow from end to end. An example of this is something called predicate pushdown on DataFrames. 




Actions:

  • An action instructs Spark to compute a result from a series of transformations. 
  • Transformations allow us to build up our logical transformation plan. To trigger the computation, we run an action
  • There are three kinds of actions:
    • Actions to view data in the console
    • Actions to collect data to native objects in the respective language
    • Actions to write to output data sources
Example:
scala> divisBy2.count
res0: Long = 500

In specifying this action, we started a Spark job that runs our filter transformation (a narrow transformation), then an aggregation (a wide transformation) that performs the counts on a per partition basis, and then a collect, which brings our result to a native object in the respective language. 




Spark UI:

  • You can monitor the progress of a job through the Spark web UI. The Spark UI is available on port 4040 of the driver node.
    If you are running in local mode, this will be 
    http://localhost:4040.
     
  • The Spark UI displays information on the state of your Spark jobs, its environment, and cluster state. It’s very useful, especially for tuning and debugging.

No comments:

Post a Comment