Section 15
Cloud Data System (Postgres / AWS)
S3 for landing, ECS or Lambda for jobs, Postgres for the warehouse.
Architecture
flow
Source systems
│
▼
S3 s3://co-data/bronze/<source>/<entity>/dt=YYYY-MM-DD/*.parquet
│
▼
ECS task / Lambda (Python: extract → land in S3)
│
▼
Postgres (RDS)
├── bronze.* (raw, append-only, partitioned by load_date)
├── silver.* (cleaned, conformed)
└── gold.* (marts, materialized views)
│
▼
Outputs: Metabase / Superset / FastAPI / reverse-ETLS3 layout
s3://co-data/
bronze/
claims/
dt=2025-01-15/part-0001.parquet
silver/
claims_clean/
dt=2025-01-15/part-0001.parquet
gold/
claims_summary/
dt=2025-01-15/part-0001.parquet
audit/
pipeline_runs/Loading bronze from S3
load_bronze.sql
-- Postgres with aws_s3 extension
SELECT aws_s3.table_import_from_s3(
'bronze.raw_claims',
'',
'(format parquet)',
aws_commons.create_s3_uri('co-data', 'bronze/claims/dt=2025-01-15/', 'us-east-1')
);Job runner (ECS task)
jobs/run_claims.py
import os, boto3, psycopg
from datetime import date
S3 = boto3.client("s3")
PG_DSN = os.environ["PG_DSN"]
def main():
today = date.today().isoformat()
# 1. extract → s3
extract_to_s3(today)
# 2. trigger sql transforms in postgres
with psycopg.connect(PG_DSN) as cx, cx.cursor() as cur:
cur.execute("CALL silver.refresh_claims_clean(%s)", [today])
cur.execute("CALL gold.refresh_claims_summary(%s)", [today])
cx.commit()Output layer
- BI: Metabase or Superset connected directly to
gold.*. - API: FastAPI service reading materialized views.
- Reverse-ETL: Hightouch / Census back into Salesforce, etc.
Don't
Don't let BI tools query
silver directly. Always go through gold. It enforces the contract and keeps performance predictable.