Skip to content

Terminal Uploader - ETL BigQuery a Supabase

Pipeline ETL automatizado que extrae datos incrementales de BigQuery y los carga a Supabase (PostgreSQL) para alimentar la aplicación web de Terminal construida con Lovable.

Nota: Este componente es parte del Proyecto Terminal, que incluye el pipeline ETL, la aplicación web y la integración completa con el Data Warehouse.

Descripción

El servicio terminal-uploader-prod es un proceso ETL que sincroniza datos de terminales de carga desde BigQuery (solvento-adv-analytics-prod) hacia Supabase, donde la aplicación web solvento-terminal (construida con Lovable) consume los datos para visualización y análisis.

Flujo de Datos

graph TB
    subgraph bq["BigQuery<br/>solvento-adv-analytics-prod"]
        ClientDBT[terminal.client_dbt<br/>Datos de Clientes Terminales]
        MarketDBT[terminal.market_dbt<br/>Datos de Mercado Terminal]
    end

    subgraph gcp["Google Cloud Platform"]
        Scheduler[Cloud Scheduler<br/>Cron: 10:30 AM diario]
        CloudRun[Cloud Run<br/>terminal-uploader-prod]
        SecretManager[Secret Manager<br/>supabase-db-url]
    end

    subgraph supabase["Supabase<br/>PostgreSQL"]
        FreightShipments[freight_shipments<br/>Tabla de Envíos]
        Market[market<br/>Tabla de Mercado]
        ETLControl[etl_control<br/>Tabla de Control]
    end

    subgraph app["Aplicación Web"]
        LovableApp[Solvento Terminal<br/>Lovable + React]
    end

    Scheduler -->|"HTTP GET /run"| CloudRun
    SecretManager -->|"Connection String"| CloudRun

    CloudRun -->|"Query incremental<br/>WHERE etl_dts > watermark"| ClientDBT
    CloudRun -->|"Query incremental<br/>WHERE etl_dts > watermark"| MarketDBT

    CloudRun -->|"UPSERT por batches"| FreightShipments
    CloudRun -->|"UPSERT por batches"| Market
    CloudRun -->|"Actualiza watermark"| ETLControl

    FreightShipments -->|"Consulta datos"| LovableApp
    Market -->|"Consulta datos"| LovableApp

Arquitectura de la Solución

Componentes Principales

1. Cloud Run Service (terminal-uploader-prod)

Responsabilidades: - Orquestar el flujo ETL completo - Extraer datos incrementales de BigQuery - Cargar datos a Supabase con UPSERT - Gestionar watermarks para carga incremental - Manejar errores y logging

Configuración: - Runtime: Python 3.11 - Entry Point: main.py (Flask app) - Endpoint: /run (GET) - Ejecuta el pipeline - Endpoint: /health (GET) - Health check - Trigger: Cloud Scheduler (HTTP GET) - Repositorio: terminal-uploader (ubicado en /Users/solvento/Desktop/terminal-uploader)

2. BigQuery (Origen)

Proyecto: solvento-adv-analytics-prod

Dataset: terminal

Tablas Fuente: - client_dbt: Datos de clientes terminales transformados por dbt - Mapea a tabla Supabase: freight_shipments - Columnas: uuid, usage, status, currency, issuedat, subtotal, createdat, issuer_rfc, issuer_name, paymenttype, receiver_rfc, paymentmethod, receiver_name, rfc_cartaporte, total_dist_rec, fecha_salida, estado_origen, municipio_origen, codigo_postal_origen, nombre_origen, fecha_llegada, estado_destino, municipio_destino, codigo_postal_destino, customer, autotransporte_remolques, placa_vehiculo, anio_modelo, cargo_type, total_peso_mercancias, total_cantidad_mercancias, distinct_dimensiones, distinct_unidad_peso, distinct_clave_unidad, has_material_peligroso, etl_dts

  • market_dbt: Datos de mercado terminal transformados por dbt
  • Mapea a tabla Supabase: market
  • Columnas: uuid, fecha, subtotal_route_price, original_currency, emisor, receptor, total_dist_rec, peso_neto_total, tipo_de_comprobante, num_total_mercancias, autotransporte_perm_sct, autotransporte_config_vehicular, estado_origen, municipio_origen, estado_destino, municipio_destino, cargo_type, total_peso_mercancias, total_cantidad_mercancias, total_valor_mercancias, distinct_dimensiones, distinct_unidad_peso, distinct_clave_unidad, has_material_peligroso, etl_dts

Campo Clave: etl_dts (timestamp de ETL) - utilizado para carga incremental

3. Supabase (Destino)

Base de Datos: PostgreSQL (hosted en Supabase)

Tablas Destino: - freight_shipments: Envíos de carga de clientes terminales - Primary Key: uuid - Constraint: UNIQUE en uuid (para UPSERT) - Índices: uuid, createdat, customer

  • market: Datos agregados de mercado terminal
  • Primary Key: uuid
  • Constraint: UNIQUE en uuid (para UPSERT)
  • Índices: uuid, fecha, emisor, receptor, estado_origen, estado_destino, cargo_type

  • etl_control: Tabla de control para gestión de watermarks

  • Columnas: table_name (PK), last_etl_dts, last_execution, status, records_processed, error_message

4. Cloud Scheduler

Job: Terminal_Uploader

Configuración: - Schedule: 30 10 * * * (diario a las 10:30 AM) - Time Zone: America/Buenos_Aires - Target: terminal-uploader-prod (Cloud Run service) - Endpoint: /run - HTTP Method: GET - Estado: ENABLED

5. Aplicación Web (Solvento Terminal)

Plataforma: Lovable (React + TypeScript + Supabase)

Repositorio: solvento-terminal (ubicado en /Users/solvento/Desktop/solvento-terminal)

Funcionalidad: - Dashboard de analytics de terminales - Visualización de datos de envíos (freight_shipments) - Análisis de mercado terminal (market) - Filtros y búsquedas interactivas - Integración directa con Supabase para consultas en tiempo real

Proceso ETL

Flujo de Ejecución

  1. Trigger: Cloud Scheduler invoca el endpoint /run del servicio Cloud Run
  2. Inicialización: El servicio carga configuración y credenciales desde Secret Manager
  3. Para cada tabla (freight_shipments, market): a. Obtener Watermark: Lee last_etl_dts desde etl_control en Supabase b. Extraer Datos: Consulta BigQuery con WHERE etl_dts > last_etl_dts c. Procesar en Batches: Divide los registros en lotes (configurables por tabla) d. Cargar a Supabase: Ejecuta UPSERT por batches usando uuid como clave e. Actualizar Watermark: Actualiza last_etl_dts con el máximo etl_dts procesado f. Registrar Estado: Actualiza etl_control con status, records_processed, last_execution

Características del Pipeline

Carga Incremental

  • Solo procesa registros nuevos o modificados usando el campo etl_dts
  • Reduce significativamente el volumen de datos procesados
  • Permite reprocesar histórico si cambia etl_dts

Query Incremental:

SELECT col1, col2, ..., etl_dts
FROM `solvento-adv-analytics-prod.terminal.client_dbt`
WHERE etl_dts > TIMESTAMP('2024-01-01 00:00:00')
ORDER BY etl_dts ASC

UPSERT Logic

  • Maneja actualizaciones de registros históricos automáticamente
  • Evita duplicados en tabla destino
  • Basado en campo uuid (business key)

UPSERT Template:

INSERT INTO freight_shipments (uuid, col1, col2, ...)
VALUES (val1, val2, ...), ...
ON CONFLICT (uuid)
DO UPDATE SET
    col1 = EXCLUDED.col1,
    col2 = EXCLUDED.col2,
    ...

Procesamiento por Batches

  • Divide datasets grandes en chunks manejables
  • Reduce uso de memoria
  • Permite procesamiento paralelo

Configuración de Batches: - freight_shipments: Batch size = 1,000 registros, Workers = 4 - market: Batch size = 250 registros, Workers = 1 (configuración especial por tamaño)

Connection Pooling

  • Reutiliza conexiones entre batches
  • Pool de 2-10 conexiones según carga
  • Reduce overhead de establecer conexiones

Watermark Management

  • Estado persistido en tabla etl_control en Supabase
  • Solo se actualiza después de carga exitosa
  • Permite reintentos sin perder progreso
  • Maneja errores y marca status

Configuración

Variables de Entorno

Variable Descripción Default
BQ_PROJECT_ID Proyecto de BigQuery solvento-adv-analytics-prod
BQ_DATASET_CLIENT Dataset para client_dbt terminal
BQ_DATASET_MARKET Dataset para market_dbt terminal
SUPABASE_SECRET_NAME Nombre del secreto en Secret Manager supabase-db-url
GCP_PROJECT_ID Proyecto GCP para Secret Manager solvento-adv-analytics-prod
BATCH_SIZE Tamaño de batch (freight_shipments) 1000
BATCH_SIZE_MARKET Tamaño de batch (market) 250
MAX_WORKERS Workers paralelos (freight_shipments) 4
MAX_WORKERS_MARKET Workers paralelos (market) 1

Secret Manager

El servicio requiere un secreto en GCP Secret Manager:

Nombre: supabase-db-url

Contenido: Connection string de PostgreSQL de Supabase

postgresql://postgres:PASSWORD@db.PROJECT_REF.supabase.co:5432/postgres

Monitoreo y Métricas

Logs

Los logs se almacenan en Cloud Logging y incluyen: - Inicio y fin de ejecución por tabla - Número de registros extraídos y cargados - Watermarks actualizados - Errores y excepciones - Tiempo de ejecución

Ver logs:

gcloud run services logs read terminal-uploader-prod --region=us-central1 --limit=50

Estado del Pipeline

El estado se puede consultar directamente en Supabase:

SELECT * FROM etl_control ORDER BY last_execution DESC;

Campos de estado: - table_name: Nombre de la tabla procesada - last_etl_dts: Último timestamp procesado - last_execution: Fecha/hora de última ejecución - status: Estado (success, error, running) - records_processed: Número de registros procesados - error_message: Mensaje de error si hubo falla

Métricas Clave

  • Registros procesados por ejecución: Por tabla
  • Tiempo de ejecución: Duración total del pipeline
  • Errores y reintentos: Tasa de éxito
  • Gap de datos: Diferencia entre BigQuery y Supabase (monitoreo de integridad)

Relaciones con Otros Componentes

Fuentes de Datos

  • BigQuery (solvento-adv-analytics-prod.terminal): Datos transformados por dbt desde solvento-data-prod
  • dbt: Transforma datos raw hacia solvento-adv-analytics-prod.terminal.client_dbt y market_dbt

Destinos

  • Supabase PostgreSQL: Almacenamiento para aplicación web
  • Solvento Terminal (Lovable): Aplicación web que consume datos de Supabase

Orquestación

  • Cloud Scheduler: Ejecuta el pipeline diariamente
  • Cloud Run: Ejecuta el código ETL

Patrones de Uso

Carga Incremental Diaria

El pipeline se ejecuta diariamente a las 10:30 AM para sincronizar datos nuevos desde BigQuery hacia Supabase.

UPSERT para Actualizaciones

Los datos históricos que se actualizan en BigQuery se reflejan automáticamente en Supabase mediante UPSERT.

Append-Only en Supabase

El pipeline implementa UPSERT pero NO elimina registros. Si BigQuery elimina registros, Supabase los conserva para mantener historial completo.

Implicaciones: - ✓ Ventaja: Mantiene historial completo para auditoría - Trade-off: Los counts pueden no coincidir exactamente (gap < 1% es aceptable)

Troubleshooting

Error: "No new records to process"

Es normal si no hay datos nuevos desde la última ejecución. El watermark previene reprocesamiento innecesario.

Error: "Connection timeout"

Aumentar el timeout del servicio Cloud Run o verificar conectividad con Supabase.

Error: "Memory limit exceeded"

Reducir el BATCH_SIZE en la configuración o aumentar memoria del servicio Cloud Run.

Gap en los Datos

Si hay registros faltantes entre BigQuery y Supabase:

  1. Verificar watermark:

    SELECT * FROM etl_control WHERE table_name = 'freight_shipments';
    

  2. Resetear watermark (solo si es necesario):

    UPDATE etl_control 
    SET last_etl_dts = '2023-01-01 00:00:00+00'
    WHERE table_name = 'freight_shipments';
    

  3. Ejecutar pipeline manualmente:

    curl https://terminal-uploader-prod-gqomdmitoa-uc.a.run.app/run
    

Tabla market con Problemas de Performance

La tabla market tiene configuración especial debido a su tamaño: - Batch size: 250 (vs 1,000 default) - Workers: 1 (vs 4 default) - UNIQUE constraint en uuid requerido

Verificar constraint:

SELECT constraint_name, constraint_type 
FROM information_schema.table_constraints 
WHERE table_name = 'market' AND constraint_type = 'UNIQUE';

Referencias

  • Repositorio ETL: terminal-uploader (ubicado en /Users/solvento/Desktop/terminal-uploader)
  • Repositorio Aplicación: solvento-terminal (ubicado en /Users/solvento/Desktop/solvento-terminal)
  • Cloud Run Service: terminal-uploader-prod
  • Cloud Scheduler Job: Terminal_Uploader
  • Documentación de Cloud Run
  • Documentación de Supabase