Medallion Architecture — Bronze to Silver¶
Business Scenario¶
A CRM export contains duplicates, invalid emails, missing fields, and bad spend values. You need a Bronze layer to preserve every raw row, and a Silver layer to apply quality rules and produce analytics-ready data — without losing sight of what was rejected.
Value Proposition¶
- Separate raw ingestion from validated outputs
- Make failures visible without stopping ingestion
- Create a repeatable, auditable Bronze → Silver pipeline in a single contract file
Goals¶
- Run Bronze ingestion — capture all raw rows, even bad ones
- Apply Silver validation — deduplicate, type-cast, enforce business rules
- Inspect clean outputs and quarantined rejects
Setup¶
# ── Setup ─────────────────────────────────────────────────────────────────────
# Single cell: installs lakelogic, clones the repo on Colab, and resolves paths.
# Safe to re-run; skips work already done.
import importlib.util
import os
import sys
import csv
import shutil
from pathlib import Path
# 1. Install lakelogic if missing
if importlib.util.find_spec("lakelogic") is None:
print("Installing lakelogic ...")
import subprocess
subprocess.run([sys.executable, "-m", "pip", "install", "lakelogic", "-q"], check=True)
print("lakelogic installed.")
else:
print("lakelogic ready.")
# 2. On Google Colab, clone the repo so YAML/CSV files are available
if "google.colab" in sys.modules:
repo = Path("/content/LakeLogic")
if not repo.exists():
print("Cloning LakeLogic repo ...")
import subprocess
subprocess.run(
[
"git",
"clone",
"--quiet",
"https://github.com/lakelogic/LakeLogic.git",
str(repo),
],
check=True,
)
example_dir = repo / "examples" / "02_core_patterns" / "medallion_architecture"
os.chdir(example_dir)
print(f"Working directory: {Path.cwd()}")
# 3. Path helper — resolves paths whether running locally or on Colab
def get_path(*parts: str) -> Path:
"""Return an absolute path relative to this notebook's directory."""
base = Path.cwd()
for candidate in [base] + list(base.parents):
target = candidate / "medallion_architecture" if candidate.name != "medallion_architecture" else candidate
if (target / "contract.yaml").exists():
return (target / Path(*parts)).resolve()
return (base / Path(*parts)).resolve()
from lakelogic import DataProcessor
print("Setup complete.")
lakelogic ready. Setup complete.
How It Works¶
LakeLogic uses a single YAML contract to define both pipeline stages. No orchestration framework, no custom transform code.
Pipeline stages¶
data/crm_*.csv (8 rows — duplicates, bad emails, negative spend, non-numeric values)
|
v [Bronze stage]
| - Reads all matching CSV files via glob pattern
| - No type enforcement, no quality rules — capture everything
| - Deduplication is OFF (Bronze preserves raw volume)
| - Writes Parquet to data/bronze/bronze_customers.parquet
|
v [Silver stage (default)]
| - Reads Bronze Parquet
| - Renames plan_type --> tier
| - Deduplicates on customer_id (keeps latest signup_date)
| - Enforces model types (date, float, boolean)
| - Applies quality rules: valid email regex, total_spend >= 0
| - Merges into data/silver/silver_customers.parquet
|
+---> good_df production-ready records
+---> bad_df quarantined rejects with _lakelogic_errors
Key contract settings¶
| Setting | Bronze | Silver |
|---|---|---|
| Source | data/crm_*.csv (glob) |
data/bronze/bronze_customers.parquet |
| Quality rules | None | email regex + positive spend |
| Deduplication | Off | customer_id, sort by signup_date DESC |
| Output | Append Parquet | Merge Parquet |
| Run logs | — | DuckDB (logs/lakelogic_run_logs.duckdb) |
Known issues in the raw data¶
| Customer | Problem | Expected outcome |
|---|---|---|
| 102 Bob | Invalid email format | Quarantined at Silver |
| 103 Charlie | Missing email | Quarantined at Silver |
| 104 Dup User | Appears twice (2024-01-15 and 2024-01-18) | Latest row survives |
| 105 Eve | Negative total_spend (-500) | Quarantined at Silver |
| 106 Frank | signup_date = "not-a-date" |
Quarantined at Silver |
| 107 Grace | total_spend = "high" (non-numeric) |
Quarantined at Silver |
# ── Helper functions ──────────────────────────────────────────────────────────
def reset_outputs():
"""Delete any previously written Bronze/Silver Parquet files for a clean run."""
targets = [
get_path("data", "bronze", "bronze_customers.parquet"),
get_path("data", "silver", "silver_customers.parquet"),
]
for t in targets:
if t.exists():
shutil.rmtree(t) if t.is_dir() else t.unlink()
def preview_frame(df, title=None, limit=10):
"""Print up to `limit` rows from any DataFrame-like object or list of dicts."""
if title:
print(title)
if df is None:
print("<empty>")
return
# Polars / Spark / list support
for method, call in [
("show", lambda: df.show(limit, truncate=False)),
("head", lambda: display(df.head(limit))),
("limit", lambda: display(df.limit(limit))),
]:
if hasattr(df, method):
try:
call()
return
except Exception:
pass
if isinstance(df, list):
for row in df[:limit]:
print(row)
return
display(df)
def print_run_summary(proc, title=None):
"""Print a compact run summary from a DataProcessor."""
if title:
print(title)
report = proc.last_report or {}
counts = report.get("counts") or {}
print(
{
"stage": report.get("stage"),
"source": counts.get("source"),
"good": counts.get("good"),
"quarantined": counts.get("quarantined"),
}
)
contract_path = get_path("contract.yaml")
data_path = get_path("data", "crm_export.csv")
1. Peek at the Raw Data¶
Let's confirm what's coming in before we run anything.
if not data_path.exists():
raise FileNotFoundError(f"Missing input: {data_path}")
with open(data_path, newline="", encoding="utf-8") as f:
rows = list(csv.DictReader(f))
print(f"Raw rows: {len(rows)}")
for row in rows:
print(row)
Raw rows: 8
{'customer_id': '101', 'name': 'Alice Smith', 'email': 'alice.smith@example.com', 'signup_date': '2024-01-15', 'plan_type': 'premium', 'total_spend': '1250.50', 'is_active': 'true'}
{'customer_id': '102', 'name': 'Bob Jones', 'email': 'bob.jones_invalid_email', 'signup_date': '2024-01-16', 'plan_type': 'basic', 'total_spend': '45.00', 'is_active': 'true'}
{'customer_id': '103', 'name': 'Charlie Brown', 'email': '', 'signup_date': '2024-01-17', 'plan_type': 'free', 'total_spend': '0', 'is_active': 'true'}
{'customer_id': '104', 'name': 'Duplicate User', 'email': 'dup@example.com', 'signup_date': '2024-01-15', 'plan_type': 'free', 'total_spend': '10.00', 'is_active': 'true'}
{'customer_id': '104', 'name': 'Duplicate User', 'email': 'dup@example.com', 'signup_date': '2024-01-18', 'plan_type': 'free', 'total_spend': '25.00', 'is_active': 'true'}
{'customer_id': '105', 'name': 'Eve Adams', 'email': 'eve@example.com', 'signup_date': '2024-02-01', 'plan_type': 'premium', 'total_spend': '-500', 'is_active': 'false'}
{'customer_id': '106', 'name': 'Frank White', 'email': 'frank@example.com', 'signup_date': 'not-a-date', 'plan_type': 'basic', 'total_spend': '100', 'is_active': 'true'}
{'customer_id': '107', 'name': 'Grace Lee', 'email': 'grace@example.com', 'signup_date': '2024-02-10', 'plan_type': 'gold', 'total_spend': 'high', 'is_active': 'true'}
2. Bronze Ingestion¶
Bronze captures all raw rows — no type enforcement, no quality gates. Its job is to land data exactly as received and write it to Parquet for Silver to consume.
RESET_OUTPUTS = True # Set False to skip re-writing outputs on reruns
if RESET_OUTPUTS:
reset_outputs()
bronze_proc = DataProcessor(contract=contract_path, stage="bronze")
bronze_result = bronze_proc.run_source()
bronze_proc.materialize(bronze_result.good, bronze_result.bad)
print_run_summary(bronze_proc, "Bronze summary:")
preview_frame(bronze_result.good, "BRONZE OUTPUT (sample):")
2026-03-01 13:33:09.791 | INFO | lakelogic.core.processor:run_source:647 - Loading source: data\crm_*.csv via polars 2026-03-01 13:33:09.798 | INFO | lakelogic.core.processor:run:451 - Run complete. [domain=customer_analytics, system=crm_export] Source: 8, Total: 8, Pre-Transform Dropped: 0 2026-03-01 13:33:09.868 | INFO | lakelogic.core.run_log:_write_run_log_table:325 - Wrote run log to DuckDB table lakelogic_run_logs (D:\Github\_SaaS\lakelogic\examples\02_core_patterns\medallion_architecture\logs\lakelogic_run_logs.duckdb) 2026-03-01 13:33:09.870 | INFO | lakelogic.core.materialization:materialize_dataframe:1299 - Materialized 8 rows to D:\Github\_SaaS\lakelogic\examples\02_core_patterns\medallion_architecture\data\bronze\bronze_customers.parquet
Bronze summary:
{'stage': 'bronze', 'source': 8, 'good': 8, 'quarantined': 0}
BRONZE OUTPUT (sample):
| customer_id | name | signup_date | plan_type | total_spend | is_active | _lakelogic_source | _lakelogic_processed_at | _lakelogic_run_id | _lakelogic_domain | _lakelogic_system | |
|---|---|---|---|---|---|---|---|---|---|---|---|
| i64 | str | str | str | str | str | bool | str | datetime[μs, UTC] | str | str | str |
| 101 | "Alice Smith" | "alice.smith@example.com" | "2024-01-15" | "premium" | "1250.50" | true | "data\crm_*.csv" | 2026-03-01 13:33:09 UTC | "400d1be9-d908-48f9-b9b8-2c4079… | "customer_analytics" | "crm_export" |
| 102 | "Bob Jones" | "bob.jones_invalid_email" | "2024-01-16" | "basic" | "45.00" | true | "data\crm_*.csv" | 2026-03-01 13:33:09 UTC | "400d1be9-d908-48f9-b9b8-2c4079… | "customer_analytics" | "crm_export" |
| 103 | "Charlie Brown" | null | "2024-01-17" | "free" | "0" | true | "data\crm_*.csv" | 2026-03-01 13:33:09 UTC | "400d1be9-d908-48f9-b9b8-2c4079… | "customer_analytics" | "crm_export" |
| 104 | "Duplicate User" | "dup@example.com" | "2024-01-15" | "free" | "10.00" | true | "data\crm_*.csv" | 2026-03-01 13:33:09 UTC | "400d1be9-d908-48f9-b9b8-2c4079… | "customer_analytics" | "crm_export" |
| 104 | "Duplicate User" | "dup@example.com" | "2024-01-18" | "free" | "25.00" | true | "data\crm_*.csv" | 2026-03-01 13:33:09 UTC | "400d1be9-d908-48f9-b9b8-2c4079… | "customer_analytics" | "crm_export" |
| 105 | "Eve Adams" | "eve@example.com" | "2024-02-01" | "premium" | "-500" | false | "data\crm_*.csv" | 2026-03-01 13:33:09 UTC | "400d1be9-d908-48f9-b9b8-2c4079… | "customer_analytics" | "crm_export" |
| 106 | "Frank White" | "frank@example.com" | "not-a-date" | "basic" | "100" | true | "data\crm_*.csv" | 2026-03-01 13:33:09 UTC | "400d1be9-d908-48f9-b9b8-2c4079… | "customer_analytics" | "crm_export" |
| 107 | "Grace Lee" | "grace@example.com" | "2024-02-10" | "gold" | "high" | true | "data\crm_*.csv" | 2026-03-01 13:33:09 UTC | "400d1be9-d908-48f9-b9b8-2c4079… | "customer_analytics" | "crm_export" |
3. Silver Quality Gate¶
Silver reads the Bronze Parquet, deduplicates, enforces types, and applies business quality rules. Records that fail any rule go to the quarantine table.
silver_proc = DataProcessor(contract=contract_path) # no stage = default (Silver)
silver_result = silver_proc.run_source()
silver_proc.materialize(silver_result.good, silver_result.bad)
print_run_summary(silver_proc, "Silver summary:")
2026-03-01 13:33:22.195 | INFO | lakelogic.core.processor:run_source:647 - Loading source: D:\Github\_SaaS\lakelogic\examples\02_core_patterns\medallion_architecture\data\bronze\bronze_customers.parquet via polars 2026-03-01 13:33:22.215 | INFO | lakelogic.core.processor:run:446 - Run complete. [domain=customer_analytics, system=crm_export] Source: 8, Total (post-transform): 7, Good: 2, Quarantined: 5, Pre-Transform Dropped: 1, Ratio: 71.43% 2026-03-01 13:33:22.280 | INFO | lakelogic.core.run_log:_write_run_log_table:325 - Wrote run log to DuckDB table lakelogic_run_logs (D:\Github\_SaaS\lakelogic\examples\02_core_patterns\medallion_architecture\logs\lakelogic_run_logs.duckdb) 2026-03-01 13:33:22.286 | INFO | lakelogic.core.materialization:materialize_dataframe:1460 - Materialized 2 rows to D:\Github\_SaaS\lakelogic\examples\02_core_patterns\medallion_architecture\data\silver\silver_customers.parquet
Silver summary:
{'stage': 'default', 'source': 8, 'good': 2, 'quarantined': 5}
4. Inspect Results¶
preview_frame(silver_result.good, "PRODUCTION-READY RECORDS:")
PRODUCTION-READY RECORDS:
| customer_id | name | signup_date | tier | total_spend | is_active | _lakelogic_source | _lakelogic_processed_at | _lakelogic_run_id | _lakelogic_domain | _lakelogic_system | |
|---|---|---|---|---|---|---|---|---|---|---|---|
| i64 | str | str | date | str | f64 | bool | str | datetime[μs, UTC] | str | str | str |
| 104 | "Duplicate User" | "dup@example.com" | 2024-01-18 | "free" | 25.0 | true | "D:\Github\_SaaS\lakelogic\exam… | 2026-03-01 13:33:22 UTC | "544724ec-5fff-4036-9025-0c5259… | "customer_analytics" | "crm_export" |
| 101 | "Alice Smith" | "alice.smith@example.com" | 2024-01-15 | "premium" | 1250.5 | true | "D:\Github\_SaaS\lakelogic\exam… | 2026-03-01 13:33:22 UTC | "544724ec-5fff-4036-9025-0c5259… | "customer_analytics" | "crm_export" |
preview_frame(silver_result.bad, "QUARANTINED RECORDS (with error reasons):")
QUARANTINED RECORDS (with error reasons):
| customer_id | name | signup_date | tier | total_spend | is_active | quarantine_state | quarantine_reprocessed | _lakelogic_source | _lakelogic_processed_at | _lakelogic_run_id | _lakelogic_domain | _lakelogic_system | _lakelogic_errors | _lakelogic_categories | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| i64 | str | str | date | str | f64 | bool | str | bool | str | datetime[μs, UTC] | str | str | str | list[str] | list[str] |
| 106 | "Frank White" | "frank@example.com" | null | "basic" | 100.0 | true | "active" | false | "D:\Github\_SaaS\lakelogic\exam… | 2026-03-01 13:33:22 UTC | "544724ec-5fff-4036-9025-0c5259… | "customer_analytics" | "crm_export" | ["Rule failed: signup_date_required ("signup_date" IS NOT NULL)"] | ["completeness"] |
| 107 | "Grace Lee" | "grace@example.com" | 2024-02-10 | "gold" | null | true | "active" | false | "D:\Github\_SaaS\lakelogic\exam… | 2026-03-01 13:33:22 UTC | "544724ec-5fff-4036-9025-0c5259… | "customer_analytics" | "crm_export" | ["Rule failed: positive_spend (total_spend >= 0)"] | ["correctness"] |
| 105 | "Eve Adams" | "eve@example.com" | 2024-02-01 | "premium" | -500.0 | false | "active" | false | "D:\Github\_SaaS\lakelogic\exam… | 2026-03-01 13:33:22 UTC | "544724ec-5fff-4036-9025-0c5259… | "customer_analytics" | "crm_export" | ["Rule failed: positive_spend (total_spend >= 0)"] | ["correctness"] |
| 103 | "Charlie Brown" | null | 2024-01-17 | "free" | 0.0 | true | "active" | false | "D:\Github\_SaaS\lakelogic\exam… | 2026-03-01 13:33:22 UTC | "544724ec-5fff-4036-9025-0c5259… | "customer_analytics" | "crm_export" | ["Rule failed: email_required ("email" IS NOT NULL)", "Rule failed: email_regex_match (REGEXP_LIKE("email", '^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$'))"] | ["completeness", "correctness"] |
| 102 | "Bob Jones" | "bob.jones_invalid_email" | 2024-01-16 | "basic" | 45.0 | true | "active" | false | "D:\Github\_SaaS\lakelogic\exam… | 2026-03-01 13:33:22 UTC | "544724ec-5fff-4036-9025-0c5259… | "customer_analytics" | "crm_export" | ["Rule failed: email_regex_match (REGEXP_LIKE("email", '^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$'))"] | ["correctness"] |
silver_out = get_path("data", "silver", "silver_customers.parquet")
print(f"Silver Parquet: {silver_out}")
print(f"Exists: {silver_out.exists()}")
Silver Parquet: D:\Github\_SaaS\lakelogic\examples\02_core_patterns\medallion_architecture\data\silver\silver_customers.parquet Exists: True
Summary¶
| Metric | Bronze | Silver |
|---|---|---|
| Source rows | 8 (all raw) | 8 (from Bronze Parquet) |
| Good (output) rows | 8 | ~2 |
| Duplicate rows dropped | 0 (Bronze keeps all) | 1 (customer 104) |
| Quarantined rows | 0 | ~5 (bad email / spend / date / type) |
Exact quarantine counts depend on which rules fire first. Run the notebook to see live numbers.
What LakeLogic did automatically¶
- Bronze: Glob-matched
data/crm_*.csv, wrote all 8 rows to Parquet — no code needed - Silver: Renamed
plan_type→tier, deduplicated oncustomer_id(latest record wins), cast types, applied email regex + spend rules - Audit columns:
_lakelogic_processed_at,_lakelogic_run_id,_lakelogic_errorsadded to every output row automatically - Run logs: Execution history written to
logs/lakelogic_run_logs.duckdbfor watermarking and incremental load tracking
Next Steps — Try It Yourself¶
The two files that drive this entire pipeline are small and easy to edit.
1. Edit the input data¶
Open data/crm_export.csv and try:
customer_id,name,email,signup_date,plan_type,total_spend,is_active
101,Alice Smith,alice.smith@example.com,2024-01-15,premium,1250.50,true
102,Bob Jones,bob.jones_invalid_email,2024-01-16,basic,45.00,true
103,Charlie Brown,,2024-01-17,free,0,true
104,Duplicate User,dup@example.com,2024-01-15,free,10.00,true
104,Duplicate User,dup@example.com,2024-01-18,free,25.00,true
105,Eve Adams,eve@example.com,2024-02-01,premium,-500,false
106,Frank White,frank@example.com,not-a-date,basic,100,true
107,Grace Lee,grace@example.com,2024-02-10,gold,high,true
Ideas:
- Add a new valid row (e.g. customer 108) and watch it flow cleanly through both layers
- Add a second CSV file (
data/crm_export_v2.csv) — Bronze picks it up automatically via the globdata/crm_*.csv - Fix Bob's email to
bob.jones@example.comand confirm he moves from quarantine to good
2. Edit the contract¶
Open contract.yaml and try:
# Add a new quality rule — reject free-tier customers with zero spend:
quality:
row_rules:
- name: paying_or_trial
sql: "tier != 'free' OR total_spend >= 0"
category: business_logic
# Change Bronze to incremental loading (only new files since last run):
stages:
bronze:
source:
type: raw_landing
path: "data/crm_*.csv"
load_mode: incremental # <-- uncomment this line
# Change dedup sort direction (keep the oldest record instead of latest):
transformations:
- deduplicate:
on: ["customer_id"]
sort_by: ["signup_date"]
order: "asc" # <-- change desc to asc
phase: "pre"
Key contract knobs:
| What to change | Where in contract.yaml |
|---|---|
| Bronze source glob | stages.bronze.source.path |
| Bronze output path | stages.bronze.materialization.target_path |
| Silver dedup key | transformations[deduplicate].on |
| Survivorship order | transformations[deduplicate].order |
| Quality rules | quality.row_rules |
| Run log location | metadata.run_log_database |
3. Explore related playbooks¶
../dedup_survivorship/— dive deeper into survivorship rules in isolation