| 
CSV files:
 
CSV stands
       for comma-separated values.  CSV reader has a large
       number of options. These options give you the ability to
       work around issues like certain characters needing to be escaped—for
       example, commas inside of columns when the file is also comma-delimited
       or null values labeled in an unconventional way. 
 
Following is the list
       of important options for CSV reader: 
 
  
   
    | 
Read/write | 
Key | 
Potential
    values | 
Default | 
Description |  
    | 
Both | 
sep | 
Any
    single string character | 
, | 
The
    single character that is used as separator for each field and value. |  
    | 
Both | 
header | 
true, false | 
false | 
A
    Boolean flag that declares whether the first line in the file(s) are the
    names of the columns. |  
    | 
Read | 
escape | 
Any
    string character | 
\ | 
The
    character Spark should use to escape other characters in the file. |  
    | 
Read | 
inferSchema | 
true, false | 
false | 
Specifies
    whether Spark should infer column types when reading the file. |  
    | 
Read | 
ignoreLeadingWhiteSpace | 
true, false | 
false | 
Declares
    whether leading spaces from values being read should be skipped. |  
    | 
Read | 
ignoreTrailingWhiteSpace | 
true, false | 
false | 
Declares
    whether trailing spaces from values being read should be skipped. |  
    | 
Both | 
nullValue | 
Any
    string character | 
“” | 
Declares
    what character represents a null value in the file. |  
    | 
Both | 
nanValue | 
Any
    string character | 
NaN | 
Declares
    what character represents a NaN or missing character in the CSV file. |  
    | 
Both | 
positiveInf | 
Any
    string or character | 
Inf | 
Declares
    what character(s) represent a positive infinite value. |  
    | 
Both | 
negativeInf | 
Any
    string or character | 
-Inf | 
Declares
    what character(s) represent a negative infinite value. |  
    | 
Both | 
compression or codec | 
None, uncompressed, bzip2, deflate, gzip, lz4,
    or snappy | 
none | 
Declares
    what compression codec Spark should use to read or write the file. |  
    | 
Both | 
dateFormat | 
Any
    string or character that conforms to java’s SimpleDataFormat. | 
yyyy-MM-dd | 
Declares
    the date format for any columns that are date type. |  
    | 
Both | 
timestampFormat | 
Any
    string or character that conforms to java’s SimpleDataFormat. | 
yyyy-MM-dd’T’HH:mm:ss.SSSZZ | 
Declares
    the timestamp format for any columns that are timestamp type. |  
    | 
Read | 
maxColumns | 
Any
    integer | 
20480 | 
Declares
    the maximum number of columns in the file. |  
    | 
Read | 
maxCharsPerColumn | 
Any
    integer | 
1000000 | 
Declares
    the maximum number of characters in a column. |  
    | 
Read | 
escapeQuotes | 
true, false | 
true | 
Declares
    whether Spark should escape quotes that are found in lines. |  
    | 
Read | 
maxMalformedLogPerPartition | 
Any
    integer | 
10 | 
Sets
    the maximum number of malformed rows Spark will log for each partition.
    Malformed records beyond this number will be ignored. |  
    | 
Write | 
quoteAll | 
true, false | 
false | 
Specifies
    whether all values should be enclosed in quotes, as opposed to just
    escaping values that have a quote character. |  
    | 
Read | 
multiLine | 
true, false | 
false | 
This
    option allows you to read multiline CSV files where each logical row in the
    CSV file might span multiple rows in the file itself. |  
 
Note we can specify the compression codec to read/write
  to compressed files. 
 
Following shows how to
       read from csv files. First we first create
       a DataFrameReader for that specific format. 
In below example, we specify that 1) the csv files
  contains "header". 2) Schema will be inferred from the files 3) set
  the read mode to FAILFAST which will fail the job if the data is malformed. 
Next we also show how to specify the schema explicitly. 
 
  
   
    | 
1    import org.apache.spark.sql._2   
    import org.apache.spark.sql.types._
 3
 4   
    object SparkDefinitiveTesting {
 5
 6   
      def main(args: Array[String]):
    Unit = {
 7   
        val spark =
    SparkSession.builder().master("local[*]").appName("Test App").getOrCreate()
 8   
        spark.sparkContext.setLogLevel("FATAL")
 9
 10  
        val df_csv =
    spark.read.format("csv").option("header", "true").option("mode", "FAILFAST").option("inferSchema", "true").load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\by-day")
 11  
        println("Count of Dataframe df_csv:" + df_csv.count())
 12
 13  
        val manualSchema =
    StructType(Seq(StructField("InvoiceNo", StringType, true),
 14  
          StructField("StockCode", StringType, true),
 15  
          StructField("Description", StringType, true),
 16  
          StructField("Quantity", IntegerType, true),
 17  
          StructField("InvoiceDate", TimestampType, true),
 18  
          StructField("UnitPrice", DoubleType, true),
 19  
          StructField("CustomerID", DoubleType, true),
 20  
          StructField("Country", StringType, true)))
 21
 22  
        val df_csv_manual =
    spark.read.format("csv").option("header", "true").option("mode", "FAILFAST").option("inferSchema", "true").schema(manualSchema).load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\by-day")
 23  
        println("Count of Dataframe df_csv_manual:" + df_csv_manual.count())
 24  
      }
 25  
    }
 2
 |  
    | 
Count of Dataframe df_csv:541909 
Count of Dataframe df_csv_manual:541909 |  
 
 
 
If the data that we read does not match
  the schema, the job will fail when Spark
  actually reads the data.(Not when the code
  is written). As soon as we start our Spark job, it will immediately fail
  (after we execute a job) due to the data not conforming to the specified
  schema. In general, Spark will fail only
  at job execution time rather than DataFrame definition time—even if, for example, we point to a file that does
  not exist. This is due to lazy
  evaluation 
 
Following shows how to
       write data to tab separated files in gzip compressed format. Note that
       the write method is not available on SparkSession. It's available on the
       dataframe itself. 
 
  
   
    | 
    
     
      | 
SparkDefinitiveTesting.scala |  
1    import org.apache.spark.sql._2   
    import org.apache.spark.sql.types._
 3
 4   
    object SparkDefinitiveTesting {
 5
 6   
      def main(args: Array[String]):
    Unit = {
 7   
        val spark =
    SparkSession.builder().master("local[*]").appName("Test App").getOrCreate()
 8   
        spark.sparkContext.setLogLevel("FATAL")
 9
 10  
        val manualSchema =
    StructType(Seq(StructField("InvoiceNo", StringType, true),
 11  
          StructField("StockCode", StringType, true),
 12  
          StructField("Description", StringType, true),
 13  
          StructField("Quantity", IntegerType, true),
 14  
          StructField("InvoiceDate", TimestampType, true),
 15  
          StructField("UnitPrice", DoubleType, true),
 16  
          StructField("CustomerID", DoubleType, true),
 17  
          StructField("Country", StringType, true)))
 18
 19  
        val df_csv_manual =
    spark.read.format("csv").option("header", "true").option("mode", "FAILFAST").option("inferSchema", "true").schema(manualSchema).load("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\by-day").repartition(5)
 20  
        println("Count of Dataframe df_csv_manual:" + df_csv_manual.count())
 21  
        df_csv_manual.write.format("csv").mode(SaveMode.Overwrite).option("sep", "\t").option("codec", "gzip").save("C:\\Users\\sukulma\\Downloads\\Spark-Data\\Spark-data\\data\\retail-data\\tsv")
 22
 23
 24  
      }
 25  
    }
 26
 |  
    | 
C:\Users\sukulma>cd
    C:\Users\sukulma\Downloads\Spark-Data\Spark-data\data\retail-data\tsv 
 
C:\Users\sukulma\Downloads\Spark-Data\Spark-data\data\retail-data\tsv>dir 
 Volume in
    drive C is OSDisk 
 Volume
    Serial Number is C05C-4437 
 
 Directory
    of C:\Users\sukulma\Downloads\Spark-Data\Spark-data\data\retail-data\tsv 
 
04/14/2019 
    02:00 PM    <DIR>          . 
04/14/2019 
    02:00 PM    <DIR>          .. 
04/14/2019 
    01:59 PM            15,328
    .part-00000-b190aff7-124c-4991-aed8-f3529d982c33.csv.gz.crc 
04/14/2019 
    01:59 PM            15,308
    .part-00001-b190aff7-124c-4991-aed8-f3529d982c33.csv.gz.crc 
04/14/2019 
    01:59 PM            15,308
    .part-00002-b190aff7-124c-4991-aed8-f3529d982c33.csv.gz.crc 
04/14/2019 
    01:59 PM            15,328
    .part-00003-b190aff7-124c-4991-aed8-f3529d982c33.csv.gz.crc 
04/14/2019 
    02:00 PM            15,332
    .part-00004-b190aff7-124c-4991-aed8-f3529d982c33.csv.gz.crc 
04/14/2019 
    02:00 PM                 8
    ._SUCCESS.crc 
04/14/2019 
    01:59 PM         1,960,940
    part-00000-b190aff7-124c-4991-aed8-f3529d982c33.csv.gz 
04/14/2019 
    01:59 PM         1,958,236
    part-00001-b190aff7-124c-4991-aed8-f3529d982c33.csv.gz 
04/14/2019 
    01:59 PM         1,958,342
    part-00002-b190aff7-124c-4991-aed8-f3529d982c33.csv.gz 
04/14/2019 
    01:59 PM         1,960,794
    part-00003-b190aff7-124c-4991-aed8-f3529d982c33.csv.gz 
04/14/2019 
    02:00 PM         1,961,057
    part-00004-b190aff7-124c-4991-aed8-f3529d982c33.csv.gz 
04/14/2019 
    02:00 PM                 0
    _SUCCESS 
             
    12 File(s)      9,875,981
    bytes 
              
    2 Dir(s)  77,774,761,984 bytes
    free |  
Note that 5 .gz files are
  created inside the directory. This actually reflects the number of partitions
  in our DataFrame at the time we write it out.
 
 
 | 
No comments:
Post a Comment