Introduction to Spark
A practical, beginner-friendly guide to getting started with Apache Spark and Python, covering everything from setting up a Dockerized Jupyter environment to mastering fundamental concepts like DataFrames, lazy evaluation, and Spark SQL.
What is Apache Spark?
Apache Spark is an open-source, distributed computing system designed for fast processing of large-scale data. Before Spark, big data processing relied heavily on Hadoop MapReduce, which read from and wrote to physical disk drives between every step. Spark revolutionized this by caching data in-memory (RAM), making it up to 100 times faster for certain workloads.
It provides a unified analytics engine with built-in modules for SQL, streaming, machine learning, and graph processing. While Spark is written in Scala, we will be using PySpark, the Python API for Spark. PySpark allows data engineers and scientists to harness the power of distributed computing using familiar Python syntax.
Let's dive into setting up our environment and writing our first Spark application.
Docker for Spark with Jupyter
# Use the official Jupyter PySpark image
FROM jupyter/pyspark-notebook:latest
# Set the user to root to install system packages if necessary
USER root
# (Optional) Install any system-level dependencies
# RUN apt-get update && apt-get install -y some-package
# Switch back to the default user (jovyan) for safety
USER jovyan
# Install additional Python libraries
# We install pandas and matplotlib for data visualization/analysis alongside Spark
# Not used in this article
RUN pip install --no-cache-dir pandas matplotlib pyarrow
services:
pyspark-notebook:
build: .
container_name: pyspark-jupyter
ports:
- "8888:8888" # Jupyter Lab Port
- "4040:4040" # Spark Master Web UI Port
volumes:
# Maps the local 'work' folder to the container's notebook directory
# This ensures your work is saved even if the container stops
- ./work:/home/jovyan/work
environment:
# This allows the notebook to be accessible from localhost without token hassles
- JUPYTER_ENABLE_LAB=yes
We'll use data from https://data.gov.gr/datasets/hyperion. Download some data from there, and place them in ./work/data/.
Start a Session
SparkSession is a unified wrapper that allows us to use Spark for creating DataFrames and Datasets, executing SQL queries, reading data from various sources (parquet, JSON, CSV, online data warehouse, etc.), registering User Defined Functions (UDFs), and managing cluster resources and configurations, among others.
The standard way to create a SparkSession is by using the builder pattern.
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
.appName("My First Application") \
.getOrCreate()
The above setup is perfectly fine for learning, but in a production environment, we'd often need to specify things like memory, cores, or other specific runtime configs.
from pyspark.sql import SparkSession
# Note: master("local[*]") is only for testing; usually set via spark-submit
# spark.sql.shuffle.partitions is set to 10 for tuning shuffle performance
spark = SparkSession.builder \
.appName("Production ETL Job") \
.master("local[*]") \
.config("spark.driver.memory", "4g") \
.config("spark.executor.memory", "4g") \
.config("spark.sql.shuffle.partitions", "10") \
.getOrCreate()
Here are the basic builder methods:
.appName(name): Sets the name shown in the Spark UI (more on that in a bit)..master(url): Where to run:"local": Run locally with one thread."local[4]": Run locally with 4 threads."local[*]": Run locally with as many threads as logical cores."spark://master:7077": Runs across many machines.
.config(key, value): Sets generic Spark configurations:spark.driver.memory: The memory allocated to the Driver process. The Driver is the "master" of the application. It coordinates tasks and collects results.spark.executor.memory: The memory allocated to the Executor processes. Executors are the "workers" on the cluster nodes. They perform the actual calculations (filtering, joining, caching data).spark.sql.shuffle.partitions: The number of partitions created when data is shuffled (re-distributed) across the cluster. It happens during operations likegroupBy,join, ororderBy. The default is 200.- Too Low: We might run out of memory on a single executor because too much data lands in one partition.
- Too High: We create too many small files, and the driver wastes time scheduling thousands of tiny tasks (overhead).
- Rule of Thumb: We set it based on our data size. For small data (\< 1GB), we set it to something low like 10-20. For massive data (TBs), keep it at 200 or higher.
.getOrCreate(): Checks if a session already exists and returns that; otherwise, it creates a new one. This prevents creating multiple sessions accidentally. In general, we stick to a singleSparkSession.
Let's use our newly created spark session (which we assigned to the spark variable) to check our Spark version:
print(f"Spark Version: {spark.version}")
Reading Data
As we mentioned earlier, we can use the session for more than just printing the spark version. One of those things is reading a CSV file.
df_csv = spark.read \
.csv(path)
The path variable could point to a single CSV file, like 'path/to/file.csv', or to a path containing multiple CSV files, like 'path/to/files/'. Additionally, glob syntax is also supported, like path/year=*/month=*/.
We can provide additional options to the spark.read command using the option method. For example, if we want to set the CSV's delimiter to ; instead of , (which is the default), we use:
df_csv = spark.read \
.option("delimiter", ";") \
.csv(path)
If we want to infer the schema of the data, we set inferSchema to True. If we want to set two or more options at the same time, we can either chain them:
df_csv = spark.read \
.option("delimiter", ";") \
.option("inferSchema", True) \
.csv(path)
Or use the options method:
df_csv = spark.read \
.options(delimiter=";", inferSchema=True) \
.csv(path)
Similarly, we can read a parquet file by using spark.read.parquet(path).
For our example, we can load the data like this:
path = './data/'
internet_df = spark.read \
.options(header="true", inferSchema=True) \
.csv(path)
To see if Spark did a good job with the schema inference, we write:
internet_df.printSchema()
Which should print:
root
|-- measurement_time: timestamp (nullable = true)
|-- connection_id: double (nullable = true)
|-- client_ip: string (nullable = true)
|-- measurement_id: integer (nullable = true)
|-- measured_downstream_mbps: double (nullable = true)
|-- measured_upstream_mbps: double (nullable = true)
|-- measured_rtt_msec: double (nullable = true)
|-- measured_loss_percentage: double (nullable = true)
|-- measured_jitter_msec: double (nullable = true)
|-- client_operating_system: string (nullable = true)
|-- client_operating_system_version: string (nullable = true)
|-- client_operating_system_architecture: string (nullable = true)
|-- ISP: string (nullable = true)
|-- contract_download_mbps: double (nullable = true)
|-- contract_upload_mbps: double (nullable = true)
|-- connection_postal_code: double (nullable = true)
|-- connection_municipality_id: double (nullable = true)
|-- connection_municipality: string (nullable = true)
|-- connection_regional_unit_id: double (nullable = true)
|-- connection_regional_unit: string (nullable = true)
|-- connection_periphery_id: double (nullable = true)
|-- connection_periphery: string (nullable = true)
Spark UI
Now that we have created a session, we can visit http://localhost:4040 (the second port we exposed in our Docker compose) to access the Spark UI.
Spark UI is a web interface built into Spark. It allows us to monitor the status of our application, debug performance bottlenecks, and visualize resource usage.
When we open the UI, we will see several tabs. Let's discuss some of them:
- Jobs: This tab lists all the Actions (like
.count(),.collect(),.saveAsTextFile()) our code triggered. We use it to check if our job has succeeded, failed, or is still running. - Stages: This is the most important tab for optimization. A Job is broken down into Stages based on dependencies (wide transformations like
groupByKeyorjoincreate new stages). We use it to find bottlenecks in our code.
Transformations vs Actions
Transformations
Up until now, we have read a file using Spark. However, if we check the Spark UI, we will notice that we see nothing in the Jobs tab. This might seem weird, but the methods we used thus far, such as read and option, are lazy.
In programming, lazy refers to a strategy called lazy evaluation, where the computation of a value is delayed until it is actually needed, rather than being executed immediately. This approach can improve efficiency and resource management by avoiding unnecessary calculations. In Spark, these operations are called Transformations.
Actions
On the other hand, there's eager evaluation. It is a method where expressions are evaluated immediately as they are defined, leading to faster execution for small datasets and simpler computations. In Spark, such operations are called Actions.
One of the most basic Actions is the .show() method, which will allow us to view the Spark DataFrame we read earlier.
# Assuming we have read some df earlier, i.e., df = spark.read.csv(path)
df.show()
If we now check the Spark UI, we will see that our job of showing the DataFrame will have been completed.
Spark DataFrame Methods
A quick note on best practices: When working with DataFrame columns in PySpark, it is generally safer to use F.col('column_name') rather than df.column_name. This prevents errors if dataframes are joined or overwritten later in your code.
We've already seen a Spark DataFrame method: show().
Another one is called select and works pretty much like its SQL equivalent, SELECT.
from pyspark.sql import functions as F
df.select(F.col('col_A'), F.col('col_B'), F.col('col_C'))
To select all rows, we use * just like in SQL:
df.select('*')
Additionally, filter is the equivalent of SQL's WHERE. For example, assuming we want to keep only the rows where col_A is equal to 'element_B', we write:
df.filter(F.col('col_A') == 'element_B')
Hence, writing:
df \
.select(F.col('col_A'), F.col('col_B'), F.col('col_C')) \
.filter(F.col('col_A') == 'element_B')
Would be the exact same as writing:
SELECT col_A, col_B, col_C
FROM some_table
WHERE col_A = 'element_B'
Another very useful method is withColumn, which adds or replaces a column in a DataFrame. For example, assuming that col_B contains some floats, and we want to multiply that column by the number 2, we write:
df \
.select(F.col('col_A'), F.col('col_B'), F.col('col_C')) \
.withColumn('col_B_but_multiplied_by_2', F.col('col_B') * 2)
In SQL, this would be:
SELECT
col_A,
col_B,
col_C,
col_B * 2 AS col_B_but_multiplied_by_2
FROM some_table
functions
What we just did was far too simple. If we want to do something a bit more complicated, Spark has us covered with its own library of defined functions. The usual convention for importing those Spark-defined functions is:
from pyspark.sql import functions as F
A very useful function that allows us to extract the date part from a timestamp is to_date. Let's use internet_df's 'measurement_time' column:
internet_df \
.withColumn('date_at', F.to_date(F.col('measurement_time')))
With the respective SQL code being:
SELECT
CAST(measurement_time AS DATE) AS date_at
FROM some_table
User Defined Functions (UDF)
Spark has an extensive library of defined functions, but often enough, we need to create our own.
Let's say we want to flag the data in the column 'measured_downstream_mbps' based on if it's a large number, say bigger than 100 Mbps. Let's create a Python function that will categorize a number based on that assumption.
def number_flagger(number):
if number > 100:
return True
else:
return False
Then, we register the Python function as a Spark UDF:
from pyspark.sql.types import BooleanType
number_flagger_udf = F.udf(number_flagger, returnType=BooleanType())
In order to specify a udf, we need to also specify its output type in Spark. Since the result of our function is a Boolean, we import BooleanType from Spark and set returnType to be equal to BooleanType().
Then, we just use it:
internet_df \
.withColumn('is_fast', number_flagger_udf(F.col('measured_downstream_mbps')))
This will create a new column called is_fast filled with True or False depending on the value of the respective row of measured_downstream_mbps.
This is one of the main advantages of Spark. The number_flagger function could have been very easy to implement in SQL; however, this isn't the case for more complex custom logic, such as performing machine learning inferences row-by-row.
SQL with Spark
If someone is more familiar with SQL, they can use SQL directly within Spark. To do that, we first have to define a temporary view of a DataFrame using the createOrReplaceTempView method.
df.createOrReplaceTempView('internet_table')
Then, we can use the sql function to run our SQL code. Because sql is a lazy transformation, we also need to append an action like .show() to execute it and view the results.
spark.sql("""
SELECT
connection_periphery,
AVG(measured_downstream_mbps) as avg_downstream,
AVG(measured_upstream_mbps)
FROM internet_table
GROUP BY connection_periphery
ORDER BY avg_downstream DESC
""").show()
Of course, if we don't want to use SQL, we can use Spark's groupBy and agg methods to achieve the exact same results natively in Python:
df \
.groupBy('connection_periphery') \
.agg(
F.avg("measured_downstream_mbps").alias("avg_downstream"),
F.avg("measured_upstream_mbps")
) \
.sort('avg_downstream', ascending=False) \
.show()
Stopping the Session
When we're done, we stop the session to release the resources back to our cluster.
spark.stop()