Apache Spark

Architecture

Spark execution components

Driver

  • The process running the main() function of the application and creating the SparkContext
  • Contains the SparkContext object
  • Each application gets its own executor processes, which stay up for the duration of the whole application and run tasks in multiple threads.
  • Data cannot be shared across different Spark applications (instances of SparkContext), meaning no inter-process sharing.
  • Because the driver schedules tasks on the cluster, it should be run close to the worker nodes, preferably on the same local area network to minimize network latency between the drivers and the executors.

Driver - Deploy mode

  • Client (default)

    locally as an external client

  • Cluster

    deployed on the worker nodes

Cluster Manager

  • Types

    • Standalone
    • Hadoop YARN
    • Kubernetes
  • Master URL

    • Kubernetes

      k8s://https://${k8s_apiserver_host}:${k8s_apiserver_port}

Playground

Code

Deployment

Docker

Resources

AWS Prescriptive Guidance - Key topics in Apache Spark (opens in a new tab)

References

Reference - Apache Spark JIRA (opens in a new tab)

Reference - Spark - ScalaDoc (opens in a new tab)

Reference - Spark - Configuration (opens in a new tab)

PySpark

Reference - PySpark API Reference (opens in a new tab)

Reference - PySpark - Examples (opens in a new tab)

PySpark - Python Spark Connect Client

PySpark - Spark SQL and DataFrames

Spark SQL, DataFrames and Datasets Guide (opens in a new tab)

PySpark - Spark SQL API Reference (opens in a new tab)

PySpark - Pandas API on Spark

PySpark - Structured Streaming (opens in a new tab)

PySpark - Machine Learning (MLlib)

PySpark - Spark Core and RDDs

PySpark - Compatibility Matrix

Spark Python Supportability Matrix

Spark SQL

Spark SQL - Built-in Functions (opens in a new tab)

Global Temporary View (opens in a new tab)

Structured Streaming (opens in a new tab)

Input Sources

  • Built-in sources

    • File source
    • Kafka source
    • Socket source (testing)
    • Rate source (testing)
    • Rate per Micro-Batch source (testing)

Stateful streaming (opens in a new tab)

  • A stateful Structured Streaming query requires incremental updates to intermediate state information.

    • Streaming aggregation
    • Streaming dropDuplicates
    • Stream-stream joins
    • Custom stateful applications
  • A stateless Structured Streaming query only tracks information about which rows have been processed from the source to the sink.

Output mode (opens in a new tab)

  • Only stateful streams containing aggregations require an output mode configuration.
  • For stateless streaming, all output modes behave the same.

Append mode (default)

Only the new rows added to the Result Table since the last trigger will be outputted to the sink.

Complete mode

The whole Result Table will be outputted to the sink after every trigger. This is supported for aggregation queries.

Update mode

Only the rows in the Result Table that were updated since the last trigger will be outputted to the sink.

Streaming Joins (opens in a new tab)

Stream-Static Joins (opens in a new tab)

Inner joins
Left outer joins (left side is a streaming DataFrame)
Right outer joins (right side is a streaming DataFrame)

Stream-Stream Joins (opens in a new tab)

Trigger intervals (opens in a new tab)

Unspecified (default)

The query will be executed in micro-batch mode, where micro-batches will be generated as soon as the previous micro-batch has completed processing.

Fixed interval micro-batches

  • Exactly-once guarantees
  • By default, ProcessingTime=500ms
.trigger(processingTime='2 seconds')

Available-now micro-batch

  • Exactly-once guarantees
  • Replaces One-time micro-batch (.trigger(once=True))
  • Consumes all available records in multiple micro-batches with configurable batch size
# Deprecated
.trigger(once=True)
 
# Recommended
.trigger(availableNow=True)

Continuous with fixed checkpoint interval

  • Continuously process data as it arrives
  • Low (~1 ms) end-to-end latency with at-least-once fault-tolerance guarantees.
.trigger(continuous='1 second')

Checkpoints

  • Checkpoints and write-ahead logs work together to provide fault tolerance.

  • checkpoint tracks the information that identifies the query, including state information and processed records.

  • Each query must have a different checkpoint location.

  • Specifying checkpointLocation option will enable checkpointing.

    (df.writeStream
      .option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
      .toTable("catalog.schema.table")
    )

Watermarking

  • Gist

    • Allow Spark to understand when to close the aggregate window and produce the correct aggregate result.

    • Spark waits to close and output the windowed aggregation once the max event time seen minus the specified watermark is greater than the upper bound of the window.

    • Watermarks can only be used when you are running your streaming application in append or update output modes.

    • You define a watermark by specifying a timestamp field and a value representing the time threshold for late data to arrive.

      WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES
  • Feature Deep Dive: Watermarking in Apache Spark Structured Streaming (opens in a new tab)

Spark Declarative Pipelines (SDP)

SDP - Flows

  • A flow is a transformation relationship from source to target dataset.
  • Supports both streaming and batch semantics
  • SDP shares the same streaming flow type (Append, Update, Complete) as Spark Structured Streaming.

SDP - Datasets

Streaming Table

  • Incremental processing to process only new data as it arrives

Materialized View

  • Precomputed
  • Always has exactly one batch flow writing to it

Temporary View

SDP - Pipelines

  • A pipeline can contain one or more flows, streaming tables, and materialized views.

SDP - Pipeline Projects

  • A pipeline project is a set of source files that contain code that define the datasets and flows that make up a pipeline.
  • Source files can be .py or .sql files.
  • Pipeline spec file by default is called pipeline.yml.

SDP - Python

  • SDP evaluates the code that defines a pipeline multiple times during planning and pipeline runs. Python functions that define datasets should include only the code required to define the table or view.
  • The function used to define a dataset must return a Spark DataFrame.
  • Never use methods that save or write to files or tables as part of your SDP dataset code.

SDP - SQL

  • The PIVOT clause is not supported in SDP SQL.
  • When using the for loop pattern to define datasets in Python, ensure that the list of values passed to the for loop is always additive.

Kubernetes

Kubernetes - Kubeflow Spark Operator (opens in a new tab)

Local set up

  1. Install a k8s cluster (k3d, minikube, kind, etc.)

  2. Install Helm

  3. Add Helm Repo

    helm repo add spark-operator https://kubeflow.github.io/spark-operator
    helm repo update
  4. Install Spark Operator chart

    helm install $release spark-operator/spark-operator \
      --namespace $namespace \
      --create-namespace
  5. Install a service account for Spark Operator

    kubectl apply -f https://github.com/kubeflow/spark-operator/blob/master/config/rbac/spark-application-rbac.yaml
  6. Run a Spark application as demo

    Ensure the name of the service account created in the previous step is used here.

    kubectl apply -f https://github.com/kubeflow/spark-operator/blob/master/examples/spark-pi.yaml
  7. Check the Spark application status

    kubectl get sparkapp $spark_app_name -A

Kubernetes - Apache Spark Kubernetes Operator (opens in a new tab)

  • Official project
  • Younger project compared to Kubeflow

Catalog API

Performance Tuning

Join Strategies

BROADCAST (Broadcast Join)

  • Use cases

    • When each key within the smaller and larger data sets is hashed to the same partition by Spark
    • When one data set is much smaller than the other (and within the default config of 10 MB, or more if you have sufficient memory)
    • When you only want to perform an equi-join, to combine 2 data sets based on matching unsorted keys

MERGE (Shuffle Sort Merge Join)

  • Aliases

    • SHUFFLE_MERGE
    • MERGEJOIN

SHUFFLE_HASH (Shuffle Hash Join)

SHUFFLE_REPLICATE_NL (Shuffle-and-replicate Nested Loop Join)

Bucketing

Spark UI

Symptoms

Symptoms - Spill

  • Spill is what happens when Spark runs low on memory. It starts to move data from memory to disk, and this can be quite expensive.
  • It is most common during data shuffling.

Symptoms - Skew

  • Skew is when one or just a few tasks take much longer than the rest.
  • This results in poor cluster utilization and longer jobs.

UDF

UDF - SQL (opens in a new tab)

-- Example
CREATE OR REPLACE FUNCTION my_catalog.my_schema.calculate_bmi(weight DOUBLE, height DOUBLE)
RETURNS DOUBLE
LANGUAGE SQL
RETURN
SELECT weight / (height * height);

Optimizations

Partitioning

FEATUREREPARTITIONCOALESCE
DescriptionAdjusts the number of partitions, redistributing data across the specified number of partitions.Reduces the number of partitions without shuffling data, merging existing partitions.
Full ShuffleYesNo
ExpensivenessCan be expensive, especially for large datasets, as it involves a full shuffle of data.Less expensive than repartitioning, as it minimizes data movement by only combining partitions when possible.
Data MovementDistributes data across partitions evenly, potentially leading to balanced partition sizes.May result in imbalanced partition sizes, especially when reducing the number of partitions.
Use CasesUseful when changing the number of partitions or evenly distributing data across partitions.Useful when decreasing the number of partitions without incurring the cost of a full shuffle.

Repartition

  • Redistributes data across a specified number of partitions.
  • Very expensive operations as it shuffles the data across many partitions; hence, try to minimize using these as much as possible.
  • When you call repartition(n), where n is the desired number of partitions, Spark reshuffles the data in the RDD into exactly n partitions.
  • If you increase/decrease the number of partitions using repartition(), Spark will perform a full shuffle of the data across the cluster, which can be an expensive operation, especially for large datasets.

Coalesce

  • Reduces the number of partitions without shuffling data across the cluster.
  • When you call coalesce(n), where n is the desired number of partitions, Spark merges existing partitions to create n partitions.

Apache Gluten

  • Gist

    • Transform Spark’s whole stage physical plan to Substrait plan and send to native
    • Offload performance-critical data processing to native library
    • Define clear JNI interfaces for native libraries
    • Switch available native backends easily
    • Reuse Spark’s distributed control flow
    • Manage data sharing between JVM and native
    • Extensible to support more native accelerators