Skip to content

Complete Contract Template Reference

This is a fully annotated contract template showing every available configuration option with detailed comments explaining business value and use case scenarios.

Use this as a reference to understand what's possible, then create your own contracts by copying only the sections you need.


Template Structure

# ============================================================
# LAKELOGIC DATA CONTRACT - COMPLETE REFERENCE TEMPLATE
# ============================================================
# This template shows ALL available options with detailed comments.
# Copy and customize only the sections you need for your use case.

# ============================================================
# 1. VERSION & METADATA
# ============================================================
# REQUIRED: Contract version for compatibility tracking
version: 1.0.0

# OPTIONAL: Human-readable metadata about this contract
info:
  title: "Customer Master Data - Silver Layer"
  # Business value: Clear identification in logs and monitoring

  version: "2.1.0"
  # Business value: Track contract evolution over time

  description: "Validated, deduplicated customer records with full quality enforcement"
  # Business value: Documentation for team members and stakeholders

  owner: "data-platform-team@company.com"
  # Business value: Clear ownership for questions and incidents

  contact:
    email: "data-platform@company.com"
    slack: "#data-quality"
  # Business value: Quick escalation path for issues

  target_layer: "silver"
  # Business value: Clarifies position in medallion architecture

  status: "production"
  # Options: development, staging, production, deprecated
  # Business value: Lifecycle management

  classification: "confidential"
  # Options: public, internal, confidential, restricted
  # Business value: Data governance and compliance

  domain: "sales"
  # Data mesh domain (e.g. "sales", "finance", "real-estate")
  # Business value: Domain ownership in data mesh architectures

  system: "crm"
  # Source system identifier (e.g. "salesforce", "sap", "zoopla")
  # Business value: Trace data back to source system

# OPTIONAL: Custom metadata for tagging and organization
metadata:
  domain: "sales"
  # Business value: Data mesh domain ownership

  system: "crm"
  # Business value: Source system identification

  data_layer: "silver"
  # Business value: Medallion layer classification

  pii_present: true
  # Business value: Privacy compliance tracking

  retention_days: 2555
  # Business value: Data retention policy (7 years)

  cost_center: "CC-1234"
  # Business value: Chargeback and cost allocation

  sla_tier: "tier1"
  # Business value: SLA classification (tier1 = critical)

# ============================================================
# 2. DATA SOURCE CONFIGURATION
# ============================================================
# OPTIONAL: Where to load data from (for run_source() method)
source:
  type: "landing"
  # Options: landing (files), stream (Kafka), table (database/catalog)
  # Business value: Defines acquisition pattern

  path: "s3://bronze-bucket/customers/*.parquet"
  # Supports: Local paths, S3, GCS, ADLS, glob patterns
  # Business value: Flexible source location

  load_mode: "incremental"
  # Options: full, incremental, cdc
  # Business value: Optimize processing (only new/changed data)

  pattern: "*.parquet"
  # OPTIONAL: File pattern filter
  # Business value: Select specific files from directory

  watermark_field: "updated_at"
  # REQUIRED for incremental: Field to track progress
  # Business value: Efficient incremental loading

  cdc_op_field: "operation"
  # REQUIRED for cdc: Field indicating operation type
  # Business value: Change data capture support

  cdc_delete_values: ["D", "DELETE"]
  # Values indicating delete operations
  # Business value: Handle deletes in CDC streams

  watermark_strategy: "max_target"
  # Options: max_target, pipeline_log, manifest, lookback, date_range
  # max_target: MAX(watermark_field) on target Delta table (default)
  # pipeline_log: last successful run from audit log table
  # manifest: JSON manifest file listing processed partitions
  # lookback: sliding window back from NOW (e.g. "7 days")
  # date_range: explicit from_date / to_date (backfills)
  # Business value: Flexible incremental boundary resolution

  target_path: "s3://silver-bucket/customers"
  # REQUIRED for max_target strategy: target table path

  lookback: "7 days"
  # For lookback strategy: "3 hours", "30 mins", "1 month" etc.

  from_date: "2024-01-01"
  to_date: "2024-12-31"
  # For date_range strategy: explicit ISO date boundaries

  pipeline_log_table: "meta.pipeline_runs"
  pipeline_name: "bronze_to_silver_customers"
  # For pipeline_log strategy: audit table and pipeline ID

  manifest_path: "/dbfs/mnt/meta/manifests/customers.json"
  # For manifest strategy: JSON manifest file path

  watermark_date_parts: ["year", "month", "day"]
  # Multi-column partition support when temporal boundary is
  # spread across multiple columns instead of a single date field
  # Also accepts named dict: {year: "partition_year", month: "partition_month"}
  # Business value: Works with Hive-style year/month/day partitions

  partition_filters:
    country: "GB"
    region: "south"
  # Static partition values ANDed into every incremental filter
  # Business value: Scope incremental reads to specific partitions

  flatten_nested: ["derived", "pricing", "location"]
  # Options: false (default), true (all), [col, col, ...] (named)
  # Flatten JSON-string columns from bronze tables into flat columns
  # Business value: Bronze → Silver workflows with nested JSON data

# ============================================================
# 3. SERVER/STORAGE CONFIGURATION
# ============================================================
# OPTIONAL: Output storage and ingestion controls
server:
  type: "s3"
  # Options: s3, gcs, adls, azure, local, glue
  # Business value: Cloud platform flexibility

  format: "delta"
  # Options: parquet, delta, iceberg, csv, json
  # Business value: Choose optimal storage format

  path: "s3://silver-bucket/customers"
  # Output location for materialized data
  # Business value: Centralized data lake organization

  mode: "validate"
  # Options: validate (quality gate), ingest (raw capture)
  # Business value: Bronze uses "ingest", Silver/Gold use "validate"

  schema_evolution: "strict"
  # Options: strict, append, merge, overwrite
  # strict: Fail on schema changes (production safety)
  # append: Allow new fields (flexible ingestion)
  # merge: Merge new fields into schema (smart evolution)
  # overwrite: Replace schema completely (reprocessing)
  # Business value: Control schema change behavior

  allow_schema_drift: false
  # If true, log drift but don't fail
  # Business value: Monitoring vs enforcement trade-off

  cast_to_string: false
  # If true, cast all columns to string (Bronze "all strings" pattern)
  # Business value: Zero ingestion failures, defer type validation

# ============================================================
# 4. ENVIRONMENT-SPECIFIC OVERRIDES
# ============================================================
# OPTIONAL: Override paths/formats per environment
environments:
  dev:
    path: "s3://dev-bucket/customers"
    format: "parquet"
    # Business value: Cheaper storage for dev/test

  staging:
    path: "s3://staging-bucket/customers"
    format: "delta"
    # Business value: Production-like testing

  prod:
    path: "s3://prod-bucket/customers"
    format: "delta"
    # Business value: Production configuration

# Usage: export LAKELOGIC_ENV=dev

# ============================================================
# 5. REFERENCE DATA LINKS
# ============================================================
# OPTIONAL: Link to dimension tables or reference data
links:
  - name: "dim_countries"
    path: "./reference/countries.parquet"
    type: "parquet"
    # Options: parquet, csv, table
    broadcast: true
    # If true, Spark will broadcast join (for small tables)
    # Business value: Lookup/join enrichment, referential integrity

  - name: "dim_products"
    table: "catalog.reference.products"
    type: "table"
    broadcast: false
    # Business value: Unity Catalog / Hive table reference

  - name: "valid_emails"
    path: "s3://reference/email_domains.csv"
    type: "csv"
    # Business value: External validation lists

# ============================================================
# 6. DATASET IDENTIFICATION
# ============================================================
# OPTIONAL: Logical dataset name (used in SQL transformations)
dataset: "customers"
# Business value: SQL table alias in transformations

# OPTIONAL: Business key(s) for the entity
primary_key:
  - "customer_id"
# Business value: Uniqueness validation, merge operations

# ============================================================
# 7. SCHEMA MODEL
# ============================================================
# OPTIONAL: Define expected schema with types and constraints
model:
  fields:
    - name: "customer_id"
      type: "long"
      # Types: string, int, long, double, boolean, date, timestamp
      required: true
      # If true, generates automatic not_null rule
      pii: false
      classification: "public"
      description: "Unique customer identifier"
      # Business value: Schema documentation and enforcement

      # OPTIONAL: Generator hints (used by DataGenerator)
      accepted_values: ["premium", "standard", "basic"]
      # Generator picks from this list; validator checks IN rule
      min: 1
      max: 999999
      # Generator stays within range; validator checks >= / <= rules

      # OPTIONAL: Foreign key reference to another contract
      foreign_key:
        contract: "silver_agents"
        column: "agent_id"
        severity: "error"
      # Generator samples from PK pool of referenced contract
      # Business value: Referential integrity + synthetic data generation

      # OPTIONAL: Field-level quality rules
      rules:
        - name: "customer_id_positive"
          sql: "customer_id > 0"
          category: "correctness"
          severity: "error"
          # Business value: Field-specific validation

    - name: "email"
      type: "string"
      required: true
      pii: true
      classification: "confidential"
      description: "Customer email address"
      rules:
        - name: "email_format"
          sql: "email RLIKE '^[^@]+@[^@]+\\.[^@]+$'"
          category: "correctness"
          description: "Valid email format"

    - name: "age"
      type: "int"
      required: false
      pii: false
      classification: "internal"
      description: "Customer age in years"
      rules:
        - name: "age_range"
          sql: "age BETWEEN 18 AND 120"
          category: "correctness"

    - name: "status"
      type: "string"
      required: true
      pii: false
      description: "Customer account status"

    - name: "created_at"
      type: "timestamp"
      required: true
      pii: false
      description: "Account creation timestamp"

    - name: "updated_at"
      type: "timestamp"
      required: true
      pii: false
      description: "Last update timestamp"

# ============================================================
# 8. SCHEMA POLICY
# ============================================================
# OPTIONAL: How to handle schema evolution and unknown fields
schema_policy:
  evolution: "strict"
  # Options: strict, compatible, allow
  # strict: Fail on any schema change
  # compatible: Allow backward-compatible changes
  # allow: Accept all changes
  # Business value: Production safety vs flexibility

  unknown_fields: "quarantine"
  # Options: quarantine, drop, allow
  # quarantine: Send rows with unknown fields to quarantine
  # drop: Remove unknown fields
  # allow: Keep unknown fields
  # Business value: Handle unexpected columns

# ============================================================
# 9. TRANSFORMATIONS
# ============================================================
# OPTIONAL: Data transformations (pre and post quality checks)
transformations:
  # ──────────────────────────────────────────────────────
  # PRE-PROCESSING (before quality checks)
  # ──────────────────────────────────────────────────────

  # Rename columns to standardize naming
  - rename:
      from: "cust_id"
      to: "customer_id"
    phase: "pre"
    # Business value: Align source schema to target schema

  # Or rename multiple columns at once
  - rename:
      mappings:
        "cust_id": "customer_id"
        "email_addr": "email"
        "cust_status": "status"
    phase: "pre"

  # Drop junk rows early
  - filter:
      sql: "customer_id IS NOT NULL AND email IS NOT NULL"
    phase: "pre"
    # Business value: Remove obvious garbage before validation

  # Deduplicate before validation
  - deduplicate:
      on: ["customer_id"]
      sort_by: ["updated_at"]
      order: "desc"
    phase: "pre"
    # Business value: Keep most recent record per customer

  # Select specific columns
  - select:
      columns: ["customer_id", "email", "age", "status"]
    phase: "pre"
    # Business value: Drop unnecessary columns

  # Drop specific columns
  - drop:
      columns: ["internal_notes", "temp_field"]
    phase: "pre"
    # Business value: Remove sensitive or temporary fields

  # Cast data types
  - cast:
      columns:
        customer_id: "long"
        age: "int"
        created_at: "timestamp"
    phase: "pre"
    # Business value: Type coercion before validation

  # Trim whitespace
  - trim:
      fields: ["email", "status"]
      side: "both"
    # Options: both, left, right
    phase: "pre"
    # Business value: Clean string data

  # Convert to lowercase
  - lower:
      fields: ["email", "status"]
    phase: "pre"
    # Business value: Normalize string values

  # Convert to uppercase
  - upper:
      fields: ["country_code"]
    phase: "pre"
    # Business value: Standardize codes

  # Coalesce multiple fields
  - coalesce:
      field: "email"
      sources: ["primary_email", "secondary_email", "backup_email"]
      default: "unknown@example.com"
      output: "email"
    phase: "pre"
    # Business value: Fill nulls from multiple sources

  # Split string into array
  - split:
      field: "tags"
      delimiter: ","
      output: "tag_array"
    phase: "pre"
    # Business value: Parse delimited strings

  # Explode array into rows
  - explode:
      field: "tag_array"
      output: "tag"
    phase: "pre"
    # Business value: Normalize nested data

  # Map values
  - map_values:
      field: "status"
      mapping:
        "A": "ACTIVE"
        "I": "INACTIVE"
        "P": "PENDING"
      default: "UNKNOWN"
      output: "status"
    phase: "pre"
    # Business value: Standardize code values

  # ──────────────────────────────────────────────────────
  # POST-PROCESSING (after quality checks, on good data)
  # ──────────────────────────────────────────────────────

  # Derive new fields
  - derive:
      field: "age_group"
      sql: "CASE WHEN age < 25 THEN 'young' WHEN age < 65 THEN 'adult' ELSE 'senior' END"
    phase: "post"
    # Business value: Calculated fields for analytics

  # Lookup/join dimension data
  - lookup:
      field: "country_name"
      reference: "dim_countries"
      on: "country_code"
      key: "code"
      value: "name"
      default_value: "Unknown"
    phase: "post"
    # Business value: Enrich with reference data

  # Full join with multiple fields
  - join:
      reference: "dim_products"
      on: "product_id"
      key: "id"
      fields: ["product_name", "category", "price"]
      type: "left"
      # Options: left, inner, right, full
      prefix: "product_"
      defaults:
        product_name: "Unknown Product"
        category: "Uncategorized"
    phase: "post"
    # Business value: Multi-field enrichment

  # SQL transformation
  - sql: |
      SELECT
        *,
        DATEDIFF(CURRENT_DATE, created_at) AS days_since_signup,
        CASE
          WHEN status = 'ACTIVE' AND age > 65 THEN 'senior_active'
          WHEN status = 'ACTIVE' THEN 'active'
          ELSE 'inactive'
        END AS segment
      FROM source
    phase: "post"
    # Business value: Complex transformations

  # Rollup/aggregation with lineage tracking
  - rollup:
      group_by: ["customer_segment", "country"]
      aggregations:
        total_customers: "COUNT(*)"
        avg_age: "AVG(age)"
        total_revenue: "SUM(lifetime_value)"
      keys: "customer_id"
      # Track which customer IDs rolled into each group
      rollup_keys_column: "_lakelogic_rollup_keys"
      rollup_keys_count_column: "_lakelogic_rollup_keys_count"
      upstream_run_id_column: "_upstream_run_id"
      upstream_run_ids_column: "_upstream_lakelogic_run_ids"
      distinct: true
    phase: "post"
    # Business value: Aggregation with full lineage

  # Pivot long metrics into wide columns
  - pivot:
      id_vars: ["customer_id"]
      pivot_col: "metric"
      value_cols: ["value"]
      values: ["clicks", "impressions"]
      # values list required for portable SQL pivot
      agg: "sum"
      name_template: "{pivot_alias}"
    phase: "post"
    # Business value: Wide analytics-ready metrics

  # Unpivot wide columns back to long form
  - unpivot:
      id_vars: ["customer_id"]
      value_vars: ["clicks", "impressions"]
      key_field: "metric"
      value_field: "value"
      include_nulls: false
    phase: "post"
    # Business value: Normalize wide metrics to long rows

  # Bucket numeric values into labelled bands
  - bucket:
      field: "price_band"
      source: "listing_price"
      bins:
        - lt: 250000
          label: "sub_250k"
        - lt: 500000
          label: "250k_500k"
        - lt: 1000000
          label: "500k_1m"
      default: "1m_plus"
    phase: "post"
    # Compiles to SQL CASE expression — identical across all engines
    # Business value: Categorize values into named segments

  # Extract scalar values from JSON string columns
  - json_extract:
      field: "latitude"
      source: "location_coordinates"
      path: "$.latitude"
      cast: "float"
    phase: "post"
    # Engine-agnostic: Polars/DuckDB/Spark each use native JSON ops
    # Business value: Parse nested JSON without full flattening

  # Compute date differences
  - date_diff:
      field: "listing_age_days"
      from_col: "creation_date"
      to_col: "event_date"
      unit: "days"
    # Options: days, hours, months
    phase: "post"
    # Business value: Calculate durations between events

  # Explode date ranges into one row per day
  - date_range_explode:
      output: "snapshot_date"
      start_col: "creation_date"
      end_col: "deleted_at"
      # end_col is nullable — defaults to today when null
      interval: "1d"
    phase: "post"
    # Business value: Create daily snapshots from validity ranges

# ============================================================
# 10. QUALITY RULES
# ============================================================
# OPTIONAL: Data quality validation rules
quality:
  enforce_required: true
  # If true, generate not_null rules for required fields
  # Business value: Automatic completeness checks

  # ──────────────────────────────────────────────────────
  # ROW-LEVEL RULES (quarantine individual bad rows)
  # ──────────────────────────────────────────────────────
  row_rules:
    # Simple not-null check
    - not_null: "email"
      # Business value: Ensure critical fields are populated

    # Not-null with custom config
    - not_null:
        field: "customer_id"
        name: "customer_id_required"
        category: "completeness"
        description: "Customer ID must be present"
        severity: "error"
      # Severity: error (quarantine), warning (log only), info

    # Multiple not-null checks
    - not_null:
        fields: ["email", "status", "created_at"]
        category: "completeness"

    # Accepted values (enum validation)
    - accepted_values:
        field: "status"
        values: ["ACTIVE", "INACTIVE", "PENDING", "SUSPENDED"]
        category: "consistency"
        description: "Status must be valid enum value"
      # Business value: Enforce controlled vocabularies

    # Regex pattern matching
    - regex_match:
        field: "email"
        pattern: "^[^@]+@[^@]+\\.[^@]+$"
        category: "correctness"
        description: "Email must be valid format"
      # Business value: Format validation

    # Numeric range validation
    - range:
        field: "age"
        min: 18
        max: 120
        inclusive: true
        category: "correctness"
        description: "Age must be between 18 and 120"
      # Business value: Plausibility checks

    # Referential integrity (foreign key)
    - referential_integrity:
        field: "country_code"
        reference: "dim_countries"
        key: "code"
        category: "consistency"
        description: "Country code must exist in reference table"
      # Business value: Prevent orphaned records

    # Lifecycle window validation (SCD Type 2)
    - lifecycle_window:
        event_ts: "order_date"
        event_key: "customer_id"
        reference: "dim_customers"
        reference_key: "customer_id"
        start_field: "valid_from"
        end_field: "valid_to"
        end_default: "9999-12-31"
        category: "consistency"
        description: "Order must fall within customer validity window"
      # Business value: Temporal referential integrity

    # Custom SQL rule
    - name: "email_domain_valid"
      sql: "email NOT LIKE '%@temp-mail.%' AND email NOT LIKE '%@disposable.%'"
      category: "validity"
      description: "Block disposable email domains"
      severity: "error"
      # Business value: Business-specific validation

  # ──────────────────────────────────────────────────────
  # DATASET-LEVEL RULES (aggregate checks on good data)
  # ──────────────────────────────────────────────────────
  dataset_rules:
    # Uniqueness check
    - unique: "customer_id"
      # Business value: Prevent duplicates

    # Uniqueness with custom config
    - unique:
        field: "email"
        name: "email_unique"
        category: "uniqueness"
        description: "Email addresses must be unique"
        severity: "error"

    # Null ratio threshold
    - null_ratio:
        field: "phone"
        max: 0.20
        # Max 20% null values allowed
        category: "completeness"
        description: "Phone number should be present for most customers"
      # Business value: Data completeness monitoring

    # Row count validation
    - row_count_between:
        min: 1000
        max: 10000000
        category: "completeness"
        description: "Expected customer count range"
      # Business value: Detect missing or duplicate data

    # Custom SQL dataset rule
    - name: "active_customer_ratio"
      sql: "SELECT SUM(CASE WHEN status = 'ACTIVE' THEN 1 ELSE 0 END) / COUNT(*) FROM source"
      category: "validity"
      description: "At least 60% of customers should be active"
      must_be_greater_than: 0.60
      # Business value: Business metric validation

# ============================================================
# 11. QUARANTINE CONFIGURATION
# ============================================================
# OPTIONAL: Quarantine settings and notifications
quarantine:
  enabled: true
  # If false, pipeline fails on any quality rule failure
  # Business value: Fail-fast vs graceful degradation

  target: "s3://quarantine-bucket/customers"
  # Where to write quarantined records
  # Business value: Centralized bad data repository

  include_error_reason: true
  # If true, include _lakelogic_errors column
  # Business value: Root cause analysis

  strict_notifications: true
  # If true, fail pipeline if notification fails
  # Business value: Ensure alerts are delivered

  format: "parquet"
  # Options: parquet (default), csv, delta, json
  # Output format for file-based quarantine targets
  # Business value: Match quarantine format to your tooling

  write_mode: "append"
  # Options: append (default), overwrite
  # append: add bad records to existing quarantine data
  # overwrite: replace quarantine target on every run
  # Business value: Control quarantine growth vs freshness

  # ──────────────────────────────────────────────────────
  # NOTIFICATION CHANNELS
  # ──────────────────────────────────────────────────────
  notifications:
    # Default notification via Apprise (auto-detects channel from URL)
    - target: "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
      on_events: ["quarantine", "failure", "schema_drift"]
      subject_template: "[{{ event | upper }}] {{ contract.title }}"
      message_template: "Run={{ run_id }}\nMessage={{ message }}"
      # type defaults to "apprise" — auto-detects Slack, Teams, email etc.
      # Events: quarantine, failure, schema_drift, dataset_rule_failed
      # Business value: Real-time alerting

    # Microsoft Teams notification
    - type: "teams"
      channel: "https://outlook.office.com/webhook/YOUR/WEBHOOK/URL"
      on_events: ["quarantine", "failure"]

    # Email notification
    - type: "email"
      to: "data-platform@company.com"
      subject_template_file: "templates/alerts/failure_subject.j2"
      message_template_file: "templates/alerts/failure_body.j2"
      on_events: ["failure", "dataset_rule_failed"]
      # Requires SMTP configuration

    # Generic webhook
    - type: "webhook"
      url: "https://api.company.com/data-quality/alerts"
      on_events: ["quarantine", "failure", "schema_drift"]
      # Business value: Integration with custom systems

    # Multi-channel fan-out (send to multiple URLs at once)
    - targets:
        - "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
        - "https://outlook.office.com/webhook/YOUR/WEBHOOK/URL"
      on_events: ["failure"]
      # Business value: Broadcast critical alerts to multiple channels

# ============================================================
# 12. MATERIALIZATION
# ============================================================
# OPTIONAL: How to write output data
materialization:
  strategy: "merge"
  # Options: append, merge, scd2, overwrite
  # append: Add new rows (fact tables)
  # merge: Upsert based on primary key (SCD Type 1)
  # scd2: Slowly Changing Dimension Type 2 (history tracking)
  # overwrite: Replace all data (daily snapshots)
  # Business value: Choose appropriate write pattern

  partition_by:
    - "country"
    - "created_date"
  # Partition columns for performance
  # Business value: Query optimization

  cluster_by:
    - "customer_id"
  # Clustering columns (Delta/Iceberg)
  # Business value: Further query optimization

  reprocess_policy: "overwrite_partition"
  # Options: overwrite_partition, append, fail
  # How to handle re-running same partition
  # Business value: Idempotent pipeline execution

  target_path: "s3://silver-bucket/customers"
  # Override default output path
  # Business value: Explicit output control

  format: "delta"
  # Override default format
  # Options: parquet, delta, iceberg, csv
  # Business value: Format selection

  # SCD Type 2 specific configuration
  scd2:
    primary_key: "customer_id"
    # Business key for the entity

    timestamp_field: "updated_at"
    # Field to determine record version

    start_date_field: "valid_from"
    # Column to store validity start date

    end_date_field: "valid_to"
    # Column to store validity end date

    current_flag_field: "is_current"
    # Boolean flag for current record

    end_date_default: "9999-12-31"
    # Default value for open-ended records

    hash_fields: ["email", "status", "age"]
    # Fields to hash for change detection
    # Business value: Full history tracking

  # OPTIONAL: Soft-delete support (mark deleted instead of removing)
  soft_delete_column: "_lakelogic_is_deleted"
  # Column name for soft-delete boolean flag
  soft_delete_value: true
  # Value to set when record is deleted
  soft_delete_time_column: "_lakelogic_deleted_at"
  # Timestamp for when soft-delete occurred
  soft_delete_reason_column: "_lakelogic_delete_reason"
  # Reason for deletion (e.g. "GDPR request", "duplicate")
  # Business value: GDPR compliance — keep audit trail without hard deletes

  # OPTIONAL: External storage location for Unity Catalog tables
  location: "abfss://container@account.dfs.core.windows.net/silver/customers/"
  # Business value: Control physical storage for UC managed tables

  # OPTIONAL: Delta/Iceberg table properties
  table_properties:
    "delta.autoOptimize.optimizeWrite": "true"
    "delta.autoOptimize.autoCompact": "true"
  # Business value: Optimize table performance without manual maintenance

  # OPTIONAL: Auto-compaction and vacuum settings
  compaction:
    auto: true
    vacuum_retention_hours: 168
  # Business value: Automated storage optimization

# ============================================================
# 13. LINEAGE & OBSERVABILITY
# ============================================================
# OPTIONAL: Lineage capture configuration
lineage:
  enabled: true
  # If true, inject lineage columns
  # Business value: Data provenance tracking

  capture_source_path: true
  source_column_name: "_lakelogic_source"
  # Capture source file/table path

  capture_timestamp: true
  timestamp_column_name: "_lakelogic_processed_at"
  # Capture processing timestamp

  capture_run_id: true
  run_id_column_name: "_lakelogic_run_id"
  # Capture unique run identifier

  capture_domain: true
  domain_column_name: "_lakelogic_domain"
  # Capture domain from metadata

  capture_system: true
  system_column_name: "_lakelogic_system"
  # Capture source system from metadata

  preserve_upstream: ["_upstream_run_id", "_upstream_source"]
  # Preserve lineage columns from upstream datasets
  # Business value: Multi-hop lineage tracking

  upstream_prefix: "_upstream"
  # Prefix for preserved upstream columns

  run_id_source: "run_id"
  # Options: run_id, pipeline_run_id
  # Use pipeline_run_id for cross-contract correlation

  capture_contract_name: true
  contract_name_column_name: "_lakelogic_contract_name"
  # Inject contract title into every output row
  # Business value: Identify which contract produced each record

# ============================================================
# 14. SERVICE LEVEL OBJECTIVES
# ============================================================
# OPTIONAL: SLO definitions for monitoring
service_levels:
  freshness:
    threshold: "24h"
    # Data must be refreshed within 24 hours
    field: "updated_at"
    # Field to check for freshness
    description: "Customer data must be updated daily"
    # Business value: Timeliness monitoring

  availability: 99.9
    # Percentage uptime target
    # Business value: Reliability tracking

# ============================================================
# 15. EXTERNAL LOGIC
# ============================================================
# OPTIONAL: Custom Python/Notebook processing
external_logic:
  type: "python"
  # Options: python, notebook

  path: "./gold/build_customer_gold.py"
  # Path to Python file or Jupyter notebook

  entrypoint: "build_gold"
  # Function name to call (Python only)

  args:
    apply_ml_scoring: true
    model_path: "s3://models/churn_predictor.pkl"
    target_table: "gold_customers"
  # Custom arguments passed to function
  # Business value: ML scoring, complex business logic

  output_path: "s3://gold-bucket/customers"
  # Override output path

  output_format: "delta"
  # Override output format

  handles_output: false
  # If true, external logic writes output itself
  # If false, LakeLogic materializes the returned DataFrame

  kernel_name: "python3"
  # Jupyter kernel for notebook execution

# ============================================================
# 16. ORCHESTRATION & DEPENDENCIES
# ============================================================
# OPTIONAL: Pipeline orchestration metadata
upstream:
  - "bronze_crm_contacts"
  - "bronze_web_signups"
# List of upstream datasets this depends on
# Business value: DAG construction in orchestrators

schedule: "0 2 * * *"
# Cron expression for scheduling
# Business value: Automated execution timing

# ============================================================
# 17. TIER / LAYER
# ============================================================
# OPTIONAL: Explicit medallion tier for single-contract mode
tier: "silver"
# Options: bronze, silver, gold, reference
# Also accepts synonyms: raw, landing, ingest → bronze
#                        stage, staging, cleansed, transform → silver
#                        curated, presentation, consumption → gold
#                        ref, seed, lookup, masterdata → reference
# Business value: Automatic tier-aware defaults and classification

# ============================================================
# 18. DOWNSTREAM CONSUMERS
# ============================================================
# OPTIONAL: Declare what uses this contract's output
downstream:
  - type: "dashboard"
    name: "Monthly Revenue Dashboard"
    platform: "power_bi"
    # Options: power_bi, tableau, looker, databricks_sql, metabase, grafana
    url: "https://app.powerbi.com/groups/.../dashboards/..."
    owner: "analytics-team"
    description: "Executive revenue dashboard"
    refresh: "daily 06:00 UTC"
    columns_used: ["customer_segment", "total_revenue", "country"]
    sla: "< 4 hours"
    # Business value: Know who consumes your data

  - type: "ml_model"
    name: "Churn Prediction"
    platform: "mlflow"
    owner: "data-science"
    # Business value: End-to-end lineage from source → gold → ML

  - type: "api"
    name: "Customer Lookup API"
    platform: "internal"
    url: "https://api.internal.com/v1/customers"
    # Types: dashboard, report, api, ml_model, application, notebook, export

# ============================================================
# 19. LLM EXTRACTION (Unstructured → Structured)
# ============================================================
# OPTIONAL: Extract structured data from unstructured text via LLM
extraction:
  provider: "openai"
  # ─── Cloud Providers (require API key via env var) ───
  #   openai       → OPENAI_API_KEY
  #   azure_openai → AZURE_OPENAI_API_KEY + AZURE_OPENAI_ENDPOINT
  #   anthropic    → ANTHROPIC_API_KEY
  #   google       → GOOGLE_API_KEY
  #   bedrock      → AWS credentials (boto3)
  #
  # ─── Local Providers (no API key required) ──────────
  #   ollama       → Local Ollama server (default: http://localhost:11434)
  #                  Override with OLLAMA_BASE_URL env var
  #                  Install Ollama: https://ollama.com
  #   local        → Direct HuggingFace Transformers (Phi-3-mini default)
  #                  No server needed, downloads to ~/.cache/huggingface/
  #                  Install with: pip install lakelogic[local]
  #
  # Global env var overrides:
  #   LAKELOGIC_AI_PROVIDER → default provider for all contracts
  #   LAKELOGIC_AI_MODEL    → default model for all contracts

  model: "gpt-4o-mini"
  # Cloud models: gpt-4o, gpt-4o-mini, claude-sonnet-4-20250514, gemini-2.0-flash
  # Ollama models: llama3.1, mistral, phi3, codellama (any pulled model)
  # Local models: auto (uses per-field extraction_task routing), or any HuggingFace model ID
  temperature: 0.1
  # Low temperature for deterministic extraction
  max_tokens: 1000
  response_format: "json"

  prompt_template: |
    Extract the following from this support ticket:
    {{ ticket_body }}
  system_prompt: "You are a data extraction assistant."

  text_column: "ticket_body"
  # Column containing text to extract from
  context_columns: ["customer_id", "ticket_date"]
  # Extra columns available in prompt template

  output_schema:
    - name: "sentiment"
      type: "string"
      accepted_values: ["positive", "neutral", "negative"]
      extraction_task: "classification"
    - name: "issue_category"
      type: "string"
      extraction_task: "classification"
      extraction_examples: ["billing", "technical", "account"]

  # Processing controls
  batch_size: 50
  concurrency: 5
  retry:
    max_attempts: 3
    backoff: "exponential"
    initial_delay: 1.0

  # Confidence scoring
  confidence:
    enabled: true
    method: "field_completeness"
    # Options: log_probs, self_assessment, consistency, field_completeness
    column: "_lakelogic_extraction_confidence"

  # Cost controls
  max_cost_per_run: 50.00
  max_rows_per_run: 10000
  # Business value: Budget safety for LLM API costs

  # Fallback model (cheaper/faster if primary fails)
  fallback_model: "gpt-4o-mini"
  fallback_provider: "openai"

  # PII safety
  redact_pii_before_llm: true
  pii_fields: ["email", "phone"]
  # Business value: Never send PII to external LLM providers

  # OPTIONAL: Preprocessing pipeline for raw files (PDF, image, audio, video)
  preprocessing:
    content_type: "pdf"
    # Options: pdf, image, video, audio, html, email, text
    ocr:
      enabled: true
      engine: "tesseract"
      # Options: tesseract, azure_di, textract, google_vision
      language: "eng"
    chunking:
      strategy: "page"
      # Options: page, paragraph, sentence, fixed_size
      max_chunk_tokens: 4000
      overlap_tokens: 200
    # Business value: Process PDFs, images, audio, video into structured data

Common Use Case Templates

Use Case 1: Bronze Ingestion (Capture Everything)

version: 1.0.0
info:
  title: "Bronze CRM Contacts"
  target_layer: "bronze"

source:
  type: "landing"
  path: "s3://landing/crm/*.csv"
  load_mode: "incremental"
  watermark_field: "file_modified_time"

server:
  type: "s3"
  path: "s3://bronze/crm_contacts"
  format: "parquet"
  mode: "ingest"
  cast_to_string: true
  schema_evolution: "append"
  allow_schema_drift: true

quality:
  row_rules:
    - name: "has_id"
      sql: "id IS NOT NULL"

materialization:
  strategy: "append"
  partition_by: ["ingestion_date"]

lineage:
  enabled: true

Use Case 2: Silver Validation (Quality Gate)

version: 1.0.0
info:
  title: "Silver Customers"
  target_layer: "silver"

dataset: "customers"
primary_key: ["customer_id"]

model:
  fields:
    - name: "customer_id"
      type: "long"
      required: true
    - name: "email"
      type: "string"
      required: true
      pii: true
    - name: "status"
      type: "string"
      required: true

transformations:
  - deduplicate:
      on: ["customer_id"]
      sort_by: ["updated_at"]
      order: "desc"
    phase: "pre"

quality:
  enforce_required: true
  row_rules:
    - regex_match:
        field: "email"
        pattern: "^[^@]+@[^@]+\\.[^@]+$"
    - accepted_values:
        field: "status"
        values: ["ACTIVE", "INACTIVE"]
  dataset_rules:
    - unique: "customer_id"

quarantine:
  enabled: true
  target: "s3://quarantine/customers"
  notifications:
    - type: "slack"
      target: "https://hooks.slack.com/..."
      on_events: ["quarantine"]

materialization:
  strategy: "merge"
  partition_by: ["country"]

Use Case 3: Gold Aggregation (Analytics)

version: 1.0.0
info:
  title: "Gold Customer Metrics"
  target_layer: "gold"

dataset: "silver_customers"

transformations:
  - sql: |
      SELECT
        customer_segment,
        country,
        DATE_TRUNC('month', created_at) AS month,
        COUNT(*) AS customer_count,
        AVG(lifetime_value) AS avg_ltv,
        SUM(total_orders) AS total_orders
      FROM source
      WHERE status = 'ACTIVE'
      GROUP BY customer_segment, country, month
    phase: "post"

materialization:
  strategy: "overwrite"
  partition_by: ["month"]

lineage:
  enabled: true

Use Case 4: SCD Type 2 (History Tracking)

version: 1.0.0
info:
  title: "Silver Customer History (SCD2)"
  target_layer: "silver"

dataset: "customers"
primary_key: ["customer_id"]

materialization:
  strategy: "scd2"
  scd2:
    primary_key: "customer_id"
    timestamp_field: "updated_at"
    start_date_field: "valid_from"
    end_date_field: "valid_to"
    current_flag_field: "is_current"
    end_date_default: "9999-12-31"
    hash_fields: ["email", "status", "address"]

Quick Reference: When to Use What

Feature Bronze Silver Gold
tier bronze silver gold
server.mode ingest validate validate
server.cast_to_string true false false
server.schema_evolution append strict strict
source.flatten_nested N/A true / [cols] true / [cols]
quality.enforce_required false true true
quality.row_rules Minimal Full Minimal
quality.dataset_rules None Yes Yes
materialization.strategy append merge/scd2 overwrite
lineage.enabled true true true
downstream N/A Optional Recommended
extraction Optional N/A N/A

For more examples, see the LakeLogic Examples directory.