Delta Lake

Set up

To set up a Python project (for example, for unit testing), you can install Delta Lake using pip install delta-spark and then configure the SparkSession with the configure_spark_with_delta_pip() utility function in Delta Lake:

from delta import *
 
builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
spark = configure_spark_with_delta_pip(builder).getOrCreate()

Design

  • File listing

    • Store the paths to Parquet files in the transaction log to avoid performing an expensive file listing. Doesn’t need to list all Parquet files in the cloud object store to fetch their paths.
  • Small file problem

    • Data processing engines don’t perform well when reading datasets with many small files. You typically want files that are between 64 MB and 1 GB. You don't want tiny 1 KB files that require excessive I/O overhead.
  • ACID transactions

  • Schema-on-write enforcement

    Schema validation before allowing a write transaction to occur.

  • Schema evolution

    • Permissive approach

    • You can set the mergeSchema option to true to write to a Delta table and enable data with a mismatched schema to be appended.

    • New columns would be added with null values populated.

    • New rows would be added with null values populated.

Transaction Logs

Transaction Protocol

  • Transaction log is the single source of truth, any client who wants to read or write to a Delta table must first query the transaction log.
  • Transaction log is an append-only JSONL format WAL, and it provides ACID transaction guarantees.

Checkpoint

  • Checkpoint files save the entire state of the table at a point in time - in native Parquet format that is quick and easy for Spark to read.
  • Avoid the need to process from the beginning of transaction logs to restore the state.

Optimistic Concurrency Control

  • Gist

    • Assume most of the time there are not conflicts or conflicts are rare.
    • Best-effort auto merging
    • Decline commit if merging fails
  • Benefits

    • Reduce disruption and user intervention
  • Process

    • Record the starting table version.
    • Record reads/writes.
    • Attempt a commit.
    • If someone else wins, check whether anything you read has changed.
    • Repeat.

Time Travel / Data Versioning

Any table state can be reproduced by replaying the transaction logs.

Data Lineage

Data Layout

Operation - OPTIMIZE (opens in a new tab)

  • Solution

    Suppose you have a dataset with 10,000 small files that are slow to query. You can compact these 10,000 small files into a dataset with 100 right-sized files.

  • Gist

    • Data practitioners will commonly want to compact the small files into larger files with a process referred to as small file compaction or bin-packing.
    • The optimization process is idempotent.
    • Small file compaction doesn’t help much when the dataset is relatively small.
    • By default, Delta Lake targets 1 GB files when OPTIMIZE is run.
    • Configurable with spark.databricks.delta.optimize.maxFileSize
    • Specify predicates to only compact a subset of data to avoid processing data already been compacted.
  • Delta Lake Small File Compaction with OPTIMIZE (opens in a new tab)

Operation - Z Order

  • Example

    OPTIMIZE events
    WHERE date >= current_timestamp() - INTERVAL 1 day
    ZORDER BY (eventType)
  • Gist

    • Z Ordering your data reorganizes the data in storage and allows certain queries to read less data, so they run faster.
    • If you expect a column to be commonly used in query predicates and if that column has high cardinality (that is, a large number of distinct values), then use ZORDER BY.
    • You can also Z Order a Delta table on multiple columns. Z Ordering the table by id1 and id2 helps queries that filter on id1, id2, and both id1 and id2.
    • To be replaced by liquid clustering (opens in a new tab)
  • Delta Lake Z Order (opens in a new tab)

Liquid Clustering (opens in a new tab)

  • Databricks - Use liquid clustering for tables (opens in a new tab)

  • Gist

    • Replaces table partitioning and ZORDER

    • Applies to both streaming tables and materialized views

    • You can redefine clustering keys without rewriting existing data.

    • Liquid clustering is incremental. (only new data needs to be clustered)

    • To enable liquid clustering, add the CLUSTER BY phrase to a table creation statement.

      CREATE TABLE table1(col0 INT, col1 string) CLUSTER BY (col0);
    • Automatic liquid clustering allows Databricks to intelligently choose clustering keys to optimize query performance, using the CLUSTER BY AUTO clause.

    • Liquid clustering is not compatible with Hive-style partitioning and Z-ordering. You may want to avoid liquid clustering if downstream systems require Hive-style partitioning.

    • You can specify up to 4 clustering columns per Delta table.

    • Your clustering columns need to be columns with statistics collected in the Delta logs.

    • You can manually trigger a liquid clustering operation using the OPTIMIZE command.

    • When you change the clustering columns, all new data writes and OPTIMIZE operations will follow the new clustering columns. Existing data is not rewritten.

  • Use cases

    • Tables that are often filtered by high cardinality columns.
    • Tables that have skew in data distribution.
    • Tables that grow quickly and require maintenance and tuning effort.
    • Tables that have concurrent write requirements.
    • Tables that have access patterns that change over time.
    • Tables where a typical partition key could leave the table with too many or too few partitions.

Operation - VACUUM

  • You can remove data files no longer referenced by a Delta table that are older than the retention threshold by running the VACUUM command on the table.

  • Default retention period is 7 days.

  • You can't roll back the Delta Lake to a version that’s farther back than the retention period if you’ve already run a VACUUM command. That’s why you need to be careful before vacuuming your Delta Lake.

  • Resources

Partition

Schema Enforcement

  • Validation

    • Cannot contain any additional columns that are not present in the target table's schema.
    • Cannot have column data types that differ from the column data types in the target table.
    • Can not contain column names that differ only by case.
  • Use cases

    • Schema evolution can be used anytime you intend to change the schema of your table (as opposed to where you accidentally added columns to your DataFrame that shouldn't be there). It's the easiest way to migrate your schema because it automatically adds the correct column names and data types, without having to declare them explicitly.

Logical column names

Delta Lake abstracted the concept of physical column names and logical column names. The physical column name is the actual column name in the Parquet file. The logical column name is the column name humans use when referencing the column.

Delta Lake lets users quickly rename columns by changing the logical column name, a pure-metadata operation. It’s just a simple entry in the Delta transaction log.

Dropping columns

You can add an entry to the Delta transaction log and instruct Delta to ignore columns on future operations - it’s a pure metadata operation.

Change Data Feed (CDF) (opens in a new tab)

Delta Lake feature compatibility and protocols (opens in a new tab)

Compatibility with Apache Spark (opens in a new tab)

Delta Lake versionApache Spark version
4.0.x4.0.x
3.3.x3.5.x
3.2.x3.5.x
3.1.x3.5.x
3.0.x3.5.x
2.4.x3.4.x
2.3.x3.3.x
2.2.x3.3.x
2.1.x3.3.x
2.0.x3.2.x
1.2.x3.2.x
1.1.x3.2.x
1.0.x3.1.x
0.7.x and 0.8.x3.0.x
Below 0.7.02.4.2 - 2.4.<latest>

Cheatsheet

Tables - Show table details (opens in a new tab)

Detail Schema (opens in a new tab)

DESC[RIBE] DETAIL @table_name

Tables - Show schema

DESC[RIBE] @table_name

Tables - Show schema

Tables - Show schema and detailed table info

DESC[RIBE] EXTENDED @table_name

Tables - Show schema and table properties

Tables - Shows information for all tables matching the given regular expression

SHOW TABLES EXTENDED IN @catalog_name LIKE @regex

Includes basic table information and file system information

Tables - Shows information for all tables matching the given regular expression

Tables - Show transaction logs

DESC HISTORY @table_name;

Tables - Retrieve a specific version of table

SELECT * FROM @table_name VERSION AS OF @version 

or

SELECT * FROM @table_name VERSION@v<version>

Tables - Restore to a previous version of table / Time travel a table

RESTORE TABLE @table_name TO VERSION AS OF @version

Tables - Vacuum with a specified retention period

Example:

VACUUM @table_name RETAIN 0 HOURS

Tables - List all tables in default schema

SHOW TABLES;

Tables - List all tables in the specified schema

SHOW TABLES IN @schema_name;

Tables - List all tables in all catalogs

SHOW TABLES IN ALL CATALOGS;

Tables - Create an external table

When creating an external table, you specify the location of the data files using the LOCATION keyword:

CREATE TABLE @table_name LOCATION @path

Since the metastore does not own the underlying data files, dropping an external table only removes the metadata associated with the table, leaving its data files intact.

Tables - Clone

  • Delta Lake - Delta Lake Clone (opens in a new tab)

  • Understanding CLONE Functionality in Databricks for Delta Tables (opens in a new tab)

  • Gist

    • Clone tables have their independent and new transaction logs, so any changes made to either deep or shallow clones affect only the clones themselves and not the source table.
    • Can only make clones
      • From Managed Table to Managed Table
      • From External Table to External Table
      • Error otherwise
    • Clones are only statically sourced from a specific table version.
  • Use cases

    • Data alignment between environments/teams.
    • Editable copy of production data without copying the data (shallow clone).
    • Disaster recovery: You can clone your table to a table in another cloud region after finishing a transaction(s).
    • Test your pipeline on production data: You can shallow clone your entire production data to a test environment and test your pipeline before release.
    • ML experiments on dev environment over a snapshot of production data.

Tables - Clone - Deep

  • Copies the source table data to the clone target in addition to the metadata of the existing table.
  • Deep clones do not depend on the source from which they were cloned, but are expensive to create because a deep clone copies the data as well as the metadata.
  • Stream metadata is also cloned for smooth migration from source table to the clone table.

Tables - Clone - Shallow

  • Does not copy data files, and reference data files in the source directory.
  • If you run VACUUM against the source table and the files are out of the retention period of the source table but are still referenced by other clones, only the files that are not needed for either the source table or any clone will be removed.
  • Shallow clone support for Unity Catalog allows you to create tables with access control privileges independent from their parent tables without needing to copy underlying data files.

Delta Sharing (opens in a new tab)

  • Gist

    • Share

      A share is a logical grouping to share with recipients. A share can be shared with one or multiple recipients. A recipient can access all resources in a share. A share may contain multiple schemas.

    • Schema

      A schema is a logical grouping of tables. A schema may contain multiple tables.

    • Table

      A table is a Delta Lake table or a view on top of a Delta Lake table.

    • Recipient

      A principal that has a bearer token to access shared tables.

    • Sharing Server

      A server that implements this protocol.

  • Resources

Delta Sharing - REST API Protocol (opens in a new tab)

Delta Sharing - Databricks-to-Databricks Sharing

Delta Sharing - Delta Sharing open sharing protocol (for providers)