Filtering Rows:
- There are two methods to
perform this operation: you can
use where or filter and they both will perform the
same operation and accept the same argument types when used with
DataFrames.
- Following are the
definitions of these methods available on Dataframes. Note that there
are two variants. One that takes a Column type and other that takes a
String. When using the Dataset API from either Scala or
Java, filter also accepts an arbitrary function that Spark
will apply to each record in the Dataset.
def
where(conditionExpr: String): Dataset[T]
Filters
rows using the given SQL expression.
peopleDs.where("age
> 15")
|
def
where(condition: Column): Dataset[T]
Filters
rows using the given condition. This is an alias for filter.
//
The following are equivalent:
peopleDs.filter($"age"
> 15)
peopleDs.where($"age"
> 15)
|
def
filter(func: (T) ⇒ Boolean): Dataset[T]
(Scala-specific)
Returns a new Dataset that only contains elements where func returns true.
|
def
filter(conditionExpr: String): Dataset[T]
Filters
rows using the given SQL expression.
peopleDs.filter("age
> 15")
|
def filter(condition: Column): Dataset[T]
Filters
rows using the given condition.
//
The following are equivalent:
peopleDs.filter($"age"
> 15)
peopleDs.where($"age"
> 15)
|
scala> df.count
res24: Long = 1502
scala> df.where("ORIGIN_COUNTRY_NAME ==
'Romania'").collect
res25: Array[org.apache.spark.sql.Row] =
Array([United States,Romania,15], [United States,Romania,1], [United
States,Romania,3], [United States,Romania,12], [United States,Romania,12])
scala>
df.where(col("ORIGIN_COUNTRY_NAME") ===
"Romania").collect
res26: Array[org.apache.spark.sql.Row] =
Array([United States,Romania,15], [United States,Romania,1], [United
States,Romania,3], [United States,Romania,12], [United States,Romania,12])
|
- Following shows how to
define a Column type and use that directly in where condition.
scala> val
FilterCond=expr("ORIGIN_COUNTRY_NAME == 'Romania'")
FilterCond: org.apache.spark.sql.Column =
(ORIGIN_COUNTRY_NAME = Romania)
scala> df.where(FilterCond).collect
res27: Array[org.apache.spark.sql.Row] =
Array([United States,Romania,15], [United States,Romania,1], [United
States,Romania,3], [United States,Romania,12], [United States,Romania,12])
|
- We can also put multiple
filrers in same expression as shown below. Although this is possible, it
is not always useful, because Spark automatically performs all filtering operations at the same time regardless of
the filter ordering. This means that if you want to specify multiple AND filters,
just chain them sequentially and let Spark handle the rest
scala> val
FilterCond=expr("ORIGIN_COUNTRY_NAME ==
'Romania'").and(expr("count > 3"))
FilterCond: org.apache.spark.sql.Column =
((ORIGIN_COUNTRY_NAME = Romania) AND (count > 3))
scala> df.where(FilterCond).collect
res28: Array[org.apache.spark.sql.Row] =
Array([United States,Romania,15], [United States,Romania,12], [United
States,Romania,12])
|
The "and" and "or" are methods available
on Column type:
def
and(other: Column): Column
Boolean
AND.
//
Scala: The following selects people that are in school and employed at the
same time.
people.select(
people("inSchool") && people("isEmployed") )
//
Java:
people.select(
people("inSchool").and(people("isEmployed")) );
|
def
or(other: Column): Column
Boolean
OR.
//
Scala: The following selects people that are in school or employed.
people.filter(
people("inSchool") || people("isEmployed") )
//
Java:
people.filter(
people("inSchool").or(people("isEmployed")) );
|
Following shows how to chain multiple filters
scala> val
FilterCond=expr("ORIGIN_COUNTRY_NAME == 'Romania'")
FilterCond: org.apache.spark.sql.Column =
(ORIGIN_COUNTRY_NAME = Romania)
scala> val Cond2=col("count") > 3
Cond2: org.apache.spark.sql.Column = (count >
3)
scala>
df.where(FilterCond).where(Cond2).collect
res29: Array[org.apache.spark.sql.Row] =
Array([United States,Romania,15], [United States,Romania,12], [United
States,Romania,12])
|
|
Random Splits:
- The randomSplit method can
be used to break up your DataFrame into a random “splits” of the
original DataFrame.
- This is often used with
machine learning algorithms to create training, validation, and test
sets.
- Below shows that there are
two overloaded versions. One takes a seed value and other uses random
seed.
- It’s important to note that
if you don’t specify a proportion for each DataFrame that adds up to
one, they will be normalized so that they do
def
randomSplit(weights: Array[Double]): Array[Dataset[T]]
Randomly
splits this Dataset with the provided weights.
Weights
-weights for splits, will be normalized if they don't sum to 1
|
def
randomSplit(weights: Array[Double], seed: Long): Array[Dataset[T]]
Randomly
splits this Dataset with the provided weights.
Weights
- weights for splits, will be normalized if they don't sum to 1.
Seed
- Seed for sampling
|
scala>
val df =
spark.read.format("json").schema(myManualSchema).load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\flight-data\\json")
df: org.apache.spark.sql.DataFrame =
[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]
scala> val weightsarray=Array(0.33,0.1,0.5)
weightsarray: Array[Double] = Array(0.33, 0.1,
0.5)
scala>
spark.conf.set("spark.sql.shuffle.partitions",5)
scala>
df.randomSplit(weightsarray).foreach(x => { val perc=
(x.count).toDouble/totalcount
| println(s"Counts in DF
${perc}")})
Counts in DF 0.35153129161118507
Counts in DF 0.1151797603195739
Counts in DF 0.533288948069241
|
|
No comments:
Post a Comment