First End to End Example:
- In this example use Spark to analyze some flight data (csv format - semi structured) from the United States Bureau of Transportation statistics. Each row in the file representing a row in our future DataFrame.
- Spark has ability to read and write from a large number of data sources. To read this data, we use a DataFrameReader that is associated with our SparkSession(The method read on SparkSession returns a DataFrameReader).
- We will specify the file
format
as well as any options we want to specify. Here we are using schema inference, which means
Spark will make a best guess at what the schema of our DataFrame should
be. We also specify that the first row is the header in the
file.
To get the schema information, Spark reads in a little bit of the data and then attempts to parse the types in those rows according to the types available in Spark.
You also have the option of strictly specifying a schema when you read in data
|
scala> :paste
// Entering paste mode (ctrl-D to finish)
val
flightData2015 = spark
.read
.option("inferSchema", "true")
.option("header", "true")
.csv("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\flight-data\\csv\\2015-summary.csv")
// Exiting paste mode, now interpreting.
flightData2015: org.apache.spark.sql.DataFrame =
[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]
|
- Dataframes have a set of columns with an unspecified number of rows. The reason the number of rows is unspecified is because reading data is a transformation, and is therefore a lazy operation. Spark peeked at only a couple of rows of data to try to guess what types each column should be.
|
scala> flightData2015.take(4)
res0: Array[org.apache.spark.sql.Row] =
Array([United States,Romania,15], [United States,Croatia,1], [United
States,Ireland,344], [Egypt,United States,15])
|
- Next we sort our data(in DataFrame) according to a data column. Remember, sort does not modify the DataFrame. We use sort as a transformation that returns a new DataFrame by transforming the previous DataFrame.
|
scala> flightData2015.sort("count")
res1:
org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [DEST_COUNTRY_NAME:
string, ORIGIN_COUNTRY_NAME: string ... 1 more field]
|
Nothing happens to the data when we call sort because
it’s just a transformation. However, we can see that Spark is building up a plan
for how it will execute this across the cluster by looking at
the explain plan. We can call explain on any DataFrame
object to see the DataFrame’s lineage
|
scala> flightData2015.sort("count").explain
== Physical Plan ==
*(2) Sort [count#12 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(count#12 ASC NULLS FIRST, 200)
+- *(1) FileScan csv
[DEST_COUNTRY_NAME#10,ORIGIN_COUNTRY_NAME#11,count#12] Batched: false,
Format: CSV, Location:
InMemoryFileIndex[file:/C:/Users/sukulma/Downloads/Spark-Data/Spark-data/data/flight-data/csv/201...,
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>
|
We can read explain plans from top to bottom, the top being the
end result, and the bottom being the source(s) of data.
- By default, when we perform a
shuffle, Spark outputs 200 shuffle partitions.
|
scala> spark.conf
res3: org.apache.spark.sql.RuntimeConfig =
org.apache.spark.sql.RuntimeConfig@31857c80
scala>
spark.conf.get("spark.sql.shuffle.partitions")
res4: String = 200
|
We can reduce that value as follows:
|
scala>
spark.conf.set("spark.sql.shuffle.partitions", "5")
scala>
flightData2015.sort("count").explain
== Physical Plan ==
*(2) Sort [count#12 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(count#12 ASC NULLS
FIRST, 5)
+- *(1)
FileScan csv [DEST_COUNTRY_NAME#10,ORIGIN_COUNTRY_NAME#11,count#12] Batched:
false, Format: CSV, Location:
InMemoryFileIndex[file:/C:/Users/sukulma/Downloads/Spark-Data/Spark-data/data/flight-data/csv/201...,
PartitionFilters: [], PushedFilters: [], ReadSchema:
struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>
|
Note that the number of partitions has changed to 5 now.
Remember that you can monitor the job progress by navigating to
the Spark UI on port 4040 to see the physical and logical execution
characteristics of your jobs.
No comments:
Post a Comment