Pipeline Resilience
Think of a data pipeline like a commercial flight. A well-engineered plane doesn't crash because of one engine hiccup — it retries the ignition, alerts the crew, and if things get really bad, safely diverts to the nearest airport. Your data pipeline should work the same way.
Production data pipelines face transient failures daily — cloud storage throttling, network timeouts, credential expiries, and service outages. LakeLogic builds resilience directly into pipeline.run() so your pipelines recover automatically without custom orchestration code.
Feature Overview
| Feature | What it does | Analogy |
|---|---|---|
| Retry | Re-attempts failed entities with exponential backoff | A delivery driver tries a second time if nobody answers the door |
| Timeout | Kills entities that run longer than expected | A chef pulls a burnt dish off the stove instead of waiting forever |
| Circuit Breaker | Stops trying when infrastructure is clearly down | A fire alarm evacuates the building instead of checking each room |
| Checkpointing | Resumes from where a previous run left off | A bookmark lets you continue reading instead of starting from page 1 |
All four features are opt-in with sensible defaults — existing pipelines see zero behaviour change.
A "Best-Effort" Philosophy
LakeLogic's pipeline runner is designed specifically as a "best-effort" wave mechanism. This means that entity processing continues independent of individual timeouts or errors.
If table_a hits an unexpected extraction error or exhausts its entity_timeout_minutes, the runner safely catches the exception, registers a failure in the run summary, and immediately moves to the next contract in the queue (table_b). The pipeline only stops entirely if you explicitly trip the infrastructure Circuit Breaker.
Retry with Exponential Backoff
Business value: A single Azure 429 (throttling) error on one entity no longer fails your entire pipeline run. The entity retries automatically while the rest of the pipeline continues.
pipeline.run(
layers="bronze,silver",
retry_attempts=3, # Try each entity up to 3 times
retry_base_wait_seconds=30, # 30s → 60s → 120s between attempts
)
How It Works
sequenceDiagram
participant P as Pipeline
participant E as Entity (events)
participant A as Azure Storage
P->>E: Process entity
E->>A: Read source data
A-->>E: 429 Too Many Requests
Note over E: ⚠️ Attempt 1/3 failed
Note over E: Wait 30s...
E->>A: Retry read
A-->>E: 200 OK
E->>P: ✅ Succeeded on attempt 2/3
What You See in Logs
⚠️ Attempt 1/3 failed for events (OSError): 429 Too Many Requests. Retrying in 30s...
Traceback (most recent call last):
File "runner.py", line 1325, in _process_single_contract
...
OSError: 429 Too Many Requests
✅ events succeeded on attempt 2/3
Every failed attempt logs the full exception type, message, and traceback. Nothing is hidden or wrapped — unlike some retry libraries that obscure the original error behind a generic RetryError.
Key Design Decisions
| Decision | Rationale |
|---|---|
| Per-entity, not per-pipeline | Only failed entities retry. Succeeded entities don't re-run. |
| Exponential backoff | Azure throttling penalises fixed-interval retries. Doubling the wait each time gives the service time to recover. |
Default retry_attempts=1 |
Backward compatible. Existing pipelines are unaffected. |
| No external dependency | Pure stdlib — zero additional packages required. |
Entity Timeout
Business value: A single runaway Spark query scanning terabytes of unpartitioned data can block your entire pipeline for hours. Timeouts ensure no single entity holds the pipeline hostage.
Retry Sequence
Each entity runs inside a monitored thread. If it exceeds the timeout, LakeLogic raises an EntityTimeoutError — which then triggers the retry mechanism if configured:
# Combined: timeout + retry
pipeline.run(
layers="bronze,silver",
entity_timeout_minutes=10, # 10 min per entity attempt
retry_attempts=2, # Retry once if timed out
retry_base_wait_seconds=60,
)
Timeout Recommendations
| Scenario | Recommended timeout |
|---|---|
| Bronze (file ingestion) | 300–600s (5–10 min) |
| Silver (transformations) | 600–1800s (10–30 min) |
| Gold (aggregations/SCD2) | 1200–3600s (20–60 min) |
| Development / testing | 120s (2 min) — fail fast |
Tip
Start with generous timeouts and tighten them as you learn your pipeline's normal execution profile. The run log captures execution duration per entity, giving you the data to set informed thresholds.
Circuit Breaker
Business value: When Azure Storage is completely down, a 50-entity data mesh pipeline without a circuit breaker would spend 50 × 3 retries × 2 min backoff = 5 hours discovering what you already know after the third failure: the infrastructure is down.
pipeline.run(
layers="bronze,silver,gold",
retry_attempts=3,
retry_base_wait_seconds=30,
max_consecutive_failures=3, # Stop after 3 entities fail in a row
)
Circuit Breaker Flow
flowchart LR
A[events] -->|❌ Failed| B[sessions]
B -->|❌ Failed| C[pageviews]
C -->|❌ Failed| D{Circuit Breaker}
D -->|🔴 Tripped| E[users ⏭️ SKIPPED]
D -->|🔴 Tripped| F[products ⏭️ SKIPPED]
D -->|🔴 Tripped| G[orders ⏭️ SKIPPED]
The failure counter resets to zero on any success. So if events fails but sessions succeeds, the counter goes back to 0. The circuit breaker only trips on consecutive failures — indicating a systemic infrastructure problem, not an entity-specific bug.
Circuit Breaker Log Output
❌ All 3 attempts exhausted for events (OSError): 403 Forbidden
❌ All 3 attempts exhausted for sessions (OSError): 403 Forbidden
❌ All 3 attempts exhausted for pageviews (OSError): 403 Forbidden
🔴 Circuit breaker tripped: 3 consecutive failures. Skipping remaining entities to avoid wasted retries.
Data Mesh Scale
In a data mesh with multiple domains, the circuit breaker becomes essential:
| Domain | Entities | Without circuit breaker | With circuit breaker (3) |
|---|---|---|---|
| Marketing | 4 | 4 × 3 retries = 12 attempts | 3 attempts → abort |
| Sales | 8 | 8 × 3 retries = 24 attempts | 3 attempts → abort |
| Finance | 12 | 12 × 3 retries = 36 attempts | 3 attempts → abort |
| Total wasted time | ~72 retry cycles | ~9 retry cycles |
Checkpointing (Resume from Failed Run)
Business value: A 49-entity pipeline fails on entity #25 after 45 minutes of processing. Without checkpointing, you re-run the entire pipeline — wasting 45 minutes re-processing 24 entities that already succeeded. With checkpointing, you resume from entity #25.
# First run fails partway through
summary = pipeline.run(layers="bronze,silver")
# summary.run_id = "abc123-def456-..."
# Resume from where it failed — skip already-succeeded entities
summary = pipeline.run(
layers="bronze,silver",
resume_from_run="abc123-def456-...",
)
Checkpoint Sequence
sequenceDiagram
participant R as Run Log (Delta)
participant P as Pipeline
P->>R: Query: pipeline_run_id = "abc123" AND stage = "succeeded"
R-->>P: {bronze:events ✅, bronze:sessions ✅}
Note over P: Skip events, sessions
P->>P: Process pageviews (failed last time)
P->>P: Process users (never attempted)
- The pipeline queries the run log for entities that succeeded in the previous run
- Builds a set of
"layer:entity"keys (e.g.,"bronze:events") - Skips any entity that already has
stage=succeededfor that run - Processes only failed and never-attempted entities
Checkpoint Log Output
🔄 Resuming from run abc123-def456: skipping 2 already-succeeded entities (bronze:events, bronze:sessions)
⏭️ Skipping events [bronze] — already succeeded in run abc123-def456
⏭️ Skipping sessions [bronze] — already succeeded in run abc123-def456
📄 [bronze] pageviews | Contract: Google Analytics Pageviews v1.0
✅ Materialized 1,200 rows for pageviews
Why This Is Safe
| Layer | On re-run without checkpoint | Risk |
|---|---|---|
| Bronze (append) | Re-appends same data → duplicates | Low risk — duplicates are by-design audit signal |
| Silver (merge) | Re-merges → idempotent (same result) | No risk — merge deduplicates by primary key |
| Gold (overwrite) | Re-computes → idempotent | No risk — last write wins |
Checkpointing is a time optimisation, not a correctness requirement. LakeLogic's medallion architecture is inherently idempotent — but skipping 24 already-succeeded entities saves real compute cost and wall-clock time.
Combining All Four
For production data mesh pipelines, use all four together:
pipeline.run(
layers="bronze,silver,gold",
# Retry: recover from transient failures
retry_attempts=3,
retry_base_wait_seconds=30,
# Timeout: prevent runaway entities
entity_timeout_minutes=10,
# Circuit breaker: fast-fail on infrastructure outage
max_consecutive_failures=3,
# Checkpoint: resume from last failure
resume_from_run=last_failed_run_id, # or None for fresh run
)
Execution Flow
flowchart TD
A[Start Entity] --> B{Checkpoint: Already succeeded?}
B -->|Yes| C[⏭️ Skip]
B -->|No| D{Circuit Breaker: Too many failures?}
D -->|Yes| E[🔴 Skip remaining]
D -->|No| F[Process Entity]
F --> G{Succeeded?}
G -->|Yes| H[Reset failure counter]
G -->|No| I{Timeout?}
I -->|Yes| J[⏱️ EntityTimeoutError]
I -->|No| K[Exception]
J --> L{Retries left?}
K --> L
L -->|Yes| M[Wait exponential backoff]
M --> F
L -->|No| N[❌ Mark failed, increment failure counter]
Reusable Retry Utility
The retry logic is available as a standalone utility for any part of your codebase — not just pipeline runs:
from lakelogic.core.retry import retry_call, with_retry
# Functional style — wrap any call
result = retry_call(
requests.get,
args=("https://api.example.com/data",),
attempts=3,
base_wait_seconds=10,
retry_on=(ConnectionError, TimeoutError), # Only retry these
label="API fetch",
)
# Decorator style
@with_retry(attempts=3, base_wait_seconds=5)
def upload_to_storage(data, path):
...
Parameter Reference
| Parameter | Type | Default | Description |
|---|---|---|---|
retry_attempts |
int |
1 (no retry) |
Max attempts per entity. Set to 3 for production. |
retry_base_wait_seconds |
int |
30 |
Base wait between retries. Doubles each attempt: 30s → 60s → 120s. |
entity_timeout_minutes |
int |
None (no limit) |
Timeout per entity attempt. Raises EntityTimeoutError. |
max_consecutive_failures |
int |
0 (disabled) |
Circuit breaker threshold. Skips remaining entities after N consecutive failures. |
resume_from_run |
str |
None (disabled) |
Pipeline run ID to resume from. Skips already-succeeded entities. |
What's Next?
- Observability — Run logs, SLO validation, and alerting
- Reprocessing — Backfills, partition overwrites, and idempotent replays
- Pipelines & Parallelism — Multi-entity orchestration and dependency waves