Bronze Quality Gate¶
Business Scenario¶
Raw signups and web leads arrive with missing emails, invalid ages, and inconsistent formats. Before this data hits Silver, you need a fast quality gate to stop obvious issues at ingestion. Catching errors early prevents bad data from silently polluting downstream analytics.
Value Proposition¶
- Stop bad data early without breaking the pipeline
- Quarantine invalid rows with a clear, auditable error reason
- Keep Bronze ingestion lightweight and contract-driven — no custom filtering code
- Full reconciliation: every row is either in
goodorbad, nothing is silently dropped
Goals¶
- Load raw web signups from CSV
- Validate schema fields and row-level quality rules from
contract.yaml - Inspect the good (Silver-ready) and quarantined (bad) rows
Setup¶
import importlib.util
import os
import sys
from pathlib import Path
if importlib.util.find_spec("lakelogic") is None:
import subprocess
subprocess.run([sys.executable, "-m", "pip", "install", "lakelogic", "-q"], check=True)
print("lakelogic installed.")
else:
print("lakelogic ready.")
if "google.colab" in sys.modules:
repo = Path("/content/LakeLogic")
if not repo.exists():
import subprocess
subprocess.run(
[
"git",
"clone",
"--quiet",
"https://github.com/lakelogic/LakeLogic.git",
str(repo),
],
check=True,
)
os.chdir(repo / "examples" / "02_core_patterns" / "bronze_quality_gate")
print(f"Working directory: {Path.cwd()}")
def get_path(*parts):
"""Resolve a path relative to the bronze_quality_gate directory, Colab-safe."""
cwd = Path.cwd()
for base in [cwd] + list(cwd.parents):
target = base if base.name == "bronze_quality_gate" else base / "bronze_quality_gate"
if (target / "contract.yaml").exists():
return (target / Path(*parts)).resolve()
return (cwd / Path(*parts)).resolve()
from lakelogic import DataProcessor
print("Setup complete.")
How It Works¶
LakeLogic reads the CSV, applies the contract's schema rules and quality rules row-by-row,
and returns a ValidationResult with three DataFrames:
Data flow¶
data/raw_signups.csv
|
v DataProcessor.run_source()
| 1. Schema validation (required fields, type coercion)
| 2. Row rules (email_format, age_positive)
|
├── result.good → Silver-ready rows (all rules passed)
└── result.bad → Quarantine (failed rows + error reason)
Quality rules in contract.yaml¶
| Rule | SQL | Catches |
|---|---|---|
email_format |
email LIKE '%@%' |
Missing or malformed email addresses |
age_positive |
age IS NULL OR age >= 0 |
Negative age values |
Schema fields enforced¶
| Field | Type | Required |
|---|---|---|
signup_id |
string | ✅ yes |
email |
string | ✅ yes |
event_date |
date | ✅ yes |
source |
string | no |
age |
int | no |
1. Run the Bronze Quality Gate¶
Point DataProcessor at contract.yaml and run it against the raw signups CSV.
LakeLogic handles schema coercion, rule evaluation, and quarantine routing automatically.
contract_path = get_path("contract.yaml")
data_path = get_path("data", "raw_signups.csv")
processor = DataProcessor(contract=contract_path)
result = processor.run_source(data_path)
print(f"Source rows : {len(result.raw)}")
print(f"Good rows : {len(result.good)}")
print(f"Bad rows : {len(result.bad)}")
print(f"Reconciled : {len(result.raw)} = {len(result.good)} + {len(result.bad)}")
2. Inspect Raw Data¶
Every row exactly as read from the CSV, before any validation.
print("RAW DATA (all source rows):")
display(result.raw)
3. Inspect Good Data (Silver-Ready)¶
Rows that passed all schema checks and quality rules — safe to promote to Silver.
print("GOOD DATA (passed all rules):")
display(result.good)
4. Inspect Quarantined Data¶
Rows that failed one or more rules — routed to quarantine with _lakelogic_error_reason.
print("BAD DATA (quarantined rows + reason):")
display(result.bad)
Summary¶
| What happened | Detail |
|---|---|
| Source | Raw CSV read — schema coercion applied automatically |
email_format rule |
Rows without @ in email → quarantine |
age_positive rule |
Rows with negative age → quarantine |
result.good |
Silver-ready rows |
result.bad |
Quarantined with _lakelogic_error_reason |
What LakeLogic did automatically¶
- Read and coerced the raw CSV against the schema definition
- Evaluated each row against every quality rule in the contract
- Routed passing rows to
result.goodwith zero custom code - Routed failing rows to
result.badwith an error reason column - Added
_lakelogic_processed_atand_lakelogic_run_idaudit columns
Next Steps — Try It Yourself¶
1. Edit the source data¶
Open data/raw_signups.csv and add a new row with a bad email or negative age:
csv
signup_id,email,event_date,source,age
S999,not-an-email,2024-01-15,organic,-5
Re-run the notebook — both rules should fire on that row.
2. Edit the contract¶
quality:
row_rules:
- name: email_format
sql: "email LIKE '%@%'"
- name: age_positive
sql: "age IS NULL OR age >= 0"
- name: age_max # <-- add this
sql: "age IS NULL OR age <= 120"
- name: known_source # <-- add this
sql: "source IN ('organic', 'paid', 'referral')"
severity: warning # warning = quarantine but still counted as good
Key contract knobs:
| What to change | Where in contract.yaml |
Effect |
|---|---|---|
| Quality rules | quality.row_rules |
Any SQL expression — failures route to result.bad |
| Rule severity | severity: warning |
Flag row but still route to result.good |
| Required fields | model.fields[].required: true |
Null values in required columns are quarantined |
| Unknown column handling | schema_policy.unknown_fields |
drop or error |
3. Explore related playbooks¶
../dedup_survivorship/— deduplicate records after the quality gate../scd2_dimension/— add full history tracking downstream../../04_compliance_governance/hipaa_pii_masking/— layer in PII masking and HIPAA/GDPR policy packs