Section 07
End-to-End Example: Driver Compliance System
A complete flow from input files to final Excel report.
Data flow
text
claims.csv + mvr.csv + training.csv
↓
bronze.raw_claims / bronze.raw_mvr / bronze.raw_training
↓
silver.claims_clean / silver.mvr_clean / silver.training_clean
↓
gold.driver_compliance
↓
driver_compliance.xlsxFolder structure
text
driver-compliance-system/
data/
input/
claims.csv
mvr.csv
training.csv
output/
sql/
bronze.sql
silver.sql
gold.sql
main.py
requirements.txtAlternative breakdown (advanced use)
For larger systems you may split
main.py into extract.py, load.py, transform.py, and export.py. This is optional — the canonical entry point is still python main.py.Join logic
The system assumes:
driver_id = employee_idacross all datasets
This simplifies joining but may not be true in real systems.
requirements.txt
txt
duckdb
pandas
openpyxl
requestsSample Data
Place these files in data/input/ before running the system.
claims.csv
claim_id,employee_id,status,amount
1,101,open,500
2,102,closed,1200
3,101,closed,300mvr.csv
driver_id,violation_count,license_status
101,1,Active
102,0,Activetraining.csv
employee_id,defensive_driving_completed,completion_date
101,true,2024-01-01
102,false,Bronze SQL
sql/bronze.sql
CREATE SCHEMA IF NOT EXISTS bronze;
CREATE SCHEMA IF NOT EXISTS silver;
CREATE SCHEMA IF NOT EXISTS gold;
CREATE SCHEMA IF NOT EXISTS audit;
CREATE OR REPLACE TABLE bronze.raw_claims AS
SELECT *, current_timestamp AS _loaded_at
FROM read_csv_auto('data/input/claims.csv');
CREATE OR REPLACE TABLE bronze.raw_mvr AS
SELECT *, current_timestamp AS _loaded_at
FROM read_csv_auto('data/input/mvr.csv');
CREATE OR REPLACE TABLE bronze.raw_training AS
SELECT *, current_timestamp AS _loaded_at
FROM read_csv_auto('data/input/training.csv');Silver SQL
sql/silver.sql
CREATE OR REPLACE TABLE silver.claims_clean AS
SELECT
claim_id,
employee_id,
status,
amount,
_loaded_at
FROM bronze.raw_claims
WHERE claim_id IS NOT NULL;
CREATE OR REPLACE TABLE silver.mvr_clean AS
SELECT
driver_id,
violation_count,
license_status,
_loaded_at
FROM bronze.raw_mvr
WHERE driver_id IS NOT NULL;
CREATE OR REPLACE TABLE silver.training_clean AS
SELECT
employee_id,
defensive_driving_completed,
completion_date,
_loaded_at
FROM bronze.raw_training
WHERE employee_id IS NOT NULL;Gold SQL
sql/gold.sql
CREATE OR REPLACE TABLE gold.driver_compliance AS
SELECT
m.driver_id,
m.license_status,
m.violation_count,
COALESCE(t.defensive_driving_completed, false) AS defensive_driving_completed,
COUNT(c.claim_id) AS claim_count,
SUM(COALESCE(c.amount, 0)) AS total_claim_amount,
CASE
WHEN m.license_status <> 'Active' THEN 'Non-Compliant'
WHEN m.violation_count > 0 THEN 'Review Needed'
WHEN COALESCE(t.defensive_driving_completed, false) = false THEN 'Training Missing'
ELSE 'Compliant'
END AS compliance_status
FROM silver.mvr_clean m
LEFT JOIN silver.training_clean t
ON m.driver_id = t.employee_id
LEFT JOIN silver.claims_clean c
ON m.driver_id = c.employee_id
GROUP BY
m.driver_id,
m.license_status,
m.violation_count,
t.defensive_driving_completed;Python runner
main.py
import duckdb
import requests
DB_PATH = "driver_compliance.duckdb"
SLACK_WEBHOOK_URL = "https://hooks.slack.com/services/XXX/YYY/ZZZ"
def notify_slack(message):
requests.post(SLACK_WEBHOOK_URL, json={"text": message})
def run_sql_file(con, path):
with open(path, "r") as file:
con.execute(file.read())
def main():
con = duckdb.connect(DB_PATH)
run_sql_file(con, "sql/bronze.sql")
run_sql_file(con, "sql/silver.sql")
run_sql_file(con, "sql/gold.sql")
df = con.execute("SELECT * FROM gold.driver_compliance").df()
df.to_excel("data/output/driver_compliance.xlsx", index=False)
print("Export complete: data/output/driver_compliance.xlsx")
if __name__ == "__main__":
try:
main()
notify_slack("Pipeline succeeded")
except Exception as e:
notify_slack(f"Pipeline failed: {str(e)}")
raiseRun it
bash
pip install -r requirements.txt
python main.pyExpected Output
One row from gold.driver_compliance:
text
driver_id: 101
license_status: Active
violation_count: 1
defensive_driving_completed: true
claim_count: 2
total_claim_amount: 800
compliance_status: Review Needed