API Reference
DataProcessor
from lakelogic import DataProcessor
processor = DataProcessor(
engine="polars",
contract="contract.yaml",
stage="silver",
)
good_df, bad_df = processor.run(df)
Constructor
engine(str):polars,spark,snowflake, orbigquerycontract(str | Path | dict | DataContract): YAML path or dictstage(str | None): Apply contract stage overrides (e.g.,bronze,silver)
run(df, source_path=None, materialize=False, materialize_target=None)
Executes the contract against a DataFrame.
Returns:
- ValidationResult object that unpacks to (good_df, bad_df)
- good_df: validated, transformed records
- bad_df: quarantined records with error reasons
run_source(path)
Loads data using the engine's native reader and runs the contract in one step.
Returns:
- ValidationResult object that unpacks to (good_df, bad_df)
- good_df: validated, transformed records
- bad_df: quarantined records with error reasons
materialize(good_df, bad_df=None, target_path=None)
Writes validated data to the configured materialization target. If quarantine.target is set, bad records are also written.
last_report
After a run, processor.last_report contains a structured summary (counts, dataset rule outcomes, run id).
DataContract
Use a YAML file or a dict. Key sections:
model.fields: schema and typesschema_policy: behavior for unknown fieldsquality.row_rules: row-level checksquality.dataset_rules: aggregate checkstransformations: SQL transformation steps (preferred) with optionalphase(preorpost)transformationsstructured helpers: rename, derive, lookup, filter, deduplicate, select, drop, cast, trim, lower, upper, coalesce, split, explode, map_values, pivot, unpivot, joinlinks: reference datasets (file path or table name)links[].broadcast: Spark-only hint to broadcast small lookup tablessource: ingestion metadata for pipelines (type,path,load_mode,pattern,watermark_field,cdc_op_field,cdc_delete_values)quarantine: quarantine settings + notifications (supports file paths ortable:targets)lineage: lineage injection settings (enabled, source path, timestamp, run id)materialization: append/merge/scd2/overwrite + partitioning (CSV/Parquet locally, Delta/Iceberg on Spark)external_logic: optional python/notebook hook for advanced processing (gold patterns)metadata.run_log_pathormetadata.run_log_dir: write run logs to JSONmetadata.run_log_table: append run logs to a table (Spark, DuckDB, or SQLite backend)metadata.run_log_backend:spark|duckdb|sqlitemetadata.run_log_database: database path for DuckDB/SQLite backendsmetadata.run_log_merge_on_run_id: Spark-only idempotent merge byrun_idmetadata.run_log_table_format: Spark-only table format (defaultdelta)metadata.quarantine_table_backend:spark|duckdb|sqlite|snowflake|bigquery(defaults to engine)metadata.quarantine_table_database: database path for DuckDB/SQLite backendsmetadata.quarantine_format: Quarantine file format (defaultparquet; Spark also supportsdelta,iceberg,json)metadata.quarantine_table_format: Spark-only table format (defaulticeberg)metadata.quarantine_table_mode: Spark-only write mode (defaultappend)metadata.snowflake_*: Connection settings for Snowflake (snowflake_account,snowflake_user,snowflake_password,snowflake_warehouse,snowflake_database,snowflake_schema,snowflake_role)metadata.bigquery_project: Optional project override for BigQuery connectionsmetadata.source_table: Optional default source table for warehouse adapters (orsnowflake_source_table/bigquery_source_table)server.schema_policy.evolution: strict/append/merge/overwrite/compatible/allow behaviorserver.schema_policy.unknown_fields: quarantine/drop/allow behavior for undocumented fieldsserver.cast_to_string: ingest all columns as strings (Bronze pattern)service_levels: freshness/availability SLO scoringqualityhelpers: not_null, accepted_values, regex_match, range, unique, null_ratio, row_count_between, referential_integritystages: optional stage override map (e.g.,stages.bronze,stages.silver) to reuse one contract across layers
external_logic
Run dedicated Python modules or notebooks for advanced processing.
external_logic.type:pythonornotebookexternal_logic.path: Path to the script/notebook (relative to contract)external_logic.entrypoint: Python function name (defaultrun)external_logic.args: Dict of parameters injected into the logicexternal_logic.output_path: Optional output file path to read back inexternal_logic.output_format:csvorparquet(optional)external_logic.handles_output: If true, skip built-in materializeexternal_logic.kernel_name: Notebook kernel override- Notebook params include
lakelogic_input_pathandlakelogic_input_formatfor validated data access.
DataGenerator
Generate schema-aware synthetic data from any Data Contract YAML.
from lakelogic import DataGenerator
gen = DataGenerator("contracts/bronze_orders.yaml", seed=42)
df = gen.generate(rows=1_000, invalid_ratio=0.1)
Constructor
contract_path(str | Path): Path to the contract YAML fileseed(int, optional): Random seed for reproducibilityuse_faker(bool): Use Faker for semantic field generation (defaultTrue)
Alternative Constructors
# From an existing data file (schema inferred, no contract needed)
gen = DataGenerator.from_file("data/sample.csv", seed=42)
# From a dbt schema.yml
gen = DataGenerator.from_dbt("models/schema.yml", model="customers")
generate(rows, invalid_ratio, output_format, reference_data, window_start, window_end, ai)
Generate synthetic rows conforming to the contract schema.
rows(int): Total rows to generate (default 100)invalid_ratio(float): Fraction of intentionally bad rows (default 0.0)output_format(str):"polars"or"pandas"(default"polars")reference_data(dict): FK pools mapping column → list of valid parent PKswindow_start(datetime): Constrain all timestamps to this time windowwindow_end(datetime): End of window (defaults tonow()ifwindow_startis set)ai(bool): Enable LLM-powered realistic value generation (default False)
Returns: polars.DataFrame or pandas.DataFrame
generate_stream(rows_per_batch, interval_minutes, batches, output_dir, ...)
Generate successive time-windowed batches, simulating a streaming source.
rows_per_batch(int): Rows per batch (default 100)interval_minutes(int): Window duration per batch (default 5)batches(int): Total batches to generate (default 12)output_dir(str | Path): Auto-save to partitioned directoriesformat(str): File format:"parquet","csv","json"(default"parquet")partition_template(str): Partition directory pattern (defaultyyyy={Y}/mm={m}/dd={d}/hh={H}/mi={M})start_from(datetime): Override starting timestamp
Yields: tuple[datetime, datetime, DataFrame] — (window_start, window_end, batch_df)
generate_related(contracts, rows, seed, ai) — classmethod
Generate referentially consistent data across multiple related contracts.
contracts(list): Paths to contract YAML filesrows(dict | int): Rows per entity (or uniform count)seed(int): Random seed
Returns: dict[str, DataFrame] — entity name → generated DataFrame
save(df, output, format)
Save a DataFrame to disk.
df: Polars or Pandas DataFrameoutput(str | Path): Destination file pathformat(str):"parquet","csv", or"json"
save_partitioned(df, output_dir, filename_field, format)
Save one file per unique key value (e.g., one JSON per entity ID).
df: Generated DataFrameoutput_dir(str | Path): Directory to write files intofilename_field(str): Column whose value becomes the filenameformat(str):"json","parquet", or"csv"