Structured Streaming:
- Structured Streaming
is a high-level API for stream processing that became
production-ready in Spark 2.2. (There exists a separate low level
API for stream processing)
- With
Structured Streaming, we can take the same operations that we perform
in batch mode using Spark’s structured APIs and run them in a streaming
fashion.
- It's easy to conceptualize
because you can write your batch job as a way to prototype it and
then you can convert it to a streaming job. The way all of this works
is by incrementally processing that data.
Following
example shows how we first analyze the data as a static dataset and create a
DataFrame to do so.
Note
that we have saved the Schema in a variable staticSchema (whose type is
StructType)
|
scala> val
staticDataFrame =
spark.read.format("csv").option("header",
"true").option("inferSchema",
"true").load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\by-day\\*.csv")
staticDataFrame:
org.apache.spark.sql.DataFrame = [InvoiceNo: string, StockCode:
string ... 6 more fields]
scala>
staticDataFrame.createOrReplaceTempView("retail_data")
scala> val
staticSchema = staticDataFrame.schema
staticSchema:
org.apache.spark.sql.types.StructType =
StructType(StructField(InvoiceNo,StringType,true),
StructField(StockCode,StringType,true),
StructField(Description,StringType,true),
StructField(Quantity,IntegerType,true),
StructField(InvoiceDate,TimestampType,true),
StructField(UnitPrice,DoubleType,true),
StructField(CustomerID,DoubleType,true),
StructField(Country,StringType,true))
|
Following
window function will include all data from each day in the aggregation. It’s
simply a window over the time–series column in our data. This is a helpful
tool for manipulating date and timestamps because we can specify our
requirements in a more human form (via intervals), and Spark will group all
of them together for us
|
scala>
staticDataFrame.selectExpr("CustomerId","(UnitPrice *
Quantity) as
total_cost","InvoiceDate").groupBy(col("CustomerId"),window(col("InvoiceDate"),
"1 day")).sum("total_cost").show(5,false)
+----------+------------------------------------------+-----------------+
|CustomerId|window
|sum(total_cost) |
+----------+------------------------------------------+-----------------+
|16057.0 |[2011-12-04 19:00:00, 2011-12-05
19:00:00]|-37.6 |
|14126.0 |[2011-11-28 19:00:00, 2011-11-29
19:00:00]|643.6300000000001|
|13500.0 |[2011-11-15 19:00:00, 2011-11-16
19:00:00]|497.9700000000001|
|17160.0 |[2011-11-07 19:00:00, 2011-11-08
19:00:00]|516.8499999999999|
|15608.0 |[2011-11-10 19:00:00, 2011-11-11
19:00:00]|122.4 |
+----------+------------------------------------------+-----------------+
|
We
could have run this as a SQL code as well.
Now that we’ve seen how that works, let’s take a look
at the streaming code!. Very little
actually changes about the code.
The biggest change is that we used
readStream instead of read.
Also notice the maxFilesPerTrigger
option, which simply specifies the number of files we should read in at
once.
Also notice that the schema we created during our static data analysis, can be
directly applied when read stream of data.
Note that streamingDataFrame is returned.
|
scala> val
streamingDataFrame =
spark.readStream.schema(staticSchema).option("maxFilesPerTrigger",
1).format("csv").option("header",
"true").load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\by-day\\*.csv")
streamingDataFrame:
org.apache.spark.sql.DataFrame = [InvoiceNo: string, StockCode: string ...
6 more fields]
|
Now
we can see whether our DataFrame is streaming
|
scala>
streamingDataFrame.isStreaming
res4: Boolean = true
|
Following
we set up the same business logic as the previous DataFrame manipulations
|
scala> val
purchaseByCustomerPerHour = streamingDataFrame.selectExpr("CustomerId",
"(UnitPrice * Quantity) as total_cost",
"InvoiceDate").groupBy($"CustomerId",
window($"InvoiceDate", "1
day")).sum("total_cost")
purchaseByCustomerPerHour:
org.apache.spark.sql.DataFrame = [CustomerId: double, window:
struct<start: timestamp, end: timestamp> ... 1 more field]
|
Remember that Streaming is
still a lazy operation, so we will need to call a
streaming action to start the execution of this data flow.
Streaming actions are a bit different from our
conventional static action because we’re going to be populating data
somewhere instead of just calling something like count (which doesn’t make
any sense on a stream anyways). The action
we will use will output to an in-memory table that we will update after
each trigger. In this case, each
trigger is based on an individual file (the read option that we set). Spark
will mutate the data in the in-memory table such that we will always have
the highest value as specified in our previous aggregation
|
purchaseByCustomerPerHour.writeStream.format("memory").queryName("customer_purchases").outputMode("complete").start()
|
When
we start the stream, we can run queries against it to debug what our result
will look like if we were to write this out to a production sink:
|
// in Scala
spark.sql("""
SELECT *
FROM customer_purchases
ORDER BY `sum(total_cost)`
DESC
""")
.show(5)
|
Another option you can use is to write the results out to the
console:
|
purchaseByCustomerPerHour.writeStream.format("console").queryName("customer_purchases_2").outputMode("complete").start()
|
You shouldn’t use either of these streaming methods in
production, but they do make for convenient demonstration of Structured
Streaming’s power.
Notice how this window is built on
event time, as well, not the time at which Spark processes the data. This
was one of the shortcomings of Spark Streaming that Structured Streaming has
resolved.
|
No comments:
Post a Comment