How to Read an Avro File Using Pyspark

Chapter 4. Spark SQL and DataFrames: Introduction to Built-in Data Sources

In the previous chapter, we explained the evolution of and justification for structure in Spark. In particular, we discussed how the Spark SQL engine provides a unified foundation for the high-level DataFrame and Dataset APIs. Now, we'll go on our discussion of the DataFrame and explore its interoperability with Spark SQL.

This chapter and the adjacent also explore how Spark SQL interfaces with some of the external components shown in Figure 4-ane.

In detail, Spark SQL:

  • Provides the engine upon which the high-level Structured APIs we explored in Chapter 3 are built.

  • Can read and write data in a variety of structured formats (e.chiliad., JSON, Hive tables, Parquet, Avro, ORC, CSV).

  • Lets you query data using JDBC/ODBC connectors from external business intelligence (BI) information sources such every bit Tableau, Power BI, Talend, or from RDBMSs such every bit MySQL and PostgreSQL.

  • Provides a programmatic interface to interact with structured information stored as tables or views in a database from a Spark application

  • Offers an interactive shell to issue SQL queries on your structured data.

  • Supports ANSI SQL:2003-compliant commands and HiveQL.

Spark SQL connectors and data sources

Figure four-1. Spark SQL connectors and data sources

Permit's begin with how y'all tin use Spark SQL in a Spark application.

Using Spark SQL in Spark Applications

The SparkSession, introduced in Spark 2.0, provides a unified entry indicate for programming Spark with the Structured APIs. Yous can use a SparkSession to access Spark functionality: just import the course and create an instance in your code.

To issue whatever SQL query, utilize the sql() method on the SparkSession instance, spark, such as spark.sql("SELECT * FROM myTableName"). All spark.sql queries executed in this manner return a DataFrame on which y'all may perform further Spark operations if you lot desire—the kind we explored in Chapter 3 and the ones yous will learn virtually in this chapter and the next.

Basic Query Examples

In this section we'll walk through a few examples of queries on the Airline On-Time Functioning and Causes of Flying Delays data set, which contains information on U.s. flights including date, delay, altitude, origin, and destination. It's available as a CSV file with over a meg records. Using a schema, we'll read the information into a DataFrame and annals the DataFrame as a temporary view (more than on temporary views shortly) then we tin query it with SQL.

Query examples are provided in code snippets, and Python and Scala notebooks containing all of the code presented hither are bachelor in the volume'southward GitHub repo. These examples will offer yous a gustation of how to use SQL in your Spark applications via the spark.sql programmatic interface. Similar to the DataFrame API in its declarative flavor, this interface allows you to query structured data in your Spark applications.

Normally, in a standalone Spark awarding, y'all will create a SparkSession case manually, as shown in the post-obit case. However, in a Spark trounce (or Databricks notebook), the SparkSession is created for you lot and accessible via the appropriately named variable spark.

Let'southward get started by reading the data set into a temporary view:

              // In Scala              import              org.apache.spark.sql.SparkSession              val              spark              =              SparkSession              .              builder              .              appName              (              "SparkSQLExampleApp"              )              .              getOrCreate              ()              // Path to data gear up                            val              csvFile              =              "/databricks-datasets/learning-spark-v2/flights/departuredelays.csv"              // Read and create a temporary view              // Infer schema (annotation that for larger files you may desire to specify the schema)              val              df              =              spark              .              read              .              format              (              "csv"              )              .              option              (              "inferSchema"              ,              "true"              )              .              pick              (              "header"              ,              "true"              )              .              load              (              csvFile              )              // Create a temporary view              df              .              createOrReplaceTempView              (              "us_delay_flights_tbl"              )            
              # In Python              from              pyspark.sql              import              SparkSession              # Create a SparkSession              spark              =              (              SparkSession              .              builder              .              appName              (              "SparkSQLExampleApp"              )              .              getOrCreate              ())              # Path to data set              csv_file              =              "/databricks-datasets/learning-spark-v2/flights/departuredelays.csv"              # Read and create a temporary view              # Infer schema (notation that for larger files yous                            # may desire to specify the schema)              df              =              (              spark              .              read              .              format              (              "csv"              )              .              option              (              "inferSchema"              ,              "true"              )              .              option              (              "header"              ,              "true"              )              .              load              (              csv_file              ))              df              .              createOrReplaceTempView              (              "us_delay_flights_tbl"              )            
Notation

If y'all want to specify a schema, y'all can utilize a DDL-formatted string. For example:

                // In Scala                val                schema                =                "date String, delay INT, altitude INT,                                                  origin STRING, destination Cord"              
                # In Python                schema                =                "`date` String, `delay` INT, `distance` INT,                                `origin`                String                ,                `destination`                STRING                "              

Now that we have a temporary view, we can issue SQL queries using Spark SQL. These queries are no different from those you might issue confronting a SQL table in, say, a MySQL or PostgreSQL database. The point hither is to prove that Spark SQL offers an ANSI:2003–compliant SQL interface, and to demonstrate the interoperability between SQL and DataFrames.

The United states of america flying delays data gear up has v columns:

  • The engagement column contains a cord similar 02190925. When converted, this maps to 02-19 09:25 am.

  • The delay column gives the delay in minutes between the scheduled and bodily departure times. Early on departures prove negative numbers.

  • The distance column gives the distance in miles from the origin airport to the destination airport.

  • The origin column contains the origin IATA airport code.

  • The destination cavalcade contains the destination IATA airport code.

With that in mind, let's attempt some instance queries against this data prepare.

Get-go, we'll find all flights whose distance is greater than 1,000 miles:

spark.sql("""SELECT distance, origin, destination  FROM us_delay_flights_tbl WHERE distance > 1000  Guild Past distance DESC""").show(10)  +--------+------+-----------+ |distance|origin|destination| +--------+------+-----------+ |4330    |HNL   |JFK        | |4330    |HNL   |JFK        | |4330    |HNL   |JFK        | |4330    |HNL   |JFK        | |4330    |HNL   |JFK        | |4330    |HNL   |JFK        | |4330    |HNL   |JFK        | |4330    |HNL   |JFK        | |4330    |HNL   |JFK        | |4330    |HNL   |JFK        | +--------+------+-----------+ only showing elevation x rows

As the results prove, all of the longest flights were between Honolulu (HNL) and New York (JFK). Next, nosotros'll find all flights between San Francisco (SFO) and Chicago (ORD) with at least a two-hour delay:

spark.sql("""SELECT date, delay, origin, destination  FROM us_delay_flights_tbl  WHERE delay > 120 AND ORIGIN = 'SFO' AND DESTINATION = 'ORD'  ORDER by delay DESC""").show(10)  +--------+-----+------+-----------+ |date    |delay|origin|destination| +--------+-----+------+-----------+ |02190925|1638 |SFO   |ORD        | |01031755|396  |SFO   |ORD        | |01022330|326  |SFO   |ORD        | |01051205|320  |SFO   |ORD        | |01190925|297  |SFO   |ORD        | |02171115|296  |SFO   |ORD        | |01071040|279  |SFO   |ORD        | |01051550|274  |SFO   |ORD        | |03120730|266  |SFO   |ORD        | |01261104|258  |SFO   |ORD        | +--------+-----+------+-----------+ but showing tiptop x rows

It seems there were many significantly delayed flights between these two cities, on unlike dates. (As an practise, catechumen the date column into a readable format and notice the days or months when these delays were most common. Were the delays related to winter months or holidays?)

Allow'south try a more complicated query where we use the Example clause in SQL. In the following example, we want to label all US flights, regardless of origin and destination, with an indication of the delays they experienced: Very Long Delays (> 6 hours), Long Delays (2–6 hours), etc. We'll add together these human being-readable labels in a new column called Flight_Delays:

spark.sql("""SELECT filibuster, origin, destination,                CASE                   WHEN delay > 360 And so 'Very Long Delays'                   WHEN delay >= 120 AND delay <= 360 And so 'Long Delays'                   WHEN delay >= lx AND filibuster < 120 And then 'Short Delays'                   WHEN delay > 0 and delay < 60 THEN 'Tolerable Delays'                   WHEN delay = 0 Then 'No Delays'                   ELSE 'Early'                END Every bit Flight_Delays                FROM us_delay_flights_tbl                Lodge By origin, delay DESC""").bear witness(10)  +-----+------+-----------+-------------+ |delay|origin|destination|Flight_Delays| +-----+------+-----------+-------------+ |333  |ABE   |ATL        |Long Delays  | |305  |ABE   |ATL        |Long Delays  | |275  |ABE   |ATL        |Long Delays  | |257  |ABE   |ATL        |Long Delays  | |247  |ABE   |DTW        |Long Delays  | |247  |ABE   |ATL        |Long Delays  | |219  |ABE   |ORD        |Long Delays  | |211  |ABE   |ATL        |Long Delays  | |197  |ABE   |DTW        |Long Delays  | |192  |ABE   |ORD        |Long Delays  | +-----+------+-----------+-------------+ but showing top 10 rows

As with the DataFrame and Dataset APIs, with the spark.sql interface you tin can deport mutual data analysis operations like those nosotros explored in the previous chapter. The computations undergo an identical journey in the Spark SQL engine (see "The Catalyst Optimizer" in Chapter three for details), giving you the same results.

All three of the preceding SQL queries can be expressed with an equivalent DataFrame API query. For example, the first query can exist expressed in the Python DataFrame API as:

              # In Python              from              pyspark.sql.functions              import              col              ,              desc              (              df              .              select              (              "distance"              ,              "origin"              ,              "destination"              )              .              where              (              col              (              "altitude"              )              >              1000              )              .              orderBy              (              desc              (              "distance"              )))              .              show              (              ten              )              # Or              (              df              .              select              (              "altitude"              ,              "origin"              ,              "destination"              )              .              where              (              "altitude > chiliad"              )              .              orderBy              (              "distance"              ,              ascending              =              Fake              )              .              show              (              10              ))            

This produces the same results as the SQL query:

+--------+------+-----------+ |distance|origin|destination| +--------+------+-----------+ |4330    |HNL   |JFK        | |4330    |HNL   |JFK        | |4330    |HNL   |JFK        | |4330    |HNL   |JFK        | |4330    |HNL   |JFK        | |4330    |HNL   |JFK        | |4330    |HNL   |JFK        | |4330    |HNL   |JFK        | |4330    |HNL   |JFK        | |4330    |HNL   |JFK        | +--------+------+-----------+ just showing top 10 rows

As an practise, try converting the other ii SQL queries to use the DataFrame API.

As these examples show, using the Spark SQL interface to query data is like to writing a regular SQL query to a relational database table. Although the queries are in SQL, y'all can feel the similarity in readability and semantics to DataFrame API operations, which you encountered in Chapter iii and will explore further in the side by side chapter.

To enable y'all to query structured information as shown in the preceding examples, Spark manages all the complexities of creating and managing views and tables, both in memory and on disk. That leads us to our next topic: how tables and views are created and managed.

SQL Tables and Views

Tables hold data. Associated with each tabular array in Spark is its relevant metadata, which is information about the tabular array and its data: the schema, clarification, table proper noun, database proper noun, column names, partitions, physical location where the actual information resides, etc. All of this is stored in a central metastore.

Instead of having a carve up metastore for Spark tables, Spark by default uses the Apache Hive metastore, located at /user/hive/warehouse, to persist all the metadata about your tables. However, y'all may change the default location past setting the Spark config variable spark.sql.warehouse.dir to another location, which can be set to a local or external distributed storage.

Managed Versus UnmanagedTables

Spark allows you to create two types of tables: managed and unmanaged. For a managed table, Spark manages both the metadata and the data in the file store. This could exist a local filesystem, HDFS, or an object shop such as Amazon S3 or Azure Hulk. For an unmanaged tabular array, Spark only manages the metadata, while you manage the data yourself in an external data source such as Cassandra.

With a managed tabular array, because Spark manages everything, a SQL command such as DROP Tabular array table_name deletes both the metadata and the data. With an unmanaged tabular array, the aforementioned control will delete but the metadata, non the actual information. Nosotros will expect at some examples of how to create managed and unmanaged tables in the next department.

Creating SQL Databases and Tables

Tables reside inside a database. Past default, Spark creates tables under the default database. To create your ain database proper name, yous can issue a SQL command from your Spark awarding or notebook. Using the U.s.a. flying delays data set, let's create both a managed and an unmanaged tabular array. To begin, nosotros'll create a database called learn_spark_db and tell Spark nosotros want to use that database:

              // In Scala/Python              spark              .              sql              (              "CREATE DATABASE learn_spark_db"              )              spark              .              sql              (              "USE learn_spark_db"              )            

From this signal, any commands we issue in our awarding to create tables volition result in the tables beingness created in this database and residing under the database name learn_spark_db.

Creating a managed tabular array

To create a managed tabular array within the database learn_spark_db, you lot can issue a SQL query similar the post-obit:

                // In Scala/Python                spark                .                sql                (                "CREATE Tabular array managed_us_delay_flights_tbl (date STRING, delay INT,                                                  altitude INT, origin STRING, destination STRING)"                )              

You can practise the aforementioned thing using the DataFrame API similar this:

                # In Python                # Path to our United states of america flight delays CSV file                                csv_file                =                "/databricks-datasets/learning-spark-v2/flights/departuredelays.csv"                # Schema equally divers in the preceding case                schema                =                "appointment Cord, delay INT, distance INT, origin STRING, destination STRING"                flights_df                =                spark                .                read                .                csv                (                csv_file                ,                schema                =                schema                )                flights_df                .                write                .                saveAsTable                (                "managed_us_delay_flights_tbl"                )              

Both of these statements will create the managed table us_delay_flights_tbl in the learn_spark_db database.

Creating an unmanaged table

By contrast, you tin create unmanaged tables from your own data sources—say, Parquet, CSV, or JSON files stored in a file shop accessible to your Spark application.

To create an unmanaged tabular array from a information source such as a CSV file, in SQL use:

spark.sql("""CREATE Tabular array us_delay_flights_tbl(appointment String, filibuster INT,    distance INT, origin STRING, destination STRING)    USING csv OPTIONS (PATH    '/databricks-datasets/learning-spark-v2/flights/departuredelays.csv')""")

And within the DataFrame API apply:

(flights_df   .write   .pick("path", "/tmp/data/us_flights_delay")   .saveAsTable("us_delay_flights_tbl"))
Note

To enable you to explore these examples, we have created Python and Scala example notebooks that you tin can notice in the volume's GitHub repo.

Creating Views

In addition to creating tables, Spark tin can create views on top of existing tables. Views tin be global (visible across all SparkSessions on a given cluster) or session-scoped (visible just to a single SparkSession), and they are temporary: they disappear after your Spark awarding terminates.

Creating views has a like syntax to creating tables within a database. Once you create a view, you can query it as you lot would a table. The divergence betwixt a view and a table is that views don't actually hold the data; tables persist later on your Spark application terminates, merely views disappear.

Yous tin create a view from an existing table using SQL. For example, if you wish to work on only the subset of the US flight delays data set with origin airports of New York (JFK) and San Francisco (SFO), the following queries will create global temporary and temporary views consisting of just that slice of the table:

              -- In SQL              CREATE              OR              REPLACE              GLOBAL              TEMP              VIEW              us_origin_airport_SFO_global_tmp_view              AS              SELECT              date              ,              delay              ,              origin              ,              destination              from              us_delay_flights_tbl              WHERE              origin              =              'SFO'              ;              CREATE              OR              REPLACE              TEMP              VIEW              us_origin_airport_JFK_tmp_view              Equally              SELECT              date              ,              delay              ,              origin              ,              destination              from              us_delay_flights_tbl              WHERE              origin              =              'JFK'            

You can attain the same thing with the DataFrame API as follows:

              # In Python              df_sfo              =              spark              .              sql              (              "SELECT date, filibuster, origin, destination FROM                            us_delay_flights_tbl              WHERE              origin              =              'SFO'              ")              df_jfk              =              spark              .              sql              (              "SELECT date, delay, origin, destination FROM                            us_delay_flights_tbl              WHERE              origin              =              'JFK'              ")              # Create a temporary and global temporary view              df_sfo              .              createOrReplaceGlobalTempView              (              "us_origin_airport_SFO_global_tmp_view"              )              df_jfk              .              createOrReplaceTempView              (              "us_origin_airport_JFK_tmp_view"              )            

Once you've created these views, you can effect queries against them just as you would against a tabular array. Keep in mind that when accessing a global temporary view y'all must utilise the prefix global_temp.<view_name> , because Spark creates global temporary views in a global temporary database called global_temp. For example:

              -- In SQL                            SELECT              *              FROM              global_temp              .              us_origin_airport_SFO_global_tmp_view            

By contrast, you tin can access the normal temporary view without the global_temp prefix:

              -- In SQL                            SELECT              *              FROM              us_origin_airport_JFK_tmp_view            
              // In Scala/Python              spark              .              read              .              tabular array              (              "us_origin_airport_JFK_tmp_view"              )              // Or              spark              .              sql              (              "SELECT * FROM us_origin_airport_JFK_tmp_view"              )            

You tin can likewise drop a view just like you lot would a tabular array:

              -- In SQL              Driblet              VIEW              IF              EXISTS              us_origin_airport_SFO_global_tmp_view              ;              DROP              VIEW              IF              EXISTS              us_origin_airport_JFK_tmp_view            
              // In Scala/Python              spark              .              catalog              .              dropGlobalTempView              (              "us_origin_airport_SFO_global_tmp_view"              )              spark              .              catalog              .              dropTempView              (              "us_origin_airport_JFK_tmp_view"              )            

Temporary views versus global temporary views

The difference between temporary and global temporary views being subtle, it tin can exist a source of mild confusion amongst developers new to Spark. A temporary view is tied to a single SparkSession within a Spark application. In contrast, a global temporary view is visible beyond multiple SparkSessions within a Spark awarding. Yes, you tin create multiple SparkSessions within a single Spark application—this can be handy, for case, in cases where you lot want to access (and combine) data from two unlike SparkSessions that don't share the same Hive metastore configurations.

Caching SQL Tables

Although we will discuss table caching strategies in the next chapter, information technology's worth mentioning here that, like DataFrames, you can cache and uncache SQL tables and views. In Spark iii.0, in addition to other options, y'all tin can specify a table as LAZY, meaning that it should only be buried when it is showtime used instead of immediately:

              -- In SQL                            Cache                                          [              LAZY              ]                                          TABLE                                                          <                table                -                proper name                >                                                        UNCACHE                                          Table                                                          <                table                -                name                >                          

Reading Tables into DataFrames

Frequently, data engineers build data pipelines as part of their regular data ingestion and ETL processes. They populate Spark SQL databases and tables with apple-pie data for consumption by applications downstream.

Let'due south assume you take an existing database, learn_spark_db, and tabular array, us_delay_flights_tbl, ready for use. Instead of reading from an external JSON file, you tin can simply use SQL to query the table and assign the returned consequence to a DataFrame:

              // In Scala              val              usFlightsDF              =              spark              .              sql              (              "SELECT * FROM us_delay_flights_tbl"              )              val              usFlightsDF2              =              spark              .              table              (              "us_delay_flights_tbl"              )            
              # In Python              us_flights_df              =              spark              .              sql              (              "SELECT * FROM us_delay_flights_tbl"              )              us_flights_df2              =              spark              .              table              (              "us_delay_flights_tbl"              )            

At present yous have a cleansed DataFrame read from an existing Spark SQL table. You tin also read data in other formats using Spark'due south built-in information sources, giving y'all the flexibility to interact with various mutual file formats.

Data Sources for DataFrames and SQL Tables

Every bit shown in Figure iv-1, Spark SQL provides an interface to a diverseness of data sources. It too provides a set of mutual methods for reading and writing information to and from these information sources using the Data Sources API.

In this section we will cover some of the congenital-in data sources, available file formats, and ways to load and write data, along with specific options pertaining to these data sources. But offset, let's take a closer expect at two loftier-level Data Source API constructs that dictate the way in which you interact with different information sources: DataFrameReader and DataFrameWriter.

DataFrameReader

DataFrameReader is the core construct for reading data from a data source into a DataFrame. It has a divers format and a recommended design for usage:

DataFrameReader.format(args).option("central", "value").schema(args).load()

This pattern of stringing methods together is common in Spark, and easy to read. We saw it in Chapter 3 when exploring common information assay patterns.

Note that you can just access a DataFrameReader through a SparkSession instance. That is, you cannot create an example of DataFrameReader. To get an example handle to it, use:

SparkSession.read  // or  SparkSession.readStream

While read returns a handle to DataFrameReader to read into a DataFrame from a static data source, readStream returns an instance to read from a streaming source. (We will cover Structured Streaming later in the book.)

Arguments to each of the public methods to DataFrameReader have different values. Table iv-i enumerates these, with a subset of the supported arguments.

Tabular array 4-1. DataFrameReader methods, arguments, and options
Method Arguments Description
format() "parquet", "csv", "txt", "json", "jdbc", "orc", "avro", etc. If you don't specify this method, then the default is Parquet or whatever is set in spark.sql.sources.default.
choice() ("mode", {PERMISSIVE | FAILFAST | DROPMALFORMED } )
("inferSchema", {true | false})
("path", "path_file_data_source")
A series of key/value pairs and options.
The Spark documentation shows some examples and explains the different modes and their actions. The default mode is PERMISSIVE. The "inferSchema" and "mode" options are specific to the JSON and CSV file formats.
schema() DDL String or StructType, e.thou., 'A INT, B String' or
StructType(...)
For JSON or CSV format, you can specify to infer the schema in the option() method. Generally, providing a schema for any format makes loading faster and ensures your data conforms to the expected schema.
load() "/path/to/information/source" The path to the information source. This can be empty if specified in option("path", "...").

While we won't comprehensively enumerate all the dissimilar combinations of arguments and options, the documentation for Python, Scala, R, and Java offers suggestions and guidance. It'southward worthwhile to evidence a couple of examples, though:

              // In Scala              // Use Parquet                            val              file              =              """/databricks-datasets/learning-spark-v2/flights/summary-                              information/parquet/2010-summary.parquet"""              val              df              =              spark              .              read              .              format              (              "parquet"              ).              load              (              file              )              // Use Parquet; y'all tin omit format("parquet") if you lot wish as it'southward the default              val              df2              =              spark              .              read              .              load              (              file              )              // Use CSV              val              df3              =              spark              .              read              .              format              (              "csv"              )              .              option              (              "inferSchema"              ,              "truthful"              )              .              choice              (              "header"              ,              "true"              )              .              choice              (              "mode"              ,              "PERMISSIVE"              )              .              load              (              "/databricks-datasets/learning-spark-v2/flights/summary-data/csv/*"              )              // Use JSON              val              df4              =              spark              .              read              .              format              (              "json"              )              .              load              (              "/databricks-datasets/learning-spark-v2/flights/summary-data/json/*"              )            
Annotation

In general, no schema is needed when reading from a static Parquet data source—the Parquet metadata usually contains the schema, then information technology'due south inferred. Nevertheless, for streaming data sources yous volition take to provide a schema. (Nosotros will cover reading from streaming information sources in Chapter 8.)

Parquet is the default and preferred data source for Spark because it's efficient, uses columnar storage, and employs a fast pinch algorithm. You volition meet additional benefits subsequently (such equally columnar pushdown), when nosotros cover the Goad optimizer in greater depth.

DataFrameWriter

DataFrameWriter does the reverse of its analogue: information technology saves or writes information to a specified built-in data source. Dissimilar with DataFrameReader, yous access its case not from a SparkSession merely from the DataFrame you wish to salve. It has a few recommended usage patterns:

DataFrameWriter.format(args)   .selection(args)   .bucketBy(args)   .partitionBy(args)   .save(path)  DataFrameWriter.format(args).option(args).sortBy(args).saveAsTable(table)

To get an example handle, use:

DataFrame.write // or  DataFrame.writeStream

Arguments to each of the methods to DataFrameWriter also accept unlike values. We listing these in Table 4-two, with a subset of the supported arguments.

Tabular array four-2. DataFrameWriter methods, arguments, and options
Method Arguments Clarification
format() "parquet", "csv", "txt", "json", "jdbc", "orc", "avro", etc. If you don't specify this method, then the default is Parquet or whatsoever is set in spark.sql.sources.default.
option() ("mode", {append | overwrite | ignore | error or errorifexists} )
("way", {SaveMode.Overwrite | SaveMode.Suspend, SaveMode.Ignore, SaveMode.ErrorIfExists})
("path", "path_to_write_to")
A series of key/value pairs and options. The Spark documentation shows some examples. This is an overloaded method. The default mode options are error or errorifexists and SaveMode.ErrorIfExists; they throw an exception at runtime if the data already exists.
bucketBy() (numBuckets, col, col..., coln) The number of buckets and names of columns to bucket by. Uses Hive's bucketing scheme on a filesystem.
salvage() "/path/to/information/source" The path to salve to. This can be empty if specified in option("path", "...").
saveAsTable() "table_name" The table to save to.

Here's a short case snippet to illustrate the employ of methods and arguments:

              // In Scala              // Use JSON              val              location              =              ...              df              .              write              .              format              (              "json"              ).              mode              (              "overwrite"              ).              save              (              location              )            

Parquet

Nosotros'll offset our exploration of data sources with Parquet, because it's the default data source in Spark. Supported and widely used by many big data processing frameworks and platforms, Parquet is an open source columnar file format that offers many I/O optimizations (such as compression, which saves storage space and allows for quick access to data columns).

Considering of its efficiency and these optimizations, we recommend that later on yous accept transformed and cleansed your information, you relieve your DataFrames in the Parquet format for downstream consumption. (Parquet is likewise the default tabular array open format for Delta Lake, which we volition cover in Chapter 9.)

Reading Parquet files into a DataFrame

Parquet files are stored in a directory construction that contains the data files, metadata, a number of compressed files, and some status files. Metadata in the footer contains the version of the file format, the schema, and column data such equally the path, etc.

For instance, a directory in a Parquet file might contain a set of files like this:

_SUCCESS _committed_1799640464332036264 _started_1799640464332036264 office-00000-tid-1799640464332036264-91273258-d7ef-4dc7-<...>-c000.snappy.parquet

There may be a number of part-XXXX compressed files in a directory (the names shown here have been shortened to fit on the page).

To read Parquet files into a DataFrame, you lot merely specify the format and path:

                // In Scala                val                file                =                """/databricks-datasets/learning-spark-v2/flights/summary-information/                                  parquet/2010-summary.parquet/"""                val                df                =                spark                .                read                .                format                (                "parquet"                ).                load                (                file                )              
                # In Python                file                =                """/databricks-datasets/learning-spark-v2/flights/summary-information/parquet/                                  2010-summary.parquet/"""                df                =                spark                .                read                .                format                (                "parquet"                )                .                load                (                file                )              

Unless you lot are reading from a streaming information source there's no need to supply the schema, because Parquet saves it equally part of its metadata.

Reading Parquet files into a Spark SQL table

Also equally reading Parquet files into a Spark DataFrame, y'all can also create a Spark SQL unmanaged table or view direct using SQL:

                -- In SQL                CREATE                OR                REPLACE                TEMPORARY                VIEW                us_delay_flights_tbl                USING                parquet                OPTIONS                (                path                "/databricks-datasets/learning-spark-v2/flights/summary-data/parquet/                                  2010-summary.parquet/"                )              

Once you lot've created the table or view, you tin read data into a DataFrame using SQL, as nosotros saw in some earlier examples:

                // In Scala                spark                .                sql                (                "SELECT * FROM us_delay_flights_tbl"                ).                evidence                ()              
                # In Python                spark                .                sql                (                "SELECT * FROM us_delay_flights_tbl"                )                .                show                ()              

Both of these operations return the same results:

+-----------------+-------------------+-----+ |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count| +-----------------+-------------------+-----+ |United States    |Romania            |1    | |United States    |Republic of ireland            |264  | |United States    |India              |69   | |Egypt            |United States      |24   | |Republic of equatorial guinea|United States      |1    | |United States    |Singapore          |25   | |United states    |Grenada            |54   | |Costa Rica       |United States      |477  | |Senegal          |The states      |29   | |United States    |Republic of the marshall islands   |44   | +-----------------+-------------------+-----+ only showing summit 10 rows

Writing DataFrames to Parquet files

Writing or saving a DataFrame as a table or file is a mutual operation in Spark. To write a DataFrame you simply use the methods and arguments to the DataFrameWriter outlined earlier in this chapter, supplying the location to save the Parquet files to. For case:

                // In Scala                df                .                write                .                format                (                "parquet"                )                .                style                (                "overwrite"                )                .                option                (                "compression"                ,                "snappy"                )                .                salvage                (                "/tmp/data/parquet/df_parquet"                )              
                # In Python                (                df                .                write                .                format                (                "parquet"                )                .                mode                (                "overwrite"                )                .                choice                (                "pinch"                ,                "snappy"                )                .                save                (                "/tmp/data/parquet/df_parquet"                ))              
Note

Recollect that Parquet is the default file format. If you lot don't include the format() method, the DataFrame will still be saved as a Parquet file.

This will create a set of compact and compressed Parquet files at the specified path. Since we used snappy as our compression pick here, we'll have snappy compressed files. For brevity, this example generated simply one file; normally, there may be a dozen or and so files created:

-rw-r--r--  ane jules  wheel    0 May xix x:58 _SUCCESS -rw-r--r--  1 jules  cycle  966 May 19 10:58 function-00000-<...>-c000.snappy.parquet

Writing DataFrames to Spark SQL tables

Writing a DataFrame to a SQL table is as easy as writing to a file—just use saveAsTable() instead of salvage(). This will create a managed table called us_delay_flights_tbl:

                // In Scala                df                .                write                .                manner                (                "overwrite"                )                .                saveAsTable                (                "us_delay_flights_tbl"                )              
                # In Python                (                df                .                write                .                fashion                (                "overwrite"                )                .                saveAsTable                (                "us_delay_flights_tbl"                ))              

To sum up, Parquet is the preferred and default built-in information source file format in Spark, and information technology has been adopted past many other frameworks. Nosotros recommend that you utilize this format in your ETL and data ingestion processes.

JSON

JavaScript Object Notation (JSON) is likewise a popular data format. Information technology came to prominence as an easy-to-read and easy-to-parse format compared to XML. It has 2 representational formats: single-line mode and multiline mode. Both modes are supported in Spark.

In unmarried-line mode each line denotes a single JSON object, whereas in multiline mode the entire multiline object constitutes a single JSON object. To read in this way, set multiLine to true in the pick() method.

Reading a JSON file into a DataFrame

You can read a JSON file into a DataFrame the same mode yous did with Parquet—just specify "json" in the format() method:

                // In Scala                val                file                =                "/databricks-datasets/learning-spark-v2/flights/summary-information/json/*"                val                df                =                spark                .                read                .                format                (                "json"                ).                load                (                file                )              
                # In Python                file                =                "/databricks-datasets/learning-spark-v2/flights/summary-data/json/*"                df                =                spark                .                read                .                format                (                "json"                )                .                load                (                file                )              

Reading a JSON file into a Spark SQL table

You can also create a SQL table from a JSON file just like you did with Parquet:

                -- In SQL                                CREATE                                                OR                                                Supercede                                                TEMPORARY                                                VIEW                                                                  us_delay_flights_tbl                                                                USING                                                json                                                OPTIONS                                                (                                                path                                                "/databricks-datasets/learning-spark-v2/flights/summary-data/json/*"                                                )              

One time the tabular array is created, yous tin can read data into a DataFrame using SQL:

                //                In                Scala                /                Python                spark                .                sql                (                "SELECT * FROM us_delay_flights_tbl"                )                .                show                ()                +-----------------+-------------------+-----+                |                DEST_COUNTRY_NAME                |                ORIGIN_COUNTRY_NAME                |                count                |                +-----------------+-------------------+-----+                |                United                States                |                Romania                |                fifteen                |                |                United                States                |                Croatia                |                1                |                |                United                States                |                Ireland                |                344                |                |                Egypt                |                United                States                |                15                |                |                United                States                |                India                |                62                |                |                United                States                |                Singapore                |                i                |                |                United                States                |                Grenada                |                62                |                |                Costa                Rica                |                United                States                |                588                |                |                Senegal                |                United                States                |                40                |                |                Moldova                |                United                States                |                1                |                +-----------------+-------------------+-----+                simply                showing                top                10                rows              

Writing DataFrames to JSON files

Saving a DataFrame every bit a JSON file is elementary. Specify the advisable DataFrameWriter methods and arguments, and supply the location to relieve the JSON files to:

                // In Scala                df                .                write                .                format                (                "json"                )                .                mode                (                "overwrite"                )                .                option                (                "compression"                ,                "snappy"                )                .                save                (                "/tmp/data/json/df_json"                )              
                # In Python                (                df                .                write                .                format                (                "json"                )                .                fashion                (                "overwrite"                )                .                option                (                "compression"                ,                "snappy"                )                .                salvage                (                "/tmp/data/json/df_json"                ))              

This creates a directory at the specified path populated with a prepare of meaty JSON files:

-rw-r--r--  i jules  wheel   0 May 16 14:44 _SUCCESS -rw-r--r--  one jules  wheel  71 May sixteen 14:44 function-00000-<...>-c000.json

JSON information source options

Tabular array 4-3 describes common JSON options for DataFrameReader and DataFrameWriter. For a comprehensive list, we refer you to the documentation.

Table 4-iii. JSON options for DataFrameReader and DataFrameWriter
Property name Values Pregnant Scope
compression none, uncompressed, bzip2, debunk, gzip, lz4, or snappy Use this compression codec for writing. Annotation that read will merely notice the pinch or codec from the file extension. Write
dateFormat yyyy-MM-dd or DateTimeFormatter Use this format or any format from Java's DateTimeFormatter. Read/write
multiLine true, fake Use multiline mode. Default is faux (single-line manner). Read
allowUnquotedFieldNames true, false Let unquoted JSON field names. Default is faux. Read

CSV

As widely used as plain text files, this common text file format captures each datum or field delimited past a comma; each line with comma-separated fields represents a record. Even though a comma is the default separator, you lot may apply other delimiters to separate fields in cases where commas are part of your data. Popular spreadsheets can generate CSV files, so it'southward a popular format amidst information and concern analysts.

Reading a CSV file into a DataFrame

Equally with the other built-in data sources, you lot tin apply the DataFrameReader methods and arguments to read a CSV file into a DataFrame:

                // In Scala                val                file                =                "/databricks-datasets/learning-spark-v2/flights/summary-information/csv/*"                val                schema                =                "DEST_COUNTRY_NAME STRING, ORIGIN_COUNTRY_NAME String, count INT"                val                df                =                spark                .                read                .                format                (                "csv"                )                .                schema                (                schema                )                .                selection                (                "header"                ,                "true"                )                .                option                (                "mode"                ,                "FAILFAST"                )                // Go out if any errors                .                option                (                "nullValue"                ,                ""                )                // Replace whatever null data with quotes                .                load                (                file                )              
                # In Python                file                =                "/databricks-datasets/learning-spark-v2/flights/summary-data/csv/*"                schema                =                "DEST_COUNTRY_NAME STRING, ORIGIN_COUNTRY_NAME STRING, count INT"                df                =                (                spark                .                read                .                format                (                "csv"                )                .                option                (                "header"                ,                "truthful"                )                .                schema                (                schema                )                .                option                (                "mode"                ,                "FAILFAST"                )                # Go out if whatever errors                .                selection                (                "nullValue"                ,                ""                )                # Replace any null data field with quotes                .                load                (                file                ))              

Reading a CSV file into a Spark SQL table

Creating a SQL tabular array from a CSV data source is no different from using Parquet or JSON:

                -- In SQL                CREATE                OR                REPLACE                TEMPORARY                VIEW                us_delay_flights_tbl                USING                csv                OPTIONS                (                path                "/databricks-datasets/learning-spark-v2/flights/summary-data/csv/*"                ,                header                "true"                ,                inferSchema                "true"                ,                manner                "FAILFAST"                )              

One time you've created the table, you can read data into a DataFrame using SQL as earlier:

                //                In                Scala                /                Python                spark                .                sql                (                "SELECT * FROM us_delay_flights_tbl"                )                .                show                (                10                )                +-----------------+-------------------+-----+                |                DEST_COUNTRY_NAME                |                ORIGIN_COUNTRY_NAME                |                count                |                +-----------------+-------------------+-----+                |                United                States                |                Romania                |                one                |                |                United                States                |                Ireland                |                264                |                |                United                States                |                India                |                69                |                |                Egypt                |                United                States                |                24                |                |                Equatorial                Guinea                |                United                States                |                1                |                |                United                States                |                Singapore                |                25                |                |                United                States                |                Grenada                |                54                |                |                Costa                Rica                |                United                States                |                477                |                |                Senegal                |                United                States                |                29                |                |                United                States                |                Marshall                Islands                |                44                |                +-----------------+-------------------+-----+                only                showing                top                10                rows              

Writing DataFrames to CSV files

Saving a DataFrame as a CSV file is simple. Specify the appropriate DataFrameWriter methods and arguments, and supply the location to save the CSV files to:

                // In Scala                df                .                write                .                format                (                "csv"                ).                mode                (                "overwrite"                ).                save                (                "/tmp/data/csv/df_csv"                )              
                # In Python                df                .                write                .                format                (                "csv"                )                .                mode                (                "overwrite"                )                .                save                (                "/tmp/data/csv/df_csv"                )              

This generates a binder at the specified location, populated with a bunch of compressed and compact files:

-rw-r--r--  i jules  wheel   0 May 16 12:17 _SUCCESS -rw-r--r--  1 jules  cycle  36 May 16 12:17 part-00000-251690eb-<...>-c000.csv

CSV data source options

Table 4-4 describes some of the common CSV options for DataFrameReader and DataFrameWriter. Because CSV files can be complex, many options are available; for a comprehensive list we refer you to the documentation.

Table iv-four. CSV options for DataFrameReader and DataFrameWriter
Belongings name Values Significant Scope
compression none, bzip2, deflate, gzip, lz4, or snappy Use this pinch codec for writing. Write
dateFormat yyyy-MM-dd or DateTimeFormatter Use this format or any format from Coffee's DateTimeFormatter. Read/write
multiLine true, imitation Use multiline fashion. Default is false (single-line manner). Read
inferSchema true, fake If true, Spark will decide the cavalcade information types. Default is false. Read
sep Any character Use this grapheme to divide column values in a row. Default delimiter is a comma (,). Read/write
escape Any character Use this character to escape quotes. Default is \. Read/write
header true, false Indicates whether the start line is a header denoting each column name. Default is false. Read/write

Avro

Introduced in Spark 2.4 as a born information source, the Avro format is used, for example, by Apache Kafka for message serializing and deserializing. It offers many benefits, including straight mapping to JSON, speed and efficiency, and bindings available for many programming languages.

Reading an Avro file into a DataFrame

Reading an Avro file into a DataFrame using DataFrameReader is consistent in usage with the other information sources nosotros have discussed in this section:

                // In Scala                val                df                =                spark                .                read                .                format                (                "avro"                )                .                load                (                "/databricks-datasets/learning-spark-v2/flights/summary-data/avro/*"                )                df                .                show                (                false                )              
                # In Python                df                =                (                spark                .                read                .                format                (                "avro"                )                .                load                (                "/databricks-datasets/learning-spark-v2/flights/summary-data/avro/*"                ))                df                .                prove                (                truncate                =                Faux                )                +-----------------+-------------------+-----+                |                DEST_COUNTRY_NAME                |                ORIGIN_COUNTRY_NAME                |                count                |                +-----------------+-------------------+-----+                |                United                States                |                Romania                |                ane                |                |                United                States                |                Republic of ireland                |                264                |                |                United                States                |                India                |                69                |                |                Egypt                |                United                States                |                24                |                |                Equatorial                Republic of guinea                |                United                States                |                1                |                |                United                States                |                Singapore                |                25                |                |                United                States                |                Grenada                |                54                |                |                Costa                Rica                |                United                States                |                477                |                |                Senegal                |                United                States                |                29                |                |                United                States                |                Marshall                Islands                |                44                |                +-----------------+-------------------+-----+                only                showing                pinnacle                10                rows              

Reading an Avro file into a Spark SQL table

Again, creating SQL tables using an Avro data source is no different from using Parquet, JSON, or CSV:

                -- In SQL                                CREATE                OR                Supplant                TEMPORARY                VIEW                episode_tbl                USING                avro                OPTIONS                (                path                "/databricks-datasets/learning-spark-v2/flights/summary-data/avro/*"                )              

One time you've created a table, you can read information into a DataFrame using SQL:

                // In Scala                spark                .                sql                (                "SELECT * FROM episode_tbl"                ).                show                (                false                )              
                # In Python                spark                .                sql                (                "SELECT * FROM episode_tbl"                )                .                show                (                truncate                =                False                )                +-----------------+-------------------+-----+                |                DEST_COUNTRY_NAME                |                ORIGIN_COUNTRY_NAME                |                count                |                +-----------------+-------------------+-----+                |                United                States                |                Romania                |                i                |                |                United                States                |                Ireland                |                264                |                |                United                States                |                India                |                69                |                |                Arab republic of egypt                |                United                States                |                24                |                |                Equatorial                Republic of guinea                |                United                States                |                one                |                |                United                States                |                Singapore                |                25                |                |                United                States                |                Grenada                |                54                |                |                Costa                Rica                |                United                States                |                477                |                |                Senegal                |                United                States                |                29                |                |                United                States                |                Marshall                Islands                |                44                |                +-----------------+-------------------+-----+                only                showing                top                x                rows              

Writing DataFrames to Avro files

Writing a DataFrame every bit an Avro file is simple. As usual, specify the appropriate DataFrameWriter methods and arguments, and supply the location to save the Avro files to:

                // In Scala                df                .                write                .                format                (                "avro"                )                .                manner                (                "overwrite"                )                .                salve                (                "/tmp/data/avro/df_avro"                )              
                # In Python                (                df                .                write                .                format                (                "avro"                )                .                mode                (                "overwrite"                )                .                salve                (                "/tmp/information/avro/df_avro"                ))              

This generates a folder at the specified location, populated with a bunch of compressed and compact files:

-rw-r--r--  one jules  wheel    0 May 17 xi:54 _SUCCESS -rw-r--r--  1 jules  wheel  526 May 17 11:54 part-00000-ffdf70f4-<...>-c000.avro

Avro data source options

Table four-5 describes common options for DataFrameReader and DataFrameWriter. A comprehensive list of options is in the documentation.

Table four-five. Avro options for DataFrameReader and DataFrameWriter
Property name Default value Pregnant Telescopic
avroSchema None Optional Avro schema provided by a user in JSON format. The data type and naming of tape fields should match the input Avro information or Goad data (Spark internal data blazon), otherwise the read/write action will neglect. Read/write
recordName topLevelRecord Top-level record name in write result, which is required in the Avro spec. Write
recordNamespace "" Tape namespace in write result. Write
ignoreExtension truthful If this option is enabled, all files (with and without the .avro extension) are loaded. Otherwise, files without the .avro extension are ignored. Read
compression snappy Allows you to specify the compression codec to use in writing. Currently supported codecs are uncompressed, snappy, debunk, bzip2, and xz.
If this option is not set, the value in spark.sql.avro.compression.codec is taken into business relationship.
Write

ORC

Every bit an boosted optimized columnar file format, Spark 2.x supports a vectorized ORC reader. Two Spark configurations dictate which ORC implementation to employ. When spark.sql.orc.impl is set to native and spark.sql.orc.enableVectorizedReader is set to true, Spark uses the vectorized ORC reader. A vectorized reader reads blocks of rows (often 1,024 per block) instead of ane row at a fourth dimension, streamlining operations and reducing CPU usage for intensive operations similar scans, filters, aggregations, and joins.

For Hive ORC SerDe (serialization and deserialization) tables created with the SQL command USING HIVE OPTIONS (fileFormat 'ORC'), the vectorized reader is used when the Spark configuration parameter spark.sql.hive.convertMetastoreOrc is set to true.

Reading an ORC file into a DataFrame

To read in a DataFrame using the ORC vectorized reader, you tin can just employ the normal DataFrameReader methods and options:

                // In Scala                                val                file                =                "/databricks-datasets/learning-spark-v2/flights/summary-data/orc/*"                val                df                =                spark                .                read                .                format                (                "orc"                ).                load                (                file                )                df                .                show                (                x                ,                false                )              
                # In Python                file                =                "/databricks-datasets/learning-spark-v2/flights/summary-data/orc/*"                df                =                spark                .                read                .                format                (                "orc"                )                .                option                (                "path"                ,                file                )                .                load                ()                df                .                show                (                10                ,                False                )                +-----------------+-------------------+-----+                |                DEST_COUNTRY_NAME                |                ORIGIN_COUNTRY_NAME                |                count                |                +-----------------+-------------------+-----+                |                United                States                |                Romania                |                one                |                |                United                States                |                Ireland                |                264                |                |                United                States                |                Republic of india                |                69                |                |                Egypt                |                United                States                |                24                |                |                Equatorial                Guinea                |                United                States                |                1                |                |                United                States                |                Singapore                |                25                |                |                United                States                |                Grenada                |                54                |                |                Costa                Rica                |                United                States                |                477                |                |                Senegal                |                United                States                |                29                |                |                United                States                |                Marshall                Islands                |                44                |                +-----------------+-------------------+-----+                merely                showing                elevation                10                rows              

Reading an ORC file into a Spark SQL table

There is no deviation from Parquet, JSON, CSV, or Avro when creating a SQL view using an ORC data source:

                -- In SQL                CREATE                OR                REPLACE                TEMPORARY                VIEW                us_delay_flights_tbl                USING                orc                OPTIONS                (                path                "/databricks-datasets/learning-spark-v2/flights/summary-data/orc/*"                )              

In one case a table is created, you can read information into a DataFrame using SQL as usual:

                //                In                Scala                /                Python                spark                .                sql                (                "SELECT * FROM us_delay_flights_tbl"                )                .                show                ()                +-----------------+-------------------+-----+                |                DEST_COUNTRY_NAME                |                ORIGIN_COUNTRY_NAME                |                count                |                +-----------------+-------------------+-----+                |                United                States                |                Romania                |                i                |                |                United                States                |                Ireland                |                264                |                |                United                States                |                Republic of india                |                69                |                |                Egypt                |                United                States                |                24                |                |                Equatorial                Guinea                |                United                States                |                1                |                |                United                States                |                Singapore                |                25                |                |                United                States                |                Grenada                |                54                |                |                Costa                Rica                |                United                States                |                477                |                |                Senegal                |                United                States                |                29                |                |                United                States                |                Marshall                Islands                |                44                |                +-----------------+-------------------+-----+                simply                showing                acme                x                rows              

Writing DataFrames to ORC files

Writing dorsum a transformed DataFrame after reading is every bit simple using the DataFrameWriter methods:

                // In Scala                df                .                write                .                format                (                "orc"                )                .                mode                (                "overwrite"                )                .                selection                (                "pinch"                ,                "snappy"                )                .                save                (                "/tmp/data/orc/df_orc"                )              
                # In Python                (                df                .                write                .                format                (                "orc"                )                .                fashion                (                "overwrite"                )                .                option                (                "compression"                ,                "snappy"                )                .                save                (                "/tmp/data/orc/flights_orc"                ))              

The effect will be a folder at the specified location containing some compressed ORC files:

-rw-r--r--  1 jules  wheel    0 May 16 17:23 _SUCCESS -rw-r--r--  one jules  wheel  547 May 16 17:23 role-00000-<...>-c000.snappy.orc

Images

In Spark 2.four the community introduced a new data source, epitome files, to support deep learning and machine learning frameworks such every bit TensorFlow and PyTorch. For estimator vision–based auto learning applications, loading and processing image data sets is important.

Reading an paradigm file into a DataFrame

As with all of the previous file formats, yous can apply the DataFrameReader methods and options to read in an epitome file equally shown here:

                // In Scala                import                org.apache.spark.ml.source.epitome                val                imageDir                =                "/databricks-datasets/learning-spark-v2/cctvVideos/train_images/"                val                imagesDF                =                spark                .                read                .                format                (                "image"                ).                load                (                imageDir                )                imagesDF                .                printSchema                imagesDF                .                select                (                "epitome.height"                ,                "image.width"                ,                "prototype.nChannels"                ,                "epitome.mode"                ,                "label"                ).                show                (                v                ,                imitation                )              
                # In Python                from                pyspark.ml                import                epitome                image_dir                =                "/databricks-datasets/learning-spark-v2/cctvVideos/train_images/"                images_df                =                spark                .                read                .                format                (                "image"                )                .                load                (                image_dir                )                images_df                .                printSchema                ()                root                |--                paradigm                :                struct                (                nullable                =                true                )                |                |--                origin                :                string                (                nullable                =                truthful                )                |                |--                peak                :                integer                (                nullable                =                true                )                |                |--                width                :                integer                (                nullable                =                true                )                |                |--                nChannels                :                integer                (                nullable                =                true                )                |                |--                mode                :                integer                (                nullable                =                true                )                |                |--                information                :                binary                (                nullable                =                true                )                |--                characterization                :                integer                (                nullable                =                truthful                )                images_df                .                select                (                "image.summit"                ,                "paradigm.width"                ,                "image.nChannels"                ,                "image.mode"                ,                "label"                )                .                show                (                five                ,                truncate                =                False                )                +------+-----+---------+----+-----+                |                peak                |                width                |                nChannels                |                style                |                label                |                +------+-----+---------+----+-----+                |                288                |                384                |                3                |                16                |                0                |                |                288                |                384                |                3                |                16                |                1                |                |                288                |                384                |                3                |                xvi                |                0                |                |                288                |                384                |                iii                |                sixteen                |                0                |                |                288                |                384                |                3                |                xvi                |                0                |                +------+-----+---------+----+-----+                only                showing                top                5                rows              

Binary Files

Spark 3.0 adds support for binary files every bit a data source. The DataFrameReader converts each binary file into a single DataFrame row (tape) that contains the raw content and metadata of the file. The binary file data source produces a DataFrame with the following columns:

  • path: StringType

  • modificationTime: TimestampType

  • length: LongType

  • content: BinaryType

Reading a binary file into a DataFrame

To read binary files, specify the data source format every bit a binaryFile. You tin can load files with paths matching a given global pattern while preserving the beliefs of partition discovery with the data source selection pathGlobFilter. For case, the post-obit code reads all JPG files from the input directory with any partitioned directories:

                // In Scala                val                path                =                "/databricks-datasets/learning-spark-v2/cctvVideos/train_images/"                val                binaryFilesDF                =                spark                .                read                .                format                (                "binaryFile"                )                .                option                (                "pathGlobFilter"                ,                "*.jpg"                )                .                load                (                path                )                binaryFilesDF                .                testify                (                5                )              
                # In Python                path                =                "/databricks-datasets/learning-spark-v2/cctvVideos/train_images/"                binary_files_df                =                (                spark                .                read                .                format                (                "binaryFile"                )                .                choice                (                "pathGlobFilter"                ,                "*.jpg"                )                .                load                (                path                ))                binary_files_df                .                show                (                5                )                +--------------------+-------------------+------+--------------------+-----+                |                path                |                modificationTime                |                length                |                content                |                label                |                +--------------------+-------------------+------+--------------------+-----+                |                file                :                /                Users                /                jules                ...|                2020                -                02                -                12                12                :                04                :                24                |                55037                |                [                FF                D8                FF                E0                00                one.                ..|                0                |                |                file                :                /                Users                /                jules                ...|                2020                -                02                -                12                12                :                04                :                24                |                54634                |                [                FF                D8                FF                E0                00                1.                ..|                ane                |                |                file                :                /                Users                /                jules                ...|                2020                -                02                -                12                12                :                04                :                24                |                54624                |                [                FF                D8                FF                E0                00                i.                ..|                0                |                |                file                :                /                Users                /                jules                ...|                2020                -                02                -                12                12                :                04                :                24                |                54505                |                [                FF                D8                FF                E0                00                ane.                ..|                0                |                |                file                :                /                Users                /                jules                ...|                2020                -                02                -                12                12                :                04                :                24                |                54475                |                [                FF                D8                FF                E0                00                1.                ..|                0                |                +--------------------+-------------------+------+--------------------+-----+                only                showing                top                v                rows              

To ignore sectionalization data discovery in a directory, you tin can set recursiveFileLookup to "true":

                // In Scala                val                binaryFilesDF                =                spark                .                read                .                format                (                "binaryFile"                )                .                pick                (                "pathGlobFilter"                ,                "*.jpg"                )                .                option                (                "recursiveFileLookup"                ,                "true"                )                .                load                (                path                )                binaryFilesDF                .                bear witness                (                v                )              
                # In Python                binary_files_df                =                (                spark                .                read                .                format                (                "binaryFile"                )                .                option                (                "pathGlobFilter"                ,                "*.jpg"                )                .                option                (                "recursiveFileLookup"                ,                "truthful"                )                .                load                (                path                ))                binary_files_df                .                bear witness                (                5                )                +--------------------+-------------------+------+--------------------+                |                path                |                modificationTime                |                length                |                content                |                +--------------------+-------------------+------+--------------------+                |                file                :                /                Users                /                jules                ...|                2020                -                02                -                12                12                :                04                :                24                |                55037                |                [                FF                D8                FF                E0                00                1.                ..|                |                file                :                /                Users                /                jules                ...|                2020                -                02                -                12                12                :                04                :                24                |                54634                |                [                FF                D8                FF                E0                00                one.                ..|                |                file                :                /                Users                /                jules                ...|                2020                -                02                -                12                12                :                04                :                24                |                54624                |                [                FF                D8                FF                E0                00                1.                ..|                |                file                :                /                Users                /                jules                ...|                2020                -                02                -                12                12                :                04                :                24                |                54505                |                [                FF                D8                FF                E0                00                i.                ..|                |                file                :                /                Users                /                jules                ...|                2020                -                02                -                12                12                :                04                :                24                |                54475                |                [                FF                D8                FF                E0                00                1.                ..|                +--------------------+-------------------+------+--------------------+                only                showing                meridian                v                rows              

Note that the label column is absent when the recursiveFileLookup option is prepare to "truthful".

Currently, the binary file information source does not support writing a DataFrame back to the original file format.

In this section, you got a tour of how to read data into a DataFrame from a range of supported file formats. We likewise showed you lot how to create temporary views and tables from the existing congenital-in data sources. Whether you lot're using the DataFrame API or SQL, the queries produce identical outcomes. You can examine some of these queries in the notebook available in the GitHub repo for this book.

Summary

To recap, this affiliate explored the interoperability between the DataFrame API and Spark SQL. In particular, you lot got a flavor of how to employ Spark SQL to:

  • Create managed and unmanaged tables using Spark SQL and the DataFrame API.

  • Read from and write to various built-in data sources and file formats.

  • Utilize the spark.sql programmatic interface to result SQL queries on structured information stored equally Spark SQL tables or views.

  • Peruse the Spark Catalog to inspect metadata associated with tables and views.

  • Utilise the DataFrameWriter and DataFrameReader APIs.

Through the code snippets in the chapter and the notebooks available in the book's GitHub repo, y'all got a experience for how to utilise DataFrames and Spark SQL. Continuing in this vein, the next chapter further explores how Spark interacts with the external information sources shown in Figure 4-1. You'll run into some more in-depth examples of transformations and the interoperability betwixt the DataFrame API and Spark SQL.

How to Read an Avro File Using Pyspark

Source: https://www.oreilly.com/library/view/learning-spark-2nd/9781492050032/ch04.html

0 Response to "How to Read an Avro File Using Pyspark"

Post a Comment

Iklan Atas Artikel

Iklan Tengah Artikel 1

Iklan Tengah Artikel 2

Iklan Bawah Artikel