Search This Blog

Thursday, 11 April 2019

CH3/1 Spark-Submit, DataSets intro


    • Spark is composed of these primitives—the lower-level APIs and the Structured APIs—and then a series of standard libraries for additional functionality. Spark’s libraries support a variety of different tasks, from graph analysis and machine learning to streaming and integrations with a host of computing and storage systems

    Running Production Applications(spark-submit):
    • The spark-submit command (a built-in command-line tool) sends your application code to a cluster and launch it to execute there.

    • Upon submission, the application will run until it exits (completes the task) or encounters an error. You can do this with all of Spark’s support cluster managers including Standalone, Mesos, and YARN.

    • spark-submit offers several controls with which you can specify the resources your application needs as well as how it should be run and its command-line arguments.

    Following shows the simplest example by running application on local machine:

    ./bin/spark-submit \
      --class org.apache.spark.examples.SparkPi \
      --master local \
      ./examples/jars/spark-examples_2.11-2.2.0.jar 10

    Python Version:

    ./bin/spark-submit \
      --master local \
      ./examples/src/main/python/pi.py 10

    By changing the master argument of spark-submit, we can also submit the same application to a cluster running Spark’s standalone cluster manager, Mesos or YARN.

Datasets :Type Safe Structured APIs

  • The Type-safe version of Spark’s structured API is called Datasets, for writing statically typed code in Java and Scala.
    The Dataset API is not available in Python and R, because those languages are dynamically typed.

  • DataFrames are a distributed collection of objects of type Row that can hold various types of tabular data.
    The Dataset API gives users the ability to
    assign a Java/Scala class to the records within a DataFrame and manipulate it as a collection of typed objects similar to a Java ArrayList or Scala Seq. 

  • The APIs available on Datasets are type-safe, meaning that you cannot accidentally view the objects in a Dataset as being of another class than the class you put in initially. 

  • The Dataset class is parameterized with the type of object contained inside: Dataset<T> in Java and Dataset[T] in Scala. For example, a Dataset[Person] will be guaranteed to contain objects of class Person.

  • As of Spark 2.0, the supported types are classes following the JavaBean pattern in Java and case classes in Scala. These types are restricted because Spark needs to be able to automatically analyze the type T and create an appropriate schema for the tabular data within your Dataset.

scala> case class Flight(DEST_COUNTRY_NAME: String,ORIGIN_COUNTRY_NAME: String,count: BigInt)
defined class Flight

scala>     val flightsDF = spark.read.parquet("data/flight-data/parquet/2010-summary.parquet/")
flightsDF: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]

scala>     val flights = flightsDF.as[Flight]
flights: org.apache.spark.sql.Dataset[Flight] = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]

  • One final advantage is that when you call collect or take on a Dataset, it will collect objects of the proper type in your Dataset, not DataFrame Rows. This makes it easy to get type safety and securely perform manipulation in a distributed and a local manner without code changes.

scala> flights.count
res0: Long = 255

scala> flights.take(1)
res1: Array[Flight] = Array(Flight(United States,Romania,1))

scala> flights.take(1).foreach(x => println(x.DEST_COUNTRY_NAME))
United States

scala> flights.filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada").map(flight_row => flight_row).take(5)
res6: Array[Flight] = Array(Flight(United States,Romania,1), Flight(United States,Ireland,264), Flight(United States,India,69), Flight(Egypt,United States,24), Flight(Equatorial Guinea,United States,1))

scala> flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada").map(fr => Flight(fr.DEST_COUNTRY_NAME, fr.ORIGIN_COUNTRY_NAME, fr.count + 5))
res7: Array[Flight] = Array(Flight(United States,Romania,6), Flight(United States,Ireland,269), Flight(United States,India,74), Flight(Egypt,United States,29), Flight(Equatorial Guinea,United States,6))


 

No comments:

Post a Comment