Arquitectura 5.0
La Arquitectura 5.0 es un patrón de extracción de datos de la API de Syntage que permite procesar eventos de creación o actualización de invoices para realizar extracciones personalizadas (carta porte, tax_compliance, etc.) de manera desacoplada, escalable y con control de rate limits.
Descripción
Esta arquitectura implementa un flujo event-driven que desacopla la conexión a la API de Syntage del procesamiento de datos. Cuando se recibe un webhook de Syntage con un evento de invoice, se utiliza para hacer un llamado custom a la API y realizar extracciones específicas. El procesamiento de cartas porte utiliza Cloud Run con reintentos mediante Cloud Tasks, permitiendo manejar de manera eficiente la conexión a la API, el escalamiento del proceso de Dataflow, y el control de rate limits.
Valor clave: Permite generar extracciones asíncronas usando Cloud Tasks para encolar solicitudes a la API, mejorando la resiliencia y el manejo de carga.
Flujo de Datos
graph TB
subgraph syntage["Syntage"]
Webhook[Syntage Webhook<br/>Eventos de Invoice]
API[Syntage API<br/>Carta Porte, Tax Compliance, etc.]
end
subgraph ingesta["Capa de Ingesta"]
WebhookService[Cloud Run<br/>Webhook Syntage]
PushTopic[Pub/Sub<br/>cp-api-push]
end
subgraph procesamiento["Capa de Procesamiento"]
RetryService[Cloud Run<br/>cp-api-retry-events]
end
subgraph reintentos["Manejo de Reintentos"]
CloudTasks[Cloud Tasks<br/>cp-api-retry-job]
end
subgraph exito["Ruta de Éxito"]
SuccessTopic[Pub/Sub<br/>cp-api-success]
DataflowJob[Dataflow<br/>cp-api-multischema]
end
subgraph error["Ruta de Error"]
FailedTopic[Pub/Sub<br/>cp-api-failed]
ErrorBucket[Cloud Storage<br/>errors bucket]
end
subgraph almacenamiento["Almacenamiento"]
BigQuery[BigQuery<br/>solvento-data-prod]
end
Webhook -->|"HTTP POST<br/>Evento Invoice"| WebhookService
WebhookService -->|"Publica evento"| PushTopic
PushTopic -->|"Consume mensaje"| RetryService
RetryService -->|"1. Request"| API
API -->|"2. Response"| RetryService
RetryService -->|"3. Si requiere reintento"| CloudTasks
CloudTasks -->|"4. Reintento"| RetryService
RetryService -->|"5. Éxito"| SuccessTopic
RetryService -->|"5. Error"| FailedTopic
SuccessTopic -->|"Consume mensajes"| DataflowJob
FailedTopic -->|"Consume mensajes"| ErrorBucket
DataflowJob -->|"Escribe datos raw"| BigQuery
Flujo Principal
- Recepción del Webhook: Syntage envía un webhook con evento de creación o actualización de invoice al servicio Cloud Run "Webhook Syntage"
- Publicación Inicial: El servicio Cloud Run publica el evento al topic Pub/Sub
cp-api-push - Procesamiento: El servicio Cloud Run
cp-api-retry-eventsconsume mensajes directamente del topic Pub/Sub - Llamada a API: El servicio realiza una llamada custom a la API de Syntage para extraer el documento específico (carta porte, tax_compliance, etc.)
- Manejo de Reintentos: Si la llamada requiere reintento, se crea una tarea en Cloud Tasks
cp-api-retry-jobque reintenta la llamada. El tiempo máximo de espera entre reintentos es de 30 minutos - Rutas de Resultado:
- Éxito: El resultado se publica al topic
cp-api-success, que es consumido por el job de Dataflowcp-api-multischema - Error: Los errores se publican al topic
cp-api-failed, que escribe a un bucket de Cloud Storage "errors" con ventanas de procesamiento de 5 minutos - Almacenamiento Final: Dataflow procesa los mensajes exitosos y escribe los datos raw a BigQuery en
solvento-data-prod
Recursos
Cloud Run Services
| Service Name | URL | Función |
|---|---|---|
webhook-syntage-inline-prod |
https://webhook-syntage-inline-prod-gqomdmitoa-uc.a.run.app |
Recibe webhooks de Syntage y publica a Pub/Sub |
cp-api-retry-events |
Por verificar | Procesa eventos y realiza llamadas a API de Syntage con reintentos |
Detalles de webhook-syntage-inline-prod:
- Pub/Sub Topic publicado: cp-api-push (para eventos de carta porte)
- Otros topics: También publica a invoices-webhook-prod, buro-webhook-prod, items-api-topic, tax-compliance-api-syntage-success
- Base URL Syntage: https://api.syntage.com
- Región: us-central1
- Proyecto: solvento-data-prod
Detalles de cp-api-retry-events:
- Consume de: Pub/Sub topic cp-api-push
- Publica a:
- cp-api-success (eventos exitosos)
- cp-api-failed (eventos con errores)
- Integración con Cloud Tasks: Crea tareas de reintento en cp-api-retry-job que invocan directamente al servicio
Pub/Sub Topics
| Topic Name | Publisher | Descripción | Formato Mensaje | Proyecto |
|---|---|---|---|---|
cp-api-push |
Cloud Run (webhook-syntage-inline-prod) | Recibe eventos de invoice que requieren extracción de carta porte | JSON (evento de invoice) | solvento-data-prod |
cp-api-success |
Cloud Run (cp-api-retry-events) | Eventos exitosos de extracción de carta porte | JSON (documento procesado) | solvento-data-prod |
cp-api-failed |
Cloud Run (cp-api-retry-events) | Eventos con errores en extracción de carta porte | JSON (error details) | solvento-data-prod |
Pub/Sub Subscriptions
| Subscription Name | Topic | Consumidor | Ack Deadline | Estado |
|---|---|---|---|---|
cp-api-push-subscription |
cp-api-push |
Cloud Run (cp-api-retry-events) | Por verificar | Activa |
cp-api-success-subscription |
cp-api-success |
Dataflow (cp-api-multischema) | 60 segundos | Activa |
cp-api-failed-subscription |
cp-api-failed |
Cloud Storage (errors bucket) | Por verificar | Activa |
Cloud Tasks
| Queue Name | Descripción | Tiempo Máximo de Espera | Target | Estado |
|---|---|---|---|---|
cp-api-retry-job |
Cola de reintentos para llamadas a API de Syntage | 30 minutos | Cloud Run (cp-api-retry-events) | Activa |
Configuración de Cloud Tasks:
- Máximo de reintentos: Configurado según política de rate limiting
- Tiempo de espera máximo: 30 minutos antes de reintentar
- Target: Invoca directamente al servicio Cloud Run cp-api-retry-events
Dataflow Jobs
| Job Name | Pub/Sub Topic | Subscription | Tipo | Estado |
|---|---|---|---|---|
cp-api-multischema-prod-20251205-191037 |
cp-api-success |
cp-api-success-subscription |
Streaming | Activo |
Detalles del Job:
- Función: Procesa mensajes exitosos de extracción de carta porte con soporte multi-schema
- Destino: BigQuery en solvento-data-prod
- Tipo: Streaming (procesamiento en tiempo real)
- Nota: El nombre del job incluye timestamp, por lo que puede variar entre ejecuciones
Cloud Storage
| Bucket Name | Descripción | Uso | Proyecto |
|---|---|---|---|
errors |
Almacena eventos fallidos de extracción de carta porte | Recibe mensajes del topic cp-api-failed con ventanas de procesamiento de 5 minutos |
solvento-data-prod |
Configuración del Bucket: - Ventanas de procesamiento: 5 minutos (agrupación de errores) - Formato: JSON con detalles de error - Retención: Por verificar
Configuraciones Relevantes
Rate Limiting
La arquitectura permite controlar el rate limiting mediante: - Cloud Tasks: Encola solicitudes para evitar exceder límites de la API - Tiempo de espera: Máximo 30 minutos entre reintentos - Procesamiento asíncrono: Permite manejar picos de carga sin saturar la API
Timeout y Reintentos
- Timeout de llamadas API: Por verificar (configurado en Cloud Run service)
- Máximo de reintentos: Configurado en Cloud Tasks
cp-api-retry-job - Tiempo máximo de espera: 30 minutos antes de reintentar
Retry Logic
- Estrategia: Reintentos exponenciales con backoff mediante Cloud Tasks
- Condiciones de reintento: Errores transitorios, rate limits, timeouts
- Máximo de intentos: Por verificar
Ventanas de Procesamiento de Errores
- Duración: 5 minutos
- Propósito: Agrupar errores para procesamiento batch y análisis
- Destino: Cloud Storage bucket "errors"
Relaciones con Otros Componentes
Entrada de Datos
- Syntage Webhook: Envía eventos de invoice que activan el flujo
- Syntage API: Proporciona los documentos extraídos (carta porte, tax_compliance, etc.)
Procesamiento Intermedio
- Pub/Sub: Actúa como bus de eventos, desacoplando la recepción de webhooks del procesamiento
- Cloud Run: Procesa eventos y realiza llamadas a la API
- Cloud Tasks: Maneja reintentos de manera asíncrona
Salida de Datos
- Dataflow: Procesa eventos exitosos y escribe a BigQuery
- Cloud Storage: Almacena errores para análisis posterior
- BigQuery (solvento-data-prod): Destino final para datos raw procesados
Patrones de Uso
Extracciones Asíncronas
La arquitectura permite generar extracciones asíncronas usando Cloud Tasks para encolar solicitudes a la API, mejorando la resiliencia y permitiendo manejar picos de carga.
Desacoplamiento de API
El flujo desacopla la conexión a la API del procesamiento de datos, permitiendo: - Manejo independiente de rate limits - Escalamiento eficiente del procesamiento de Dataflow - Resiliencia ante fallos temporales de la API
Control de Rate Limits
Cloud Tasks permite controlar el rate limiting mediante: - Encolar solicitudes cuando se alcanzan límites - Reintentos con backoff exponencial - Procesamiento asíncrono que no bloquea el flujo principal
Procesamiento Multi-Schema
Dataflow procesa documentos con soporte multi-schema, permitiendo manejar diferentes tipos de documentos (carta porte, tax_compliance, etc.) en el mismo pipeline.
Monitoreo y Métricas
Métricas Clave
- Webhook Reception Rate: Tasa de webhooks recibidos de Syntage
- API Call Success Rate: Tasa de éxito de llamadas a API de Syntage
- Retry Rate: Tasa de eventos que requieren reintento
- Processing Latency: Tiempo desde recepción de webhook hasta procesamiento final
- Dataflow Throughput: Mensajes procesados por segundo en Dataflow
- Error Rate: Tasa de errores que terminan en Cloud Storage
Alertas Recomendadas
- Alta tasa de errores: Cuando más del 5% de eventos terminan en error
- Backlog en Pub/Sub: Cuando hay más de 1000 mensajes sin procesar
- Cloud Tasks queue size: Cuando hay más de 500 tareas pendientes
- Dataflow job failures: Cuando el job de Dataflow falla o se detiene
- API rate limit exceeded: Cuando se detectan errores 429 (Too Many Requests)
Logs Importantes
- Webhook reception: Logs de recepción de webhooks en Cloud Run
- API calls: Logs de llamadas a API de Syntage (éxito/error)
- Retry creation: Logs de creación de tareas de reintento
- Dataflow processing: Logs de procesamiento en Dataflow
- Error storage: Logs de escritura a Cloud Storage bucket de errores
Beneficios de la Arquitectura
Desacoplamiento
- Separación de responsabilidades: La recepción de webhooks está separada del procesamiento
- Escalabilidad independiente: Cada componente puede escalar según su carga
- Resiliencia: Fallos en un componente no afectan directamente a otros
Control de Rate Limits
- Manejo proactivo: Cloud Tasks permite encolar solicitudes cuando se alcanzan límites
- Reintentos inteligentes: Backoff exponencial evita saturar la API
- Procesamiento asíncrono: No bloquea el flujo principal
Escalabilidad
- Dataflow auto-scaling: Se escala automáticamente según la carga
- Cloud Run concurrency: Maneja múltiples requests simultáneamente
- Pub/Sub buffering: Actúa como buffer durante picos de carga
Observabilidad
- Trazabilidad completa: Cada evento puede ser rastreado a través del flujo
- Métricas detalladas: Monitoreo en cada etapa del proceso
- Manejo de errores: Errores almacenados para análisis posterior