Search This Blog

Sunday, 14 April 2019

CH7.3 Dataframe Window Functions


    Window Functions:

     
    • We can use window functions to carry out some unique aggregations by either computing some aggregation on a specific “window” of data, which you define by using a reference to the current data. This window specification determines which rows will be passed in to this function. 

    • A group-by takes data, and every row can go only into one grouping. A window function calculates a return value for every input row of a table based on a group of rows, called a frame. Each row can fall into one or more frames. 

    • Spark supports three kinds of window functions: ranking functions, analytic functions, and aggregate functions.

    • The first step to a window function is to create a window specification. Note that the partition by is unrelated to the partitioning scheme concept that we have covered thus far. It’s just a similar concept that describes how we will be breaking up our group. The ordering determines the ordering within a given partition, and, finally, the frame specification (the rowsBetween statement) states which rows will be included in the frame based on its reference to the current input row.

    To create a Window specification we use the Object Window.(The Companion class does not have any methods). All methods to define window spec are available under the Window Object.

    def partitionBy(cols: Column*): WindowSpec
    Creates a WindowSpec with the partitioning defined.

    def partitionBy(colName: String, colNames: String*): WindowSpec
    Creates a WindowSpec with the partitioning defined.

    def orderBy(cols: Column*): WindowSpec
    Creates a WindowSpec with the ordering defined.

    def orderBy(colName: String, colNames: String*): WindowSpec
    Creates a WindowSpec with the ordering defined.

    def rangeBetween(start: Long, end: Long): WindowSpec
    Creates a WindowSpec with the frame boundaries defined, from start (inclusive) to end (inclusive).
    Both start and end are relative to the current row. For example, "0" means "current row", while "-1" means one off before the current row, and "5" means the five off after the current row.

    def rowsBetween(start: Long, end: Long): WindowSpec
    Creates a WindowSpec with the frame boundaries defined, from start (inclusive) to end (inclusive).
    Both start and end are positions relative to the current row. For example, "0" means "current row", while "-1" means the row before the current row, and "5" means the fifth row after the current row.

    def currentRow: Long
    Value representing the current row. This can be used to specify the frame boundaries:
    Window.rowsBetween(Window.unboundedPreceding, Window.currentRow)

    def unboundedFollowing: Long
    Value representing the last row in the partition, equivalent to "UNBOUNDED FOLLOWING" in SQL. This can be used to specify the frame boundaries:
    Window.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

    def unboundedPreceding: Long
    Value representing the last row in the partition, equivalent to "UNBOUNDED PRECEDING" in SQL. This can be used to specify the frame boundaries:
    Window.rowsBetween(Window.unboundedPreceding, Window.currentRow)

    Note that all the methods are used on Window Object. That is why we don’t create any instances of this.

    scala>     Window.partitionBy("CustomerId", "date")
    res48: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@2c7c3720

    scala>     Window.partitionBy("CustomerId", "date").orderBy(col("Quantity").desc)
    res49: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@7cb0a302

    scala>     val windowSpec = Window.partitionBy("CustomerId", "date").orderBy(col("Quantity").desc).rowsBetween(Window.unboundedPreceding, Window.currentRow)
    windowSpec: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@37a7269e


    • Next Step is to use the same aggregation functions that we saw earlier by passing a column name or expression. In addition, we indicate the window specification that defines to which frames of data this function will apply. We know that aggregate functions like max, count return a Column type. The Column type has the "over" method which we can use to provide the WindowSpec.

    def over(): Column
    Define a empty analytic clause. In this case the analytic function is applied and presented for all rows in the result set.
    df.select(
      sum("price").over(),
      avg("price").over()
    )

    def over(window: WindowSpec): Column
    Define a windowing column.
    val w = Window.partitionBy("name").orderBy("id")
    df.select(
      sum("price").over(w.rangeBetween(Window.unboundedPreceding, 2)),
      avg("price").over(w.rowsBetween(Window.currentRow, 4))
    )

    scala> val maxPurchaseQuantity = max(col("Quantity")).over(windowSpec)
    maxPurchaseQuantity: org.apache.spark.sql.Column = max(Quantity) OVER (PARTITION BY CustomerId, date ORDER BY Quantity DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)

    Notice that this return a Column. We can now use this in a DataFrame select statement.
    The one specified above is aggregate functions. We also have several ranking functions as shown below. These window ranking functions return a Column type, on top of which we can execute the over method and provide window specification:

    def cume_dist(): Column
    Window function: returns the cumulative distribution of values within a window partition, i.e. the fraction of rows that are below the current row.
    N = total number of rows in the partition
    cumeDist(x) = number of values before (and including) x / N

    def dense_rank(): Column
    Window function: returns the rank of rows within a window partition, without any gaps.
    The difference between rank and dense_rank is that denseRank leaves no gaps in ranking sequence when there are ties. That is, if you were ranking a competition using dense_rank and had three people tie for second place, you would say that all three were in second place and that the next person came in third. Rank would give me sequential numbers, making the person that came in third place (after the ties) would register as coming in fifth.
    This is equivalent to the DENSE_RANK function in SQL.

    def lag(e: Column, offset: Int, defaultValue: Any): Column
    Window function: returns the value that is offset rows before the current row, and defaultValue if there is less than offset rows before the current row. For example, an offset of one will return the previous row at any given point in the window partition.
    This is equivalent to the LAG function in SQL.

    def lag(columnName: String, offset: Int, defaultValue: Any): Column
    Window function: returns the value that is offset rows before the current row, and defaultValue if there is less than offset rows before the current row. For example, an offset of one will return the previous row at any given point in the window partition.
    This is equivalent to the LAG function in SQL.

    def lag(columnName: String, offset: Int): Column
    Window function: returns the value that is offset rows before the current row, and null if there is less than offset rows before the current row. For example, an offset of one will return the previous row at any given point in the window partition.
    This is equivalent to the LAG function in SQL.

    def lag(e: Column, offset: Int): Column
    Window function: returns the value that is offset rows before the current row, and null if there is less than offset rows before the current row. For example, an offset of one will return the previous row at any given point in the window partition.
    This is equivalent to the LAG function in SQL.

    def lead(e: Column, offset: Int, defaultValue: Any): Column
    Window function: returns the value that is offset rows after the current row, and defaultValue if there is less than offset rows after the current row. For example, an offset of one will return the next row at any given point in the window partition.
    This is equivalent to the LEAD function in SQL.

    def lead(columnName: String, offset: Int, defaultValue: Any): Column
    Window function: returns the value that is offset rows after the current row, and defaultValue if there is less than offset rows after the current row. For example, an offset of one will return the next row at any given point in the window partition.
    This is equivalent to the LEAD function in SQL.

    def lead(e: Column, offset: Int): Column
    Window function: returns the value that is offset rows after the current row, and null if there is less than offset rows after the current row. For example, an offset of one will return the next row at any given point in the window partition.
    This is equivalent to the LEAD function in SQL.

    def lead(columnName: String, offset: Int): Column
    Window function: returns the value that is offset rows after the current row, and null if there is less than offset rows after the current row. For example, an offset of one will return the next row at any given point in the window partition.
    This is equivalent to the LEAD function in SQL.

    def ntile(n: Int): Column
    Window function: returns the ntile group id (from 1 to n inclusive) in an ordered window partition. For example, if n is 4, the first quarter of the rows will get value 1, the second quarter will get 2, the third quarter will get 3, and the last quarter will get 4.
    This is equivalent to the NTILE function in SQL.

    def percent_rank(): Column
    Window function: returns the relative rank (i.e. percentile) of rows within a window partition.
    This is computed by:
    (rank of row in its partition - 1) / (number of rows in the partition - 1)
    This is equivalent to the PERCENT_RANK function in SQL.

    def rank(): Column
    Window function: returns the rank of rows within a window partition.
    The difference between rank and dense_rank is that dense_rank leaves no gaps in ranking sequence when there are ties. That is, if you were ranking a competition using dense_rank and had three people tie for second place, you would say that all three were in second place and that the next person came in third. Rank would give me sequential numbers, making the person that came in third place (after the ties) would register as coming in fifth.
    This is equivalent to the RANK function in SQL.

    def row_number(): Column
    Window function: returns a sequential number starting at 1 within a window partition.

    Following shows some examples of Window functions. Note how we can reuse the windowSpec:

    scala>     Window.partitionBy("CustomerId", "date")
    res54: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@203fa03c

    scala>     Window.partitionBy("CustomerId", "date").orderBy(col("Quantity").desc)
    res55: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@71f6a244

    scala>     val windowSpec = Window.partitionBy("CustomerId", "date").orderBy(col("Quantity").desc).rowsBetween(Window.unboundedPreceding, Window.currentRow)
    windowSpec: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@714d1485

    scala>     val purchaseDenseRank = dense_rank().over(windowSpec)
    purchaseDenseRank: org.apache.spark.sql.Column = DENSE_RANK() OVER (PARTITION BY CustomerId, date ORDER BY Quantity DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)

    scala>     val purchaseRank = rank().over(windowSpec)
    purchaseRank: org.apache.spark.sql.Column = RANK() OVER (PARTITION BY CustomerId, date ORDER BY Quantity DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)


    scala>     import org.apache.spark.sql.functions.{col, to_date}
    import org.apache.spark.sql.functions.{col, to_date}

    scala>     val dfWithDate = df.withColumn("date", to_date(col("InvoiceDate"),"MM/d/yyyy H:mm"))
    dfWithDate: org.apache.spark.sql.DataFrame = [InvoiceNo: string, StockCode: string ... 7 more fields]

    scala>     dfWithDate.where("CustomerId IS NOT NULL").orderBy("CustomerId").select(col("CustomerId"), col("date"), col("Quantity"), purchaseRank.alias("quantitRank"), purchaseDenseRank.alias("quantityDensRank"), maxPurchaseQuantity.alias("maxPurchaseQuantity")).show()
    +----------+----------+--------+-----------+----------------+-------------------+
    |CustomerId|      date|Quantity|quantitRank|quantityDensRank|maxPurchaseQuantity|
    +----------+----------+--------+-----------+----------------+-------------------+
    |     12346|2011-01-18|   74215|          1|               1|              74215|
    |     12346|2011-01-18|  -74215|          2|               2|              74215|
    |     12347|2010-12-07|      36|          1|               1|                 36|
    |     12347|2010-12-07|      30|          2|               2|                 36|
    |     12347|2010-12-07|      24|          3|               3|                 36|
    |     12347|2010-12-07|      12|          4|               4|                 36|
    |     12347|2010-12-07|      12|          4|               4|                 36|



No comments:

Post a Comment