Slowly Changing Dimension Type 2 (SCD2)¶
Business Scenario¶
Customer attributes change over time — a status flip, a name correction, a tier upgrade. Reporting and compliance teams need to know what was true on a specific date, not just the latest snapshot. Without versioned history, point-in-time queries are impossible.
Value Proposition¶
- Preserve full history of every attribute change
- Track active vs. expired record versions with
is_currentandeffective_to - Enable time-travel analytics with zero custom code
Goals¶
- Load Snapshot 1 — seed the dimension table with initial records
- Load Snapshot 2 — detect changes, close old rows, open new versions
- Inspect the SCD2 output (both
is_current = trueand closed-out history rows)
Setup¶
# ── Setup ─────────────────────────────────────────────────────────────────────
# Single cell: installs lakelogic, clones the repo on Colab, resolves paths,
# and clears any previous output so the run is clean.
# Safe to re-run.
import importlib.util
import os
import sys
import shutil
import polars as pl
from pathlib import Path
# 1. Install lakelogic if missing
if importlib.util.find_spec("lakelogic") is None:
print("Installing lakelogic ...")
import subprocess
subprocess.run([sys.executable, "-m", "pip", "install", "lakelogic", "-q"], check=True)
print("lakelogic installed.")
else:
print("lakelogic ready.")
# 2. On Google Colab, clone the repo so YAML/CSV files are available
if "google.colab" in sys.modules:
repo = Path("/content/LakeLogic")
if not repo.exists():
print("Cloning LakeLogic repo ...")
import subprocess
subprocess.run(
[
"git",
"clone",
"--quiet",
"https://github.com/lakelogic/LakeLogic.git",
str(repo),
],
check=True,
)
example_dir = repo / "examples" / "02_core_patterns" / "scd2_dimension"
os.chdir(example_dir)
print(f"Working directory: {Path.cwd()}")
# 3. Path helper — resolves paths whether running locally or on Colab
def get_path(*parts: str) -> Path:
"""Return an absolute path relative to this notebook's directory."""
base = Path.cwd()
for candidate in [base] + list(base.parents):
target = candidate / "scd2_dimension" if candidate.name != "scd2_dimension" else candidate
if (target / "contract.yaml").exists():
return (target / Path(*parts)).resolve()
return (base / Path(*parts)).resolve()
# 4. Clear previous output so each full run starts clean
output_dir = get_path("output")
if output_dir.exists():
shutil.rmtree(output_dir)
print(f"Cleared: {output_dir}")
from lakelogic import DataProcessor
print("Setup complete.")
lakelogic ready. Cleared: D:\Github\_SaaS\lakelogic\examples\02_core_patterns\scd2_dimension\output Setup complete.
How It Works¶
LakeLogic's scd2 materialization strategy manages versioned dimension rows
automatically. You declare field names and run snapshots — LakeLogic handles the
effective_from, effective_to, and is_current bookkeeping.
SCD2 mechanics¶
Snapshot 1 (3 rows: Alice active, Bob active, Carla inactive)
|
v DataProcessor.run_source() + materialize()
|
+---> gold_dim_customers/data.csv
customer_id | name | status | updated_at | effective_to | is_current
1 | Alice | active | 2024-01-01 | 9999-12-31 | true
2 | Bob | active | 2024-01-01 | 9999-12-31 | true
3 | Carla | inactive | 2024-01-01 | 9999-12-31 | true
Snapshot 2 (3 rows: Bobby renamed, Carla flipped, Dan new)
|
v DataProcessor.run_source() + materialize()
| For each row that CHANGED vs. current dimension:
| - Close old version: set effective_to = new updated_at, is_current = false
| - Open new version: effective_to = 9999-12-31, is_current = true
| For NEW rows: insert with is_current = true
| For UNCHANGED rows: leave as-is
|
+---> gold_dim_customers/data.csv
customer_id | name | status | updated_at | effective_to | is_current
1 | Alice | active | 2024-01-01 | 9999-12-31 | true (unchanged)
2 | Bob | active | 2024-01-01 | 2024-02-01 | false (closed - name changed)
2 | Bobby | active | 2024-02-01 | 9999-12-31 | true (new version)
3 | Carla | inactive | 2024-01-01 | 2024-02-01 | false (closed - status changed)
3 | Carla | active | 2024-02-01 | 9999-12-31 | true (new version)
4 | Dan | active | 2024-02-01 | 9999-12-31 | true (new customer)
Key contract settings¶
| Setting | Value | What it does |
|---|---|---|
materialization.strategy |
scd2 |
Enables versioned history tracking |
scd2.effective_from_field |
updated_at |
Source field used as the version start date |
scd2.effective_to_field |
effective_to |
Added column: 9999-12-31 = still current |
scd2.current_flag_field |
is_current |
Added column: true = latest version |
primary_key |
customer_id |
Key used to match incoming rows to existing dimension |
quality: valid_status |
status IN ('active','inactive') |
Quarantines rows with unknown status |
Snapshot data¶
Snapshot 1 (data/dim_customers_snapshot1.csv):
customer_id,name,status,updated_at
1,Alice,active,2024-01-01
2,Bob,active,2024-01-01
3,Carla,inactive,2024-01-01
Snapshot 2 (data/dim_customers_snapshot2.csv):
customer_id,name,status,updated_at
2,Bobby,active,2024-02-01 ← name changed (Bob → Bobby)
3,Carla,active,2024-02-01 ← status changed (inactive → active)
4,Dan,active,2024-02-01 ← new customer
Note: Alice (customer_id=1) is absent from Snapshot 2 — she is unchanged.
1. Preview Input Snapshots¶
A look at both input CSV files before we run the processor.
snap1_path = get_path("data", "dim_customers_snapshot1.csv")
snap2_path = get_path("data", "dim_customers_snapshot2.csv")
print("Snapshot 1 — initial load (3 customers)")
display(pl.read_csv(snap1_path))
print("Snapshot 2 — incremental update (Bob renamed, Carla flipped, Dan new)")
display(pl.read_csv(snap2_path))
Snapshot 1 — initial load (3 customers)
| customer_id | name | status | updated_at |
|---|---|---|---|
| i64 | str | str | str |
| 1 | "Alice" | "active" | "2024-01-01" |
| 2 | "Bob" | "active" | "2024-01-01" |
| 3 | "Carla" | "inactive" | "2024-01-01" |
Snapshot 2 — incremental update (Bob renamed, Carla flipped, Dan new)
| customer_id | name | status | updated_at |
|---|---|---|---|
| i64 | str | str | str |
| 2 | "Bobby" | "active" | "2024-02-01" |
| 3 | "Carla" | "active" | "2024-02-01" |
| 4 | "Dan" | "active" | "2024-02-01" |
2. Run Snapshot 1 — Seed the Dimension¶
First run: all rows are new, so every customer gets is_current = true
and effective_to = 9999-12-31.
contract_path = get_path("contract.yaml")
output_csv = get_path("output", "gold_dim_customers", "data.csv")
processor = DataProcessor(contract=contract_path)
result1 = processor.run_source(snap1_path)
processor.materialize(result1.good, result1.bad)
report = processor.last_report or {}
counts = report.get("counts", {})
print(f"Source: {counts.get('source')} | Good: {counts.get('good')} | Quarantined: {counts.get('quarantined')}")
print("\nSCD2 TABLE AFTER SNAPSHOT 1:")
display(pl.read_csv(output_csv))
2026-03-01 15:49:29.476 | INFO | lakelogic.core.processor:run_source:647 - Loading source: D:\Github\_SaaS\lakelogic\examples\02_core_patterns\scd2_dimension\data\dim_customers_snapshot1.csv via polars 2026-03-01 15:49:29.483 | INFO | lakelogic.core.processor:run:446 - Run complete. Source: 3, Total (post-transform): 3, Good: 3, Quarantined: 0, Pre-Transform Dropped: 0, Ratio: 0.00% 2026-03-01 15:49:29.491 | INFO | lakelogic.core.materialization:materialize_dataframe:1539 - Materialized 3 rows to D:\Github\_SaaS\lakelogic\examples\02_core_patterns\scd2_dimension\output\gold_dim_customers\data.csv
Source: 3 | Good: 3 | Quarantined: 0 SCD2 TABLE AFTER SNAPSHOT 1:
| customer_id | name | status | updated_at | _lakelogic_source | _lakelogic_processed_at | _lakelogic_run_id | effective_from | effective_to | is_current |
|---|---|---|---|---|---|---|---|---|---|
| i64 | str | str | str | str | str | str | str | str | bool |
| 1 | "Alice" | "active" | "2024-01-01" | "D:\Github\_SaaS\lakelogic\exam… | "2026-03-01 15:49:29+00:00" | "8220dae5-e86d-4d8e-98f8-d9d5d3… | "2026-03-01T15:49:29+00:00" | null | true |
| 2 | "Bob" | "active" | "2024-01-01" | "D:\Github\_SaaS\lakelogic\exam… | "2026-03-01 15:49:29+00:00" | "8220dae5-e86d-4d8e-98f8-d9d5d3… | "2026-03-01T15:49:29+00:00" | null | true |
| 3 | "Carla" | "inactive" | "2024-01-01" | "D:\Github\_SaaS\lakelogic\exam… | "2026-03-01 15:49:29+00:00" | "8220dae5-e86d-4d8e-98f8-d9d5d3… | "2026-03-01T15:49:29+00:00" | null | true |
3. Run Snapshot 2 — Apply Incremental Changes¶
Second run: LakeLogic compares incoming rows to the current dimension.
- Changed rows (Bob's name, Carla's status): old version closed, new version opened
- New row (Dan): inserted as current
- Unchanged row (Alice): untouched
result2 = processor.run_source(snap2_path)
processor.materialize(result2.good, result2.bad)
report = processor.last_report or {}
counts = report.get("counts", {})
print(f"Source: {counts.get('source')} | Good: {counts.get('good')} | Quarantined: {counts.get('quarantined')}")
print("\nSCD2 TABLE AFTER SNAPSHOT 2 (full history):")
display(pl.read_csv(output_csv).sort(["customer_id"]))
2026-03-01 15:49:29.513 | INFO | lakelogic.core.processor:run_source:647 - Loading source: D:\Github\_SaaS\lakelogic\examples\02_core_patterns\scd2_dimension\data\dim_customers_snapshot2.csv via polars 2026-03-01 15:49:29.520 | INFO | lakelogic.core.processor:run:446 - Run complete. Source: 3, Total (post-transform): 3, Good: 3, Quarantined: 0, Pre-Transform Dropped: 0, Ratio: 0.00% 2026-03-01 15:49:29.543 | INFO | lakelogic.core.materialization:materialize_dataframe:1539 - Materialized 6 rows to D:\Github\_SaaS\lakelogic\examples\02_core_patterns\scd2_dimension\output\gold_dim_customers\data.csv
Source: 3 | Good: 3 | Quarantined: 0 SCD2 TABLE AFTER SNAPSHOT 2 (full history):
| customer_id | name | status | updated_at | _lakelogic_source | _lakelogic_processed_at | _lakelogic_run_id | effective_from | effective_to | is_current |
|---|---|---|---|---|---|---|---|---|---|
| i64 | str | str | str | str | str | str | str | str | bool |
| 1 | "Alice" | "active" | "2024-01-01" | "D:\Github\_SaaS\lakelogic\exam… | "2026-03-01 15:49:29+00:00" | "8220dae5-e86d-4d8e-98f8-d9d5d3… | "2026-03-01T15:49:29+00:00" | null | true |
| 2 | "Bob" | "active" | "2024-01-01" | "D:\Github\_SaaS\lakelogic\exam… | "2026-03-01 15:49:29+00:00" | "8220dae5-e86d-4d8e-98f8-d9d5d3… | "2026-03-01T15:49:29+00:00" | "2026-03-01T15:49:29+00:00" | false |
| 2 | "Bobby" | "active" | "2024-02-01 00:00:00" | "D:\Github\_SaaS\lakelogic\exam… | "2026-03-01 15:49:29+00:00" | "8220dae5-e86d-4d8e-98f8-d9d5d3… | "2026-03-01T15:49:29+00:00" | null | true |
| 3 | "Carla" | "inactive" | "2024-01-01" | "D:\Github\_SaaS\lakelogic\exam… | "2026-03-01 15:49:29+00:00" | "8220dae5-e86d-4d8e-98f8-d9d5d3… | "2026-03-01T15:49:29+00:00" | "2026-03-01T15:49:29+00:00" | false |
| 3 | "Carla" | "active" | "2024-02-01 00:00:00" | "D:\Github\_SaaS\lakelogic\exam… | "2026-03-01 15:49:29+00:00" | "8220dae5-e86d-4d8e-98f8-d9d5d3… | "2026-03-01T15:49:29+00:00" | null | true |
| 4 | "Dan" | "active" | "2024-02-01 00:00:00" | "D:\Github\_SaaS\lakelogic\exam… | "2026-03-01 15:49:29+00:00" | "8220dae5-e86d-4d8e-98f8-d9d5d3… | "2026-03-01T15:49:29+00:00" | null | true |
4. Current Records Only¶
Filter is_current = true to get the latest version of each customer —
equivalent to a standard snapshot query.
full_dim = pl.read_csv(output_csv)
print("CURRENT RECORDS ONLY (is_current = true):")
display(full_dim.filter(pl.col("is_current") == True))
print("\nCLOSED HISTORY ROWS (is_current = false):")
display(full_dim.filter(pl.col("is_current") == False))
CURRENT RECORDS ONLY (is_current = true):
| customer_id | name | status | updated_at | _lakelogic_source | _lakelogic_processed_at | _lakelogic_run_id | effective_from | effective_to | is_current |
|---|---|---|---|---|---|---|---|---|---|
| i64 | str | str | str | str | str | str | str | str | bool |
| 1 | "Alice" | "active" | "2024-01-01" | "D:\Github\_SaaS\lakelogic\exam… | "2026-03-01 15:49:29+00:00" | "8220dae5-e86d-4d8e-98f8-d9d5d3… | "2026-03-01T15:49:29+00:00" | null | true |
| 2 | "Bobby" | "active" | "2024-02-01 00:00:00" | "D:\Github\_SaaS\lakelogic\exam… | "2026-03-01 15:49:29+00:00" | "8220dae5-e86d-4d8e-98f8-d9d5d3… | "2026-03-01T15:49:29+00:00" | null | true |
| 3 | "Carla" | "active" | "2024-02-01 00:00:00" | "D:\Github\_SaaS\lakelogic\exam… | "2026-03-01 15:49:29+00:00" | "8220dae5-e86d-4d8e-98f8-d9d5d3… | "2026-03-01T15:49:29+00:00" | null | true |
| 4 | "Dan" | "active" | "2024-02-01 00:00:00" | "D:\Github\_SaaS\lakelogic\exam… | "2026-03-01 15:49:29+00:00" | "8220dae5-e86d-4d8e-98f8-d9d5d3… | "2026-03-01T15:49:29+00:00" | null | true |
CLOSED HISTORY ROWS (is_current = false):
| customer_id | name | status | updated_at | _lakelogic_source | _lakelogic_processed_at | _lakelogic_run_id | effective_from | effective_to | is_current |
|---|---|---|---|---|---|---|---|---|---|
| i64 | str | str | str | str | str | str | str | str | bool |
| 2 | "Bob" | "active" | "2024-01-01" | "D:\Github\_SaaS\lakelogic\exam… | "2026-03-01 15:49:29+00:00" | "8220dae5-e86d-4d8e-98f8-d9d5d3… | "2026-03-01T15:49:29+00:00" | "2026-03-01T15:49:29+00:00" | false |
| 3 | "Carla" | "inactive" | "2024-01-01" | "D:\Github\_SaaS\lakelogic\exam… | "2026-03-01 15:49:29+00:00" | "8220dae5-e86d-4d8e-98f8-d9d5d3… | "2026-03-01T15:49:29+00:00" | "2026-03-01T15:49:29+00:00" | false |
Summary¶
| Metric | Snapshot 1 | Snapshot 2 |
|---|---|---|
| Rows in source file | 3 | 3 |
| New versions opened | 3 | 3 (2 updates + 1 new) |
| Old versions closed | 0 | 2 (Bob + Carla) |
| Total rows in dimension | 3 | 6 |
Current rows (is_current = true) |
3 | 4 |
| Quarantined | 0 | 0 |
SCD2 outcome per customer¶
| customer_id | Snapshot 1 | Snapshot 2 | Versions |
|---|---|---|---|
| 1 — Alice | active | (absent — unchanged) | 1 row, still current |
| 2 — Bob | active Bob |
name → Bobby |
2 rows: old closed, new current |
| 3 — Carla | inactive | status → active |
2 rows: old closed, new current |
| 4 — Dan | (absent) | new active | 1 row, current |
What LakeLogic did automatically¶
- Compared each incoming row against the existing dimension using
primary_key: customer_id - Detected attribute changes by comparing all non-key fields
- Set
effective_toon closed rows to the new snapshot'supdated_atdate - Set
effective_to = 9999-12-31on all current rows - Added
_lakelogic_processed_atand_lakelogic_run_idaudit columns
Next Steps — Try It Yourself¶
1. Edit the snapshot files¶
Open data/dim_customers_snapshot1.csv:
customer_id,name,status,updated_at
1,Alice,active,2024-01-01
2,Bob,active,2024-01-01
3,Carla,inactive,2024-01-01
Open data/dim_customers_snapshot2.csv:
customer_id,name,status,updated_at
2,Bobby,active,2024-02-01
3,Carla,active,2024-02-01
4,Dan,active,2024-02-01
Ideas:
- Add a third snapshot file and call
processor.run_source(snap3_path)— watch another layer of history accumulate - Introduce a row with
status = "suspended"→ it will be quarantined byvalid_status - Include Alice in Snapshot 2 but with a changed status → she will now have 2 versions too
- Omit Bob from Snapshot 2 entirely (i.e. delete him) — observe that SCD2 leaves his current row as-is (deletions are not tracked unless you add a soft-delete flag)
2. Edit the contract¶
Open contract.yaml and try:
# Rename the SCD2 output columns to match your warehouse convention:
materialization:
strategy: scd2
target_path: output/gold_dim_customers
format: csv
scd2:
effective_from_field: updated_at # source field for version start
effective_to_field: valid_to # rename column
current_flag_field: is_latest # rename column
# Add a new tracked attribute to the model:
model:
fields:
- name: customer_id
type: int
required: true
- name: name
type: string
- name: status
type: string
- name: tier # <-- new field — add to CSV and here
type: string
- name: updated_at
type: date
# Add a new quality rule:
quality:
row_rules:
- name: valid_status
sql: "status IN ('active','inactive')"
category: consistency
- name: name_not_empty
sql: "name IS NOT NULL AND name != ''"
category: completeness
Key contract knobs:
| What to change | Where in contract.yaml |
|---|---|
| SCD2 column names | materialization.scd2.*_field |
| Change-detection key | primary_key |
| Version start timestamp | scd2.effective_from_field |
| Output path / format | materialization.target_path / format |
| Quality rules | quality.row_rules |
3. Explore related playbooks¶
../dedup_survivorship/— deduplicate before feeding into a dimension../medallion_architecture/— build a full Bronze → Silver pipeline that feeds a Gold dimension