Section 20
Validation & Data Quality
Every pipeline writes to the same audit tables. Fail loud, log everything.
Audit tables
audit.pipeline_runs
CREATE TABLE audit.pipeline_runs (
run_id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
job VARCHAR NOT NULL,
status VARCHAR NOT NULL CHECK (status IN ('running','success','failure')),
rows BIGINT,
duration_ms BIGINT,
error TEXT,
ran_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE audit.validation_results (
run_id BIGINT NOT NULL REFERENCES audit.pipeline_runs(run_id),
table_name VARCHAR NOT NULL,
rule VARCHAR NOT NULL,
passed BOOLEAN NOT NULL,
failed_count BIGINT,
sample_keys JSON,
checked_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);Validation rules
validate_silver_claims.sql
-- Rule 1: primary keys are unique
INSERT INTO audit.validation_results(run_id, table_name, rule, passed, failed_count)
SELECT :run_id, 'silver.claims_clean', 'pk_unique',
COUNT(*) = 0, COUNT(*)
FROM (
SELECT claim_id FROM silver.claims_clean
GROUP BY claim_id HAVING COUNT(*) > 1
) d;
-- Rule 2: amounts are non-negative
INSERT INTO audit.validation_results(run_id, table_name, rule, passed, failed_count)
SELECT :run_id, 'silver.claims_clean', 'amount_non_negative',
COUNT(*) = 0, COUNT(*)
FROM silver.claims_clean
WHERE amount_billed < 0 OR amount_paid < 0;
-- Rule 3: every silver row traces back to bronze
INSERT INTO audit.validation_results(run_id, table_name, rule, passed, failed_count)
SELECT :run_id, 'silver.claims_clean', 'lineage_to_bronze',
COUNT(*) = 0, COUNT(*)
FROM silver.claims_clean s
LEFT JOIN bronze.raw_claims b USING (claim_id)
WHERE b.claim_id IS NULL;Fail the pipeline
Don't just log — make the run fail when a critical rule breaks. Critical rules: PK uniqueness, NOT NULL on business keys, lineage gaps.