Search This Blog

Tuesday, 23 April 2019

CH16.1 Developing Spark Applications


Spark Applications are the combination of two things: a Spark cluster and your code. 
Following shows a sample application in different languages.
Scala Based App:
Scala is Spark’s “native” language and naturally makes for a great way to write applications. It’s really no different than writing a Scala application.

You can build applications using sbt or Apache Maven, two Java Virtual Machine (JVM)–based build tools. 
To configure an sbt build for our Scala application, we specify a build.sbt file to manage the package information. Inside the build.sbt file, there are a few key things to include:
  • Project metadata (package name, package versioning information, etc.)
  • Where to resolve dependencies
  • Dependencies needed for your library

Following is how a sample build.sbt file would look like. Notice how we must specify the Scala version as well as the Spark version:

name := "example"
organization := "com.databricks"
version := "0.1-SNAPSHOT"
scalaVersion := "2.11.8"
// Spark Information
val
sparkVersion = "2.2.0"
// allows us to include spark packages
resolvers += "bintray-spark-packages" at
  "
https://dl.bintray.com/spark-packages/maven/"
resolvers += "Typesafe Simple Repository" at
  "
http://repo.typesafe.com/typesafe/simple/maven-releases/"
resolvers += "MavenRepository" at
  "
https://mvnrepository.com/"
libraryDependencies ++= Seq(
  // spark core
  "org.apache.spark"
%% "spark-core" % sparkVersion,
  "org.apache.spark" %% "spark-sql" % sparkVersion,
// the rest of the file is omitted for brevity
)

Once we have defined the build file, we can start adding code to our project.
The Sbt directory structure is same as that for maven projects.

src/
  main/
    resources/
       <files to include in main jar here>
    scala/
       <main Scala sources>
    java/
       <main Java sources>
  test/
    resources
       <files to include in test jar here>
    scala/
       <test Scala sources>
    java/
       <test Java sources>

We put the source code in the Scala and Java directories. Following shows an example of Scala-Spark code that initializes the SparkSession, runs the application, and then exits.

object DataFrameExample extends Serializable {
  def main(args: Array[String]) = {
val pathToDataFolder = args(0)
// start up the SparkSession
    // along with explicitly setting a given config
    val spark = SparkSession.builder().appName("Spark Example")
      .config("spark.sql.warehouse.dir", "/user/hive/warehouse")
      .getOrCreate()

// udf registration
    spark.udf.register("myUDF", someUDF(_:String):String)
    val df = spark.read.json(pathToDataFolder + "data.json")
    val manipulated = df.groupBy(expr("myUDF(group)")).sum().collect()
     .foreach(x => println(x))
}
}
The main class would be used when we use spark-submit to submit it to our cluster for execution.

For compilation we have following options:
  • We can use sbt assemble to build an “uber-jar” or “fat-jar” that contains all of the dependencies in one JAR. This can be simple for some deployments but cause complications (especially dependency conflicts) for others.
  • A lighter-weight approach is to run sbt package, which will gather all of your dependencies into the target folder but will not package all of them into one big JAR.

After compilation target folder contains Jar, which can be used as argument to spark-submit.
Example:
$SPARK_HOME/bin/spark-submit \
   --class com.databricks.example.DataFrameExample \
   --master local \
   target/scala-2.11/example_2.11-0.1-SNAPSHOT.jar "hello"

Thursday, 18 April 2019

CH17.1 Deploying Spark


    • Spark has three officially supported cluster managers:
      • Standalone mode
      • Hadoop YARN
      • Apache Mesos
    These cluster managers maintain a set of machines onto which you can deploy Spark Applications. They all run Spark applications the same way
Where to deploy cluster to run Spark Applications :

Two high Level options: Deploy in an on-premises cluster or in the public cloud

On premises Cluster:

  • Reasonable option for organizations that already manage their own datacenters.
  • Advantage: Gives you full control over the hardware used, meaning you can optimize performance for your specific workload. 
  • Disadvantage: On premises cluster will be fixed in size, whereas the resource demands of data analytics workloads are often elastic.
    • If you make your cluster too small, it will be hard to launch the occasional very large analytics query or training job for a new machine learning model, whereas
    •  if you make it large, you will have resources sitting idle.  In public clouds, it’s easy to give each application its own cluster of exactly the required size for just the duration of that job.
    • Second, for on-premises clusters, you need to select and operate your own storage system, such as a Hadoop file system or scalable key-value store. This includes setting up georeplication and disaster recovery if required.

  • On the on-premises cluster the best to combat the resource utilization problem is to use a cluster manager that allows you to run many Spark applications and dynamically reassign resources between them, or even allows non-Spark applications on the same cluster. All of Spark’s supported cluster managers allow multiple concurrent applications, but YARN and Mesos have better support for dynamic sharing and also additionally support non-Spark workloads.


Spark in Cloud:

  • Advantages:
    •  Resources can be launched and shut down elastically, so you can run that occasional “monster” job that takes hundreds of machines for a few hours without having to pay for them all the time. Even for normal operation, you can choose a different type of machine and cluster size for each application to optimize its cost performance (Ex: launch machines with Graphics Processing Units (GPUs) just for your deep learning jobs.)
    • Public clouds include low-cost, georeplicated storage(Ex: S3) that makes it easier to manage large amounts of data

  • All the major cloud providers (Amazon Web Services [AWS], Microsoft Azure, Google Cloud Platform [GCP], and IBM Bluemix) include managed Hadoop clusters for their customers, which provide HDFS for storage as well as Apache Spark. This is actually not a great way to run Spark in the cloud, however, because by using a fixed-size cluster and file system, you are not going to be able to take advantage of elasticity. Instead, it is generally a better idea to use global storage systems that are decoupled from a specific cluster, such as Amazon S3, Azure Blob Storage, or Google Cloud Storage and spin up machines dynamically for each Spark workload. With decoupled compute and storage, you will be able to pay for computing resources only when needed, scale them up dynamically, and mix different hardware types.

  • Basically running Spark in the cloud need not mean migrating an on-premises installation to virtual machines: you can run Spark natively against cloud storage to take full advantage of the cloud’s elasticity, cost-saving benefit, and management tools without having to manage an on-premise computing stack within your cloud environment.

  • Databricks (The company started by the Spark team from UC Berkeley) is one example of a service provider built specifically for Spark in the cloud. Databricks provides a simple way to run Spark workloads without the heavy baggage of a Hadoop installation. The company provides a number of features for running Spark more efficiently in the cloud, such as auto-scaling, auto-termination of clusters, and optimized connectors to cloud storage, as well as a collaborative environment for working on notebooks and standalone jobs.

  • If you run Spark in the cloud you are most likely to create a separate, short-lived Spark cluster for each job you execute. In this case StandAlone cluster manager is the easiest to use.


Cluster Managers:
 
Spark supports three aforementioned cluster managers: standalone clusters, Hadoop YARN, and Mesos.

StandAlone Mode:
  • A lightweight platform built specifically for Apache Spark workloads.
  • Disadvantage : More limited than the other cluster managers . Can only run Spark applications
  • Best starting point if you just want to quickly get Spark running on a cluster ( and you do not have experience using YARN or Mesos).
  • Starting Standalone cluster:
    • 1st it requires provisioning the machines -  starting them up, ensuring that they can talk to one another over the network, and getting the version of Spark you would like to run on those sets of machines.
    • After provisioning , there are two ways to start the cluster: by hand or using built-in launch scripts.
      • By Hand process:
        • 1st step : Start the master process on the machine that we want that to run on, using the following command: $SPARK_HOME/sbin/start-master.sh.
        • After we run this command, the cluster manager process will start up on that machine. Once started, the master prints out a spark://HOST:PORT URI. You use this when you start each of the worker nodes of the cluster, and you can use it as the master argument to your SparkSession on application initialization. You can also find this URI on the master’s web UI, which is http://master-ip-address:8080 by default.
        • 2nd Step: With that URI, start the worker nodes by logging in to each machine and running the following script using the URI you just received from the master node. The master machine must be available on the network of the worker nodes you are using, and the port must be open on the master node, as well:
$SPARK_HOME/sbin/start-slave.sh <master-spark-URI>

  • Cluster launch Scripts:
    • Launch Scripts can be used to automate launch of Standalone cluster.
    • 1st we create a file called conf/slaves in your Spark directory that will contain the hostnames of all the machines on which you intend to start Spark workers, one per line.  If this file does not exist, everything will launch locally. 
    • Note that when actually start the cluster, the master machine will access each of the worker machines via Secure Shell (SSH). By default, SSH is run in parallel and requires that you configure password-less (using a private key) access. If you do not have a password-less setup, you can set the environment variable SPARK_SSH_FOREGROUND and serially provide a password for each worker.
    • Once the conf/slaves file is setup , you can launch or stop your cluster by using the following shell scripts, based on Hadoop’s deploy scripts, and available in $SPARK_HOME/sbin

$SPARK_HOME/sbin/start-master.sh Starts a master instance on the machine on which the script is executed.
$SPARK_HOME/sbin/start-slaves.sh Starts a slave instance on each machine specified in the conf/slaves file.
$SPARK_HOME/sbin/start-slave.sh Starts a slave instance on the machine on which the script is executed.
$SPARK_HOME/sbin/start-all.sh Starts both a master and a number of slaves as described earlier.
$SPARK_HOME/sbin/stop-master.sh Stops the master that was started via the bin/start-master.sh script.
$SPARK_HOME/sbin/stop-slaves.sh Stops all slave instances on the machines specified in the conf/slaves file.
$SPARK_HOME/sbin/stop-all.sh Stops both the master and the slaves as described earlier.
  • After you create the cluster, you can submit applications to it using the spark:// URI of the master. You can do this either on the master node itself or another machine using spark-submit.

Yarn Mode:
  • Hadoop YARN is a framework for job scheduling and cluster resource management
  • Spark has little to do with Hadoop. Spark does natively support the Hadoop YARN cluster manager but it requires nothing from Hadoop itself.
  • When submitting applications to YARN, the core difference from other deployments is that --master will become yarn as opposed the master node IP, as it is in standalone mode.
  • Instead, Spark will find the YARN configuration files using the environment variable HADOOP_CONF_DIR or YARN_CONF_DIR. Once you have set those environment variables to your Hadoop installation’s configuration directory, you can just run spark-submit.
  • Two deployment modes that you can use to launch Spark on YARN:
    • Cluster mode has the spark driver as a process managed by the YARN cluster, and the client can exit after creating the application. in In cluster mode, Spark doesn’t necessarily run on the same machine on which you’re executing. Therefore libraries and external jars must be distributed manually or through the --jars command-line argument.
    • Client mode  the driver will run in the client process and therefore YARN will be responsible only for granting executor resources to the application, not maintaining the master node.

  • To read and write from HDFS using Spark, you need to include two Hadoop configuration files on Spark’s classpath
    • hdfs-site.xml, which provides default behaviors for the HDFS client; and 
    • core-site.xml, which sets the default file system name.
The location of these configuration files varies across Hadoop versions, but a common location is inside of /etc/hadoop/conf. To make these files visible to Spark, set HADOOP_CONF_DIR in $SPARK_HOME/spark-env.sh to a location containing the configuration files or as an environment variable when you go to spark-submit your application.

Mesos:
  • Mesos intends to be a datacenter scale-cluster manager that manages not just short-lived applications like Spark, but long-running applications like web applications or other resource interfaces. 
  • Mesos is the heaviest-weight cluster manager,
  • We choose this cluster manager only if your organization already has a large-scale deployment of Mesos, but it makes for a good cluster manager nonetheless.
  • Submitting applications to a Mesos cluster is similar to doing so for Spark’s other cluster managers. For the most part you should favor cluster mode when using Mesos. Client mode requires some extra configuration on your part, especially with regard to distributing resources around the cluster.



Miscellaneous Considerations:


  • Number and type of applications: YARN is great for HDFS-based applications but is not commonly used for much else. Additionally, it’s not well designed to support the cloud, because it expects information to be available on HDFS. Also, compute and storage is largely coupled together, meaning that scaling your cluster involves scaling both storage and compute instead of just one or the other. Mesos does improve on this a bit conceptually, and it supports a wide range of application types, but it still requires pre-provisioning machines and, in some sense, requires buy-in at a much larger scale. For instance, it doesn’t really make sense to have a Mesos cluster for only running Spark Applications. Spark standalone mode is the lightest-weight cluster manager and is relatively simple to understand and take advantage of, but then you’re going to be building more application management infrastructure that you could get much more easily by using YARN or Mesos.

  • Managing different Spark versions: Your hands are largely tied if you want to try to run a variety of different applications running different Spark versions, and unless you use a well-managed service, you’re going to need to spend a fair amount of time either managing different setup scripts for different Spark services or removing the ability for your users to use a variety of different Spark applications.

  • Logging: You need to consider how you’re going to set up logging, store logs for future reference, and allow end users to debug their applications. These are more “out of the box” for YARN or Mesos and might need some tweaking if you’re using standalone.

  •  Spark’s external shuffle service: Typically Spark stores shuffle blocks (shuffle output) on a local disk on that particular node. An external shuffle service allows for storing those shuffle blocks so that they are available to all executors, meaning that you can arbitrarily kill executors and still have their shuffle outputs available to other applications.


Tuesday, 16 April 2019

CH15.2 Life Cycle of Spark Application (SparkSession and SparkContext)


The life Cycle of a Spark Application (outside Spark)

  • Client Request:
    • 1st we submit an actual application.(a pre-compiled JAR or library).At this point, you are executing code on your local machine and you’re going to make a request to the cluster manager driver node .
    • We are explicitly asking for resources for the Spark driver process only. We assume that the cluster manager accepts this offer and places the driver onto a node in the cluster. The client process that submitted the original job exits and the application is off and running on the cluster.
    • To submit the application we would need to use spark-submit as shown below:

./bin/spark-submit \
  --class <main-class> \
  --master <master-url> \
  --deploy-mode cluster \
  --conf <key>=<value> \
  ... # other options
  <application-jar> \
  [application-arguments]


  • Launch :
    • Once the driver process is running it begins running Spark user Code. This code must include a SparkSession that initializes a Spark cluster (e.g., driver + executors).(Note Spark Cluster is one for one application. Spark processes are temporary processes.
    • The SparkSession will subsequently communicate with the cluster manager asking it to launch Spark executor processes across the cluster. The number of executors and their relevant configurations are set by the user via the command-line arguments in the original spark-submit call.
    • The cluster manager responds by launching the executor processes (assuming all goes well) and sends the relevant information about their locations to the driver process. After everything is hooked up correctly, we have a “Spark Cluster” 

  • Execution:
    • The driver and the workers communicate among themselves, executing code and moving data around. The driver schedules tasks onto each worker, and each worker responds with the status of those tasks and success or failure. 

  • Completion:
    • After a Spark Application completes, the driver process exits with either success or failure . The cluster manager then shuts down the executors in that Spark cluster for the driver

The Life Cycle of a Spark Application (Inside Spark)
 

Each application is made up of one or more Spark jobs. Spark jobs within an application are executed serially (unless you use threading to launch multiple actions in parallel).

  • SparkSession:
    • The first step of any Spark Application is creating a SparkSession. In Spark REPL its done for us. But in our code, we need to create one explicitly.
    • Legacy code might use the SparkContext pattern. This should be avoided in favor of the builder method on the SparkSession, which more robustly instantiates the Spark and SQL Contexts and ensures that there is no context conflict.
    • SparkSession Companion object has the method "builder" which gives us several methods to set app name, master etc.
  • Following are the method available on the Builder class. (inner class of SparkSession)

def appName(name: String): Builder
Sets a name for the application, which will be shown in the Spark web UI. If no application name is set, a randomly generated name will be used.

def config(conf: SparkConf): Builder
Sets a list of config options based on the given SparkConf.

def config(key: String, value: Boolean): Builder
Sets a config option. Options set using this method are automatically propagated to both SparkConf and SparkSession's own configuration.

def config(key: String, value: Double): Builder
Sets a config option. Options set using this method are automatically propagated to both SparkConf and SparkSession's own configuration.

def config(key: String, value: Long): Builder
Sets a config option. Options set using this method are automatically propagated to both SparkConf and SparkSession's own configuration.

def config(key: String, value: String): Builder
Sets a config option. Options set using this method are automatically propagated to both SparkConf and SparkSession's own configuration.

def enableHiveSupport(): Builder
Enables Hive support, including connectivity to a persistent Hive metastore, support for Hive serdes, and Hive user-defined functions.

def getOrCreate(): SparkSession
Gets an existing SparkSession or, if there is no existing one, creates a new one based on the options set in this builder.
This method first checks whether there is a valid thread-local SparkSession, and if yes, return that one. It then checks whether there is a valid global default SparkSession, and if yes, return that one. If no valid global default SparkSession exists, the method creates a new SparkSession and assigns the newly created SparkSession as the global default.
In case an existing SparkSession is returned, the config options specified in this builder will be applied to the existing SparkSession.

def master(master: String): Builder
Sets the Spark master URL to connect to, such as "local" to run locally, "local[4]" to run locally with 4 cores, or "spark://master:7077" to run on a Spark standalone cluster.

 val spark = SparkSession.builder().master("local[*]").appName("Test App").getOrCreate()

  • Once we have the SparkSession, we should be able to run our Spark Code. From the SparkSession, you can access all of low-level and legacy contexts and configurations accordingly, as well. 

  • Note that the SparkSession class was only added in Spark 2.X. Older code you might find would instead directly create a SparkContext and a SQLContext for the structured APIs.

  • SparkContext:

  • A SparkContext object within the SparkSession represents the connection to the Spark cluster. This class is how you communicate with some of Spark’s lower-level APIs, such as RDDs.
  • Through a SparkContext, you can create RDDs, accumulators, and broadcast variables, and you can run code on the cluster.
  • In previous versions of Spark, the SQLContext and HiveContext provided the ability to work with DataFrames and Spark SQL and were commonly stored as the variable sqlContext in examples, documentation, and legacy code. 
  • As a historical point, Spark 1.X had effectively two contexts. The SparkContext and theSQLContext. These two each performed different things. The former focused on more fine-grained control of Spark’s central abstractions, whereas the latter focused on the higher-level tools like Spark SQL.
  • In Spark 2.X, the community combined the two APIs into the centralized SparkSession that we have today. However, both of these APIs still exist and you can access them via the SparkSession. It is important to note that you should never need to use the SQLContext and rarely need to use the SparkContext.


  • Spark job:
    • When we have an "action" in our code, it triggers a complete Spark job. We can use "explain" to understand the physical execution plan. We can access this information on the SQL tab (after we actually run a query) in the Spark UI as well. We can go to localhost:4040 if you are running this on your local machine to see the Spark UI.
    • Spark job  individually consist of stages and tasks
    • Each job breaks down into a series of stages, the number of which depends on how many shuffle operations need to take place.

  • Stages:
    • Stages in Spark represent groups of tasks that can be executed together to compute the same operation on multiple machines. In general, Spark will try to pack as much work as possible (i.e., as many transformations as possible inside your job) into the same stage, but the engine starts new stages after operations called shuffles. 
    • A shuffle represents a physical repartitioning of the data. Spark starts a new stage after each shuffle, and keeps track of what order the stages must run in to compute the final result.
    • The spark.sql.shuffle.partitions default value is 200, which means that when there is a shuffle performed during execution, it outputs 200 shuffle partitions by default. You can change this value, and the number of output partitions will change. This value should be set according to the number of cores in your cluster to ensure efficient execution.
spark.conf.set("spark.sql.shuffle.partitions", 50)
  • A good rule of thumb is that the number of partitions should be larger than the number of executors on your cluster, potentially by multiple factors depending on the workload. 

  • Tasks: 
    • Stages in Spark consist of tasks.
    • If there is one big partition in our dataset, we will have one task. If there are 1,000 little partitions, we will have 1,000 tasks that can be executed in parallel.
    • A task is just a unit of computation applied to a unit of data (the partition). Partitioning your data into a greater number of partitions means that more can be executed in parallel. 

  • Execution Details: Spark automatically pipelines stages and tasks that can be done together, such as a map operation followed by another map operation. Second, for all shuffle operations, Spark writes the data to stable storage (e.g., disk), and can reuse it across multiple jobs.
    • Unlike the tools that came before it (e.g., MapReduce), Spark performs as many steps as it can at one point in time before writing data to memory or disk. Pipelining is a key optimization which occurs at and below the RDD level. With pipelining, any sequence of operations that feed data directly into each other, without needing to move it across nodes, is collapsed into a single stage of tasks that do all the operations together.
    • For example, if you write an RDD-based program that does a map, then a filter, then another map, these will result in a single stage of tasks that immediately read each input record, pass it through the first map, pass it through the filter, and pass it through the last map function if needed. This pipelined version of the computation is much faster than writing the intermediate results to memory or disk after each step. From a practical point of view, pipelining will be transparent to you as you write an application—the Spark runtime will automatically do it.
    • When Spark needs to run an operation that has to move data across nodes, such as a reduce-by-key operation (where input data for each key needs to first be brought together from many nodes), the engine can’t perform pipelining anymore, and instead it performs a cross-network shuffle. Spark always executes shuffles by first having the “source” tasks (those sending data) write shuffle files to their local disks during their execution stage. Then, the stage that does the grouping and reduction launches and runs tasks that fetch their corresponding records from each shuffle file and performs that computation (e.g., fetches and processes the data for a specific range of keys).
    • Saving the shuffle files to disk lets Spark run this stage later in time than the source stage (e.g., if there are not enough executors to run both at the same time), and also lets the engine re-launch reduce tasks on failure without rerunning all the input tasks. One side effect you’ll see for shuffle persistence is that running a new job over data that’s already been shuffled does not rerun the “source” side of the shuffle. Because the shuffle files were already written to disk earlier, Spark knows that it can use them to run the later stages of the job, and it need not redo the earlier ones.
    • This automatic optimization can save time in a workload that runs multiple jobs over the same data, but of course, for even better performance you can perform your own caching with the DataFrame or RDD cache method, which lets you control exactly which data is saved and where.