Search This Blog

Wednesday, 10 April 2019

CH2/3 DataFrames and SQL Example, Execution plans


  • Spark can run the same transformations, regardless of the language, in the exact same way. We can express your business logic in SQL or DataFrames (either in R, Python, Scala, or Java) and Spark will compile that logic down to an underlying plan (that you can see in the explain plan) before actually executing your code.
     
  • With Spark SQL, we can register any DataFrame as a table or view (a temporary table) and query it using pure SQL. There is no performance difference between writing SQL queries or writing DataFrame code, they both “compile” to the same underlying plan that we specify in DataFrame code.

  • We can convert any DataFrame into a table or view with one simple method call.

scala> flightData2015.createOrReplaceTempView("flight_data_2015")

To query the table we use spark.sql function. It  returns a new DataFrame.

SQL query against a DataFrame returns another DataFrame.  This makes it possible for you to specify transformations in the manner most convenient to you at any given point in time and not sacrifice any efficiency to do so.
 
Have a look at the below explain plans. We can see that they are identical.


scala>     val flightData2015 = spark.read.option("inferSchema", "true").option("header", "true").csv("data/flight-data/csv/2015-summary.csv")
flightData2015: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]

scala>     flightData2015.createOrReplaceTempView("flight_data_2015")

scala>     val sqlWay = spark.sql(s"""SELECT DEST_COUNTRY_NAME, count(1)
     |                            FROM flight_data_2015
     |                            GROUP BY DEST_COUNTRY_NAME""")
sqlWay: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string, count(1): bigint]

scala>     val dataFrameWay = flightData2015.groupBy("DEST_COUNTRY_NAME").count()
dataFrameWay: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string, count: bigint]

scala>     sqlWay.explain
== Physical Plan ==
*(2) HashAggregate(keys=[DEST_COUNTRY_NAME#27], functions=[count(1)])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#27, 3000)
   +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#27], functions=[partial_count(1)])
      +- *(1) FileScan csv [DEST_COUNTRY_NAME#27] Batched: false, Format: CSV, Location: InMemoryFileIndex[xxxxxxx/data/flight-data/csv/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>

scala>     dataFrameWay.explain
== Physical Plan ==
*(2) HashAggregate(keys=[DEST_COUNTRY_NAME#27], functions=[count(1)])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#27, 3000)
   +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#27], functions=[partial_count(1)])
      +- *(1) FileScan csv [DEST_COUNTRY_NAME#27] Batched: false, Format: CSV, Location: InMemoryFileIndex[xxxxxxxdata/flight-data/csv/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>

  • Following simply shows few more examples of using DataFrame functions and SQL
    Underlying execution plans are the same.:

scala> spark.sql("SELECT max(count) from flight_data_2015").take(1)
res5: Array[org.apache.spark.sql.Row] = Array([370002])

scala> flightData2015.select(max("count")).take(1)
res6: Array[org.apache.spark.sql.Row] = Array([370002])
scala> val maxSql = spark.sql("""
     | SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
     | FROM flight_data_2015
     | GROUP BY DEST_COUNTRY_NAME
     | ORDER BY sum(count) DESC
     | LIMIT 5
     | """)
maxSql: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string, destination_total: bigint]

scala>

scala> maxSql.show()
+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+
scala> flightData2015.groupBy("DEST_COUNTRY_NAME").sum("count").withColumnRenamed("sum(count)", "destination_total").sort(desc("destination_total")).limit(5).show()
+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+

Following is how the explain plan looks like:

scala> flightData2015.groupBy("DEST_COUNTRY_NAME").sum("count").withColumnRenamed("sum(count)", "destination_total").sort(desc("destination_total")).limit(5).explain
== Physical Plan ==
TakeOrderedAndProject(limit=5, orderBy=[destination_total#125L DESC NULLS LAST], output=[DEST_COUNTRY_NAME#27,destination_total#125L])
+- *(2) HashAggregate(keys=[DEST_COUNTRY_NAME#27], functions=[sum(cast(count#29 as bigint))])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#27, 3000)
      +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#27], functions=[partial_sum(cast(count#29 as bigint))])
         +- *(1) FileScan csv [DEST_COUNTRY_NAME#27,count#29] Batched: false, Format: CSV, Location: InMemoryFileIndex[xxxxxxxdata/flight-data/csv/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,count:int>

Execution plan is a directed acyclic graph (DAG) of transformations, each resulting in a new immutable DataFrame, on which we call an action to generate a result.
 
  • The first step is to read in the data. Spark does not actually read it in until an action is called on that DataFrame or one derived from the original DataFrame.
  • The second step is our grouping(groupBy). After grouping we have a RelationalGroupedDataset, which is a fancy name for a DataFrame that has a grouping specified but needs the user to specify an aggregation before it can be queried further

scala> flightData2015.groupBy("DEST_COUNTRY_NAME")
res13: org.apache.spark.sql.RelationalGroupedDataset = RelationalGroupedDataset: [grouping expressions: [DEST_COUNTRY_NAME: string], value: [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field], type: GroupBy]

  • Third step is to specify the aggregation. The result of the sum method call is a new DataFrame.

scala> flightData2015.groupBy("DEST_COUNTRY_NAME").sum("count")
res15: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string, sum(count): bigint]

  • The fourth step is a simple renaming. We use the withColumnRenamed method that takes two arguments, the original column name and the new column name.

  • The fifth step sorts the data such that if we were to take results off of the top of the DataFrame, they would have the largest values in the destination_total column. We used a function called "desc".The function desc does not return a string but a Column. In general, many DataFrame methods will accept strings (as column names) or Column types or expressions. Columns and expressions are actually the exact same thing.

scala> desc("destination_total")
res16: org.apache.spark.sql.Column = destination_total DESC NULLS LAST

  • Penultimately, we’ll specify a limit. This just specifies that we only want to return the first five values in our final DataFrame instead of all the data.
     
  • The last step is our action! Now we actually begin the process of collecting the results of our DataFrame, and Spark will give us back a list or array in the language that we’re executing.

Although this explain plan doesn’t match our exact “conceptual plan,” all of the pieces are there. You can see the limit statement as well as the orderBy (in the first line). You can also see how our aggregation happens in two phases, in the partial_sum calls. This is because summing a list of numbers is commutative, and Spark can perform the sum, partition by partition. Of course we can see how we read in the DataFrame, as well.
Naturally, we don’t always need to collect the data. We can also write it out to any data source that Spark supports.

CH2/2 End to End Example



First End to End Example:

  • In this example use Spark to analyze some flight data (csv format - semi structured) from the United States Bureau of Transportation statistics. Each row in the file representing a row in our future DataFrame.

  • Spark has ability to read and write from a large number of data sources. To read this data, we use a DataFrameReader that is associated with our SparkSession(The method read on SparkSession returns a DataFrameReader).

  • We will specify the file format as well as any options we want to specify. Here we are using schema inference, which means Spark will make a best guess at what the schema of our DataFrame should be. We also specify that the first row is the header in the file.
    To get the schema information, Spark reads in a little bit of the data and then attempts to parse the types in those rows according to the types available in Spark.
    You also have the option of strictly specifying a schema when you read in data 

scala> :paste
// Entering paste mode (ctrl-D to finish)

    val flightData2015 = spark
      .read
      .option("inferSchema", "true")
      .option("header", "true")
      .csv("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\flight-data\\csv\\2015-summary.csv")

// Exiting paste mode, now interpreting.

flightData2015: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]

  • Dataframes have a set of columns with an unspecified number of rows. The reason the number of rows is unspecified is because reading data is a transformation, and is therefore a lazy operation. Spark peeked at only a couple of rows of data to try to guess what types each column should be.

scala> flightData2015.take(4)
res0: Array[org.apache.spark.sql.Row] = Array([United States,Romania,15], [United States,Croatia,1], [United States,Ireland,344], [Egypt,United States,15])

  • Next we sort our data(in DataFrame) according to a data column. Remember, sort does not modify the DataFrame. We use sort as a transformation that returns a new DataFrame by transforming the previous DataFrame.

scala> flightData2015.sort("count")
res1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]

Nothing happens to the data when we call sort because it’s just a transformation. However, we can see that Spark is building up a plan for how it will execute this across the cluster by looking at the explain plan. We can call explain on any DataFrame object to see the DataFrame’s lineage

scala> flightData2015.sort("count").explain
== Physical Plan ==
*(2) Sort [count#12 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(count#12 ASC NULLS FIRST, 200)
   +- *(1) FileScan csv [DEST_COUNTRY_NAME#10,ORIGIN_COUNTRY_NAME#11,count#12] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/C:/Users/sukulma/Downloads/Spark-Data/Spark-data/data/flight-data/csv/201..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>

We can read explain plans from top to bottom, the top being the end result, and the bottom being the source(s) of data.

  • By default, when we perform a shuffle, Spark outputs 200 shuffle partitions.
     
scala> spark.conf
res3: org.apache.spark.sql.RuntimeConfig = org.apache.spark.sql.RuntimeConfig@31857c80

scala> spark.conf.get("spark.sql.shuffle.partitions")
res4: String = 200

We can reduce that value as follows:

scala> spark.conf.set("spark.sql.shuffle.partitions", "5")

scala> flightData2015.sort("count").explain
== Physical Plan ==
*(2) Sort [count#12 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(count#12 ASC NULLS FIRST, 5)
   +- *(1) FileScan csv [DEST_COUNTRY_NAME#10,ORIGIN_COUNTRY_NAME#11,count#12] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/C:/Users/sukulma/Downloads/Spark-Data/Spark-data/data/flight-data/csv/201..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>

Note that the number of partitions has changed to 5 now.

Remember that you can monitor the job progress by navigating to the Spark UI on port 4040 to see the physical and logical execution characteristics of your jobs.

CH2/1 Spark Architecture,Applications,SparkSession,Partitions,Transformations, Actions


Spark's Basic Architecture:

  • In Spark(and any distributed processing engine) a cluster, or group, of computers, pools the resources of many machines together, giving us the ability to use all the cumulative resources as if they were a single computer.
     
  • Now, a group of machines alone is not powerful, we need a framework to coordinate work across them. Spark does just that -  managing and coordinating the execution of tasks on data across a cluster of computers.

  • The cluster of machines that Spark will use to execute tasks is managed by a cluster manager like Spark’s standalone cluster manager, YARN, or Mesos.
    We then
    submit Spark Applications to these cluster managers, which will grant resources to our application so that we can complete our work.



Spark Applications:

  • Spark Applications consist of a driver process and a set of executor processes.
     
  • The driver process runs our main() function, sits on a node in the cluster, and is responsible for three things:
    • maintaining information about the Spark Application;
    • responding to a user’s program or input;
    • and analyzing, distributing, and scheduling work across the executors
The driver process is absolutely essential—it’s the heart of a Spark Application and maintains all relevant information during the lifetime of the application.
Driver also negotiates resources with the cluster manager

  • The driver can be “driven” from a number of different languages through Spark’s language APIs.

  • The executors are responsible for actually carrying out the work that the driver assigns them.
    Each executor is responsible for only two things:
    • executing code assigned to it by the driver, and
    • reporting the state of the computation on that executor back to the driver node.
The user can specify how many executors should fall on each node through configurations.
 
  • Cluster manager controls physical machines and allocates resources to Spark Applications.
Three core cluster managers:
  • Spark’s standalone cluster manager,
  • YARN, or
  • Mesos.
This means that there can be multiple Spark Applications running on a cluster at the same time.
Spark employs a cluster manager that keeps track of the resources available.



Spark in Local Mode:

Spark, in addition to its cluster mode, also has a local mode.
The driver and executors are simply processes,
which means that they can live on the same machine or different machines.
In local mode, the driver and executers run (as threads) on our individual computer instead of a cluster. 


Spark Language API:
Spark’s language APIs make it possible for us to run Spark code using various programming languages.
 
  • Scala-Spark is primarily written in Scala, making it Spark’s “default” language. 
  • Java-Even though Spark is written in Scala, Spark’s authors have been careful to ensure that you can write Spark code in Java.
  • Python- Python supports nearly all constructs that Scala supports. 
  • SQL -Spark supports a subset of the ANSI SQL 2003 standard. This makes it easy for analysts and non-programmers to take advantage of the big data powers of Spark.
  • R - Spark has two commonly used R libraries: one as a part of Spark core (SparkR) and another as an R community-driven package (sparklyr).

There is a SparkSession object available to the user, which is the entrance point to running Spark code.

When using Spark from Python or R, you don’t write explicit JVM instructions; instead, you write Python and R code that
Spark translates into code that it then can run on the executor JVMs.



Spark API's
  • Spark API's allow us to drive Spark from a variety of languages, what it makes available in those languages is worth mentioning.
     
  • Spark has two fundamental sets of APIs:
    • the low-level “unstructured” APIs, and
    • the higher-level structured APIs.



Starting Spark:
  • When we start writing applications we are going to need a way to send user commands and data to it. This is done by 1st creating Spark Session.
     
  • When you start Spark in this interactive mode, you implicitly create a SparkSession that manages the Spark Application.
     
  • When you start it through a standalone application, you must create the SparkSession object yourself in your application code.
We can submit standalone applications to Spark using spark-submit (where we can submit a precompiled application to Spark)



SparkSession:
  • We control our Spark Application through a driver process called the SparkSession.
     
  • The SparkSession instance is the way Spark executes user-defined manipulations across the cluster.
     
  • There is a one-to-one correspondence between a SparkSession and a Spark Application.
     
  • In Scala and Python, the variable is available as "spark" when we start the console.

scala> spark
res13: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@604d0523

scala> val myRange = spark.range(1000).toDF("number")
myRange: org.apache.spark.sql.DataFrame = [number: bigint]

scala> myRange.take(10)
res14: Array[org.apache.spark.sql.Row] = Array([0], [1], [2], [3], [4], [5], [6], [7], [8], [9])

  • Above we created a DataFrame with one column containing 1,000 rows with values from 0 to 999. This range of numbers represents a distributed collection. When run on a cluster, each part of this range of numbers exists on a different executor.



DataFrames:
  • A DataFrame is the most common Structured API and simply represents a table of data with rows and columns.
     
  • The list that defines the columns and the types within those columns is called the schema.
     
  • You can think of a DataFrame as a spreadsheet with named columns. A spreadsheet sits on one computer in one specific location, whereas a Spark DataFrame can span thousands of computers.  The reason for putting the data on more than one computer should be either the data is too large to fit on one machine or it would simply take too long to perform that computation on one machine.
     
  • The DataFrame concept is not unique to Spark. R and Python both have similar concepts. However, Python/R DataFrames (with some exceptions) exist on one machine rather than multiple machines.  However, because Spark has language interfaces for both Python and R, it’s quite easy to convert Pandas (Python) DataFrames to Spark DataFrames, and R DataFrames to Spark DataFrames.

Spark has several core abstractions: Datasets, DataFrames, SQL Tables, and Resilient Distributed Datasets (RDDs). These different abstractions all represent distributed collections of data. The easiest and most efficient are DataFrames, which are available in all languages.



Partitions:

  • To allow every executor to perform work in parallel, Spark breaks up the data into chunks called partitions.
     
  • A partition is a collection of rows that sit on one physical machine in your cluster.
     
  • A DataFrame’s partitions represent how the data is physically distributed across the cluster of machines during execution.
     
  • If you have one partition, Spark will have a parallelism of only one, even if you have thousands of executors.
    If you have many partitions but only one executor, Spark will still have a parallelism of only one because there is only one computation resource.



Transformations

  • To Change a dataframe we need to instruct Spark how we would like to modify it to do what we want. These instructions are called transformations. Transformations are the core of how you express your business logic using Spark.

Following shows how to perform a simple transformation to find all even numbers in our current DataFrame.

scala> val myRange = spark.range(1000).toDF("number")
myRange: org.apache.spark.sql.DataFrame = [number: bigint]

scala> val divisBy2 = myRange.where("number % 2 = 0")
divisBy2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [number: bigint]

  • Spark will not act on transformations until we call an action
     
  • There are two types of transformations:
    • those that specify narrow dependencies, and
    • those that specify wide dependencies.

Transformations consisting of narrow dependencies (Narrow transformations)  are those for which each input partition will contribute to only one output partition. Ex: the where statement specifies a narrow dependency, where only one partition contributes to at most one output partition
With narrow transformations, Spark will automatically perform an operation called 
pipelining, meaning that if we specify multiple filters on DataFrames, they’ll all be performed in-memory. The same cannot be said for shuffles. 

A wide dependency (or wide transformation) style transformation will have input partitions contributing to many output partitions. This is often referred as shuffle whereby Spark will exchange partitions across the cluster. 
When we perform a shuffle, Spark writes the results to disk.




Lazy Evaluation:

  • Lazy evaulation means that Spark will wait until the very last moment to execute the graph of computation instructions.

  • In Spark, instead of modifying the data immediately when you express some operation, you build up a plan of transformations that you would like to apply to your source data

  • By waiting until the last minute to execute the code, Spark compiles this plan from your raw DataFrame transformations to a streamlined physical plan that will run as efficiently as possible across the cluster. This provides immense benefits because Spark can optimize the entire data flow from end to end. An example of this is something called predicate pushdown on DataFrames. 




Actions:

  • An action instructs Spark to compute a result from a series of transformations. 
  • Transformations allow us to build up our logical transformation plan. To trigger the computation, we run an action
  • There are three kinds of actions:
    • Actions to view data in the console
    • Actions to collect data to native objects in the respective language
    • Actions to write to output data sources
Example:
scala> divisBy2.count
res0: Long = 500

In specifying this action, we started a Spark job that runs our filter transformation (a narrow transformation), then an aggregation (a wide transformation) that performs the counts on a per partition basis, and then a collect, which brings our result to a native object in the respective language. 




Spark UI:

  • You can monitor the progress of a job through the Spark web UI. The Spark UI is available on port 4040 of the driver node.
    If you are running in local mode, this will be 
    http://localhost:4040.
     
  • The Spark UI displays information on the state of your Spark jobs, its environment, and cluster state. It’s very useful, especially for tuning and debugging.

CH1/0 What is Apache Spark


  • Apache Spark is a unified computing engine and a set of libraries for parallel data processing on computer clusters.
     
  • Spark supports multiple widely used programming languages (Python, Java, Scala, and R), includes libraries for diverse tasks ranging from SQL to streaming and machine learning, and runs anywhere from a laptop to a cluster of thousands of servers.
     
  • Components and Libraries Spark offer to end users:
    • Low Level API's - RDD's, Distributed Variables
    • Structured API - DataSets, DataFrames, SQL
    • Structured Streaming, Advanced Analytics, Libraries and EcoSystem




Apache Spark Philosophy:
Apache Spark—a unified computing engine and set of libraries for big data

A] Unified :
 
  • Spark’s key driving goal is to offer a unified platform for writing big data applications.
  • Spark is designed to support a wide range of data analytics tasks, ranging from simple data loading and SQL queries to machine learning and streaming computation, over the same computing engine and with a consistent set of APIs
  • Real-world data analytics tasks tend to combine many different processing types and libraries. Spark’s unified nature makes these tasks both easier and more efficient to write.
  • Spark’s APIs are also designed to enable high performance by optimizing across the different libraries and functions composed together in a user program. For example, if you load data using a SQL query and then evaluate a machine learning model over it using Spark’s ML library, the engine can combine these steps into one scan over the data. 

B] Computing Engine:

  • At the same time that Spark strives for unification, it carefully limits its scope to a computing engine.
     
  • This means Spark handles loading data from storage systems and performing computation on it, not permanent storage as the end itself. We can use Spark with a wide variety of persistent storage systems, including cloud storage systems such as Azure Storage and Amazon S3, distributed file systems such as Apache Hadoop, key-value stores such as Apache Cassandra, and message buses such as Apache Kafka. However, Spark neither stores data long term itself, nor favors one over another.
     
  • In user-facing APIs, Spark works hard to make these storage systems look largely similar so that applications do not need to worry about where their data is.

  • Spark’s focus on computation makes it different from earlier big data software platforms such as Apache Hadoop. Hadoop included both a storage system and computing engine.
However, this choice makes it difficult to run one of the systems without the other. More important, this choice also makes it a challenge to write applications that access data stored anywhere else. Although Spark runs well on Hadoop storage, today it is also used broadly in environments for which the Hadoop architecture does not make sense, such as the public cloud
(Spark is not tied to HDFS)

C] Libraries :
 
  • Spark supports both standard libraries that ship with the engine as well as a wide array of external libraries published as third-party packages by the open source communities.
     
  • Spark core engine itself has changed little since it was first released, but the libraries have grown to provide more and more types of functionality. Spark includes libraries for SQL and structured data (Spark SQL), machine learning (MLlib), stream processing (Spark Streaming and the newer Structured Streaming), and graph analytics (GraphX). Beyond these libraries, there are hundreds of open source external libraries ranging from connectors for various storage systems to machine learning algorithms.







History of Spark:


  • In 2013, the project had grown to widespread use, with more than 100 contributors from more than 30 organizations outside UC Berkeley. The AMPlab contributed Spark to the Apache Software Foundation 

  • The early AMPlab team also launched a company, Databricks, to harden the project, joining the community of other companies and organizations contributing to Spark. Since that time, the Apache Spark community released Spark 1.0 in 2014 and Spark 2.0 in 2016, and continues to make regular releases, bringing new features into the project.

  • Early versions of Spark (before 1.0) largely defined this API in terms of functional operations—parallel operations such as maps and reduces over collections of Java objects.
    Beginning with 1.0, the project added Spark SQL, a new API for working with structured data—tables with a fixed data format that is not tied to Java’s in-memory representation. Spark SQL enabled powerful new optimizations across libraries and APIs by understanding both the data format and the user code that runs on it in more detail. Over time, the project added a plethora of new APIs that build on this more powerful structured foundation, including DataFrames, machine learning pipelines, and Structured Streaming, a high-level, automatically optimized streaming API.

  • New high-level streaming engine, Structured Streaming, was introduced in 2016. 

Running Spark

  • We can use Spark from Python, Java, Scala, R, or SQL.
    Spark itself is written in Scala, and runs on the Java Virtual Machine (JVM), so therefore to run Spark either on your laptop or a cluster, all you need is an installation of Java.
    If you want to use the Python API, you will also need a Python interpreter (version 2.7 or later).
    If you want to use R, you will need a version of R on your machine.

  • There are two options we recommend for getting started with Spark:
    • downloading and installing Apache Spark on your laptop, or
    • running a web-based version in Databricks Community Edition, a free cloud environment for learning Spark

  • If you want to download and run Spark locally, the first step is to make sure that you have Java installed on your machine (available as java), as well as a Python version if you would like to use Python.
    Next, visit 
    the project’s official download page, select the package type of “Pre-built for Hadoop 2.7 and later,” and click “Direct Download.” This downloads a compressed TAR file, or tarball, that you will then need to extract.

  • Spark can run locally without any distributed storage system, such as Apache Hadoop. However, if you would like to connect the Spark version on your laptop to a Hadoop cluster, make sure you download the right Spark version for that Hadoop version






Launching Spark's interactive consoles:

  • We can start an interactive shell in Spark for several different programming languages. 

  • Python :
We need Python 2 or 3 installed in order to launch the Python console. From Spark’s home directory, run the following code: ./bin/pyspark
After we have done that, type “spark” and press Enter. We will see the SparkSession object printed

  • Scala:
To launch the Scala console we run the command:  ./bin/spark-shell
After we have done that, type “spark” and press Enter. We will see the SparkSession object printed

  • SQL Console:
To launch the Scala sql console we run the command:  ./bin/spark-sql