Understanding Delta Lake's consistency model

A few days ago I released my analysis of Apache Hudi’s consistency model, with the help of a TLA+ specification. This post will do the same for Delta Lake. Just like the Hudi post, I will not comment on subjects such as performance, efficiency or how use cases such as batch and streaming are supported. This post focuses solely on the consistency model using a logical model of the core Delta Lake protocol.

The basics

Delta Lake is one of the big three table formats that provide a table abstraction over object storage. Like Apache Hudi, it consists of a write-ahead-log, and a set of data files (usually parquet). The log in Delta Lake is called the delta log.

The delta log is a write-ahead-log that consists of metadata about the transactions that have occurred, including the data files that were added and removed in each transaction. Using this transaction log, a reader or writer can build up a snapshot of all the data files that make up the table at a given version. If a data file is not referenced from the delta log, then it is unreadable. The very basic idea behind how Delta Lake read and write operations are:

  • Writers write data files (Parquet usually) and commit those files by writing a log entry to the delta log that includes the files it added and logically removed.

  • Readers scan the delta log to find out the latest snapshot of data files that exist, and then read those files to satisfy queries.

ACID transactional guarantees

Delta Lake states that it supports ACID transactional guarantees, scoped per table. 

Delta Lake achieves atomicity by committing all file changes in a single write to the delta log. Delta Lake does not update files in place, nor overwrite them. All changes happen via adding files and by noting which files are logically removed via the delta log. If a Delta Lake writer failed midway through the transaction, after it had written new data files, those files would remain unreadable because the corresponding log entry file was never written to the delta log.

Durability is achieved as all writes go to a redundant storage service such as cloud object storage. Cloud object storage these days offers strong consistency.

The remainder of this analysis will focus on Consistency and Isolation as the remaining properties of ACID to understand and verify. In terms of isolation, Delta Lake states it supports serializable isolation for writes and snapshot isolation for reads. Reads can also reach serializable isolation if the read itself goes through the log, however, this analysis will focus on snapshot isolation for reads. This isolation is achieved via multi-version concurrency control (MVCC). Delta Lake does not modify files in place, which means that until a version is cleaned, its files remain in place and changes are materialized as new versions of the files.

The delta log and data files

All operations go through the delta log and Delta Lake calls these Actions. A few notable actions include (but not limited to):

  • Add/remove files.

  • Add CDC files.

  • Change metadata.

In this analysis we’ll only be looking at the add/remove action, which covers both regular writes and also background jobs like compaction and cleaning (which are not part of the Delta Lake protocol explicitly).

Each delta log entry is a JSON file with a zero-left-padded integer file name which corresponds to the ordinal position in the log and the version number of the table that the entry represents.

If the delta log had only ever had 3 entries written to it, it would contain the files: 

  • ./_delta_log/00000000000000000000.json. 

  • ./_delta_log/00000000000000000001.json. 

  • ./_delta_log/00000000000000000002.json. 

The log entry ids are monotonically increasing by one. The delta log is not actually a software component, but a “directory” in object storage. The ordering of the log is via a lexicographical sort in the Delta Lake client that reads all the files of the directory. Delta Lake supports snapshot isolation for reads which means that readers perform queries at a given table version. Each delta log entry represents a new version of the table.

So that readers and writers don’t have to read the entire log, which can grow long, writers regularly perform checkpoints which roll up the current snapshot of data files and table metadata into a single parquet file. That way readers and writers only need to read, at most, the latest checkpoint and a few new delta log entry files. The rest of this analysis will ignore checkpointing, but just be aware that it exists and is a key component of the design.

Each add/remove action log entry includes a set of files that were added and a set of files that were removed. Delta Lake offers both copy-on-write (COW) and merge-on-read (MOR). We’ll look at COW first then MOR.

Copy-on-write (COW)

Copy-on-write refers to the behavior that mutations to a data file do not affect the original data file but cause a new file to be written with the changes applied. For example, if a data file has a billion rows and one row must be deleted or updated, then a new version of the data file is written with this change applied.

Example: a table with two columns “color” and “count”.

The delta log above shows how a reader or writer can learn of the current snapshot of files that constitute the table by reading the log in ascending order and noting the files which are added and removed. In this very simple example, the snapshot contains up to 1 file, but in a real table it could have thousands, or even millions of files if the table were large enough.

Merge-on-read

Copy-on-write can be inefficient for workloads that perform a lot of deletions and updates. Just deleting or updating one row from a multi-gigabyte parquet file causes the entire file to be rewritten with the single row change applied. Merge-on-read is a different approach that adds new files to invalidate specific rows in existing data files that have been deleted or updated. These additional files are known as deletion vector (DV) files. This avoids the need to rewrite whole data files in the write path but adds the cost of merging data files with deletion vectors on reading. 

When a reader loads a data file that has an associated DV file, it skips all rows in the data file that have a deletion vector. When a row of a data file is deleted, the row is referenced from a newly written deletion vector file. When a row is updated, a new data file can be written with the new row values, and a deletion vector added to invalidate the row in the original data file.

With merge-on-read, the added and removed files are uniquely identified with the combination of the filename and the DV file name. If no DV file exists yet, then the value of Null is used.

I will often refer to data files as file 1 and file 2 etc, but in the real-world filenames include among other things, the partition, a UUID and the column; such as part-00000-3935a07c-416b-4344-ad97-2a38342ee2fc.c000.snappy.parquet.

A simple logical model of the write path

If we ignore concurrency control for now, the steps of a write in our logical model can be condensed down to 4 steps. This simplified model applies to both COW and MOR tables, though the specifics of steps 2 and 3 will vary. We’ll look at concurrency control in some detail further down.

Copy-on-write example

Example:

  1. Txn 1. Insert rows: [(“red”, 1),(“green”, 1),(“blue”, 1)].

    1. Load delta log, version at 0.

    2. No files to read.

    3. Create a new data file datafile1.parquet with those rows.

    4. Write the log entry 00001.json to the delta log. Added files = [”datafile1.parquet”], removed files = [].

  2. Txn 2. Delete row where Color=”green”.

    1. Load delta log, version at 1.

    2. Read datafile1.parquet. Delete the row in memory.

    3. Write a new parquet file datafile2.parquet with the rows  [(“red”, 1),(“blue”, 1)].

    4. Write the log entry 00002.json to the delta log. Added files = [”datafile2.parquet”], removed files = [”datafile1.parquet”].

  3. Txn 3. Update the blue row with a count of 2

    1. Load delta log, version at 2.

    2. Reads datafile2.parquet. Apply the update to the blue row in memory.

    3. Write datafile3.parquet with rows [(“red”, 1),(“blue”, 2)]

    4. Write the log entry 00003.json to the delta log. Added files = [”datafile3.parquet”], removed files = [”datafile2.parquet”].

  4. Txn 4. Insert two new rows, [(“cyan”, 1),(“magenta”, 2)]

    1. Load delta log, version at 3.

    2. Will create a new file, so no read necessary.

    3. Write these rows to a new data file datafile4.parquet.

    4. Write the log entry 00004.json to the delta log. Added files = [”datafile4.parquet”], removed files = [].

A simplified delta log structure would look as follows:

00001.json
{
	“Added”: [“datafile1.parquet”],
	“Removed”: []
}
00002.json
{
	“Added”: [“datafile2.parquet”],
	“Removed”: [“datafile1.parquet”]
}
00003.json
{
	“Added”: [“datafile3.parquet”],
	“Removed”: [“datafile2.parquet”]
}
00004.json
{
	“Added”: [“datafile4.parquet”],
	“Removed”: []
}

Merge-on-read example

Using the same example as before, but with deletion vectors enabled.

  1. Txn 1. Insert rows: [(“red”, 1),(“green”, 1),(“blue”, 1)].

    1. Load delta log, version at 0.

    2. No files to read.

    3. Create a new data file datafile1.parquet with those rows.

    4. Write the log entry 00001.json to the delta log. Added files = [(”datafile1.parquet”, NULL)], removed files = [].

  2. Txn 2. Delete row where Color=”green”.

    1. Load delta log, version at 1.

    2. Read datafile1.parquet and find the green row.

    3. Write a new DV file dv1.bin with contents: (datafile1.parquet, rows: [2]) which identifies row 2 as invalidated.

    4. Write the log entry 00002.json to the delta log. Added files = [(”datafile1.parquet”, “dv1.bin”)], removed files = [(”datafile1.parquet”, NULL)].

  3. Txn 3. Update the blue row with a count of 2

    1. Load delta log, version at 2.

    2. Reads datafile1.parquet and dv2.bin. Find the blue row.

    3. Write a new data file datafile2.parquet with rows [(“blue”, 2)]. Write a new FV file dv2.bin with contents: (datafile1.parquet, rows: [2, 3]) which identifies rows 2 and 3 as invalidated.

    4. Write the log entry 00003.json to the delta log. Added files = [(”datafile2.parquet”, NULL),(”datafile1.parquet”, ““dv2.bin”)], removed files = [(”datafile1.parquet”, “dv1.bin”)].

  4. Txn 4. Insert two new rows, [(“cyan”, 1),(“magenta”, 2)]

    1. Load delta log, version at 3.

    2. Will create a new file, so no read necessary.

    3. Write these rows to a new data file datafile3.parquet.

    4. Write the log entry 00004.json to the delta log. Added files = [(”datafile3.parquet”, NULL)], removed files = [].

A simplified delta log structure would look as follows:

00001.json
{
	“Added”: [(“datafile1.parquet”, NULL)],
	“Removed”: []
}
00002.json
{
	“Added”: [(“datafile1.parquet”,“dv1.bin”)],
	“Removed”: [(“datafile1.parquet”, NULL)]
}
00003.json
{
	“Added”: [(“datafile1.parquet”,“dv2.bin”),
               (“datafile2.parquet”, NULL)],
	“Removed”: [(“datafile1.parquet”,“dv1.bin”)]
}
00004.json
{
	“Added”: [“datafile3.parquet”, NULL)],
	“Removed”: []
}

Concurrency control

Delta Lake supports multiple concurrent writers with the support of optimistic concurrency control (OCC). There are two broad approaches to OCC with Delta Lake:

  1. PutIfAbsent storage: Many object storage services allow a client to only write an object if it does not already exist - hence PutIfAbsent. Amazon S3 is one notable holdout that still does not offer any conditional put logic.

  2. Writer coordination: When the storage service does not support PutIfAbsent, an additional coordination step is required. S3 for example, does not support PutIfAbsent at the time of writing.

The basic principle of OCC in Delta Lake, no matter which of the two approaches is used, is to avoid overwriting delta log entry files. As we saw with Apache Hudi, overwriting log files leads to lost inserts/updates/deletes.

Let’s look at any example without any kind of concurrency control available. In the following sequence, two concurrent writers load the same delta log, write different data files and ultimately, write to the same delta log entry. The second operation to write to the delta log overwrites the log file of the first.

This results in an orphaned data file that was committed by operation #1.

If the storage service had failed the delta log write of operation 2, data loss would have been prevented. Likewise, if a table lock had been used to create a critical section over the right steps, this could have been avoided.

OCC approach 1: PutIfAbsent storage

When using an object store such as Google Cloud Storage or Azure Data Lake Storage, no locking is required to correctly handle concurrent writers.

In step 4 of our simple Delta Lake model, the write operation uses PutIfAbsent. If the write fails then we have a table version conflict - another writer wrote that version first. The writer now knows there is a version conflict and must now determine if there is a data conflict. Delta Lake allows concurrent writes to succeed if they touch disjoint sets of partitions. This is similar to Apache Hudi, except that Delta Lake is more coarse grained as partitions are larger than data files.

The data conflict check involves reloading the timeline and scanning the entry log files that are equal to or higher than the commit version of the writer, to see if any of the added or removed files touch the same partitions as this writer’s transaction. If it finds any such log entries, then the writer must abort the transaction, else the writer can attempt to repeat the delta log write. To repeat it, it must change its commit version to the current table version (the version of the last delta log entry) + 1.

OCC approach 2: Writer coordination 

When PutIfAbsent is not supported, then some kind of writer coordination is required. This simple Delta Lake model uses a table lock.

As before, the data files are written optimistically. However, before writing the delta log entry file, the writer acquires the table lock. Once acquired, it reloads the delta log to check for a version conflict (a writer already wrote the commit version of the writer). If the writer finds an log entry file with the commit version it intended to use, then it now performs a data conflict check. This is exactly the same as in the PutIfAbsent approach. It scans the loaded delta log entries that intersect with the partitions it is trying to make changes to. If a data conflict is found, the writer must abort, but if no data conflict exists, it resets its commit version to the table version + 1 and writes the delta log entry file successfully this time. 

One other point to note is that only delta log files can get overwritten if no concurrency control mechanism is used or available. Data files cannot be overwritten as the Delta Lake protocol calls for file names to be unique. 

Data files MUST be uniquely named and MUST NOT be overwritten. The reference implementation uses a GUID in the name to ensure this property.Delta Log PROTOCOL.md.

A simple logical model of the read path

A reader must first load the delta log in order to build a snapshot of the data files (and deletion vectors if using MOR). Next it can do file pruning based on column statistics and finally read the data files themselves.

TLA+ specification and model checking results

The TLA+ specification uses a simple two column table, with column1 and column2. The workload treats column1 as a non-unique row identifier - so deletes and updates use “where column1=value”. Partitioning can be enabled and uses column1 as the partitioning column. This workload makes the data model for the linearized history checker simple.

The TLA+ specification has the following parameters:

  • Writers. E.g. {w1, w2}.

  • Column1Values. Any set of strings, or numbers or model values. Example from the earlier examples: {Red, Green, Blue}.

  • Column2Values. Any set of strings, or numbers or model values. Example from the earlier examples: {1, 2}

  • PartitionByColumn1: True/False. When set to FALSE, a version conflict always causes a data conflict. When set to TRUE, a data conflict only occurs when another operation touched the same partition.

  • PutIfAbsentSupport.

    • True = Log entry files cannot be overwritten (and will cause the writer to abort on trying).

    • False = Log entry files can be silently overwritten.

  • WriterCoordination

    • True = Includes the extra step to acquire the table lock and perform a conflict check before writing the log entry.

    • False = No extra coordination step used.

The next state formula of the TLA+ specification has 6 actions. If we ignore the writer failure action, Copy-on-write with PutIfAbsent only uses 4 actions:

  1. StartOperation. The delta log is loaded and the commit version recorded as the table version + 1.

  2. ReadDataFiles. Any relevant data files are loaded into memory.

  3. WriteDataFiles. The new files are optimistically written to the object store.

  4. TryCommitTxn. The writer attempts to write the delta log entry. If it fails, it performs the data conflict check.

When using writer coordination, the additional CoordinatePrepareWrite is included which acquires the table lock and performs the version and data conflict checks.

Merge-on-read is not modeled as it would complicate the specification and I don’t believe it adds any additional behavior that would jeopardize correctness.

The TLA+ specification uses the following approach to checking consistency:

  • On each commit (a successful write of a log entry file), the committed rows are recorded in a serialized history variable. This is used by the ConsistentRead invariant to compare table reads against the recorded history to ensure that both are consistent with each other.

  • The ConsistentRead invariant, in each step, checks that a read of each possible column1 value at each committed table version so far, returns the correct data. If a row with a given column1 value has not been written yet at a given version, then both the history and the table read should return no rows.

  • If the ConsistentRead invariant detects divergence between a table read and the linearized history, the model checker stops and prints an error trace.

An invariant is a property that must always be true in every reachable state - as soon as it is false it means we have violated safety. The ConsistentRead invariant is checked in every single reachable state. If, in a given state, the latest table version is 2, then the invariant compares the result of a read at each version (1-2) for each of the column1 values (“red”, “green”, “red”). If the table read does not match the expected (based on the recorded history) then the invariant is violated.

| Version | Col1 Value | Expected rows  | Table read values | Result |
| ------- | ---------- | -------------- | ----------------- | ------ |
| 1       | Red        | []             | []                | OK     |
| 1       | Blue       | [("blue", 1)]  | [("blue", 1)]     | OK     |
| 1       | Green      | [("green", 2)] | [("green", 2)]    | OK     |
| 2       | Red        | [("red", 2)]   | [("red", 2)]      | OK     |
| 2       | Blue       | [("blue", 1)]  | [("blue", 1)]     | OK     |
| 2       | Green      | [("green", 3)] | [("green", 2)]    | FAIL   |

The above example shows a violation of ConsistentRead, which could happen if OCC were not performed and two different operations succeeded in writing version 2.

To check for liveness issues, the specification ensures that once a transaction has started, it will always terminate either successfully, or be abandoned due to a conflict. 

Model checking was uneventful. When either PutIfAbsent was enabled or writer coordination was enabled, then no consistency issues were found. If both of these controls were disabled, then the log entry overwrite issue resulted in a consistency violation as expected.

Conclusions

This analysis has focused on the logical model of the core Delta Lake design (at the time of writing). There are many details left out: the specifics of file naming, file contents, other action types, delta log checkpoints and so on. However, I believe this work acts as a good mental model which can be used to explore deeper into Delta Lake internals. You can read the protocol description in the Delta Lake GitHub repository.

I’m satisfied that the core Delta Lake protocol provides ACID transactional guarantees in multi-writer scenarios, with writer serializable transactions and read snapshot isolation. You can review this Delta Lake TLA+ specification in my table-formats-tlaplus GitHub repository.


Related: