Search This Blog

Wednesday, 10 April 2019

CH2/2 End to End Example



First End to End Example:

  • In this example use Spark to analyze some flight data (csv format - semi structured) from the United States Bureau of Transportation statistics. Each row in the file representing a row in our future DataFrame.

  • Spark has ability to read and write from a large number of data sources. To read this data, we use a DataFrameReader that is associated with our SparkSession(The method read on SparkSession returns a DataFrameReader).

  • We will specify the file format as well as any options we want to specify. Here we are using schema inference, which means Spark will make a best guess at what the schema of our DataFrame should be. We also specify that the first row is the header in the file.
    To get the schema information, Spark reads in a little bit of the data and then attempts to parse the types in those rows according to the types available in Spark.
    You also have the option of strictly specifying a schema when you read in data 

scala> :paste
// Entering paste mode (ctrl-D to finish)

    val flightData2015 = spark
      .read
      .option("inferSchema", "true")
      .option("header", "true")
      .csv("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\flight-data\\csv\\2015-summary.csv")

// Exiting paste mode, now interpreting.

flightData2015: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]

  • Dataframes have a set of columns with an unspecified number of rows. The reason the number of rows is unspecified is because reading data is a transformation, and is therefore a lazy operation. Spark peeked at only a couple of rows of data to try to guess what types each column should be.

scala> flightData2015.take(4)
res0: Array[org.apache.spark.sql.Row] = Array([United States,Romania,15], [United States,Croatia,1], [United States,Ireland,344], [Egypt,United States,15])

  • Next we sort our data(in DataFrame) according to a data column. Remember, sort does not modify the DataFrame. We use sort as a transformation that returns a new DataFrame by transforming the previous DataFrame.

scala> flightData2015.sort("count")
res1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]

Nothing happens to the data when we call sort because it’s just a transformation. However, we can see that Spark is building up a plan for how it will execute this across the cluster by looking at the explain plan. We can call explain on any DataFrame object to see the DataFrame’s lineage

scala> flightData2015.sort("count").explain
== Physical Plan ==
*(2) Sort [count#12 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(count#12 ASC NULLS FIRST, 200)
   +- *(1) FileScan csv [DEST_COUNTRY_NAME#10,ORIGIN_COUNTRY_NAME#11,count#12] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/C:/Users/sukulma/Downloads/Spark-Data/Spark-data/data/flight-data/csv/201..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>

We can read explain plans from top to bottom, the top being the end result, and the bottom being the source(s) of data.

  • By default, when we perform a shuffle, Spark outputs 200 shuffle partitions.
     
scala> spark.conf
res3: org.apache.spark.sql.RuntimeConfig = org.apache.spark.sql.RuntimeConfig@31857c80

scala> spark.conf.get("spark.sql.shuffle.partitions")
res4: String = 200

We can reduce that value as follows:

scala> spark.conf.set("spark.sql.shuffle.partitions", "5")

scala> flightData2015.sort("count").explain
== Physical Plan ==
*(2) Sort [count#12 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(count#12 ASC NULLS FIRST, 5)
   +- *(1) FileScan csv [DEST_COUNTRY_NAME#10,ORIGIN_COUNTRY_NAME#11,count#12] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/C:/Users/sukulma/Downloads/Spark-Data/Spark-data/data/flight-data/csv/201..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>

Note that the number of partitions has changed to 5 now.

Remember that you can monitor the job progress by navigating to the Spark UI on port 4040 to see the physical and logical execution characteristics of your jobs.

No comments:

Post a Comment