Database Governance Quickstart¶
Business Scenario¶
Most organizations have critical data locked in traditional SQL databases (ERP, CRM, legacy apps). To unlock analytics and ML, this data must move to a modern Data Lakehouse — but database records are often messy: null emails, invalid IDs, duplicates. Copying dirty data into your lake builds a Data Swamp.
LakeLogic acts as a Governance Gate: extract from the database, validate against a contract, route clean rows to Silver, and quarantine bad ones for review — automatically.
Value Proposition¶
- Only validated records reach your analytics layer
- Bad records are isolated with an error reason column — no silent data loss
- Rules live in a YAML contract, not scattered across ETL scripts
Goals¶
- Seed a local SQLite database with intentionally dirty data (1 good + 2 bad)
- Define quality rules in a contract
- Run the governance pipeline — inspect what passed and what was quarantined
Setup¶
import importlib.util
import os
import sys
import sqlite3
from pathlib import Path
if importlib.util.find_spec("lakelogic") is None:
import subprocess
subprocess.run([sys.executable, "-m", "pip", "install", "lakelogic", "-q"], check=True)
print("lakelogic installed.")
else:
print("lakelogic ready.")
if "google.colab" in sys.modules:
repo = Path("/content/LakeLogic")
if not repo.exists():
import subprocess
subprocess.run(
[
"git",
"clone",
"--quiet",
"https://github.com/lakelogic/LakeLogic.git",
str(repo),
],
check=True,
)
os.chdir(repo / "examples" / "01_quickstart")
print(f"Working directory: {Path.cwd()}")
def get_path(filename):
"""Resolve path relative to this notebook's directory, Colab-safe."""
cwd = Path.cwd()
for base in [cwd] + list(cwd.parents):
p = base / filename
if p.exists():
return str(p.resolve())
return str((cwd / filename).resolve())
from lakelogic import DataProcessor
print("Setup complete.")
How It Works¶
SQLite DB (3 rows: 1 good, 2 bad)
|
v DataProcessor.run(rows)
| Applies quality rules from contract.yaml row-by-row
|
├── result.good → Analytics-ready Silver layer
| (only Alice — valid email, valid id)
|
└── result.bad → Quarantine
Bob — failed Valid Email rule (no @ in email)
Charlie — failed Valid ID rule (id < 0)
Quality rules in users_contract.yaml¶
| Rule name | SQL expression | Catches |
|---|---|---|
| Valid Email | email LIKE '%@%' |
Any row without @ in the email field |
| Valid ID | id > 0 |
Any row with a zero or negative ID |
Known data seeded into the database¶
| id | name | Expected outcome | |
|---|---|---|---|
| 1 | Alice | alice@example.com | ✅ Good — passes both rules |
| 2 | Bob | bob-invalid | ❌ Bad — fails Valid Email |
| -99 | Charlie | charlie@example.com | ❌ Bad — fails Valid ID |
1. Seed the Database with Dirty Data¶
We use SQLite to simulate a source ERP/CRM database. One good record, two intentionally bad ones.
db_file = "example.db"
conn = sqlite3.connect(db_file)
c = conn.cursor()
c.execute("CREATE TABLE IF NOT EXISTS users (id INTEGER, name TEXT, email TEXT)")
c.execute("DELETE FROM users")
c.execute('INSERT INTO users VALUES (1, "Alice", "alice@example.com")') # good
c.execute('INSERT INTO users VALUES (2, "Bob", "bob-invalid-email")') # bad: no @
c.execute('INSERT INTO users VALUES (-99, "Charlie", "charlie@example.com")') # bad: id < 0
conn.commit()
conn.close()
print("Database seeded: 1 good + 2 bad records.")
2. Review the Contract¶
The users_contract.yaml file defines quality rules. It lives in Git — no code changes needed
when rules change, just update the YAML.
contract_path = get_path("users_contract.yaml")
print(f"Contract: {contract_path}")
print("---")
print(Path(contract_path).read_text())
3. Run the Governance Pipeline¶
Extract from SQLite, pass rows to DataProcessor.run(). LakeLogic validates each row
and splits into good / bad automatically.
conn = sqlite3.connect("example.db")
conn.row_factory = sqlite3.Row
rows = [dict(r) for r in conn.execute("SELECT * FROM users").fetchall()]
conn.close()
processor = DataProcessor(contract=contract_path)
result = processor.run(rows)
print(f"Engine : {processor.engine_name}")
print(f"Records validated: {len(result.good)}")
print(f"Records quarantined: {len(result.bad)}")
4. Inspect Results¶
print("CLEAN SILVER DATA (analytics-ready):")
display(result.good)
print("QUARANTINED ROWS (failed quality rules):")
display(result.bad)
Summary¶
| Row | Name | Outcome | Reason |
|---|---|---|---|
| 1 | Alice | ✅ Good | Passes both rules |
| 2 | Bob | ❌ Quarantined | Failed Valid Email (no @) |
| -99 | Charlie | ❌ Quarantined | Failed Valid ID (negative id) |
What LakeLogic did automatically¶
- Evaluated each database row against every rule in the contract
- Routed passing rows to
result.good— zero custom filtering code - Routed failing rows to
result.badwith an_lakelogic_error_reasoncolumn - Added
_lakelogic_processed_atand_lakelogic_run_idaudit columns
Next Steps — Try It Yourself¶
1. Change the seeded data¶
# Add a new bad record — null name:
c.execute('INSERT INTO users VALUES (4, NULL, "diana@example.com")')
# Then add a rule to catch it:
2. Edit the contract¶
Open users_contract.yaml and add new rules:
version: 1.0.0
dataset: users
source:
type: landing
quality:
row_rules:
- name: Valid Email
sql: "email LIKE '%@%'"
- name: Valid ID
sql: "id > 0" # already here — catches Charlie
- name: Name Required # <-- add this
sql: "name IS NOT NULL AND name != ''"
- name: Known Domain # <-- add this
sql: "email LIKE '%@example.com'"
Key contract knobs:
| What to change | Where in contract.yaml |
Effect |
|---|---|---|
| Quality rules | quality.row_rules |
Any SQL expression — rows that fail are quarantined |
| Rule severity | severity: warning |
Mark rule as warning instead of error |
| Required fields | model.fields[].required: true |
Quarantine rows with null in required columns |
| Write output | materialization.target_path + format |
Persist good rows to Parquet/CSV/Delta |
3. Explore related quickstarts¶
01_hello_world.ipynb— same governance applied to a remote URL../02_core_patterns/scd2_dimension/— add history tracking downstream../02_core_patterns/soft_delete/— flag deletes instead of removing rows