|
Text Files:
- Spark also allows
you to read in plain-text files. Each line in the file becomes a record
in the DataFrame. It is then up to you to transform it accordingly. Ex:
Parse some Apache log files to some more structured format.
- Reading
text files is straightforward: you simply specify the type to
be textFile. With textFile, partitioned
directory names are ignored. To read and write text files according
to partitions, you should use "text", which
respects partitioning on reading and writing.
Following are the method definitions available on the
DataFrameReader:
|
def text(paths: String*): DataFrame
Loads text files and returns a DataFrame whose schema
starts with a string column named "value", and followed by
partitioned columns if there are any.
Each
line in the text files is a new row in the resulting DataFrame. For
example:
//
Scala:
spark.read.text("/path/to/spark/README.md")
def text(path: String): DataFrame
Loads
text files and returns a DataFrame whose schema starts with a string column
named "value", and followed by partitioned columns if there are
any. See the documentation on the other overloaded text() method for more
details.
def textFile(paths: String*): Dataset[String]
Loads
text files and returns a Dataset of String. The underlying schema of the
Dataset contains a single string column named "value".
If
the directory structure of the text files contains partitioning
information, those are ignored in the resulting Dataset. To include
partitioning information as columns, use text.
Each
line in the text files is a new element in the resulting Dataset. For
example:
//
Scala:
spark.read.textFile("/path/to/spark/README.md")
//
Java:
spark.read().textFile("/path/to/spark/README.md")
def textFile(path: String): Dataset[String]
Loads
text files and returns a Dataset of String. See the documentation on the
other overloaded textFile() method for more details
|
- Writing to text files:
When you write a text file, you need to be sure to have only one string
column; otherwise, the write will fail.
- If you perform some
partitioning when performing your write you can write more columns.
However, those columns will manifest as directories in the folder to
which you’re writing out to, instead of columns on every single
file
Following is the definition of the methods
"text" and "partitionBy" available on the DataFrameWriter
objects:
|
def text(path: String): Unit
Saves
the content of the DataFrame in a text file at the specified path. The
DataFrame must have only one column that is of string type. Each row
becomes a new line in the output file. For example:
//
Scala:
df.write.text("/path/to/output")
//
Java:
df.write().text("/path/to/output")
You
can set the following option(s) for writing text files:
compression
(default null): compression codec to use when saving to file. This can be
one of the known case-insensitive shorten names (none, bzip2, gzip, lz4,
snappy and deflate).
def partitionBy(colNames: String*):
DataFrameWriter[T]
Partitions
the output by the given columns on the file system. If specified, the
output is laid out on the file system similar to Hive's partitioning
scheme. As an example, when we partition a dataset by year and then month,
the directory layout would look like:
year=2016/month=01/
year=2016/month=02/
Partitioning
is one of the most widely used techniques to optimize physical data layout.
It provides a coarse-grained index for skipping unnecessary data reads when
queries have predicates on the partitioned columns. In order for
partitioning to work well, the number of distinct values in each column
should typically be less than tens of thousands.
This
is applicable for all file-based data sources (e.g. Parquet, JSON) staring
Spark .
|
- Following example
shows how to read data from text files and write to a directory with
partitions. Pay attention to the field names.
|
1 import org.apache.spark.sql._
2
import org.apache.spark.sql.types._
3
4
object SparkDefinitiveTesting {
5
6
def main(args: Array[String]):
Unit = {
7
val spark =
SparkSession.builder().master("local[*]").appName("Test App").getOrCreate()
8
spark.sparkContext.setLogLevel("FATAL")
9
10
val manualSchema =
StructType(Seq(StructField("InvoiceNo", StringType, true),
11
StructField("StockCode", StringType, true),
12
StructField("Description", StringType, true),
13
StructField("Quantity", IntegerType, true),
14
StructField("InvoiceDate", TimestampType, true),
15
StructField("UnitPrice", DoubleType, true),
16
StructField("CustomerID", DoubleType, true),
17
StructField("Country", StringType, true)))
18
19
20
//Reading csv files as plain text files and extracting
infomation.
21 val df_text=spark.read.textFile("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\by-day").repartition(5)
22
df_text.printSchema()
23
df_text.show(5,false)
24
val df_text_split=df_text.selectExpr("split(value,',')[2] as
Desc","split(value,',')[7] as Country")
25
df_text_split.printSchema()
26
df_text_split.show(5,false)
27
28
//Writing files using partitioning.
29 df_text_split.write.partitionBy("Country").text("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\textop")
30
31
32
}
33
}
|
|
root
|-- value: string (nullable = true)
+--------------------------------------------------------------------------------------------------+
|value
|
+--------------------------------------------------------------------------------------------------+
|580538,21914,BLUE HARMONICA IN BOX ,24,2011-12-05
08:38:00,1.25,14075.0,United Kingdom |
|580539,21479,WHITE SKULL HOT WATER BOTTLE
,4,2011-12-05 08:39:00,4.25,18180.0,United Kingdom |
|580539,21411,GINGHAM HEART DOORSTOP RED,8,2011-12-05
08:39:00,1.95,18180.0,United Kingdom
|
|580539,22372,AIRLINE BAG VINTAGE WORLD CHAMPION
,4,2011-12-05 08:39:00,4.25,18180.0,United Kingdom|
|580539,22389,PAPERWEIGHT SAVE THE
PLANET,12,2011-12-05 08:39:00,0.39,18180.0,United Kingdom |
+--------------------------------------------------------------------------------------------------+
only showing top 5 rows
root
|-- Desc:
string (nullable = true)
|--
Country: string (nullable = true)
+-----------------------------------+--------------+
|Desc
|Country |
+-----------------------------------+--------------+
|BLUE HARMONICA IN BOX |United Kingdom|
|WHITE SKULL HOT WATER BOTTLE |United Kingdom|
|GINGHAM HEART
DOORSTOP RED |United
Kingdom|
|AIRLINE BAG VINTAGE WORLD CHAMPION |United
Kingdom|
|PAPERWEIGHT SAVE THE PLANET |United Kingdom|
+-----------------------------------+--------------+
only showing top 5 rows
|
|
04/15/2019
05:18 AM <DIR> Country=Australia
04/15/2019
05:18 AM <DIR> Country=Austria
04/15/2019
05:18 AM <DIR> Country=Bahrain
04/15/2019
05:18 AM <DIR> Country=Belgium
04/15/2019
05:18 AM <DIR> Country=Brazil
04/15/2019
05:18 AM <DIR> Country=Canada
04/15/2019
05:18 AM <DIR> Country=Channel%20Islands
04/15/2019
05:18 AM <DIR> Country=Country
04/15/2019
05:18 AM <DIR> Country=Cyprus
04/15/2019
05:18 AM <DIR> Country=Czech%20Republic
04/15/2019
05:18 AM <DIR> Country=Denmark
04/15/2019
05:18 AM <DIR> Country=EIRE
04/15/2019
05:18 AM <DIR> Country=European%20Community
04/15/2019
05:18 AM <DIR> Country=Finland
04/15/2019
05:18 AM <DIR> Country=France
04/15/2019
05:18 AM <DIR> Country=Germany
04/15/2019
05:18 AM <DIR> Country=Greece
04/15/2019
05:18 AM <DIR> Country=Hong%20Kong
04/15/2019
05:18 AM <DIR> Country=Iceland
|
|