Section 15

Cloud Data System (Postgres / AWS)

S3 for landing, ECS or Lambda for jobs, Postgres for the warehouse.

Architecture

flow
Source systems
   │
   ▼
S3  s3://co-data/bronze/<source>/<entity>/dt=YYYY-MM-DD/*.parquet
   │
   ▼
ECS task / Lambda  (Python: extract → land in S3)
   │
   ▼
Postgres (RDS)
   ├── bronze.*    (raw, append-only, partitioned by load_date)
   ├── silver.*    (cleaned, conformed)
   └── gold.*      (marts, materialized views)
   │
   ▼
Outputs: Metabase / Superset / FastAPI / reverse-ETL

S3 layout

s3://co-data/
bronze/
  claims/
    dt=2025-01-15/part-0001.parquet
silver/
  claims_clean/
    dt=2025-01-15/part-0001.parquet
gold/
  claims_summary/
    dt=2025-01-15/part-0001.parquet
audit/
  pipeline_runs/

Loading bronze from S3

load_bronze.sql
-- Postgres with aws_s3 extension
SELECT aws_s3.table_import_from_s3(
  'bronze.raw_claims',
  '',
  '(format parquet)',
  aws_commons.create_s3_uri('co-data', 'bronze/claims/dt=2025-01-15/', 'us-east-1')
);

Job runner (ECS task)

jobs/run_claims.py
import os, boto3, psycopg
from datetime import date

S3 = boto3.client("s3")
PG_DSN = os.environ["PG_DSN"]

def main():
    today = date.today().isoformat()
    # 1. extract → s3
    extract_to_s3(today)
    # 2. trigger sql transforms in postgres
    with psycopg.connect(PG_DSN) as cx, cx.cursor() as cur:
        cur.execute("CALL silver.refresh_claims_clean(%s)", [today])
        cur.execute("CALL gold.refresh_claims_summary(%s)", [today])
        cx.commit()

Output layer

  • BI: Metabase or Superset connected directly to gold.*.
  • API: FastAPI service reading materialized views.
  • Reverse-ETL: Hightouch / Census back into Salesforce, etc.
Don't
Don't let BI tools query silver directly. Always go through gold. It enforces the contract and keeps performance predictable.