Section 07

End-to-End Example: Driver Compliance System

A complete flow from input files to final Excel report.

Data flow

text
claims.csv + mvr.csv + training.csv
        ↓
bronze.raw_claims / bronze.raw_mvr / bronze.raw_training
        ↓
silver.claims_clean / silver.mvr_clean / silver.training_clean
        ↓
gold.driver_compliance
        ↓
driver_compliance.xlsx

Folder structure

text
driver-compliance-system/
  data/
    input/
      claims.csv
      mvr.csv
      training.csv
    output/
  sql/
    bronze.sql
    silver.sql
    gold.sql
  main.py
  requirements.txt
Alternative breakdown (advanced use)
For larger systems you may split main.py into extract.py, load.py, transform.py, and export.py. This is optional — the canonical entry point is still python main.py.

Join logic

The system assumes:

  • driver_id = employee_id across all datasets

This simplifies joining but may not be true in real systems.

requirements.txt

txt
duckdb
pandas
openpyxl
requests

Sample Data

Place these files in data/input/ before running the system.

claims.csv
claim_id,employee_id,status,amount
1,101,open,500
2,102,closed,1200
3,101,closed,300
mvr.csv
driver_id,violation_count,license_status
101,1,Active
102,0,Active
training.csv
employee_id,defensive_driving_completed,completion_date
101,true,2024-01-01
102,false,

Bronze SQL

sql/bronze.sql
CREATE SCHEMA IF NOT EXISTS bronze;
CREATE SCHEMA IF NOT EXISTS silver;
CREATE SCHEMA IF NOT EXISTS gold;
CREATE SCHEMA IF NOT EXISTS audit;

CREATE OR REPLACE TABLE bronze.raw_claims AS
SELECT *, current_timestamp AS _loaded_at
FROM read_csv_auto('data/input/claims.csv');

CREATE OR REPLACE TABLE bronze.raw_mvr AS
SELECT *, current_timestamp AS _loaded_at
FROM read_csv_auto('data/input/mvr.csv');

CREATE OR REPLACE TABLE bronze.raw_training AS
SELECT *, current_timestamp AS _loaded_at
FROM read_csv_auto('data/input/training.csv');

Silver SQL

sql/silver.sql
CREATE OR REPLACE TABLE silver.claims_clean AS
SELECT
  claim_id,
  employee_id,
  status,
  amount,
  _loaded_at
FROM bronze.raw_claims
WHERE claim_id IS NOT NULL;

CREATE OR REPLACE TABLE silver.mvr_clean AS
SELECT
  driver_id,
  violation_count,
  license_status,
  _loaded_at
FROM bronze.raw_mvr
WHERE driver_id IS NOT NULL;

CREATE OR REPLACE TABLE silver.training_clean AS
SELECT
  employee_id,
  defensive_driving_completed,
  completion_date,
  _loaded_at
FROM bronze.raw_training
WHERE employee_id IS NOT NULL;

Gold SQL

sql/gold.sql
CREATE OR REPLACE TABLE gold.driver_compliance AS
SELECT
  m.driver_id,
  m.license_status,
  m.violation_count,
  COALESCE(t.defensive_driving_completed, false) AS defensive_driving_completed,
  COUNT(c.claim_id) AS claim_count,
  SUM(COALESCE(c.amount, 0)) AS total_claim_amount,
  CASE
    WHEN m.license_status <> 'Active' THEN 'Non-Compliant'
    WHEN m.violation_count > 0 THEN 'Review Needed'
    WHEN COALESCE(t.defensive_driving_completed, false) = false THEN 'Training Missing'
    ELSE 'Compliant'
  END AS compliance_status
FROM silver.mvr_clean m
LEFT JOIN silver.training_clean t
  ON m.driver_id = t.employee_id
LEFT JOIN silver.claims_clean c
  ON m.driver_id = c.employee_id
GROUP BY
  m.driver_id,
  m.license_status,
  m.violation_count,
  t.defensive_driving_completed;

Python runner

main.py
import duckdb
import requests

DB_PATH = "driver_compliance.duckdb"
SLACK_WEBHOOK_URL = "https://hooks.slack.com/services/XXX/YYY/ZZZ"

def notify_slack(message):
    requests.post(SLACK_WEBHOOK_URL, json={"text": message})

def run_sql_file(con, path):
    with open(path, "r") as file:
        con.execute(file.read())

def main():
    con = duckdb.connect(DB_PATH)

    run_sql_file(con, "sql/bronze.sql")
    run_sql_file(con, "sql/silver.sql")
    run_sql_file(con, "sql/gold.sql")

    df = con.execute("SELECT * FROM gold.driver_compliance").df()
    df.to_excel("data/output/driver_compliance.xlsx", index=False)

    print("Export complete: data/output/driver_compliance.xlsx")

if __name__ == "__main__":
    try:
        main()
        notify_slack("Pipeline succeeded")
    except Exception as e:
        notify_slack(f"Pipeline failed: {str(e)}")
        raise

Run it

bash
pip install -r requirements.txt
python main.py

Expected Output

One row from gold.driver_compliance:

text
driver_id: 101
license_status: Active
violation_count: 1
defensive_driving_completed: true
claim_count: 2
total_claim_amount: 800
compliance_status: Review Needed