Reference Joins¶
Business Scenario¶
Customer records are often missing context like region or opt-out status. Without reference tables, analytics and compliance checks are inconsistent. You need a repeatable way to enrich records using trusted dimension tables — without writing custom join code in every pipeline.
Value Proposition¶
- Declare reference tables once in the contract; reuse across pipelines
- Enrich raw records with standard dimensions via SQL post-transforms
- Validate referential integrity and quality in the same pass
Goals¶
- Load source customer records and reference tables
- Join and enrich via SQL transforms declared in the contract
- Inspect enriched output and quarantined rejects
Setup¶
# ── Setup ─────────────────────────────────────────────────────────────────────
# Single cell: installs lakelogic, clones the repo on Colab, and resolves paths.
# Safe to re-run; skips work already done.
import importlib.util
import os
import sys
import csv
import polars as pl
from pathlib import Path
# 1. Install lakelogic if missing
if importlib.util.find_spec("lakelogic") is None:
print("Installing lakelogic ...")
import subprocess
subprocess.run([sys.executable, "-m", "pip", "install", "lakelogic", "-q"], check=True)
print("lakelogic installed.")
else:
print("lakelogic ready.")
# 2. On Google Colab, clone the repo so YAML/CSV files are available
if "google.colab" in sys.modules:
repo = Path("/content/LakeLogic")
if not repo.exists():
print("Cloning LakeLogic repo ...")
import subprocess
subprocess.run(
[
"git",
"clone",
"--quiet",
"https://github.com/lakelogic/LakeLogic.git",
str(repo),
],
check=True,
)
example_dir = repo / "examples" / "02_core_patterns" / "reference_joins"
os.chdir(example_dir)
print(f"Working directory: {Path.cwd()}")
# 3. Path helper — resolves paths whether running locally or on Colab
def get_path(*parts: str) -> Path:
"""Return an absolute path relative to this notebook's directory."""
base = Path.cwd()
for candidate in [base] + list(base.parents):
target = candidate / "reference_joins" if candidate.name != "reference_joins" else candidate
if (target / "contract.yaml").exists():
return (target / Path(*parts)).resolve()
return (base / Path(*parts)).resolve()
from lakelogic import DataProcessor
print("Setup complete.")
How It Works¶
LakeLogic's links section in the YAML contract declares reference tables.
They are loaded automatically alongside the source data and made available inside
SQL transform blocks — no manual join code required.
Data flow¶
data/customers.csv (9 rows — has nulls, duplicates, invalid tiers)
|
v [pre-transforms]
| 1. rename email_address --> email
| 2. filter WHERE email IS NOT NULL (drops row 4: NoEmail User)
| 3. dedup on email, keep latest created_at (drops row 5: earliest duplicate)
|
v [links loaded automatically]
| dim_geography.csv (country_id --> region: 1=USA, 2=UK, 3=Unknown)
| marketing_opt_outs.csv (email --> opted_out: opted@out.com=true)
|
v [post-transform SQL]
| LEFT JOIN dim_geography --> region (COALESCE to 'Unknown' if no match)
| LEFT JOIN marketing_opt_outs --> opted_out (COALESCE to 'false' if no match)
| CONCAT first_name + last_name --> full_name
|
v [quality rules]
| email_format: email LIKE '%@%' (rejects 'invalid-email')
| valid_membership: IN ('GOLD','SILVER','BRONZE','PLATINUM') (rejects 'INVALID_TIER')
|
+---> good_df enriched, validated records
+---> bad_df quarantined rejects with _lakelogic_errors
Key contract settings¶
| Setting | Value | What it does |
|---|---|---|
links[0] |
dim_geography.csv |
Maps country_id → region |
links[1] |
marketing_opt_outs.csv |
Maps email → opted_out |
pre: rename |
email_address → email |
Normalises field name from source |
pre: filter |
email IS NOT NULL |
Drops records with no email before dedup |
pre: deduplicate |
on email, latest created_at |
One canonical row per email |
post: SQL JOIN |
LEFT JOIN both reference tables | Enriches each surviving row |
quality: email_format |
LIKE '%@%' |
Quarantines malformed emails |
quality: valid_membership |
IN (GOLD, SILVER, ...) |
Quarantines unknown tiers |
quality: valid_rejoin |
region IN ('USA','UK') |
Quarantines rows where country resolves to 'Unknown' |
Known issues in the raw data¶
| Customer | Problem | Expected outcome |
|---|---|---|
| 4 — NoEmail User | email_address is empty |
Dropped by pre-filter |
| 5 — First Dup | Duplicate email (earlier created_at) |
Dropped by dedup |
| 6 — Second Dup | Duplicate email (later created_at) |
Survives as canonical |
| 7 — Bad Email | email = "invalid-email" |
Quarantined (email_format) |
| 8 — Old Timer | membership_level = "INVALID_TIER" |
Quarantined (valid_membership) |
| 3 — Billy Kid | country_id = 1 (has match) |
region = USA → passes valid_rejoin |
| 9 — Opted Out | email = opted@out.com (in opt-out list) |
opted_out = true, region = USA |
| 4 — NoEmail User | country_id = 3 (no match → 'Unknown') |
Quarantined (valid_rejoin) after join |
1. Preview Source Files¶
A quick look at the raw records and the two reference tables before processing.
customers_path = get_path("data", "customers.csv")
geography_path = get_path("data", "dim_geography.csv")
opt_outs_path = get_path("data", "marketing_opt_outs.csv")
# Keep raw dicts for the run step (DataProcessor expects list-of-dicts)
with open(customers_path, newline="", encoding="utf-8") as f:
customers = list(csv.DictReader(f))
print("customers.csv — 9 rows")
display(pl.read_csv(customers_path))
print("dim_geography.csv")
display(pl.read_csv(geography_path))
print("marketing_opt_outs.csv")
display(pl.read_csv(opt_outs_path))
2. Run the Contract¶
DataProcessor loads the source rows, resolves the links reference tables,
applies all pre/post transforms, and enforces quality rules in one pass.
contract_path = get_path("contract.yaml")
processor = DataProcessor(contract=contract_path)
result = processor.run(customers, source_path=str(customers_path))
good_df = result.good
bad_df = result.bad
report = processor.last_report or {}
counts = report.get("counts", {})
print(
f"Source: {counts.get('source')} | "
f"Good: {counts.get('good')} | "
f"Pre-dropped: {counts.get('pre_transform_dropped')} | "
f"Quarantined: {counts.get('quarantined')}"
)
3. Enriched Output (Good Records)¶
Surviving records have region, opted_out, and full_name added by the
reference joins and post-transform SQL.
print("ENRICHED GOOD RECORDS:")
display(good_df)
4. Quarantined Records¶
Records that failed quality rules land here with _lakelogic_errors explaining why.
print("QUARANTINED RECORDS (with error reasons):")
display(bad_df)
Summary¶
| Metric | Value |
|---|---|
| Source rows loaded | 9 |
| Dropped by pre-filter (null email) | 1 |
| Dropped by dedup (older duplicate) | 1 |
| Good (enriched output) | ~4 |
| Quarantined (bad email / bad tier / unknown region) | ~3 |
Exact counts depend on rule execution order. Run the notebook to see live numbers. The
valid_rejoinrule (region IN ('USA','UK')) now also quarantines any customer whosecountry_idhas no match indim_geography(resolved to'Unknown').
Enrichment outcome per customer¶
| customer_id | region | opted_out | full_name | Outcome | |
|---|---|---|---|---|---|
| 1 | john.doe@example.com | USA | false | John Doe | Good |
| 2 | jane.smith@gmail.com | UK | false | Jane Smith | Good |
| 3 | kid@outlook.com | USA | false | Billy Kid | Good |
| 4 | (empty) | — | — | — | Dropped (null email pre-filter) |
| 5 | duplicate@example.com | — | — | — | Dropped (older dedup) |
| 6 | duplicate@example.com | USA | false | Second Dup | Good (latest) |
| 7 | invalid-email | — | — | — | Quarantined (email_format) |
| 8 | old@timer.com | USA | false | Old Timer | Quarantined (valid_membership) |
| 9 | opted@out.com | USA | true | Opted Out | Good |
| (any country_id=3) | (no dim_geography match) | Unknown | — | — | Quarantined (valid_rejoin) |
What LakeLogic did automatically¶
- Renamed
email_address→emailin the pre-transform stage - Filtered null emails before dedup to avoid biasing the survivor selection
- Resolved
dim_geographyandmarketing_opt_outsas in-memory SQL tables from thelinkssection — no extra Python needed - Applied
COALESCEfor country_id = 3 (no geography match →'Unknown') - Added
_lakelogic_processed_at,_lakelogic_run_id,_lakelogic_errorsaudit columns automatically
Next Steps — Try It Yourself¶
1. Edit the source customers¶
Open data/customers.csv and try:
customer_id,email_address,first_name,last_name,birth_date,membership_level,country_id,created_at
1,john.doe@example.com,John,Doe,1985-05-15,GOLD,1,2023-01-05
2,jane.smith@gmail.com,Jane,Smith,1990-10-20,SILVER,2,2023-02-10
3,kid@outlook.com,Billy,Kid,2015-01-01,BRONZE,1,2023-03-01
4,,NoEmail,User,1970-01-01,PLATINUM,3,2022-12-15
5,duplicate@example.com,First,Dup,1980-01-01,GOLD,1,2023-01-01
6,duplicate@example.com,Second,Dup,1980-01-01,GOLD,1,2023-01-10
7,invalid-email,Bad,Email,1990-01-01,BRONZE,2,2023-04-10
8,old@timer.com,Old,Timer,1940-01-01,INVALID_TIER,1,2021-05-05
9,opted@out.com,Opted,Out,1988-08-08,GOLD,1,2023-06-06
Ideas:
- Add a new customer with
country_id = 3→ watchregionbecome'Unknown' - Add a new email to
data/marketing_opt_outs.csv→ seeopted_out = 'true'flow through - Add
country_id = 4todata/dim_geography.csvasCANADA→ then use it in a customer row
2. Edit the reference tables¶
Open data/dim_geography.csv:
country_id,region
1,USA
2,UK
3,Canada
Open data/marketing_opt_outs.csv:
email,opted_out
opted@out.com,true
john.doe@example.com,true
3. Edit the contract¶
Open contract.yaml and try:
# Add a third reference link — e.g. a loyalty tier label lookup:
links:
- name: dim_geography
path: ./data/dim_geography.csv
type: csv
- name: marketing_opt_outs
path: ./data/marketing_opt_outs.csv
type: csv
- name: tier_labels # <-- new
path: ./data/tier_labels.csv
type: csv
# Then use it in the post-transform SQL:
transformations:
- sql: |
SELECT
src.*,
COALESCE(geo.region, 'Unknown') AS region,
COALESCE(opt.opted_out, 'false') AS opted_out,
COALESCE(tl.label, src.membership_level) AS tier_label,
src.first_name || ' ' || src.last_name AS full_name
FROM source src
LEFT JOIN dim_geography geo ON src.country_id = geo.country_id
LEFT JOIN marketing_opt_outs opt ON src.email = opt.email
LEFT JOIN tier_labels tl ON src.membership_level = tl.code
phase: post
# Add a new quality rule — reject customers under 18:
quality:
row_rules:
- name: adult_only
sql: "DATEDIFF('year', birth_date, CURRENT_DATE) >= 18"
category: business_logic
Key contract knobs:
| What to change | Where in contract.yaml |
|---|---|
| Reference table paths | links[*].path |
| Field rename | transformations[rename].mappings |
| Pre-filter condition | transformations[filter].sql |
| Dedup key / sort | transformations[deduplicate].on / sort_by |
| Join SQL | transformations[sql (phase: post)] |
| Quality rules | quality.row_rules |
4. Explore related playbooks¶
../dedup_survivorship/— focus on the dedup/survivorship pattern in isolation../medallion_architecture/— combine joins with a full Bronze → Silver pipeline