Ingestion (Source Configuration)
The source: block defines what to read, from where, and how.
Think of it like a mail room. You tell LakeLogic which mailbox to open (file path or table), what kind of mail to expect (CSV, Parquet, JSON), and how often to check for new deliveries (full refresh vs incremental).
Source Types
Three ways to bring data into your lakehouse:
source:
type: "landing" # File-based (local, S3, ADLS, GCS)
# type: "table" # Catalog table (Unity Catalog, Hive)
# type: "stream" # Kafka / streaming
File-Based Ingestion (Landing)
For ingesting raw files (CSV, JSON, Parquet, etc.) from cloud storage or local filesystem.
Example: CSV file ingestion from S3
source:
type: "landing"
path: "s3://bronze-bucket/customers/*.parquet"
format: "csv" # parquet | csv | json | delta | avro | orc | xml
pattern: "*.parquet" # File glob filter
load_mode: "incremental" # full | incremental | cdc
# Reader options (passed to engine: Spark, Polars, DuckDB)
options:
header: "true"
inferSchema: "true"
delimiter: ","
multiLine: "true"
recursiveFileLookup: "true"
Date-Partitioned Landing
Limits file scanning to relevant date partitions — critical at scale to avoid scanning millions of files.
Example: Scan only the last 3 days of partitions
source:
type: "landing"
path: "s3://landing/events/"
partition:
format: "y_%Y/m_%m/d_%d" # → events/y_2026/m_03/d_22/*.json
lookback_days: 3 # Scan last 3 days (default: 1)
# start_date: "2026-01-01" # For explicit backfills
# end_date: "2026-03-22"
# file_pattern: "*.json" # Auto-derived from source.format
Supported format patterns:
| Pattern | Directory Structure |
|---|---|
y_%Y/m_%m/d_%d |
events/y_2026/m_03/d_22/*.json |
date=%Y%m%d |
events/date=20260322/*.json |
%Y/%m/%d |
events/2026/03/22/*.json |
year=%Y/month=%m |
events/year=2026/month=03/*.json |
dt=%Y-%m-%d/%H |
events/dt=2026-03-22/17/*.json (hourly) |
Table-Based Ingestion
For reading from an existing catalog table (e.g., a Bronze table feeding Silver).
Example: Read from a Bronze Unity Catalog table
CDC (Change Data Capture)
For source systems that provide insert/update/delete operation flags (e.g., Debezium, Oracle GoldenGate).
Example: CDC with operation flags
Flatten Nested JSON
When Bronze tables contain JSON-string columns, flatten them in Silver to make the data queryable.
Example: Flatten specific nested columns
Reference Data Links
Register additional datasets for use in SQL transformations. This is how you join multiple datasets within a single contract — for example, enriching orders with customer details.
Why this matters: Without links, you'd need a separate pipeline step to join data. With links, you declare the reference table once and use it directly in SQL — keeping everything in one contract.
Example: Register reference tables for joins
links:
- name: "dim_countries"
path: "./reference/countries.parquet"
type: "parquet" # parquet | csv | delta | table
broadcast: true # Spark broadcast join for small tables
columns: ["code", "name", "region"] # Column projection
- name: "dim_products"
table: "catalog.reference.products"
type: "table"
broadcast: false
columns: ["id", "product_name", "category", "price"]
Example: Using linked datasets in SQL transforms
Zero-Retention Architecture (Post-Ingestion Lifecycle)
After Bronze ingestion commits data to Delta, you may want to delete or archive the original landing zone files. This is critical for GDPR compliance — raw PII should not persist in unmanaged file storage.
The post_ingestion config controls what happens to source files after a successful Bronze commit.
Contract-Level (Recommended)
The simplest setup — declare post_ingestion directly on the source: block:
Example: Zero-retention on a single contract
This is ideal for simple pipelines and single-contract setups — no server: block needed.
System-Level Default
For data mesh and multi-system pipelines, set a default for all Bronze contracts in _system.yaml:
Example: Zero-retention for the entire system
Individual contracts can then override the system default:
Example: Override system default to archive for audit-sensitive data
Precedence
Contract-level config always takes priority, consistent with how LakeLogic handles all other overrides:
| Level | Location | Priority |
|---|---|---|
| Contract | source.post_ingestion |
Highest — overrides system default |
| System | server.post_ingestion in _system.yaml |
Default for all Bronze contracts |
| None | Neither configured | retain (files stay in place) |
Actions
| Action | Behaviour | Use Case |
|---|---|---|
delete |
Remove landing files after Bronze commit | GDPR zero-retention, cost reduction |
archive |
Move files to archive_path |
Regulatory audit trails, 7-year retention |
retain |
Leave files in place (default) | Development, debugging, replay scenarios |
Safety Guarantees
The cleanup engine follows strict safety rules:
- Cleanup only runs after a successful Bronze Delta commit — if ingestion fails, files are untouched
- Cleanup failures are non-blocking by default (
cleanup_is_blocking: false) — the pipeline succeeds with a warning - Cleanup failures are logged — if a delete/archive fails, the warning is logged for manual intervention
- No double-counting risk — the watermark has advanced past cleaned files, so they won't be re-ingested
Failure Matrix
| Stage | Failure | Result |
|---|---|---|
| Read fails | File is malformed | Handled by quarantine: — file quarantined |
| Write fails | Bronze commit crashes | File untouched in landing, retried on next run |
| Delete/Archive fails | Permission / network error | Pipeline succeeds with warning, file stays in landing |
Configuration Reference
| Field | Type | Default | Description |
|---|---|---|---|
action |
string |
"retain" |
delete, archive, or retain |
cleanup_is_blocking |
bool |
false |
If true, cleanup failure fails the pipeline |
archive_path |
string |
null |
Destination path for archive action. Required when action: archive |
Archive path resolution: When
action: archive, files are moved to thearchive_pathspecified inpost_ingestion. If not set at the contract level, the PipelineRunner falls back tostorage.archive_pathin_system.yaml. How long archived files are retained is controlled by your cloud provider's lifecycle policy (e.g., Azure Blob Lifecycle Management), not by LakeLogic.
Load Mode Validation
The pipeline validates required properties per load_mode:
| Mode | Requirements |
|---|---|
full |
None |
incremental |
watermark_field recommended (defaults to _lakelogic_processed_at) |
cdc |
At least ONE of: cdc_op_field, cdc_timestamp_field |
See Watermark Strategies for incremental state tracking.