Skip to content

Complete Contract Template Reference

[!NOTE] Looking for focused, topic-specific guides? This is the full kitchen-sink reference (2,700+ lines). For faster navigation, use the modular documentation:

Topic Page
Domain ownership, SLOs, governance Domain Config →
Storage, environments, placeholders System Config →
Your first data product contract Data Product Contracts →
Ingestion, watermarks, transforms, quality Sub-pages →
SCD2 dimensions, fact tables Dimensional Modeling →
PII masking, schema, lineage Schema & Model →
Notifications & alerting Notifications →
GDPR, EU AI Act compliance Compliance →

This is a fully annotated contract template showing every available configuration option with detailed comments explaining business value and use case scenarios.

Use this as a reference to understand what's possible, then create your own contracts by copying only the sections you need.


Template Structure

# ============================================================
# LAKELOGIC DATA CONTRACT - COMPLETE REFERENCE TEMPLATE
# ============================================================
# This template shows ALL available options with detailed comments.
# Copy and customize only the sections you need for your use case.

# ============================================================
# 1. VERSION & METADATA
# ============================================================
# REQUIRED: Contract version for compatibility tracking
version: 1.0.0

# OPTIONAL: Human-readable metadata about this contract
info:
  title: "Customer Master Data - Silver Layer"
  # Business value: Clear identification in logs and monitoring

  version: "2.1.0"
  # Business value: Track contract evolution over time

  description: "Validated, deduplicated customer records with full quality enforcement"
  # Business value: Documentation for team members and stakeholders

  owner: "data-platform-team@company.com"
  # Business value: Clear ownership for questions and incidents

  contact:
    email: "data-platform@company.com"
    slack: "#data-quality"
  # Business value: Quick escalation path for issues

  target_layer: "silver"
  # Business value: Clarifies position in medallion architecture

  status: "production"
  # Options: development, staging, production, deprecated
  # Business value: Lifecycle management

  classification: "confidential"
  # Options: public, internal, confidential, restricted
  # Business value: Data governance and compliance

  domain: "sales"
  # Data mesh domain (e.g. "sales", "finance", "real-estate")
  # Business value: Domain ownership in data mesh architectures

  system: "crm"
  # Source system identifier (e.g. "salesforce", "sap", "zoopla")
  # Business value: Trace data back to source system

# OPTIONAL: Custom metadata for tagging and organization
metadata:
  domain: "sales"
  # Business value: Data mesh domain ownership

  system: "crm"
  # Business value: Source system identification

  data_layer: "silver"
  # Business value: Medallion layer classification

  pii_present: true
  # Business value: Privacy compliance tracking

  retention_period: "P7Y"
  # ISO 8601 duration — data retention policy (7 years)

  cost_center: "CC-1234"
  # Business value: Chargeback and cost allocation

  sla_tier: "tier1"
  # Business value: SLA classification (tier1 = critical)

  # ── Run Log Persistence ────────────────────────────────────
  # Controls where pipeline run metadata is stored.
  # All three targets can be used simultaneously.

  run_log_dir: "{log_root}/runs/{domain}_{system}_{bronze_layer}_events"
  # Per-run JSON files — one file per run. Works on local + ADLS + S3 + GCS.
  # Best for: lightweight logging, cloud storage

  # run_log_path: "{log_root}/run_log.json"
  # Single JSON file — overwritten each run (no history).
  # Best for: simplest possible logging

  run_log_table: "{domain_catalog}._run_logs"
  # Queryable table — auto-detects backend from engine:
  #   spark engine  → Spark Delta table (Unity Catalog)
  #   polars engine → Delta table via delta-rs (ADLS/S3/local)
  #   duckdb engine → local DuckDB file
  # Best for: production — queryable, partitionable, Delta format

  # run_log_backend: "delta"
  # Explicit backend override. Options: spark, delta, duckdb, sqlite
  # If omitted, auto-detected from engine (spark → spark, everything else → delta)

  # run_log_table_partition_by: ["domain", "data_layer"]
  # Partition the run log table for faster queries (Spark + delta backends)

  # run_log_database: "logs/lakelogic_run_logs.duckdb"
  # Only for duckdb/sqlite backends: path to embedded database file

# ============================================================
# 2. DATA SOURCE CONFIGURATION
# ============================================================
# OPTIONAL: Where to load data from (for run_source() method)
source:
  type: "landing"
  # Options: landing (files), stream (Kafka), table (database/catalog)
  # Business value: Defines acquisition pattern

  path: "s3://bronze-bucket/customers/*.parquet"
  # Supports: Local paths, S3, GCS, ADLS, glob patterns
  # Business value: Flexible source location

  load_mode: "incremental"
  # Options: full, incremental, cdc
  # Business value: Optimize processing (only new/changed data)
  #
  # ── Load Mode Validation ─────────────────────────────────────
  # The pipeline validates required properties per load_mode:
  #   incremental → watermark_field recommended (defaults to _lakelogic_processed_at)
  #   cdc         → at least ONE of: cdc_op_field, cdc_timestamp_field
  #                 (both optional individually, but at least one required)
  #   full        → no additional requirements

  pattern: "*.parquet"
  # OPTIONAL: File pattern filter
  # Business value: Select specific files from directory

  format: "csv"
  # OPTIONAL: Source file format (default: auto-detected from path/extension)
  # Options: parquet | csv | json | delta | avro | orc | xml
  # Business value: Explicit format when auto-detection isn't sufficient

  options:
    header: "true"
    inferSchema: "true"
    recursiveFileLookup: "true"
    multiLine: "true"
    delimiter: ","
  # OPTIONAL: Reader options passed directly to the engine (Spark, Polars, etc.)
  # Common CSV options: header, inferSchema, delimiter, quote, escape, encoding
  # Common JSON options: multiLine, allowComments
  # Common Parquet options: mergeSchema
  # Business value: Fine-grained control over file parsing behavior

  cdc_op_field: "operation"
  # REQUIRED for cdc: Field indicating operation type
  # Business value: Change data capture support

  cdc_delete_values: ["D", "DELETE"]
  # Values indicating delete operations
  # Business value: Handle deletes in CDC streams (see materialization.soft_delete_column for detail on Tombstone vs Hard Delete behavior)

  cdc_timestamp_field: "source_record_updated_at"
  # OPTIONAL for cdc: Source column providing the exact event time for soft deletes.
  # If specified, the engine will use this timestamp (coalescing to current time if null).
  # Business value: High accuracy tracking of exactly when a record was deleted upstream.

  # ── CDC Example Usage ────────────────────────────────────────
  # To enable CDC, set load_mode: cdc and map your source operation flags:
  #
  # Example:
  #   source:
  #     type: table
  #     path: "{domain_catalog}.{bronze_layer}_{system}_events"
  #     load_mode: cdc
  #     cdc_op_field: "__START_AT"  # Or whatever column holds the CDC op
  #     cdc_delete_values: ["D", "Delete", "Remove"]
  #     cdc_timestamp_field: "_lakelogic_processed_at"

  watermark_strategy: "max_target"
  # Options: max_target (default), pipeline_log, manifest, lookback, date_range, delta_version
  # Business value: Flexible incremental boundary resolution
  #
  # ── Strategy Comparison ──────────────────────────────────────
  #
  #  Strategy        Best For                 Source Type     State Stored In          Requires Spark?
  #  ──────────────  ───────────────────────  ──────────────  ───────────────────────  ───────────────
  #  max_target      Most batch pipelines     Table only      Target table (self-heal) Yes
  #  pipeline_log    File-based incremental   File only ⚠️    _run_logs table          No
  #  lookback        Simple rolling windows   File or Table   None (stateless)         No
  #  date_range      Backfills & widgets      File or Table   None (explicit dates)    No
  #  manifest        Non-Spark pipelines      File only       JSON manifest file       No
  #  delta_version   Large streaming tables   Table only ⚠️   Target TBLPROPERTIES     Yes
  #
  # ⚠️ CROSS-VALIDATION (enforced at runtime):
  #   - pipeline_log on a table source → raises ValueError (no file mtimes to compare)
  #   - delta_version on a file source → raises ValueError (no Delta transaction log)
  #   Use load_mode: 'cdc' with cdc_timestamp_field for table-to-table incremental loads.
  #
  # ── Strategy 1: max_target (DEFAULT) ──────────────────────────────
  # Queries MAX(watermark_field) on the target Delta table.
  # Self-healing: if target is manually edited, next run re-reads from
  # the actual high-water mark.
  #
  # Example:
  #   source:
  #     type: table
  #     path: "{domain_catalog}.{bronze_layer}_{system}_events"
  #     load_mode: incremental
  #     watermark_strategy: max_target
  #     watermark_field: "_lakelogic_processed_at"
  #     target_path: "abfss://silver@acct.dfs.core.windows.net/events"
  #   # Requires: target_path pointing to the target Delta table
  #   # Uses watermark_field: YES — to query MAX and to filter source
  #
  # ── Strategy 2: pipeline_log ──────────────────────────────────────
  # Queries the _run_logs table (written by LakeLogic after each run)
  # for the last successful max_source_mtime of this dataset.
  # Compares file modification times against the watermark.
  #
  # ⚠️ FILE SOURCES ONLY — raises ValueError on table sources
  #    (tables have no file mtimes to compare).
  #    For table-to-table incremental, use load_mode: 'cdc' with
  #    cdc_timestamp_field or watermark_strategy: 'delta_version'.
  #
  # Example:
  #   source:
  #     type: landing
  #     path: "abfss://landing@acct.dfs.core.windows.net/events/"
  #     load_mode: incremental
  #     watermark_strategy: pipeline_log
  #   # State: _run_logs table (configured via metadata.run_log_table)
  #   # Filters by: dataset (target table name), data_layer, domain, system
  #   # Excludes: failed runs, no_new_data runs, reprocess runs
  #   # Fallback: if no prior runs, scans all files (initial load)
  #
  # ── Strategy 3: lookback ──────────────────────────────────────────
  # Sliding window: processes everything from NOW - duration to NOW.
  # No state needed. Simple and predictable.
  #
  # Example (daily):
  #   source:
  #     type: table
  #     path: "{domain_catalog}.{bronze_layer}_{system}_events"
  #     load_mode: incremental
  #     watermark_strategy: lookback
  #     lookback: "7 days"
  #     watermark_field: "_lakelogic_processed_at"
  #
  # Example (near-real-time):
  #   source:
  #     watermark_strategy: lookback
  #     lookback: "3 hours"
  #     watermark_field: "event_timestamp"
  #
  # Example (monthly batch):
  #   source:
  #     watermark_strategy: lookback
  #     lookback: "1 month"
  #
  # Supported durations: "N days", "N hours", "N mins", "N month(s)"
  # Uses watermark_field: YES — to filter source
  # Overlap: allows reprocessing of data within the window (idempotent writes recommended)
  #
  # ── Strategy 4: date_range ────────────────────────────────────────
  # Explicit from/to dates. Ideal for backfills and Databricks Widgets.
  #
  # Example:
  #   source:
  #     type: table
  #     path: "{domain_catalog}.{bronze_layer}_{system}_events"
  #     load_mode: incremental
  #     watermark_strategy: date_range
  #     from_date: "2024-01-01"
  #     to_date: "2024-12-31"
  #     watermark_field: "event_date"
  #   # Uses watermark_field: YES — to filter source
  #   # Tip: from_date/to_date can be overridden at runtime via pipeline params
  #
  # ── Strategy 5: manifest ──────────────────────────────────────────
  # Reads a JSON manifest file listing already-processed partitions.
  # Lightweight alternative for non-Spark environments (Polars, Azure Functions).
  #
  # Example:
  #   source:
  #     type: landing
  #     path: "abfss://landing@acct.dfs.core.windows.net/events/"
  #     load_mode: incremental
  #     watermark_strategy: manifest
  #     manifest_path: "abfss://meta@acct.dfs.core.windows.net/manifests/events.json"
  #     watermark_field: "_snapshot_date"
  #   # Manifest JSON format:
  #   #   { "processed_partitions": ["2024-03-20", "2024-03-21"], "last_updated": "..." }
  #   # State: the manifest file itself
  #   # Uses watermark_field: YES — as the partition field tracked in the manifest
  #   # Note: external process must update the manifest after each successful run
  #
  # ── Strategy 6: delta_version ─────────────────────────────────────
  # Uses Delta transaction log versions instead of timestamp columns.
  # Reads only the specific Parquet files added/changed between versions.
  # Fastest strategy for large tables — no column scanning required.
  #
  # ⚠️ TABLE SOURCES ONLY — raises ValueError on file sources
  #    (files have no Delta transaction log).
  #
  # Example:
  #   source:
  #     type: table
  #     path: "catalog.bronze.sessions"
  #     load_mode: incremental
  #     watermark_strategy: delta_version
  #     target_path: "table:catalog.silver.sessions"
  #   # State: target table TBLPROPERTIES('lakelogic.last_source_version')
  #   # Also captured in _run_logs for full audit trail
  #   # Uses watermark_field: NO — versions replace timestamps entirely
  #   # Safeguards:
  #   #   - Detects source rollback (from_v > to_v) and auto-resets
  #   #   - Exempt from mandatory run_log requirement (uses TBLPROPERTIES)
  #   # Requirement: Spark-only, both source and target must be Delta tables

  watermark_field: "_lakelogic_processed_at"
  # For incremental loads: Field to track processing progress.
  # If omitted, defaults to "_lakelogic_processed_at" (stamped by lineage as TIMESTAMP).
  # For partition-pruned reads, use the partition column (e.g. session_date).
  # NOT used by delta_version strategy (operates on version numbers).
  # Business value: Efficient incremental loading

  watermark_date_parts: ["year", "month", "day"]
  # Multi-column partition support when temporal boundary is
  # spread across multiple columns instead of a single date field
  # Also accepts named dict: {year: "partition_year", month: "partition_month"}
  # Business value: Works with Hive-style year/month/day partitions

  partition_filters:
    country: "GB"
    region: "south"
  # Static partition values ANDed into every incremental filter
  # Business value: Scope incremental reads to specific partitions

  flatten_nested: ["derived", "pricing", "location"]
  # Options: false (default), true (all), [col, col, ...] (named)
  # Flatten JSON-string columns from bronze tables into flat columns
  # Business value: Bronze → Silver workflows with nested JSON data

  # ── Date-Partitioned Landing ───────────────────────────────
  # Limits file globbing to only relevant date partitions instead
  # of scanning the entire landing directory. Critical for scale.
  partition:
    format: "y_%Y/m_%m/d_%d"
    # Strftime-style format defining the directory structure.
    # Examples:
    #   "y_%Y/m_%m/d_%d"     → events/y_2026/m_03/d_22/*.json
    #   "date=%Y%m%d"         → events/date=20260322/*.json
    #   "%Y/%m/%d"            → events/2026/03/22/*.json
    #   "year=%Y/month=%m"    → events/year=2026/month=03/*.json
    #   "dt=%Y-%m-%d/%H"      → events/dt=2026-03-22/17/*.json (hourly)
    # Business value: Scan scope drops from millions of files to days

    lookback_days: 3
    # How many days back to scan from today. Default: 1.
    # On FIRST RUN (no watermark), lookback is respected to prevent massive backfills.
    # Can be overridden at runtime: pipeline.run(lookback_days=30)
    # Business value: Controls daily scan scope and initial load bounds

    # start_date: "2026-01-01"
    # end_date: "2026-03-22"
    # Optional: explicit date range for backfills.
    # Also auto-populated from pipeline.run(reprocess_from=..., reprocess_to=...)
    # Business value: Targeted historical reprocessing

    # file_pattern: "*.json"
    # Optional: glob pattern for files within each partition.
    # Auto-derived from source.format if omitted (format: json → *.json)
    # Business value: Only needed when file_pattern differs from format

# ============================================================
# 3. SERVER/STORAGE CONFIGURATION
# ============================================================
# OPTIONAL: Output storage and ingestion controls
server:
  type: "s3"
  # Options: s3, gcs, adls, azure, local, glue
  # Business value: Cloud platform flexibility

  format: "delta"
  # Options: parquet, delta, iceberg, csv, json
  # Business value: Choose optimal storage format

  path: "s3://silver-bucket/customers"
  # Output location for materialized data
  # Business value: Centralized data lake organization

  mode: "validate"
  # Options: validate (quality gate), ingest (raw capture)
  # Business value: Bronze uses "ingest", Silver/Gold use "validate"

  schema_policy:
    evolution: "strict"
    # Options: strict, append, merge, overwrite, compatible, allow
    # strict: Fail on schema changes (production safety)
    # append: Allow new fields (flexible ingestion)
    # merge: Merge new fields into schema (smart evolution)
    # overwrite: Replace schema completely (reprocessing)
    # Business value: Control schema change behavior

    unknown_fields: "quarantine"
    # Options: drop, quarantine, allow
    # drop: Prune undocumented columns from the final table.
    # quarantine: Log/alert and send to dead-letter queue.
    # allow: Pass unknown columns through.
    # Business value: Monitoring vs strict contract enforcement

  cast_to_string: false
  # If true, cast all columns to string (Bronze "all strings" pattern)
  # Business value: Zero ingestion failures, defer type validation

# ============================================================
# 4. ENVIRONMENT-SPECIFIC OVERRIDES
# ============================================================
# OPTIONAL: Override paths/formats per environment
environments:
  dev:
    path: "s3://dev-bucket/customers"
    format: "parquet"
    # Business value: Cheaper storage for dev/test

  staging:
    path: "s3://staging-bucket/customers"
    format: "delta"
    # Business value: Production-like testing

  prod:
    path: "s3://prod-bucket/customers"
    format: "delta"
    # Business value: Production configuration

# Usage: export LAKELOGIC_ENV=dev

# ============================================================
# 5. REFERENCE DATA LINKS
# ============================================================
# OPTIONAL: Link to dimension tables or reference data
links:
  - name: "dim_countries"
    path: "./reference/countries.parquet"
    type: "parquet"
    # Options: parquet, csv, table
    broadcast: true
    # If true, Spark will broadcast join (for small tables)
    columns: ["code", "name", "region"]
    # Column projection — only load these columns from the linked table.
    # Reduces DataFrame footprint by avoiding unused columns.
    # If omitted or empty, all columns are loaded (default).
    # Business value: Memory optimization for wide reference tables

  - name: "dim_products"
    table: "catalog.reference.products"
    type: "table"
    broadcast: false
    columns: ["id", "product_name", "category", "price"]
    # Business value: Unity Catalog / Hive table reference with projection

  - name: "valid_emails"
    path: "s3://reference/email_domains.csv"
    type: "csv"
    # Business value: External validation lists

# ============================================================
# 6. DATASET IDENTIFICATION
# ============================================================
# OPTIONAL: Logical dataset name (used in SQL transformations)
dataset: "customers"
# Business value: SQL table alias in transformations

# OPTIONAL: Business key(s) for the entity
primary_key:
  - "customer_id"
# Business value: Uniqueness validation, merge operations

# OPTIONAL: Business/natural key for SCD2 dimensions
# In SCD2, primary_key is the surrogate (unique per row), while natural_key
# is the business key that repeats across versions.
natural_key:
  - "customer_id"
# Business value: Separates surrogate identity from business identity

# ============================================================
# 7. SCHEMA MODEL
# ============================================================
# OPTIONAL: Define expected schema with types and constraints
model:

  fields:
    - name: "customer_id"
      type: "long"
      # Types: string, int, long, double, boolean, date, timestamp
      required: true
      # If true, generates automatic not_null rule
      pii: false
      classification: "public"
      description: "Unique customer identifier"
      # Business value: Schema documentation and enforcement

      # OPTIONAL: Generator hints (used by DataGenerator)
      accepted_values: ["premium", "standard", "basic"]
      # Generator picks from this list; validator checks IN rule
      min: 1
      max: 999999
      # Generator stays within range; validator checks >= / <= rules

      # OPTIONAL: Foreign key reference to another contract
      foreign_key:
        contract: "silver_agents"
        column: "agent_id"
        severity: "error"
      # Generator samples from PK pool of referenced contract
      # Business value: Referential integrity + synthetic data generation

      # OPTIONAL: Kimball Dimensional Modelling flags
      nullable: true
      # Explicit nullability declaration — critical for accumulating snapshot milestones
      # where dates start as null until that lifecycle stage is reached.
      # Business value: Generates COALESCE merge logic for milestone columns

      milestone: true
      # Flag for accumulating snapshot milestone date columns.
      # Business value: Signals that this column tracks a lifecycle stage

      generated: true
      # Flag for auto-generated fields (surrogate keys, SCD2 validity columns).
      # Fields marked generated: true are produced by LakeLogic, not the source.
      # Business value: Complete, honest description of the output table schema

      # OPTIONAL: Field-level quality rules
      rules:
        - name: "customer_id_positive"
          sql: "customer_id > 0"
          category: "correctness"
          severity: "error"
          # Business value: Field-specific validation

    - name: "email"
      type: "string"
      required: true
      pii: true
      classification: "confidential"
      description: "Customer email address"
      # ── PII Security Group Mapping ──
      security_groups: ["pii-readers", "compliance-team"]
      # Groups allowed to see unmasked values.
      # Maps to IAM/AD groups or Databricks UC is_member() groups.
      # Business value: Role-based data masking per field
      masking: "partial"
      # Default mask when user NOT in security_groups.
      # Options: nullify | hash | redact | partial | encrypt
      #
      # ── Masking Strategies ──────────────────────────────
      #
      # nullify → NULL
      #   Joinable: No | Reversible: No | GDPR: ✓
      #   Hard removal — value gone forever.
      #   Use for GDPR Art.17 erasure writes.
      #
      # hash → a3f8b2c1d4... (SHA-256)
      #   Joinable: Yes | Reversible: No | GDPR: ✓
      #   One-way pseudonymisation. Analytics teams
      #   can COUNT DISTINCT and JOIN across tables
      #   without seeing real values.
      #
      # redact → ***REDACTED***
      #   Joinable: No | Reversible: No | GDPR: ✓
      #   Visible marker that data existed. Audit-
      #   friendly — shows masking was applied
      #   (vs NULL which is ambiguous).
      #
      # partial → j***@company.com | ***-***-1234
      #   Joinable: No | Reversible: No | GDPR: ✗
      #   Preserves structure for support/debug.
      #   NOT suitable for erasure — still leaks
      #   enough to narrow identification.
      #
      # encrypt → enc:gAAAAABh... (AES-256)
      #   Joinable: Yes | Reversible: Yes | GDPR: ✓
      #   Reversible for authorised users via Key
      #   Vault. Delete the key = crypto-shredding.
      #   Best for regulated environments
      #   (healthcare, finance, GDPR Art.17).
      #
      # The encryption key is injected via the LAKELOGIC_PII_KEY 
      # environment variable, which your orchestrator can pull 
      # from Azure Key Vault or AWS KMS before running the pipeline.
      masking_format: "{first1}***@{domain}"
      # Custom format for 'partial' strategy.
      # Tokens: {first1}-{first9}, {last1}-{last9}, {domain}
      # If omitted, auto-detects email/phone/generic.
      #
      # Examples:
      # "{first1}***@{domain}"  → j***@company.com
      # "***-***-{last4}"       → ***-***-1234
      # "{first2}** ***"        → SW** *** (postcode)
      # "GB**-****-{last4}"     → GB**-****-7890 (IBAN)
      rules:
        - name: "email_format"
          sql: "email RLIKE '^[^@]+@[^@]+\\.[^@]+$'"
          category: "correctness"
          description: "Valid email format"

    - name: "age"
      type: "int"
      required: false
      pii: false
      classification: "internal"
      description: "Customer age in years"
      rules:
        - name: "age_range"
          sql: "age BETWEEN 18 AND 120"
          category: "correctness"

    - name: "status"
      type: "string"
      required: true
      pii: false
      description: "Customer account status"

    - name: "created_at"
      type: "timestamp"
      required: true
      pii: false
      description: "Account creation timestamp"

    - name: "updated_at"
      type: "timestamp"
      required: true
      pii: false
      description: "Last update timestamp"

# ============================================================
# 8. SCHEMA POLICY
# ============================================================
# OPTIONAL: How to handle schema evolution and unknown fields
schema_policy:
  evolution: "strict"
  # Options: strict, compatible, allow
  # strict: Fail on any schema change
  # compatible: Allow backward-compatible changes
  # allow: Accept all changes
  # Business value: Production safety vs flexibility

  unknown_fields: "quarantine"
  # Options: quarantine, drop, allow
  # quarantine: Send rows with unknown fields to quarantine
  # drop: Remove unknown fields
  # allow: Keep unknown fields
  # Business value: Handle unexpected columns

# ============================================================
# 9. TRANSFORMATIONS
# ============================================================
# OPTIONAL: Data transformations (pre and post quality checks)
transformations:
  # ──────────────────────────────────────────────────────
  # PRE-PROCESSING (before quality checks)
  # ──────────────────────────────────────────────────────

  # Rename columns to standardize naming
  - rename:
      from: "cust_id"
      to: "customer_id"
    phase: "pre"
    # Business value: Align source schema to target schema

  # Or rename multiple columns at once
  - rename:
      mappings:
        "cust_id": "customer_id"
        "email_addr": "email"
        "cust_status": "status"
    phase: "pre"

  # Drop junk rows early
  - filter:
      sql: "customer_id IS NOT NULL AND email IS NOT NULL"
    phase: "pre"
    # Business value: Remove obvious garbage before validation

  # Deduplicate before validation
  - deduplicate:
      "on": ["customer_id"]
      sort_by: ["updated_at"]
      order: "desc"
    phase: "pre"
    # Business value: Keep most recent record per customer

  # Select specific columns
  - select:
      columns: ["customer_id", "email", "age", "status"]
    phase: "pre"
    # Business value: Drop unnecessary columns

  # Drop specific columns
  - drop:
      columns: ["internal_notes", "temp_field"]
    phase: "pre"
    # Business value: Remove sensitive or temporary fields

  # Cast data types
  - cast:
      columns:
        customer_id: "long"
        age: "int"
        created_at: "timestamp"
    phase: "pre"
    # Business value: Type coercion before validation

  # Trim whitespace
  - trim:
      fields: ["email", "status"]
      side: "both"
    # Options: both, left, right
    phase: "pre"
    # Business value: Clean string data

  # Convert to lowercase
  - lower:
      fields: ["email", "status"]
    phase: "pre"
    # Business value: Normalize string values

  # Convert to uppercase
  - upper:
      fields: ["country_code"]
    phase: "pre"
    # Business value: Standardize codes

  # Coalesce multiple fields
  - coalesce:
      field: "email"
      sources: ["primary_email", "secondary_email", "backup_email"]
      default: "unknown@example.com"
      output: "email"
    phase: "pre"
    # Business value: Fill nulls from multiple sources

  # Split string into array
  - split:
      field: "tags"
      delimiter: ","
      output: "tag_array"
    phase: "pre"
    # Business value: Parse delimited strings

  # Explode array into rows
  - explode:
      field: "tag_array"
      output: "tag"
    phase: "pre"
    # Business value: Normalize nested data

  # Map values
  - map_values:
      field: "status"
      mapping:
        "A": "ACTIVE"
        "I": "INACTIVE"
        "P": "PENDING"
      default: "UNKNOWN"
      output: "status"
    phase: "pre"
    # Business value: Standardize code values

  # ──────────────────────────────────────────────────────
  # POST-PROCESSING (after quality checks, on good data)
  # ──────────────────────────────────────────────────────

  # Derive new fields
  - derive:
      field: "age_group"
      sql: "CASE WHEN age < 25 THEN 'young' WHEN age < 65 THEN 'adult' ELSE 'senior' END"
    phase: "post"
    # Business value: Calculated fields for analytics

  # Lookup/join dimension data
  - lookup:
      field: "country_name"
      reference: "dim_countries"
      "on": "country_code"
      key: "code"
      value: "name"
      default_value: "Unknown"
    phase: "post"
    # Business value: Enrich with reference data

  # Full join with multiple fields
  - join:
      reference: "dim_products"
      "on": "product_id"
      key: "id"
      fields: ["product_name", "category", "price"]
      type: "left"
      # Options: left, inner, right, full
      prefix: "product_"
      defaults:
        product_name: "Unknown Product"
        category: "Uncategorized"
    phase: "post"
    # Business value: Multi-field enrichment

  # SQL transformation
  - sql: |
      SELECT
        *,
        DATEDIFF(CURRENT_DATE, created_at) AS days_since_signup,
        CASE
          WHEN status = 'ACTIVE' AND age > 65 THEN 'senior_active'
          WHEN status = 'ACTIVE' THEN 'active'
          ELSE 'inactive'
        END AS segment
      FROM source
    phase: "post"
    # Business value: Complex transformations

  # Rollup/aggregation with lineage tracking
  - rollup:
      group_by: ["customer_segment", "country"]
      aggregations:
        total_customers: "COUNT(*)"
        avg_age: "AVG(age)"
        total_revenue: "SUM(lifetime_value)"
      keys: "customer_id"
      # Track which customer IDs rolled into each group
      rollup_keys_column: "_lakelogic_rollup_keys"
      rollup_keys_count_column: "_lakelogic_rollup_keys_count"
      upstream_run_id_column: "_upstream_run_id"
      upstream_run_ids_column: "_upstream_lakelogic_run_ids"
      distinct: true
    phase: "post"
    # Business value: Compute metrics (Pair with `fact: type: aggregate` below for auto-lineage!)

  # Pivot long metrics into wide columns
  - pivot:
      id_vars: ["customer_id"]
      pivot_col: "metric"
      value_cols: ["value"]
      values: ["clicks", "impressions"]
      # values list required for portable SQL pivot
      agg: "sum"
      name_template: "{pivot_alias}"
    phase: "post"
    # Business value: Wide analytics-ready metrics

  # Unpivot wide columns back to long form
  - unpivot:
      id_vars: ["customer_id"]
      value_vars: ["clicks", "impressions"]
      key_field: "metric"
      value_field: "value"
      include_nulls: false
    phase: "post"
    # Business value: Normalize wide metrics to long rows

  # Bucket numeric values into labelled bands
  - bucket:
      field: "price_band"
      source: "listing_price"
      bins:
        - lt: 250000
          label: "sub_250k"
        - lt: 500000
          label: "250k_500k"
        - lt: 1000000
          label: "500k_1m"
      default: "1m_plus"
    phase: "post"
    # Compiles to SQL CASE expression — identical across all engines
    # Business value: Categorize values into named segments

  # Extract scalar values from JSON string columns
  - json_extract:
      field: "latitude"
      source: "location_coordinates"
      path: "$.latitude"
      cast: "float"
    phase: "post"
    # Engine-agnostic: Polars/DuckDB/Spark each use native JSON ops
    # Business value: Parse nested JSON without full flattening

  # Compute date differences
  - date_diff:
      field: "listing_age_days"
      from_col: "creation_date"
      to_col: "event_date"
      unit: "days"
    # Options: days, hours, months
    phase: "post"
    # Business value: Calculate durations between events

  # Explode date ranges into one row per day
  - date_range_explode:
      output: "snapshot_date"
      start_col: "creation_date"
      end_col: "deleted_at"
      # end_col is nullable — defaults to today when null
      interval: "1d"
    phase: "post"
    # Business value: Create daily snapshots from validity ranges

# ============================================================
# 10. QUALITY RULES
# ============================================================
# OPTIONAL: Data quality validation rules
#
# ── Pipeline Execution Order ──────────────────────────────────
# 1. Pre-transforms  → renames, filters, dedup (phase: pre)
# 2. Schema enforcement → cast columns to contract types
# 3. Pre quality rules (default) → validate source columns
# 4. Good/bad split → quarantine rows failing pre-rules
# 5. Post-transforms → derives, lookups, joins (phase: post)
# 6. Post quality rules (phase: post) → validate derived columns,
#    quarantine failures
#
# Pre-phase rules can only reference SOURCE columns.
# Post-phase rules can reference both source AND derived columns.
# ──────────────────────────────────────────────────────────────
quality:
  enforce_required: true
  # If true, generate not_null rules for required fields
  # Business value: Automatic completeness checks

  # ──────────────────────────────────────────────────────
  # ROW-LEVEL RULES (quarantine individual bad rows)
  # ──────────────────────────────────────────────────────
  row_rules:
    # Simple not-null check
    - not_null: "email"
      # Business value: Ensure critical fields are populated

    # Not-null with custom config
    - not_null:
        field: "customer_id"
        name: "customer_id_required"
        category: "completeness"
        description: "Customer ID must be present"
        severity: "error"
      # Severity: error (quarantine), warning (log only), info

    # Multiple not-null checks
    - not_null:
        fields: ["email", "status", "created_at"]
        category: "completeness"

    # Accepted values (enum validation)
    - accepted_values:
        field: "status"
        values: ["ACTIVE", "INACTIVE", "PENDING", "SUSPENDED"]
        category: "consistency"
        description: "Status must be valid enum value"
      # Business value: Enforce controlled vocabularies

    # Regex pattern matching
    - regex_match:
        field: "email"
        pattern: "^[^@]+@[^@]+\\.[^@]+$"
        category: "correctness"
        description: "Email must be valid format"
      # Business value: Format validation

    # Numeric range validation
    - range:
        field: "age"
        min: 18
        max: 120
        inclusive: true
        category: "correctness"
        description: "Age must be between 18 and 120"
      # Business value: Plausibility checks

    # Referential integrity (foreign key)
    - referential_integrity:
        field: "country_code"
        reference: "dim_countries"
        key: "code"
        category: "consistency"
        description: "Country code must exist in reference table"
      # Business value: Prevent orphaned records

    # Lifecycle window validation (SCD Type 2)
    - lifecycle_window:
        event_ts: "order_date"
        event_key: "customer_id"
        reference: "dim_customers"
        reference_key: "customer_id"
        start_field: "valid_from"
        end_field: "valid_to"
        end_default: "9999-12-31"
        category: "consistency"
        description: "Order must fall within customer validity window"
      # Business value: Temporal referential integrity

    # Custom SQL rule
    - name: "email_domain_valid"
      sql: "email NOT LIKE '%@temp-mail.%' AND email NOT LIKE '%@disposable.%'"
      category: "validity"
      description: "Block disposable email domains"
      severity: "error"
      # Business value: Business-specific validation

    # Post-phase rule (runs AFTER transforms — can reference derived columns)
    - name: "derived_date_not_null"
      sql: "event_date_parsed IS NOT NULL"
      phase: "post"
      category: "correctness"
      description: "Derived date must parse successfully"
      # phase: pre (default) — runs before transforms, source columns only
      # phase: post — runs after transforms, can reference derived columns
      # Errors tagged [pre] or [post] in _lakelogic_errors for traceability

  # ──────────────────────────────────────────────────────
  # DATASET-LEVEL RULES (aggregate checks on good data)
  # ──────────────────────────────────────────────────────
  dataset_rules:
    # Uniqueness check
    - unique: "customer_id"
      # Business value: Prevent duplicates

    # Uniqueness with custom config
    - unique:
        field: "email"
        name: "email_unique"
        category: "uniqueness"
        description: "Email addresses must be unique"
        severity: "error"

    # Null ratio threshold
    - null_ratio:
        field: "phone"
        max: 0.20
        # Max 20% null values allowed
        category: "completeness"
        description: "Phone number should be present for most customers"
      # Business value: Data completeness monitoring

    # Row count validation
    - row_count_between:
        min: 1000
        max: 10000000
        category: "completeness"
        description: "Expected customer count range"
      # Business value: Detect missing or duplicate data

    # Custom SQL dataset rule
    - name: "active_customer_ratio"
      sql: "SELECT SUM(CASE WHEN status = 'ACTIVE' THEN 1 ELSE 0 END) / COUNT(*) FROM source"
      category: "validity"
      description: "At least 60% of customers should be active"
      must_be_greater_than: 0.60
      # Business value: Business metric validation

# ============================================================
# 11. QUARANTINE CONFIGURATION
# ============================================================
# OPTIONAL: Quarantine settings and notifications
quarantine:
  enabled: true
  # If false, pipeline fails on any quality rule failure
  # Business value: Fail-fast vs graceful degradation

  notifications_enabled: true
  # If false, mutes all alerts specifically for this contract
  # overrides domain/system settings if set to false

  fail_on_quarantine: true
  # If true, deliberately crush and fail the pipeline immediately upon 
  # quarantining ANY bad records instead of allowing good records to flow.
  # Business value: Aggressive data quality enforcement

  target: "s3://quarantine-bucket/customers"
  # Where to write quarantined records
  # Business value: Centralized bad data repository

  include_error_reason: true
  # If true, include _lakelogic_errors column
  # Business value: Root cause analysis

  strict_notifications: true
  # If true, fail pipeline if notification fails
  # Business value: Ensure alerts are delivered

  format: "parquet"
  # Options: parquet (default), csv, delta, json
  # Output format for file-based quarantine targets
  # Business value: Match quarantine format to your tooling

  write_mode: "append"
  # Options: append (default), overwrite
  # append: add bad records to existing quarantine data
  # overwrite: replace quarantine target on every run
  # Business value: Control quarantine growth vs freshness

# ============================================================
# 12. NOTIFICATION CHANNELS
# ============================================================
# OPTIONAL: Pipeline event notifications via Apprise.
# Handles ALL event types — not just quarantine.
# Each entry uses a `target:` URL — Apprise auto-detects the channel
# from the URL scheme (mailto://, slack://, msteams://, etc.).
#
# `target:` MUST be a full Apprise URL with a scheme.
# ❌ Wrong:  target: "to=alerts@company.com"
# ✅ Right:  target: "mailto://user:pass@smtp.co.com?to=alerts@company.com"
#
# Also accepts `type:` for legacy built-in adapters (slack, teams, webhook).
# `channel:`, `url:`, `to:` are aliases for `target:`.
#
# Supported events: quarantine, failure, schema_drift, dataset_rule_failed,
#                   slo_breach, slo_recovery
#
# Secrets in URLs: use env:VAR_NAME, keyvault:secret-name, etc.
# Full Apprise reference: https://github.com/caronc/apprise/wiki
notifications:

  # ── Slack (via webhook URL) ──────────────────────────────
  - target: "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
    on_events: ["quarantine", "failure", "schema_drift", "slo_breach", "slo_recovery"]
    subject_template: "[{{ event | upper }}] {{ contract.title }}"
    message_template: "Run={{ run_id }}\nMessage={{ message }}"
    # Apprise auto-detects Slack from the hooks.slack.com domain.
    # Business value: Real-time alerting

  # ── Microsoft Teams (via webhook URL) ────────────────────
  - target: "https://outlook.office.com/webhook/YOUR/WEBHOOK/URL"
    on_events: ["quarantine", "failure"]
    # Apprise auto-detects Teams from the outlook.office.com domain.
    # Legacy alternative: type: "teams" + channel: "https://..."

  # ── Email via SMTP (Gmail) ──────────────────────────────
  # SMTP credentials are embedded in the URL. No server config needed.
  - target: "mailto://user:app_password@gmail.com?to=data-platform@company.com"
    on_events: ["failure", "dataset_rule_failed"]
    subject_template: "[{{ event | upper }}] {{ contract.title }}"
    # Other SMTP providers:
    #   O365:     mailto://user:pass@outlook.com?to=team@co.com
    #   Custom:   mailtos://user:pass@smtp.company.com:587?to=alerts@co.com
    #   Multiple: mailto://user:pass@smtp.co.com?to=a@co.com,b@co.com
    # With secrets: mailto://env:SMTP_USER:env:SMTP_PASS@smtp.co.com?to=alerts@co.com
    # Global Fallback: Setting the LAKELOGIC_SMTP_URI environment variable 
    # lets you use bare emails (target: "alerts@co.com") directly.

  # ── Email via SendGrid API (no SMTP needed) ─────────────
  # Uses SendGrid's native API integration for precise environment variable mapping
  - type: sendgrid
    target: "data-platform@company.com"
    api_key: "env:SENDGRID_API_KEY"
    from_email: "env:EMAIL_FROM_ADDRESS"
    from_name: "env:EMAIL_FROM_NAME"
    on_events: ["failure", "slo_breach"]
    subject_template: "[{{ event | upper }}] {{ contract.title }}"

  # ── Generic Webhook ─────────────────────────────────────
  - target: "json://api.company.com/data-quality/alerts"
    on_events: ["quarantine", "failure", "schema_drift", "slo_breach"]
    # json:// sends a JSON POST to any HTTP endpoint.
    # Alternatives: xml://, form://
    # Business value: Integration with custom monitoring systems

  # ── Multi-channel fan-out ───────────────────────────────
  # Send the same alert to multiple channels at once.
  - targets:
      - "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
      - "mailto://user:pass@smtp.co.com?to=data-platform@company.com"
      - "sendgrid://apikey:env:SENDGRID_API_KEY@co.com/alerts@co.com"
    on_events: ["failure"]
    # Business value: Broadcast critical alerts to multiple channels

# ============================================================
# 13. MATERIALIZATION
# ============================================================
# OPTIONAL: How to write output data
materialization:
  strategy: "merge"
  # Options: append, merge, scd2, overwrite
  # append: Add new rows (e.g. Bronze raw logs, Gold fact tables)
  #         (Add fact: block below to enable Kimball auto-governance and structural validations)
  # strategy: merge
  # merge: Upsert based on primary key (e.g. Silver CDC, Gold dimensions)
  #        (Add scd1: block below to enable SCD Type 1 auto-SK and Unknown Member)
  # materialization:
  #   strategy: merge
  #   merge_dedup_guard: true
  #   # OPTIONAL: If true, deduplicates the incoming batch by primary key 
  #   # before merging. This prevents MERGE_CARDINALITY_VIOLATION errors 
  #   # if the source contains duplicate keys. 
  #   # Business value: Production stability against dirty source data.
  #   # Default: false (fail loudly on duplicate PKs).
  # scd2: Slowly Changing Dimension Type 2 (history tracking - Configured via scd2: block below)
  # overwrite: Replace all data (e.g. daily snapshots)
  # Business value: Choose appropriate write pattern across all Medallion layers

  partition_by:
    - "country"
    - "created_date"
  # Partition columns for performance
  # Business value: Query optimization

  cluster_by:
    - "customer_id"
  # Clustering columns (Delta/Iceberg)
  # Business value: Further query optimization

  reprocess_policy: "overwrite_partition"
  # Options: overwrite_partition, append, fail
  # How to handle re-running same partition
  # Business value: Idempotent pipeline execution

  target_path: "s3://silver-bucket/customers"
  # Override default output path
  # Business value: Explicit output control

  format: "delta"
  # Override default format
  # Options: parquet, delta, iceberg, csv
  # Business value: Format selection

  # ── Fact Table Configuration (Auto-Governance) ───────────────
  # OPTIONAL: Define the business purpose of this fact table.
  # LakeLogic uses this to automatically enforce data quality rules 
  # based on Kimball best practices, so you don't have to write them manually.
  fact:
    type: "accumulating_snapshot"  
    # Options & Business Value (Pick One):
    # - transaction: Immutable log of events (e.g. Sales, Clicks). Cannot be updated.
    # - periodic_snapshot: State at a point in time (e.g. Daily Account Balances).
    # - accumulating_snapshot: Tracks lifecycle of a process (e.g. Order Pipelines, SLAs).
    # - factless: Tracks events that have no numeric metrics (e.g. Student Attendance).
    # - aggregate: Pre-summarized metrics for dashboards (e.g. Monthly Revenue).
    #   (Pair this with a `rollup:` transformation block below to compute the actual math!)

    milestone_dates:
      # Required ONLY for 'accumulating_snapshot'. 
      # Tracks the progression of an entity through defined stages.
      # LakeLogic automatically ensures these dates always flow sequentially
      # (e.g. preventing 'shipped_date' from arriving before 'placed_date').
      - "placed_date"
      - "shipped_date"
      - "delivered_date"

  # ── SCD Type 2 specific configuration ────────────────────────
  scd2:
    timestamp_field: "updated_at"
    # Field to determine record version

    start_date_field: "valid_from"
    # Column to store validity start date

    end_date_field: "valid_to"
    # Column to store validity end date

    current_flag_field: "is_current"
    # Boolean flag for current record

    start_date_default: "1900-01-01"
    # Default value for initial load records

    end_date_default: "9999-12-31"
    # Default value for open-ended records

    track_columns: ["email", "status", "age"]
    # OPTIONAL: Only open a new version when one of these columns changes.
    # If omitted, any incoming row for a known key triggers a new version.
    # Business value: Reduce version churn from no-op updates

    # ── Surrogate Key (auto-generated) ─────────────────────────
    surrogate_key: "_sk"
    # Column name for auto-generated surrogate key. Default: "_sk"
    # Set to null/empty to disable.
    # Business value: Unique row-level ID for downstream joins

    surrogate_key_strategy: "hash"
    # Options: hash (default), uuid
    # hash: SHA256(primary_key|effective_from)[:16] — deterministic, reprocess-safe
    # uuid: random 16-char hex — unique but non-deterministic
    # Business value: Idempotent surrogate keys for reprocessing

    # ── Version Number (auto-generated) ────────────────────────
    version_column: "_version"
    # Column name for auto-incrementing version per business key. Default: "_version"
    # Computed as ROW_NUMBER() OVER (partition_by primary_key ORDER BY effective_from)
    # Set to null/empty to disable.
    # Business value: "Give me version N of this customer"

    # ── Change Reason (auto-generated) ─────────────────────────
    change_reason_column: "_change_reason"
    # Column name for tracking which fields triggered a new version.
    # Default: "_change_reason". Set to null/empty to disable.
    # Values:
    #   "initial_load"   — first appearance of this business key
    #   "email,status"   — comma-separated list of changed tracked fields
    #   "all"            — no track_columns specified (all changes trigger version)
    # Business value: Audit trail — why was a new version created?

    # ── Unknown Member Injection ───────────────────────────────
    # Enabled by default. Provides an idempotently generated late-arriving fact fallback row.
    unknown_member:
      enabled: true
      surrogate_key_value: "-1"
      # OPTIONAL: Explicit default values to override lakelogic's auto-inference.
      # If omitted or a column is left out, lakelogic automatically infers defaults 
      # from schema data types (Numeric->-1, Boolean->False). 
      # Strings default to "Unknown" but safely truncate to fit VARCHAR(N) constraints.
      # Examples: VARCHAR(2) -> "Un", VARCHAR(4) -> "Unkn", VARCHAR(X) > 7 -> "Unknown".
      # default_values:
      #   customer_id: -1
      #   email: "Unknown"
      #   status: "Unknown"

  # SCD Type 1 specific configuration (only applied if strategy: "merge")
  scd1:
    surrogate_key: "_sk"
    # Auto-generates a Surrogate Key for the dimension (Hash of primary key)
    # Business value: Fast integer/hash joins for downstream facts where history is not needed

    surrogate_key_strategy: "hash"
    # Options: hash (default), uuid
    # hash: SHA256(primary_key)[:16] — deterministic

    # ── Unknown Member Injection ───────────────────────────────
    # Enabled by default. Provides an idempotently generated late-arriving fact fallback row even for SCD1.
    unknown_member:
      enabled: true
      surrogate_key_value: "-1"
      # OPTIONAL: Explicit default values to override lakelogic's auto-inference.
      # If omitted or a column is left out, lakelogic automatically infers defaults 
      # from schema data types (Numeric->-1, Boolean->False). 
      # Strings default to "Unknown" but safely truncate to fit VARCHAR(N) constraints.
      # Examples: VARCHAR(2) -> "Un", VARCHAR(4) -> "Unkn", VARCHAR(X) > 7 -> "Unknown".
      # default_values:
      #   customer_id: -1
      #   email: "Unknown"
      #   status: "Unknown"

  # OPTIONAL: Soft-delete support (mark deleted instead of removing). 
  # - If set (Tombstone): Deletes from CDC flip this flag to true and preserve the row. 
  #   Late-arriving inserts for already deleted records will safely insert as tombstones.
  # - If omitted (Hard Delete): Deletes from CDC physically remove the row from the target.
  #
  # ✨ NOTE: When `source.load_mode` is set to `cdc`, LakeLogic automatically 
  # enables soft-deletes (Tombstone) by injecting the default columns below. 
  # To force Hard Deletes during a CDC load, explicitly set `soft_delete_column: ""` (empty string).
  soft_delete_column: "_lakelogic_is_deleted"
  # Column name for soft-delete boolean flag
  soft_delete_value: true
  # Value to set when record is deleted
  soft_delete_time_column: "_lakelogic_deleted_at"
  # Timestamp for when soft-delete occurred (auto-filled on CDC deletes if empty)
  soft_delete_reason_column: "_lakelogic_delete_reason"
  # Reason for deletion (auto-filled to 'cdc_delete_signal' on CDC deletes if empty)
  # Business value: GDPR compliance & Auditability — maintain history without hard deletes

  # OPTIONAL: External storage location for Unity Catalog tables
  location: "abfss://container@account.dfs.core.windows.net/silver/customers/"
  # Business value: Control physical storage for UC managed tables
  # Supports placeholders: "{silver_path}/{silver_layer}_{system}_customers"
  # See _system.yaml Placeholders section below

  # OPTIONAL: Delta/Iceberg table properties
  table_properties:
    "delta.autoOptimize.optimizeWrite": "true"
    "delta.autoOptimize.autoCompact": "true"
  # Business value: Optimize table performance without manual maintenance

  # OPTIONAL: Auto-compaction and vacuum settings
  compaction:
    auto: true
    vacuum_retention_hours: 168
  # Business value: Automated storage optimization

# ============================================================
# 14. LINEAGE & OBSERVABILITY
# ============================================================
# OPTIONAL: Lineage capture configuration.
# When enabled, up to 8 lineage columns are injected into every output row.
#
# All 8 lineage columns:
#   _lakelogic_source         — source file/table path
#   _lakelogic_processed_at   — updated every pipeline run
#   _lakelogic_run_id         — unique run identifier
#   _lakelogic_contract_name  — YAML contract filename
#   _lakelogic_domain         — domain from metadata
#   _lakelogic_system         — system from metadata
#   _lakelogic_created_at     — first-insert timestamp (immutable)
#   _lakelogic_created_by     — username/service principal
#
lineage:
  enabled: true
  # If true, inject lineage columns
  # Business value: Data provenance tracking

  capture_source_path: true
  source_column_name: "_lakelogic_source"
  # Capture source file/table path

  capture_timestamp: true
  timestamp_column_name: "_lakelogic_processed_at"
  # Updated every pipeline run (processing time)
  # Also serves as default watermark for incremental loading

  capture_run_id: true
  run_id_column_name: "_lakelogic_run_id"
  # Capture unique run identifier

  capture_contract_name: true
  contract_name_column_name: "_lakelogic_contract_name"
  # Inject contract title into every output row
  # Business value: Identify which contract produced each record

  capture_domain: true
  domain_column_name: "_lakelogic_domain"
  # Capture domain from metadata

  capture_system: true
  system_column_name: "_lakelogic_system"
  # Capture source system from metadata

  # ── Record Creation Tracking ─────────────────────────────
  capture_created_at: true
  created_at_column_name: "_lakelogic_created_at"
  # Stamped when record is FIRST created (immutable on re-runs).
  # Unlike _lakelogic_processed_at (updated every run),
  # this preserves the original insertion timestamp.
  # Business value: Audit trail — when was this record first ingested?

  capture_created_by: true
  created_by_column_name: "_lakelogic_created_by"
  # Captures the user/service principal that created the record.
  # Default: getpass.getuser() — resolves to service principal in Databricks.
  # Business value: Track who/what system inserted each record

  created_by_override: "etl_pipeline_svc"
  # OPTIONAL: Override the default user detection with a static value.
  # Useful for CI/CD pipelines or shared service accounts.
  # If omitted, falls back to getpass.getuser().
  # Can also be passed from pipeline_driver.py or Databricks Widgets.
  # Business value: Consistent identity in multi-user environments

  # ── Upstream Lineage ─────────────────────────────────────
  preserve_upstream: ["_upstream_run_id", "_upstream_source"]
  # Preserve lineage columns from upstream datasets
  # Business value: Multi-hop lineage tracking

  upstream_prefix: "_upstream"
  # Prefix for preserved upstream columns

  run_id_source: "run_id"
  # Options: run_id, pipeline_run_id
  # Use pipeline_run_id for cross-contract correlation

# ============================================================
# 15. SERVICE LEVEL OBJECTIVES
# ============================================================
# OPTIONAL: Contract-level SLO overrides.
# These merge with system-level SLOs defined in _system.yaml.
# Contract values take precedence (deep merge — field by field).
# If a contract doesn't define a specific SLO section, the
# system default applies automatically.
#
# SLO results are captured in the run log for every pipeline run.
# - Load-time freshness:   slo_freshness_seconds / slo_freshness_pass
# - Source-time freshness: slo_source_freshness_seconds / slo_source_freshness_pass
# - Availability:          slo_availability_ratio / slo_availability_pass
# - Row count:             slo_row_count_min / slo_row_count_max
# - Row count anomaly:     slo_row_count_anomaly_pass / _ratio
# - Quality:               slo_quality_pass / _ratio / _severity
# - Schedule:              slo_schedule_pass / slo_duration_seconds
#
# SLO breaches emit "slo_breach" notification events that route
# through the existing notifications system (Apprise/webhook/email).

service_levels:
  freshness:
    threshold: "24h"
    # Data must be refreshed within 24 hours (supports: "30m", "6h", "1d")
    field: "updated_at"
    # Field to check for freshness (MAX of this column vs current time)
    description: "Customer data must be updated daily"
    # Business value: Timeliness monitoring

    # ── Source-Time Freshness (optional) ─────────────────────
    # Checks the actual data age, not just when the pipeline ran.
    # Catches the case where the pipeline runs on time but
    # processes stale upstream data.
    source_field: "event_timestamp"
    # Column containing the upstream source timestamp
    source_threshold: "2h"
    # Max age for source data (separate from load-time freshness)
    # Business value: Distinguish "pipeline is slow" from "upstream data is stale"

  availability:
    threshold: 99.9
    # ── NOTE: This is FIELD COMPLETENESS, not uptime! ──
    # Percentage of rows that must have a non-null value in the specified field.
    # Example: 99.9 means <= 0.1% of rows can have a null value.
    field: "customer_id"
    # Field to measure non-null ratio against
    description: "customer_id must be populated in 99.9% of rows"
    # Business value: Data completeness monitoring

  row_count:
    min_rows: 100
    # Minimum expected rows per pipeline run (fail if below)
    max_rows: 10000000
    # Maximum expected rows per pipeline run (fail if above — catches runaway cartesian joins)
    check_field: "counts_source"
    # Which run log count to validate against:
    #   counts_source: rows received from upstream (default for Bronze)
    #   counts_good:   rows after quality filtering (default)
    #   counts_total:  good + quarantined before filtering
    skip_reprocess_days: 3
    # When the reprocess date range (reprocess_from → reprocess_to) exceeds
    # this many days, SLO row count checks AND counts_source computation
    # are skipped entirely — avoids expensive Spark wide-transformation
    # actions on large backfills where thresholds don't apply.
    # Default: 3 (skip for backfills > 3 days). Set to 0 to never skip.
    description: "Revenue transactions must produce between 100 and 10M rows"
    # Business value: Volumetric anomaly detection — catches empty loads AND data explosions
    # NOTE: min_rows, max_rows, and check_field are captured in the run log
    # at pipeline time for point-in-time auditability.

    warn_only: false
    # If true, SLO breach logs a warning instead of failing the pipeline.
    # Useful during onboarding to observe thresholds before enforcing them.
    # Business value: Risk-free SLO rollout

    # ── Anomaly Detection (optional) ─────────────────────────
    # Compares current run count against historical baseline from
    # the run log. Catches silent data drops or volume spikes
    # that static min/max thresholds miss.
    anomaly:
      enabled: true
      lookback_runs: 14
      # Number of historical runs to compute baseline from.
      # 14 gives a stable baseline; 7 is more sensitive to trends.
      min_ratio: 0.5
      # Alert if row count is <50% of historical baseline
      max_ratio: 2.0
      # Alert if row count is >200% of historical baseline
      method: "median"
      # Options: median (default, robust to outliers), rolling_average
      # median: single spike doesn't permanently inflate baseline
      # rolling_average: more sensitive but skews after anomalies
      min_runs_before_enforcement: 5
      # Skip anomaly checks until at least N historical runs exist.
      # Prevents false positives during initial pipeline setup.
      # Business value: Catches silent data drops (e.g. upstream went quiet)

  # ── Quality (optional — overrides system-level) ───────────
  # Contract-level quality override. If omitted, system-level
  # quality config from _system.yaml applies.
  # quality:
  #   min_good_ratio: 0.99          # stricter than system default (0.95)
  #   max_quarantine_ratio: 0.01

# ============================================================
# 16. EXTERNAL LOGIC
# ============================================================
# OPTIONAL: Custom Python/Notebook processing
external_logic:
  type: "python"
  # Options: python, notebook

  path: "./gold/build_customer_gold.py"
  # Path to Python file or Jupyter notebook

  entrypoint: "build_gold"
  # Function name to call (Python only, default: "run")
  #
  # ─── Required function signature ───────────────────────
  # def build_gold(df, *, contract, engine, **kwargs):
  #     """
  #     Args:
  #         df:       Validated good DataFrame (Polars/Spark)
  #         contract: DataContract instance (read-only)
  #         engine:   Engine name ("polars", "spark")
  #         **kwargs: Values from args: below (e.g. apply_ml_scoring)
  #
  #     Returns:
  #         DataFrame  → replaces good_df for materialization
  #         str/Path   → LakeLogic reads this file as output
  #         None       → original good_df is kept
  #     """
  #     return df
  # ────────────────────────────────────────────────────────
  # Optional extra kwargs (accepted but not required):
  #   add_trace:  callback to append TraceStep objects
  #   trace_step: context manager for traced blocks
  #
  # Runs in a sandboxed thread with timeout (default 300s).
  # Blocked imports: subprocess, shutil, socket.
  # Blocked builtins: exec, eval, compile.

  args:
    apply_ml_scoring: true
    model_path: "s3://models/churn_predictor.pkl"
    target_table: "gold_customers"
  # Custom arguments passed as **kwargs to the function
  # Business value: ML scoring, complex business logic

  output_path: "s3://gold-bucket/customers"
  # Override output path

  output_format: "delta"
  # Override output format

  handles_output: false
  # If true, external logic writes output itself
  # If false, LakeLogic materializes the returned DataFrame

  kernel_name: "python3"
  # Jupyter kernel for notebook execution

# ============================================================
# 17. ORCHESTRATION & DEPENDENCIES
# ============================================================
# OPTIONAL: Pipeline orchestration metadata
upstream:
  - "bronze_crm_contacts"
  - "bronze_web_signups"
# List of upstream datasets this depends on
# Business value: DAG construction in orchestrators

schedule: "0 2 * * *"
# Cron expression for scheduling
# Business value: Automated execution timing

# ============================================================
# 18. TIER / LAYER
# ============================================================
# OPTIONAL: Explicit medallion tier for single-contract mode
tier: "silver"
# Options: bronze, silver, gold, reference
# Also accepts synonyms: raw, landing, ingest → bronze
#                        stage, staging, cleansed, transform → silver
#                        curated, presentation, consumption → gold
#                        ref, seed, lookup, masterdata → reference
# Business value: Automatic tier-aware defaults and classification

# ============================================================
# 19. DOWNSTREAM CONSUMERS
# ============================================================
# OPTIONAL: Declare what uses this contract's output
downstream:
  - type: "dashboard"
    name: "Monthly Revenue Dashboard"
    platform: "power_bi"
    # Options: power_bi, tableau, looker, databricks_sql, metabase, grafana
    url: "https://app.powerbi.com/groups/.../dashboards/..."
    owner: "analytics-team"
    description: "Executive revenue dashboard"
    refresh: "daily 06:00 UTC"
    columns_used: ["customer_segment", "total_revenue", "country"]
    sla: "< 4 hours"
    # Business value: Know who consumes your data

  - type: "ml_model"
    name: "Churn Prediction"
    platform: "mlflow"
    owner: "data-science"
    # Business value: End-to-end lineage from source → gold → ML

  - type: "api"
    name: "Customer Lookup API"
    platform: "internal"
    url: "https://api.internal.com/v1/customers"
    # Types: dashboard, report, api, ml_model, application, notebook, export

# ============================================================
# 20. LLM EXTRACTION (Unstructured → Structured)
# ============================================================
# OPTIONAL: Extract structured data from unstructured text via LLM
extraction:
  provider: "openai"
  # ─── Cloud Providers (require API key via env var) ───
  #   openai       → OPENAI_API_KEY
  #   azure_openai → AZURE_OPENAI_API_KEY + AZURE_OPENAI_ENDPOINT
  #   anthropic    → ANTHROPIC_API_KEY
  #   google       → GOOGLE_API_KEY
  #   bedrock      → AWS credentials (boto3)
  #
  # ─── Local Providers (no API key required) ──────────
  #   ollama       → Local Ollama server (default: http://localhost:11434)
  #                  Override with OLLAMA_BASE_URL env var
  #                  Install Ollama: https://ollama.com
  #   local        → Direct HuggingFace Transformers (Phi-3-mini default)
  #                  No server needed, downloads to ~/.cache/huggingface/
  #                  Install with: pip install lakelogic[local]
  #
  # Global env var overrides:
  #   LAKELOGIC_AI_PROVIDER → default provider for all contracts
  #   LAKELOGIC_AI_MODEL    → default model for all contracts

  model: "gpt-4o-mini"
  # Cloud models: gpt-4o, gpt-4o-mini, claude-sonnet-4-20250514, gemini-2.0-flash
  # Ollama models: llama3.1, mistral, phi3, codellama (any pulled model)
  # Local models: auto (uses per-field extraction_task routing), or any HuggingFace model ID
  temperature: 0.1
  # Low temperature for deterministic extraction
  max_tokens: 1000
  response_format: "json"

  prompt_template: |
    Extract the following from this support ticket:
    {{ ticket_body }}
  system_prompt: "You are a data extraction assistant."

  text_column: "ticket_body"
  # Column containing text to extract from
  context_columns: ["customer_id", "ticket_date"]
  # Extra columns available in prompt template

  output_schema:
    - name: "sentiment"
      type: "string"
      accepted_values: ["positive", "neutral", "negative"]
      extraction_task: "classification"
    - name: "issue_category"
      type: "string"
      extraction_task: "classification"
      extraction_examples: ["billing", "technical", "account"]

  # Processing controls
  batch_size: 50
  concurrency: 5
  retry:
    max_attempts: 3
    backoff: "exponential"
    initial_delay: 1.0

  # Confidence scoring
  confidence:
    enabled: true
    method: "field_completeness"
    # Options: log_probs, self_assessment, consistency, field_completeness
    column: "_lakelogic_extraction_confidence"

  # Cost controls
  max_cost_per_run: 50.00
  max_rows_per_run: 10000
  # Business value: Budget safety for LLM API costs

  # Fallback model (cheaper/faster if primary fails)
  fallback_model: "gpt-4o-mini"
  fallback_provider: "openai"

  # PII safety
  redact_pii_before_llm: true
  pii_fields: ["email", "phone"]
  # Business value: Never send PII to external LLM providers

  # OPTIONAL: Preprocessing pipeline for raw files (PDF, image, audio, video)
  preprocessing:
    content_type: "pdf"
    # Options: pdf, image, video, audio, html, email, text
    ocr:
      enabled: true
      engine: "tesseract"
      # Options: tesseract, azure_di, textract, google_vision
      language: "eng"
    chunking:
      strategy: "page"
      # Options: page, paragraph, sentence, fixed_size
      max_chunk_tokens: 4000
      overlap_tokens: 200
    # Business value: Process PDFs, images, audio, video into structured data

# ============================================================
# 22. COMPLIANCE — Regulatory Metadata
# ============================================================
# OPTIONAL: Multi-framework compliance metadata.
# Supports GDPR, EU AI Act, CCPA, HIPAA, SOX, PIPEDA, LGPD,
# and BCBS 239. Inherits from _domain.yaml → _system.yaml.
#
# → Full reference with examples: contracts/compliance.md

compliance:
  gdpr:
    applicable: true
    legal_basis: "legitimate_interest"
    consent_type: "opt_in"             # opt_in | opt_out | implicit | not_required
    retention_period: "P24M"            # ISO 8601 duration
    dpia_status: "not_required"         # not_required | planned | in_progress | completed
  eu_ai_act:
    applicable: true
    risk_tier: "limited"               # prohibited | high | gpai | limited | minimal
  # ccpa, hipaa, sox, pipeda, lgpd, bcbs_239 — see compliance docs

Common Use Case Templates

Use Case 1: Bronze Ingestion (Capture Everything)

version: 1.0.0
info:
  title: "Bronze CRM Contacts"
  table_name: "{bronze_layer}_{system}_contacts"
  target_layer: "bronze"

# ── The Reader: Defines WHAT and WHEN to read (State Tracking) ────
source:
  type: "landing"
  path: "s3://landing/crm/*.csv"
  load_mode: "incremental"
  watermark_field: "file_modified_time"

# ── The Adapter: Defines HOW to physically parse the raw payload ──
# OPTIONAL: Only strictly needed if connecting to a DB, or if requiring 
# raw format rules (like safely casting messy JSON/CSV to strings).
server:
  type: "s3"
  path: "s3://bronze/crm_contacts"
  format: "parquet"
  mode: "ingest"
  cast_to_string: true    # Safety-net against schema crashes!
  schema_evolution: "append"
  allow_schema_drift: true

quality:
  row_rules:
    - name: "has_id"
      sql: "id IS NOT NULL"

materialization:
  strategy: "append"
  partition_by: ["ingestion_date"]

lineage:
  enabled: true

Use Case 2: Silver Validation (Quality Gate)

version: 1.0.0
info:
  title: "Silver Customers"
  table_name: "{silver_layer}_{system}_customers"
  target_layer: "silver"

source:
  type: "table"
  path: "{domain_catalog}.{bronze_layer}_{system}_customers"
  load_mode: "incremental"
  watermark_strategy: "pipeline_log"

dataset: "customers"
primary_key: ["customer_id"]

model:
  fields:
    - name: "customer_id"
      type: "long"
      required: true
    - name: "email"
      type: "string"
      required: true
      pii: true
    - name: "status"
      type: "string"
      required: true

transformations:
  - sql: |
      WITH ranked_data AS (
        SELECT 
          *,
          ROW_NUMBER() OVER(PARTITION BY customer_id ORDER BY updated_at DESC) as _rn
        FROM source
      )
      SELECT 
        s.* EXCEPT(_rn),
        COALESCE(p.marketing_opt_in, FALSE) AS is_opted_in 
      FROM ranked_data s
      LEFT JOIN {domain_catalog}.{silver_layer}_{system}_preferences p
        ON s.customer_id = p.customer_id
      WHERE s._rn = 1
    phase: "pre"
  # ── Alternative YAML syntax:
  # - deduplicate:
  #     "on": ["customer_id"]
  #     sort_by: ["updated_at"]
  #     order: "desc"
  #   phase: "pre"

quality:
  enforce_required: true
  row_rules:
    - regex_match:
        field: "email"
        pattern: "^[^@]+@[^@]+\\.[^@]+$"
    - accepted_values:
        field: "status"
        values: ["ACTIVE", "INACTIVE"]
  dataset_rules:
    - unique: "customer_id"

quarantine:
  enabled: true
  target: "s3://quarantine/customers"
  notifications:
    - type: "slack"
      target: "https://hooks.slack.com/..."
      on_events: ["quarantine"]

materialization:
  strategy: "merge"
  merge_dedup_guard: true
  partition_by: ["country"]

Use Case 3: Gold Aggregation (Analytics)

version: 1.0.0
info:
  title: "Gold Customer Metrics"
  table_name: "{gold_layer}_{system}_customer_metrics"
  target_layer: "gold"

source:
  type: "table"
  path: "{domain_catalog}.{silver_layer}_{system}_customers"
  load_mode: "incremental"
  watermark_strategy: "pipeline_log"

dataset: "silver_customers"

transformations:
  - sql: |
      SELECT
        customer_segment,
        country,
        DATE_TRUNC('month', created_at) AS month,
        COUNT(*) AS customer_count,
        AVG(lifetime_value) AS avg_ltv,
        SUM(total_orders) AS total_orders
      FROM source
      WHERE status = 'ACTIVE'
      GROUP BY customer_segment, country, month
    phase: "post"
  # ── Alternative YAML syntax:
  # - filter:
  #     sql: "status = 'ACTIVE'"
  #   phase: "post"
  # - derive:
  #     field: "month"
  #     sql: "DATE_TRUNC('month', created_at)"
  #   phase: "post"
  # - rollup:
  #     group_by: ["customer_segment", "country", "month"]
  #     aggregations:
  #       customer_count: "COUNT(*)"              # → output column: customer_count
  #       avg_ltv: "AVG(lifetime_value)"          # → output column: avg_ltv
  #       total_orders: "SUM(total_orders)"       # → output column: total_orders
  #   phase: "post"

materialization:
  strategy: "overwrite"
  partition_by: ["month"]
  fact:
    type: "aggregate"

lineage:
  enabled: true

Use Case 4: Gold Fact Table (Transaction Ledger)

version: 1.0.0
info:
  title: "Gold Revenue Transactions"
  table_name: "{gold_layer}_{system}_fact_revenue"
  target_layer: "gold"

source:
  type: "table"
  path: "{domain_catalog}.{silver_layer}_{system}_transactions"
  load_mode: "incremental"
  watermark_strategy: "pipeline_log"

dataset: "fact_revenue"
primary_key: ["transaction_id"]

model:
  fields:
    - name: "transaction_id"
      type: "string"
      primary_key: true
    - name: "user_pseudo_id"
      type: "string"
      foreign_key:
        contract: "gold_marketing_dim_users"
        column: "user_pseudo_id"
    - name: "revenue_amount"
      type: "double"
    - name: "user_surrogate_key"
      type: "string"
      generated: true
      description: "Fetched dynamically from dim_users via SQL join"

transformations:
  - sql: |
      -- Drive SQL-first point-in-time (SCD2) dimension lookups natively!
      SELECT 
        s.*,
        COALESCE(d.user_key, '-1') AS user_surrogate_key
      FROM source s
      LEFT JOIN {domain_catalog}.{gold_layer}_marketing_dim_users d
        ON s.user_pseudo_id = d.user_pseudo_id
        AND s.transaction_date >= d.effective_from 
        AND s.transaction_date < COALESCE(d.effective_to, '9999-12-31')
    phase: "post"

materialization:
  strategy: "append"
  partition_by: ["transaction_date"]
  fact:
    type: "transaction"

Use Case 5: Gold Fact Table (Accumulating Snapshot)

version: 1.0.0
info:
  title: "Gold Order Funnel"
  table_name: "{gold_layer}_{system}_fact_order_timeline"
  target_layer: "gold"

source:
  type: "table"
  path: "{domain_catalog}.{silver_layer}_{system}_orders"
  load_mode: "incremental"
  watermark_strategy: "pipeline_log"

dataset: "fact_order_timeline"
primary_key: ["order_id"]

model:
  fields:
    - name: "order_id"
      type: "string"
      primary_key: true
    - name: "customer_id"
      type: "string"
      foreign_key:
        contract: "gold_sales_dim_customers"
        column: "customer_id"
    - name: "placed_date"
      type: "timestamp"
    - name: "shipped_date"
      type: "timestamp"
      nullable: true
      milestone: true
    - name: "delivered_date"
      type: "timestamp"
      nullable: true
      milestone: true
    - name: "customer_surrogate_key"
      type: "string"
      generated: true

transformations:
  - sql: |
      SELECT 
        o.*,
        COALESCE(c.customer_key, '-1') AS customer_surrogate_key
      FROM source o
      LEFT JOIN {domain_catalog}.{gold_layer}_sales_dim_customers c
        ON o.customer_id = c.customer_id
        AND o.placed_date >= c.effective_from 
        AND o.placed_date < COALESCE(c.effective_to, '9999-12-31')
    phase: "post"

materialization:
  strategy: "merge"
  fact:
    type: "accumulating_snapshot"
    milestone_dates:
      - "placed_date"
      - "shipped_date"
      - "delivered_date"

Use Case 6: Gold Fact Table (Periodic Snapshot)

version: 1.0.0
info:
  title: "Gold Daily Account Balances"
  table_name: "{gold_layer}_{system}_fact_account_balance"
  target_layer: "gold"

source:
  type: "table"
  path: "{domain_catalog}.{silver_layer}_{system}_accounts"
  load_mode: "incremental"
  watermark_strategy: "pipeline_log"

dataset: "fact_account_balance"
primary_key: ["account_id", "snapshot_date"]

model:
  fields:
    - name: "account_id"
      type: "string"
      primary_key: true
      foreign_key:
        contract: "gold_sales_dim_accounts"
        column: "account_id"
    - name: "snapshot_date"
      type: "date"
      primary_key: true
    - name: "ending_balance"
      type: "double"
    - name: "account_surrogate_key"
      type: "string"
      generated: true

transformations:
  - sql: |
      SELECT 
        b.*,
        COALESCE(a.account_key, '-1') AS account_surrogate_key
      FROM source b
      LEFT JOIN {domain_catalog}.{gold_layer}_sales_dim_accounts a
        ON b.account_id = a.account_id
        AND b.snapshot_date >= a.effective_from 
        AND b.snapshot_date < COALESCE(a.effective_to, '9999-12-31')
    phase: "post"

materialization:
  strategy: "append"
  partition_by: ["snapshot_date"]
  fact:
    type: "periodic_snapshot"

Use Case 7: Gold Fact Table (Factless)

version: 1.0.0
info:
  title: "Gold Student Attendance"
  table_name: "{gold_layer}_{system}_fact_attendance"
  target_layer: "gold"

source:
  type: "table"
  path: "{domain_catalog}.{silver_layer}_{system}_events"
  load_mode: "incremental"
  watermark_strategy: "pipeline_log"

dataset: "fact_attendance"
primary_key: ["student_id", "class_id", "date"]

model:
  fields:
    - name: "student_id"
      type: "string"
      foreign_key:
        contract: "dim_students"
        column: "student_id"
    - name: "class_id"
      type: "string"
    - name: "date"
      type: "date"
    - name: "student_surrogate_key"
      type: "string"
      generated: true

transformations:
  - sql: |
      SELECT 
        e.*,
        COALESCE(s.student_key, '-1') AS student_surrogate_key
      FROM source e
      LEFT JOIN {domain_catalog}.{gold_layer}_school_dim_students s
        ON e.student_id = s.student_id
        AND e.date >= s.effective_from 
        AND e.date < COALESCE(s.effective_to, '9999-12-31')
    phase: "post"

materialization:
  strategy: "append"
  partition_by: ["date"]
  fact:
    type: "factless"
    # LakeLogic automatically verifies no metric/numeric columns exist here!

Use Case 8: SCD Type 1 (Upsert & Unknown Member)

version: 1.0.0
info:
  title: "Silver Products (SCD1)"
  table_name: "{silver_layer}_{system}_dim_products"
  target_layer: "silver"

source:
  type: "table"
  path: "{domain_catalog}.{bronze_layer}_{system}_products"
  load_mode: "incremental"
  watermark_strategy: "pipeline_log"

dataset: "dim_products"
primary_key: ["product_id"]

model:
  fields:
    - name: "product_id"
      type: "string"
      primary_key: true
    - name: "product_name"
      type: "string"
    - name: "category"
      type: "string"
    - name: "product_key"
      type: "string"
      generated: true
      description: "Surrogate key - SHA2 hash of product_id"

materialization:
  strategy: "merge"
  scd1:
    surrogate_key: "product_key"
    surrogate_key_strategy: "hash"
    unknown_member: true
    # Automatically generates a -1 hash record to catch late-arriving facts

Use Case 9: SCD Type 2 (History Tracking)

version: 1.0.0
info:
  title: "Silver Customer History (SCD2)"
  table_name: "{silver_layer}_{system}_dim_customers"
  target_layer: "silver"

source:
  type: "table"
  path: "{domain_catalog}.{bronze_layer}_{system}_customers"
  load_mode: "incremental"
  watermark_strategy: "pipeline_log"

dataset: "customers"
primary_key: ["customer_key"]
natural_key: ["customer_id"]

model:
  fields:
    - name: "customer_key"
      type: "string"
      primary_key: true
      generated: true
      description: "Surrogate key - SHA2 hash of customer_id + valid_from"
    - name: "customer_id"
      type: "string"
    - name: "email"
      type: "string"
    - name: "status"
      type: "string"
    - name: "address"
      type: "string"
    - name: "updated_at"
      type: "timestamp"
    - name: "valid_from"
      type: "timestamp"
      generated: true
    - name: "valid_to"
      type: "timestamp"
      generated: true
      nullable: true
    - name: "is_current"
      type: "boolean"
      generated: true

materialization:
  strategy: "scd2"
  scd2:
    timestamp_field: "updated_at"
    start_date_field: "valid_from"
    end_date_field: "valid_to"
    current_flag_field: "is_current"
    end_date_default: "9999-12-31"
    track_columns: ["email", "status", "address"]

Use Case 10: Marketing Data with Compliance (GDPR + EU AI Act)

version: 1.0.0
info:
  title: "Bronze Marketing Events"
  table_name: "{bronze_layer}_{system}_events"
  target_layer: "bronze"
  domain: "marketing"
  system: "klaviyo"

source:
  type: "landing"
  path: "s3://landing/klaviyo/events/"
  load_mode: "incremental"
  watermark_strategy: "manifest"

model:
  fields:
    - name: "event_id"
      type: "string"
      required: true
    - name: "email"
      type: "string"
      pii: true
      pii_classification: "email_address"
    - name: "phone_number"
      type: "string"
      pii: true
      pii_classification: "phone_number"
    - name: "event_name"
      type: "string"
      required: true
    - name: "timestamp"
      type: "timestamp"
      required: true

downstream:
  - name: "Marketing Engagement Dashboard"
    type: "dashboard"
    platform: "Tableau"
    owner: "Marketing Team"

lineage:
  enabled: true
  upstream:
    - name: "klaviyo_api"
      type: "external_api"

compliance:
  gdpr:
    applicable: true
    legal_basis: "legitimate_interest"
    purpose: "Marketing campaign analytics"
    retention_period: "P24M"             # ISO 8601 duration
    consent_type: "opt_in"               # opt_in | opt_out | implicit | not_required
  eu_ai_act:
    applicable: true
    risk_tier: "limited"                 # prohibited | high | gpai | limited | minimal

_system.yaml Placeholders & Layer Aliases

Contracts support placeholder variables resolved from the domain registry's _system.yaml. This keeps paths DRY and portable across environments.

Available Placeholders

# _system.yaml — domain-level configuration
domain: marketing
system: google_analytics

# Configurable layer aliases (default values shown)
bronze_layer: bronze           # → {bronze_layer} in contracts
silver_layer: silver           # → {silver_layer}
gold_layer: gold               # → {gold_layer}

# Storage paths
bronze_path: "abfss://bronze@acct.dfs.core.windows.net"
silver_path: "abfss://silver@acct.dfs.core.windows.net"
gold_path: "abfss://gold@acct.dfs.core.windows.net"
landing_root: "abfss://landing@acct.dfs.core.windows.net"
log_root: "abfss://logs@acct.dfs.core.windows.net"

# Catalog
domain_catalog: "retail_marketing"

# Materialization Defaults
materialization:
  silver:
    strategy: merge
    format: delta
    merge_dedup_guard: true    # Enable safety guard for all silver contracts in this domain

Placeholder Usage in Contracts

source:
  path: "{landing_root}/events"

materialization:
  target_path: "{bronze_path}/{bronze_layer}_{system}_events"
  location: "{bronze_path}/{bronze_layer}/{domain}/{system}/events"

quarantine:
  target: "{bronze_path}/{bronze_layer}_{system}_events_quarantine"

metadata:
  run_log_dir: "{log_root}/runs/{domain}/{system}/{bronze_layer}_events"
  run_log_table: "{domain_catalog}._run_logs"

Layer Alias Flexibility

Override layer names per domain — useful for migrating naming conventions:

# _system.yaml
bronze_layer: raw        # {bronze_layer} → "raw" everywhere
silver_layer: cleansed    # {silver_layer} → "cleansed"
gold_layer: curated       # {gold_layer} → "curated"

Run Log Backend Selection

# Metadata in contract or _system.yaml defaults
metadata:
  # Auto-detected from engine:
  #   spark  → Spark Delta table (Unity Catalog)
  #   polars → Delta table via delta-rs (cloud or local)
  #   duckdb → DuckDB file
  run_log_table: "{domain_catalog}._run_logs"

  # Explicit override:
  # run_log_backend: delta    # spark | delta | duckdb | sqlite

Pipeline Runtime Parameters

from lakelogic.pipeline import LakehousePipeline

pipeline = LakehousePipeline(registry)

# Daily run — uses contract defaults (lookback_days: 3)
pipeline.run()

# Override lookback at runtime
pipeline.run(lookback_days=30)

# Backfill — auto-scopes partition scan to date range
pipeline.run(reprocess_from="2026-01-01", reprocess_to="2026-03-22")

_system.yaml: External Sources (Cross-Domain Lineage)

When your domain consumes tables managed by another domain's pipeline, declare them as external_sources so the DAG shows the full lineage:

# In _system.yaml
external_sources:
  - name: "silver_ga4_sessions"
    catalog_path: "catalog.silver.ga4_sessions"
    source_domain: "marketing/google_analytics"
    consumed_by: ["gold_marketing_funnel", "gold_attribution"]
    # consumed_by: entity names of contracts in THIS registry that read this table

  - name: "silver_crm_customers"
    catalog_path: "catalog.silver.crm_customers"
    source_domain: "sales/crm"
    consumed_by: ["gold_customer_360"]

What this does:

  • 🌐 External nodes appear in the DAG with dashed borders (teal) in an "EXTERNAL" column
  • Edges connect external sources → consuming contracts. Generic entity matches (e.g. ['events']) automatically restrict to the Bronze layer to prevent duplicates, but you can explicitly map to other layers via ID (e.g. ['silver_events']).
  • Metadata-only — LakeLogic does not orchestrate the external pipeline
  • Late-arriving data in the external source is picked up automatically if consuming contracts use watermark_field: _lakelogic_processed_at

Contract YAML: Downstream Consumers (DAG Lineage)

When a contract is consumed by reports, dashboards, APIs, or external teams, declare them at the top level of the contract file (downstream:) to visualize them in the DAG:

# In cross-domain or Gold contracts
downstream:
  - name: "Executive KPI Dashboard"
    type: "dashboard"    # Icons: dashboard (📊), api (🔌), report (📈), table (📋)
    platform: "PowerBI"
    owner: "Executive Team"

  - name: "Churn Prediction Model"
    type: "api"
    platform: "Databricks Model Serving"
    owner: "Data Science"

What this does: - 🌐 Creates purple DOWNSTREAM nodes at the very end of your DAG hierarchy. - Edges connect the current contract → the defined business consumers. - Helps teams immediately identify which critical business assets will be impacted by upstream schema drift or freshness delays.


SCD1 (Merge) Notes

SCD1 uses strategy: merge with the contract-level primary_key for the merge ON condition:

primary_key: [customer_id]

materialization:
  strategy: merge
  # Merge uses primary_key to match rows (NOT incremental_key)
  # incremental_key is only used for source filtering (what's new?)

Key behaviors: - Matched rows → UPDATE all non-key columns - Unmatched incoming rows → INSERT - Unmatched existing rows → no change (kept as-is) - _lakelogic_processed_at → updated on every merge (serves as "last modified") - _lakelogic_created_at → immutable (preserves first-insert time)


Quick Reference: When to Use What

Feature Bronze Silver Gold
tier bronze silver gold
server.mode ingest validate validate
server.cast_to_string true false false
server.schema_evolution append strict strict
source.partition Recommended N/A N/A
source.partition.lookback_days 1-7 N/A N/A
source.flatten_nested N/A true / [cols] true / [cols]
quality.enforce_required false true true
quality.row_rules Minimal Full Minimal
quality.dataset_rules None Yes Yes
materialization.strategy append merge/scd2 overwrite
lineage.enabled true true true
metadata.run_log_table Optional Optional Optional
downstream N/A Optional Recommended
external_sources N/A N/A Via _system.yaml
extraction Optional N/A N/A
cloud.enabled Optional Optional Optional
external_logic N/A N/A Optional
compliance Recommended Recommended Optional

For more examples, see the LakeLogic Examples directory.