Search This Blog

Monday, 15 April 2019

CH9.6 Reading and Writing Text Files


Text Files:

 
  • Spark also allows you to read in plain-text files. Each line in the file becomes a record in the DataFrame. It is then up to you to transform it accordingly. Ex:  Parse some Apache log files to some more structured format.

  • Reading text files is straightforward: you simply specify the type to be textFile. With textFile, partitioned directory names are ignored. To read and write text files according to partitions, you should use "text", which respects partitioning on reading and writing.

Following are the method definitions available on the DataFrameReader:


def text(paths: String*): DataFrame
Loads text files and returns a DataFrame whose schema starts with a string column named "value", and followed by partitioned columns if there are any.
Each line in the text files is a new row in the resulting DataFrame. For example:
// Scala:
spark.read.text("/path/to/spark/README.md")

def text(path: String): DataFrame
Loads text files and returns a DataFrame whose schema starts with a string column named "value", and followed by partitioned columns if there are any. See the documentation on the other overloaded text() method for more details.

def textFile(paths: String*): Dataset[String]
Loads text files and returns a Dataset of String. The underlying schema of the Dataset contains a single string column named "value".
If the directory structure of the text files contains partitioning information, those are ignored in the resulting Dataset. To include partitioning information as columns, use text.
Each line in the text files is a new element in the resulting Dataset. For example:
// Scala:
spark.read.textFile("/path/to/spark/README.md")
// Java:
spark.read().textFile("/path/to/spark/README.md")

def textFile(path: String): Dataset[String]
Loads text files and returns a Dataset of String. See the documentation on the other overloaded textFile() method for more details


  • Writing to text files: When you write a text file, you need to be sure to have only one string column; otherwise, the write will fail.

  • If you perform some partitioning when performing your write you can write more columns. However, those columns will manifest as directories in the folder to which you’re writing out to, instead of columns on every single file
     
Following is the definition of the methods "text" and "partitionBy" available on the DataFrameWriter objects:

def text(path: String): Unit
Saves the content of the DataFrame in a text file at the specified path. The DataFrame must have only one column that is of string type. Each row becomes a new line in the output file. For example:
// Scala:
df.write.text("/path/to/output")
// Java:
df.write().text("/path/to/output")
You can set the following option(s) for writing text files:
compression (default null): compression codec to use when saving to file. This can be one of the known case-insensitive shorten names (none, bzip2, gzip, lz4, snappy and deflate).

def partitionBy(colNames: String*): DataFrameWriter[T]
Partitions the output by the given columns on the file system. If specified, the output is laid out on the file system similar to Hive's partitioning scheme. As an example, when we partition a dataset by year and then month, the directory layout would look like:
year=2016/month=01/
year=2016/month=02/
Partitioning is one of the most widely used techniques to optimize physical data layout. It provides a coarse-grained index for skipping unnecessary data reads when queries have predicates on the partitioned columns. In order for partitioning to work well, the number of distinct values in each column should typically be less than tens of thousands.
This is applicable for all file-based data sources (e.g. Parquet, JSON) staring Spark .

  • Following example shows how to read data from text files and write to a directory with partitions. Pay attention to the field names.

1    import org.apache.spark.sql._
2    import org.apache.spark.sql.types._
3   
4    object SparkDefinitiveTesting {
5   
6      def main(args: Array[String]): Unit = {
7        val spark = SparkSession.builder().master("local[*]").appName("Test App").getOrCreate()
8        spark.sparkContext.setLogLevel("FATAL")
9   
10       val manualSchema = StructType(Seq(StructField("InvoiceNo", StringType, true),
11         StructField("StockCode", StringType, true),
12         StructField("Description", StringType, true),
13         StructField("Quantity", IntegerType, true),
14         StructField("InvoiceDate", TimestampType, true),
15         StructField("UnitPrice", DoubleType, true),
16         StructField("CustomerID", DoubleType, true),
17         StructField("Country", StringType, true)))
18  
19  
20       //Reading csv files as plain text files and extracting infomation.
21       val df_text=spark.read.textFile("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\by-day").repartition(5)
22       df_text.printSchema()
23       df_text.show(5,false)
24       val df_text_split=df_text.selectExpr("split(value,',')[2] as Desc","split(value,',')[7] as Country")
25       df_text_split.printSchema()
26       df_text_split.show(5,false)
27  
28      //Writing files using partitioning.
29       df_text_split.write.partitionBy("Country").text("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\textop")
30  
31  
32     }
33   }
root
 |-- value: string (nullable = true)

+--------------------------------------------------------------------------------------------------+
|value                                                                                             |
+--------------------------------------------------------------------------------------------------+
|580538,21914,BLUE HARMONICA IN BOX ,24,2011-12-05 08:38:00,1.25,14075.0,United Kingdom            |
|580539,21479,WHITE SKULL HOT WATER BOTTLE ,4,2011-12-05 08:39:00,4.25,18180.0,United Kingdom      |
|580539,21411,GINGHAM HEART  DOORSTOP RED,8,2011-12-05 08:39:00,1.95,18180.0,United Kingdom        |
|580539,22372,AIRLINE BAG VINTAGE WORLD CHAMPION ,4,2011-12-05 08:39:00,4.25,18180.0,United Kingdom|
|580539,22389,PAPERWEIGHT SAVE THE PLANET,12,2011-12-05 08:39:00,0.39,18180.0,United Kingdom       |
+--------------------------------------------------------------------------------------------------+
only showing top 5 rows

root
 |-- Desc: string (nullable = true)
 |-- Country: string (nullable = true)

+-----------------------------------+--------------+
|Desc                               |Country       |
+-----------------------------------+--------------+
|BLUE HARMONICA IN BOX              |United Kingdom|
|WHITE SKULL HOT WATER BOTTLE       |United Kingdom|
|GINGHAM HEART  DOORSTOP RED        |United Kingdom|
|AIRLINE BAG VINTAGE WORLD CHAMPION |United Kingdom|
|PAPERWEIGHT SAVE THE PLANET        |United Kingdom|
+-----------------------------------+--------------+
only showing top 5 rows
04/15/2019  05:18 AM    <DIR>          Country=Australia
04/15/2019  05:18 AM    <DIR>          Country=Austria
04/15/2019  05:18 AM    <DIR>          Country=Bahrain
04/15/2019  05:18 AM    <DIR>          Country=Belgium
04/15/2019  05:18 AM    <DIR>          Country=Brazil
04/15/2019  05:18 AM    <DIR>          Country=Canada
04/15/2019  05:18 AM    <DIR>          Country=Channel%20Islands
04/15/2019  05:18 AM    <DIR>          Country=Country
04/15/2019  05:18 AM    <DIR>          Country=Cyprus
04/15/2019  05:18 AM    <DIR>          Country=Czech%20Republic
04/15/2019  05:18 AM    <DIR>          Country=Denmark
04/15/2019  05:18 AM    <DIR>          Country=EIRE
04/15/2019  05:18 AM    <DIR>          Country=European%20Community
04/15/2019  05:18 AM    <DIR>          Country=Finland
04/15/2019  05:18 AM    <DIR>          Country=France
04/15/2019  05:18 AM    <DIR>          Country=Germany
04/15/2019  05:18 AM    <DIR>          Country=Greece
04/15/2019  05:18 AM    <DIR>          Country=Hong%20Kong
04/15/2019  05:18 AM    <DIR>          Country=Iceland


No comments:

Post a Comment