Terminal Uploader - ETL BigQuery a Supabase
Pipeline ETL automatizado que extrae datos incrementales de BigQuery y los carga a Supabase (PostgreSQL) para alimentar la aplicación web de Terminal construida con Lovable.
Nota: Este componente es parte del Proyecto Terminal, que incluye el pipeline ETL, la aplicación web y la integración completa con el Data Warehouse.
Descripción
El servicio terminal-uploader-prod es un proceso ETL que sincroniza datos de terminales de carga desde BigQuery (solvento-adv-analytics-prod) hacia Supabase, donde la aplicación web solvento-terminal (construida con Lovable) consume los datos para visualización y análisis.
Flujo de Datos
graph TB
subgraph bq["BigQuery<br/>solvento-adv-analytics-prod"]
ClientDBT[terminal.client_dbt<br/>Datos de Clientes Terminales]
MarketDBT[terminal.market_dbt<br/>Datos de Mercado Terminal]
end
subgraph gcp["Google Cloud Platform"]
Scheduler[Cloud Scheduler<br/>Cron: 10:30 AM diario]
CloudRun[Cloud Run<br/>terminal-uploader-prod]
SecretManager[Secret Manager<br/>supabase-db-url]
end
subgraph supabase["Supabase<br/>PostgreSQL"]
FreightShipments[freight_shipments<br/>Tabla de Envíos]
Market[market<br/>Tabla de Mercado]
ETLControl[etl_control<br/>Tabla de Control]
end
subgraph app["Aplicación Web"]
LovableApp[Solvento Terminal<br/>Lovable + React]
end
Scheduler -->|"HTTP GET /run"| CloudRun
SecretManager -->|"Connection String"| CloudRun
CloudRun -->|"Query incremental<br/>WHERE etl_dts > watermark"| ClientDBT
CloudRun -->|"Query incremental<br/>WHERE etl_dts > watermark"| MarketDBT
CloudRun -->|"UPSERT por batches"| FreightShipments
CloudRun -->|"UPSERT por batches"| Market
CloudRun -->|"Actualiza watermark"| ETLControl
FreightShipments -->|"Consulta datos"| LovableApp
Market -->|"Consulta datos"| LovableApp
Arquitectura de la Solución
Componentes Principales
1. Cloud Run Service (terminal-uploader-prod)
Responsabilidades: - Orquestar el flujo ETL completo - Extraer datos incrementales de BigQuery - Cargar datos a Supabase con UPSERT - Gestionar watermarks para carga incremental - Manejar errores y logging
Configuración:
- Runtime: Python 3.11
- Entry Point: main.py (Flask app)
- Endpoint: /run (GET) - Ejecuta el pipeline
- Endpoint: /health (GET) - Health check
- Trigger: Cloud Scheduler (HTTP GET)
- Repositorio: terminal-uploader (ubicado en /Users/solvento/Desktop/terminal-uploader)
2. BigQuery (Origen)
Proyecto: solvento-adv-analytics-prod
Dataset: terminal
Tablas Fuente:
- client_dbt: Datos de clientes terminales transformados por dbt
- Mapea a tabla Supabase: freight_shipments
- Columnas: uuid, usage, status, currency, issuedat, subtotal, createdat, issuer_rfc, issuer_name, paymenttype, receiver_rfc, paymentmethod, receiver_name, rfc_cartaporte, total_dist_rec, fecha_salida, estado_origen, municipio_origen, codigo_postal_origen, nombre_origen, fecha_llegada, estado_destino, municipio_destino, codigo_postal_destino, customer, autotransporte_remolques, placa_vehiculo, anio_modelo, cargo_type, total_peso_mercancias, total_cantidad_mercancias, distinct_dimensiones, distinct_unidad_peso, distinct_clave_unidad, has_material_peligroso, etl_dts
market_dbt: Datos de mercado terminal transformados por dbt- Mapea a tabla Supabase:
market - Columnas: uuid, fecha, subtotal_route_price, original_currency, emisor, receptor, total_dist_rec, peso_neto_total, tipo_de_comprobante, num_total_mercancias, autotransporte_perm_sct, autotransporte_config_vehicular, estado_origen, municipio_origen, estado_destino, municipio_destino, cargo_type, total_peso_mercancias, total_cantidad_mercancias, total_valor_mercancias, distinct_dimensiones, distinct_unidad_peso, distinct_clave_unidad, has_material_peligroso, etl_dts
Campo Clave: etl_dts (timestamp de ETL) - utilizado para carga incremental
3. Supabase (Destino)
Base de Datos: PostgreSQL (hosted en Supabase)
Tablas Destino:
- freight_shipments: Envíos de carga de clientes terminales
- Primary Key: uuid
- Constraint: UNIQUE en uuid (para UPSERT)
- Índices: uuid, createdat, customer
market: Datos agregados de mercado terminal- Primary Key:
uuid - Constraint: UNIQUE en
uuid(para UPSERT) -
Índices: uuid, fecha, emisor, receptor, estado_origen, estado_destino, cargo_type
-
etl_control: Tabla de control para gestión de watermarks - Columnas: table_name (PK), last_etl_dts, last_execution, status, records_processed, error_message
4. Cloud Scheduler
Job: Terminal_Uploader
Configuración:
- Schedule: 30 10 * * * (diario a las 10:30 AM)
- Time Zone: America/Buenos_Aires
- Target: terminal-uploader-prod (Cloud Run service)
- Endpoint: /run
- HTTP Method: GET
- Estado: ENABLED
5. Aplicación Web (Solvento Terminal)
Plataforma: Lovable (React + TypeScript + Supabase)
Repositorio: solvento-terminal (ubicado en /Users/solvento/Desktop/solvento-terminal)
Funcionalidad:
- Dashboard de analytics de terminales
- Visualización de datos de envíos (freight_shipments)
- Análisis de mercado terminal (market)
- Filtros y búsquedas interactivas
- Integración directa con Supabase para consultas en tiempo real
Proceso ETL
Flujo de Ejecución
- Trigger: Cloud Scheduler invoca el endpoint
/rundel servicio Cloud Run - Inicialización: El servicio carga configuración y credenciales desde Secret Manager
- Para cada tabla (
freight_shipments,market): a. Obtener Watermark: Leelast_etl_dtsdesdeetl_controlen Supabase b. Extraer Datos: Consulta BigQuery conWHERE etl_dts > last_etl_dtsc. Procesar en Batches: Divide los registros en lotes (configurables por tabla) d. Cargar a Supabase: Ejecuta UPSERT por batches usandouuidcomo clave e. Actualizar Watermark: Actualizalast_etl_dtscon el máximoetl_dtsprocesado f. Registrar Estado: Actualizaetl_controlcon status, records_processed, last_execution
Características del Pipeline
Carga Incremental
- Solo procesa registros nuevos o modificados usando el campo
etl_dts - Reduce significativamente el volumen de datos procesados
- Permite reprocesar histórico si cambia
etl_dts
Query Incremental:
SELECT col1, col2, ..., etl_dts
FROM `solvento-adv-analytics-prod.terminal.client_dbt`
WHERE etl_dts > TIMESTAMP('2024-01-01 00:00:00')
ORDER BY etl_dts ASC
UPSERT Logic
- Maneja actualizaciones de registros históricos automáticamente
- Evita duplicados en tabla destino
- Basado en campo
uuid(business key)
UPSERT Template:
INSERT INTO freight_shipments (uuid, col1, col2, ...)
VALUES (val1, val2, ...), ...
ON CONFLICT (uuid)
DO UPDATE SET
col1 = EXCLUDED.col1,
col2 = EXCLUDED.col2,
...
Procesamiento por Batches
- Divide datasets grandes en chunks manejables
- Reduce uso de memoria
- Permite procesamiento paralelo
Configuración de Batches: - freight_shipments: Batch size = 1,000 registros, Workers = 4 - market: Batch size = 250 registros, Workers = 1 (configuración especial por tamaño)
Connection Pooling
- Reutiliza conexiones entre batches
- Pool de 2-10 conexiones según carga
- Reduce overhead de establecer conexiones
Watermark Management
- Estado persistido en tabla
etl_controlen Supabase - Solo se actualiza después de carga exitosa
- Permite reintentos sin perder progreso
- Maneja errores y marca status
Configuración
Variables de Entorno
| Variable | Descripción | Default |
|---|---|---|
BQ_PROJECT_ID |
Proyecto de BigQuery | solvento-adv-analytics-prod |
BQ_DATASET_CLIENT |
Dataset para client_dbt | terminal |
BQ_DATASET_MARKET |
Dataset para market_dbt | terminal |
SUPABASE_SECRET_NAME |
Nombre del secreto en Secret Manager | supabase-db-url |
GCP_PROJECT_ID |
Proyecto GCP para Secret Manager | solvento-adv-analytics-prod |
BATCH_SIZE |
Tamaño de batch (freight_shipments) | 1000 |
BATCH_SIZE_MARKET |
Tamaño de batch (market) | 250 |
MAX_WORKERS |
Workers paralelos (freight_shipments) | 4 |
MAX_WORKERS_MARKET |
Workers paralelos (market) | 1 |
Secret Manager
El servicio requiere un secreto en GCP Secret Manager:
Nombre: supabase-db-url
Contenido: Connection string de PostgreSQL de Supabase
Monitoreo y Métricas
Logs
Los logs se almacenan en Cloud Logging y incluyen: - Inicio y fin de ejecución por tabla - Número de registros extraídos y cargados - Watermarks actualizados - Errores y excepciones - Tiempo de ejecución
Ver logs:
Estado del Pipeline
El estado se puede consultar directamente en Supabase:
Campos de estado:
- table_name: Nombre de la tabla procesada
- last_etl_dts: Último timestamp procesado
- last_execution: Fecha/hora de última ejecución
- status: Estado (success, error, running)
- records_processed: Número de registros procesados
- error_message: Mensaje de error si hubo falla
Métricas Clave
- Registros procesados por ejecución: Por tabla
- Tiempo de ejecución: Duración total del pipeline
- Errores y reintentos: Tasa de éxito
- Gap de datos: Diferencia entre BigQuery y Supabase (monitoreo de integridad)
Relaciones con Otros Componentes
Fuentes de Datos
- BigQuery (
solvento-adv-analytics-prod.terminal): Datos transformados por dbt desdesolvento-data-prod - dbt: Transforma datos raw hacia
solvento-adv-analytics-prod.terminal.client_dbtymarket_dbt
Destinos
- Supabase PostgreSQL: Almacenamiento para aplicación web
- Solvento Terminal (Lovable): Aplicación web que consume datos de Supabase
Orquestación
- Cloud Scheduler: Ejecuta el pipeline diariamente
- Cloud Run: Ejecuta el código ETL
Patrones de Uso
Carga Incremental Diaria
El pipeline se ejecuta diariamente a las 10:30 AM para sincronizar datos nuevos desde BigQuery hacia Supabase.
UPSERT para Actualizaciones
Los datos históricos que se actualizan en BigQuery se reflejan automáticamente en Supabase mediante UPSERT.
Append-Only en Supabase
El pipeline implementa UPSERT pero NO elimina registros. Si BigQuery elimina registros, Supabase los conserva para mantener historial completo.
Implicaciones: - ✓ Ventaja: Mantiene historial completo para auditoría - Trade-off: Los counts pueden no coincidir exactamente (gap < 1% es aceptable)
Troubleshooting
Error: "No new records to process"
Es normal si no hay datos nuevos desde la última ejecución. El watermark previene reprocesamiento innecesario.
Error: "Connection timeout"
Aumentar el timeout del servicio Cloud Run o verificar conectividad con Supabase.
Error: "Memory limit exceeded"
Reducir el BATCH_SIZE en la configuración o aumentar memoria del servicio Cloud Run.
Gap en los Datos
Si hay registros faltantes entre BigQuery y Supabase:
-
Verificar watermark:
-
Resetear watermark (solo si es necesario):
-
Ejecutar pipeline manualmente:
Tabla market con Problemas de Performance
La tabla market tiene configuración especial debido a su tamaño:
- Batch size: 250 (vs 1,000 default)
- Workers: 1 (vs 4 default)
- UNIQUE constraint en uuid requerido
Verificar constraint:
SELECT constraint_name, constraint_type
FROM information_schema.table_constraints
WHERE table_name = 'market' AND constraint_type = 'UNIQUE';
Referencias
- Repositorio ETL:
terminal-uploader(ubicado en/Users/solvento/Desktop/terminal-uploader) - Repositorio Aplicación:
solvento-terminal(ubicado en/Users/solvento/Desktop/solvento-terminal) - Cloud Run Service:
terminal-uploader-prod - Cloud Scheduler Job:
Terminal_Uploader - Documentación de Cloud Run
- Documentación de Supabase