Dedup & Survivorship¶
Business Scenario¶
CRMs and event streams often produce multiple updates for the same customer. Without survivorship rules, metrics inflate and teams act on stale attributes.
Value Proposition¶
- Enforce one canonical record per entity
- Keep the most recent or highest-quality version
- Reduce downstream noise in analytics
Goals¶
- Sort updates by timestamp
- Keep the best record per customer
- Return a clean, deduplicated dataset
1. Setup¶
In [ ]:
Copied!
# ── Setup ─────────────────────────────────────────────────────────────────────
# Single cell: installs lakelogic, clones the repo on Colab, and resolves paths.
# Safe to re-run; skips work already done.
import importlib.util
import os
import sys
import csv
from pathlib import Path
# 1. Install lakelogic if missing
if importlib.util.find_spec("lakelogic") is None:
print("Installing lakelogic ...")
import subprocess
subprocess.run([sys.executable, "-m", "pip", "install", "lakelogic", "-q"], check=True)
print("lakelogic installed.")
else:
print("lakelogic ready.")
# 2. On Google Colab, clone the repo so YAML/CSV files are available
if "google.colab" in sys.modules:
repo = Path("/content/LakeLogic")
if not repo.exists():
print("Cloning LakeLogic repo ...")
import subprocess
subprocess.run(
[
"git",
"clone",
"--quiet",
"https://github.com/lakelogic/LakeLogic.git",
str(repo),
],
check=True,
)
example_dir = repo / "examples" / "02_core_patterns" / "dedup_survivorship"
os.chdir(example_dir)
print(f"Working directory: {Path.cwd()}")
# 3. Path helper — resolves paths whether running locally or on Colab
def get_path(*parts: str) -> Path:
"""Return an absolute path relative to this notebook's directory."""
base = Path(__file__).parent if "__file__" in dir() else Path.cwd()
# Walk up until we find the dedup_survivorship folder
for candidate in [base] + list(base.parents):
target = candidate / "dedup_survivorship" if candidate.name != "dedup_survivorship" else candidate
if (target / "contract.yaml").exists():
return (target / Path(*parts)).resolve()
return (base / Path(*parts)).resolve()
from lakelogic import DataProcessor
print("Setup complete.")
# ── Setup ─────────────────────────────────────────────────────────────────────
# Single cell: installs lakelogic, clones the repo on Colab, and resolves paths.
# Safe to re-run; skips work already done.
import importlib.util
import os
import sys
import csv
from pathlib import Path
# 1. Install lakelogic if missing
if importlib.util.find_spec("lakelogic") is None:
print("Installing lakelogic ...")
import subprocess
subprocess.run([sys.executable, "-m", "pip", "install", "lakelogic", "-q"], check=True)
print("lakelogic installed.")
else:
print("lakelogic ready.")
# 2. On Google Colab, clone the repo so YAML/CSV files are available
if "google.colab" in sys.modules:
repo = Path("/content/LakeLogic")
if not repo.exists():
print("Cloning LakeLogic repo ...")
import subprocess
subprocess.run(
[
"git",
"clone",
"--quiet",
"https://github.com/lakelogic/LakeLogic.git",
str(repo),
],
check=True,
)
example_dir = repo / "examples" / "02_core_patterns" / "dedup_survivorship"
os.chdir(example_dir)
print(f"Working directory: {Path.cwd()}")
# 3. Path helper — resolves paths whether running locally or on Colab
def get_path(*parts: str) -> Path:
"""Return an absolute path relative to this notebook's directory."""
base = Path(__file__).parent if "__file__" in dir() else Path.cwd()
# Walk up until we find the dedup_survivorship folder
for candidate in [base] + list(base.parents):
target = candidate / "dedup_survivorship" if candidate.name != "dedup_survivorship" else candidate
if (target / "contract.yaml").exists():
return (target / Path(*parts)).resolve()
return (base / Path(*parts)).resolve()
from lakelogic import DataProcessor
print("Setup complete.")
How It Works¶
LakeLogic uses a YAML contract to describe dedup and survivorship rules — no custom Python required.
Key contract settings for this example¶
| Setting | Value | What it does |
|---|---|---|
dedup key |
customer_id |
Groups rows by this field |
sort field |
updated_at DESC |
Picks the latest update as the survivor |
post-transform |
status = 'active' AS is_active |
Derives a boolean column from status |
quality rules |
email LIKE '%@%', status IN (...) |
Quarantines invalid rows |
Data flow¶
customer_updates.csv (5 rows, 2 customers appear twice)
|
v
[pre-transform] ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY updated_at DESC)
| → keeps rn = 1 (most recent row per customer)
v
[post-transform] Derives is_active from status
|
v
[quality rules] email format check, valid status check
|
+---------> good_df (3 canonical records)
+---------> bad_df (0 quarantined in this dataset)
2. Run¶
In [ ]:
Copied!
# Load the CSV and run the processor
contract = get_path("contract.yaml")
csv_file = get_path("data", "customer_updates.csv")
with open(csv_file, newline="", encoding="utf-8") as f:
data = list(csv.DictReader(f))
processor = DataProcessor(contract=contract)
result = processor.run(data)
raw_df = result.raw
good_df = result.good
bad_df = result.bad
report = processor.last_report or {}
counts = report.get("counts", {})
print(
f"Source: {counts.get('source')} | "
f"Good: {counts.get('good')} | "
f"Dropped (dedup): {counts.get('pre_transform_dropped')} | "
f"Quarantined: {counts.get('quarantined')}"
)
# Load the CSV and run the processor
contract = get_path("contract.yaml")
csv_file = get_path("data", "customer_updates.csv")
with open(csv_file, newline="", encoding="utf-8") as f:
data = list(csv.DictReader(f))
processor = DataProcessor(contract=contract)
result = processor.run(data)
raw_df = result.raw
good_df = result.good
bad_df = result.bad
report = processor.last_report or {}
counts = report.get("counts", {})
print(
f"Source: {counts.get('source')} | "
f"Good: {counts.get('good')} | "
f"Dropped (dedup): {counts.get('pre_transform_dropped')} | "
f"Quarantined: {counts.get('quarantined')}"
)
Raw Input Data¶
In [ ]:
Copied!
print("RAW DATA (5 rows, note duplicate customer_ids 1 and 2)")
display(raw_df)
print("RAW DATA (5 rows, note duplicate customer_ids 1 and 2)")
display(raw_df)
Good Data — Survivors¶
One canonical record per customer_id — always the row with the latest updated_at.
In [ ]:
Copied!
print("GOOD DATA (deduplicated)")
display(good_df)
print("GOOD DATA (deduplicated)")
display(good_df)
Quarantined Data¶
Rows that failed quality rules (e.g. invalid email, unknown status) land here with error reasons attached.
In [ ]:
Copied!
print("QUARANTINED DATA")
display(bad_df)
print("QUARANTINED DATA")
display(bad_df)
Summary¶
| Metric | Value |
|---|---|
| Source records ingested | 5 |
| Canonical records (good) | 3 |
| Duplicate rows dropped | 2 |
| Quarantined (bad) records | 0 |
Survivorship outcome¶
| customer_id | Rows in | Survivor updated_at |
Survivor status |
is_active |
|---|---|---|---|---|
| 1 (Alice) | 2 | 2024-02-01 | inactive | false |
| 2 (Bob) | 2 | 2024-01-05 | active | true |
| 3 (Carla) | 1 | 2024-01-20 | active | true |
What LakeLogic did automatically¶
- Ran a window function (
ROW_NUMBER) to rank rows per customer by recency — no Pandas/Polars code needed - Derived
is_activevia a SQL post-transform - Added
_lakelogic_processed_at,_lakelogic_run_idaudit columns to every output row - Reported schema drift (
is_activenot in source) as a warning, not a failure
Next Steps — Try It Yourself¶
The two files that drive this entire playbook are small and easy to edit.
1. Edit the input data¶
Open data/customer_updates.csv and try:
customer_id,email,status,updated_at,last_login
1,alice@example.com,active,2024-01-01,2024-01-10
1,alice@example.com,inactive,2024-02-01,2024-02-05
2,bob@example.com,active,2024-01-05,2024-02-01
2,bob@example.com,active,2023-12-20,2023-12-30
3,carla@example.com,active,2024-01-20,2024-01-25
Ideas:
- Add a row with
status = "unknown"→ watch it land inbad_df(failsvalid_statusrule) - Add a row with
email = "notanemail"→ quarantined for failingemail_formatrule - Add a 4th customer with 3 competing rows and verify only the latest survives
2. Edit the contract¶
Open contract.yaml and try:
# Change the survivorship sort field — e.g. keep the OLDEST record instead:
transformations:
- sql: |
SELECT * FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY updated_at ASC) AS rn
FROM source
) AS t
WHERE rn = 1
phase: pre
# Add a new quality rule — e.g. reject records with no last_login:
quality:
row_rules:
- name: has_last_login
sql: "last_login IS NOT NULL"
category: completeness
Key contract knobs:
| What to change | Where in contract.yaml |
|---|---|
| Dedup / partition key | PARTITION BY <field> in pre-transform SQL |
| Survivorship order | ORDER BY <field> DESC/ASC in pre-transform SQL |
| Derived columns | phase: post SQL SELECT list |
| Quality rules | quality.row_rules |
| Quarantine behaviour | quarantine.include_error_reason |
3. Explore related playbooks¶
../medallion_architecture/— combine dedup with a Bronze → Silver pipeline