Search This Blog

Thursday, 11 April 2019

CH3/2 Structured Streaming, RDD intro



    Structured Streaming:
     
    • Structured Streaming is a high-level API for stream processing that became production-ready in Spark 2.2.  (There exists a separate low level API for stream processing)
       
    • With Structured Streaming, we can take the same operations that we perform in batch mode using Spark’s structured APIs and run them in a streaming fashion.
       
    • It's easy to conceptualize because you can write your batch job as a way to prototype it and then you can convert it to a streaming job. The way all of this works is by incrementally processing that data.

    • Basic Example:
       
    Following example shows how we first analyze the data as a static dataset and create a DataFrame to do so.
    Note that we have saved the Schema in a variable staticSchema (whose type is StructType)
     
    scala>     val staticDataFrame = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\by-day\\*.csv")
    staticDataFrame: org.apache.spark.sql.DataFrame = [InvoiceNo: string, StockCode: string ... 6 more fields]

    scala>     staticDataFrame.createOrReplaceTempView("retail_data")

    scala>     val staticSchema = staticDataFrame.schema
    staticSchema: org.apache.spark.sql.types.StructType = StructType(StructField(InvoiceNo,StringType,true), StructField(StockCode,StringType,true), StructField(Description,StringType,true), StructField(Quantity,IntegerType,true), StructField(InvoiceDate,TimestampType,true), StructField(UnitPrice,DoubleType,true), StructField(CustomerID,DoubleType,true), StructField(Country,StringType,true))

    Following window function will include all data from each day in the aggregation. It’s simply a window over the time–series column in our data. This is a helpful tool for manipulating date and timestamps because we can specify our requirements in a more human form (via intervals), and Spark will group all of them together for us

    scala> staticDataFrame.selectExpr("CustomerId","(UnitPrice * Quantity) as total_cost","InvoiceDate").groupBy(col("CustomerId"),window(col("InvoiceDate"), "1 day")).sum("total_cost").show(5,false)
    +----------+------------------------------------------+-----------------+
    |CustomerId|window                                    |sum(total_cost)  |
    +----------+------------------------------------------+-----------------+
    |16057.0   |[2011-12-04 19:00:00, 2011-12-05 19:00:00]|-37.6            |
    |14126.0   |[2011-11-28 19:00:00, 2011-11-29 19:00:00]|643.6300000000001|
    |13500.0   |[2011-11-15 19:00:00, 2011-11-16 19:00:00]|497.9700000000001|
    |17160.0   |[2011-11-07 19:00:00, 2011-11-08 19:00:00]|516.8499999999999|
    |15608.0   |[2011-11-10 19:00:00, 2011-11-11 19:00:00]|122.4            |
    +----------+------------------------------------------+-----------------+
    We could have run this as a SQL code as well.
     
    Now that we’ve seen how that works, let’s take a look at the streaming code!. Very little actually changes about the code.
    The biggest change is that we
    used readStream instead of read.
    Also notice the maxFilesPerTrigger option, which simply specifies the number of files we should read in at once.
    Also notice that the
    schema we created during our static data analysis, can be directly applied when read stream of data.
    Note that streamingDataFrame is returned.

    scala> val streamingDataFrame = spark.readStream.schema(staticSchema).option("maxFilesPerTrigger", 1).format("csv").option("header", "true").load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\by-day\\*.csv")
    streamingDataFrame: org.apache.spark.sql.DataFrame = [InvoiceNo: string, StockCode: string ... 6 more fields]
     

    Now we can see whether our DataFrame is streaming

    scala> streamingDataFrame.isStreaming
    res4: Boolean = true

    Following we set up the same business logic as the previous DataFrame manipulations

    scala> val purchaseByCustomerPerHour = streamingDataFrame.selectExpr("CustomerId", "(UnitPrice * Quantity) as total_cost", "InvoiceDate").groupBy($"CustomerId", window($"InvoiceDate", "1 day")).sum("total_cost")
    purchaseByCustomerPerHour: org.apache.spark.sql.DataFrame = [CustomerId: double, window: struct<start: timestamp, end: timestamp> ... 1 more field]

     
    Remember that Streaming is still a lazy operation, so we will need to call a streaming action to start the execution of this data flow.
    Streaming actions are a bit different from our conventional static action because we’re going to be populating data somewhere instead of just calling something like count (which doesn’t make any sense on a stream anyways). The action we will use will output to an in-memory table that we will update after each trigger. In this case, each trigger is based on an individual file (the read option that we set). Spark will mutate the data in the in-memory table such that we will always have the highest value as specified in our previous aggregation

    purchaseByCustomerPerHour.writeStream.format("memory").queryName("customer_purchases").outputMode("complete").start()

    When we start the stream, we can run queries against it to debug what our result will look like if we were to write this out to a production sink:

    // in Scala
    spark.sql("""
      SELECT *
      FROM customer_purchases
      ORDER BY `sum(total_cost)` DESC
      """)
      .show(5)

    Another option you can use is to write the results out to the console:

    purchaseByCustomerPerHour.writeStream.format("console").queryName("customer_purchases_2").outputMode("complete").start()

    You shouldn’t use either of these streaming methods in production, but they do make for convenient demonstration of Structured Streaming’s power.
    Notice how this window is built on event time, as well, not the time at which Spark processes the data. This was one of the shortcomings of Spark Streaming that Structured Streaming has resolved. 

Low Level API:

 
  • Spark includes a number of lower-level primitives to allow for arbitrary Java and Python object manipulation via Resilient Distributed Datasets (RDDs). Virtually everything in Spark is built on top of RDDs.

  • DataFrame operations are built on top of RDDs and compile down to these lower-level tools for convenient and extremely efficient distributed execution. 

  • RDDs are lower level than DataFrames because they reveal physical execution characteristics (like partitions) to end users.

  • One thing that you might use RDDs for is to parallelize raw data that you have stored in memory on the driver machine. 

spark.sparkContext.parallelize(Seq(1, 2, 3)).toDF()

  • RDDs are available in Scala as well as Python. However, they’re not equivalent. This differs from the DataFrame API (where the execution characteristics are the same) due to some underlying implementation details. 

  • You shouldn’t need to use RDDs much in order to perform many tasks unless you’re maintaining older Spark code. There are basically no instances in modern Spark, for which you should be using RDDs instead of the structured APIs beyond manipulating some very raw unprocessed and unstructured data.


No comments:

Post a Comment