Skip to content

Arquitectura 5.0

La Arquitectura 5.0 es un patrón de extracción de datos de la API de Syntage que permite procesar eventos de creación o actualización de invoices para realizar extracciones personalizadas (carta porte, tax_compliance, etc.) de manera desacoplada, escalable y con control de rate limits.

Descripción

Esta arquitectura implementa un flujo event-driven que desacopla la conexión a la API de Syntage del procesamiento de datos. Cuando se recibe un webhook de Syntage con un evento de invoice, se utiliza para hacer un llamado custom a la API y realizar extracciones específicas. El procesamiento de cartas porte utiliza Cloud Run con reintentos mediante Cloud Tasks, permitiendo manejar de manera eficiente la conexión a la API, el escalamiento del proceso de Dataflow, y el control de rate limits.

Valor clave: Permite generar extracciones asíncronas usando Cloud Tasks para encolar solicitudes a la API, mejorando la resiliencia y el manejo de carga.

Flujo de Datos

graph TB
    subgraph syntage["Syntage"]
        Webhook[Syntage Webhook<br/>Eventos de Invoice]
        API[Syntage API<br/>Carta Porte, Tax Compliance, etc.]
    end

    subgraph ingesta["Capa de Ingesta"]
        WebhookService[Cloud Run<br/>Webhook Syntage]
        PushTopic[Pub/Sub<br/>cp-api-push]
    end

    subgraph procesamiento["Capa de Procesamiento"]
        RetryService[Cloud Run<br/>cp-api-retry-events]
    end

    subgraph reintentos["Manejo de Reintentos"]
        CloudTasks[Cloud Tasks<br/>cp-api-retry-job]
    end

    subgraph exito["Ruta de Éxito"]
        SuccessTopic[Pub/Sub<br/>cp-api-success]
        DataflowJob[Dataflow<br/>cp-api-multischema]
    end

    subgraph error["Ruta de Error"]
        FailedTopic[Pub/Sub<br/>cp-api-failed]
        ErrorBucket[Cloud Storage<br/>errors bucket]
    end

    subgraph almacenamiento["Almacenamiento"]
        BigQuery[BigQuery<br/>solvento-data-prod]
    end

    Webhook -->|"HTTP POST<br/>Evento Invoice"| WebhookService
    WebhookService -->|"Publica evento"| PushTopic
    PushTopic -->|"Consume mensaje"| RetryService
    RetryService -->|"1. Request"| API
    API -->|"2. Response"| RetryService
    RetryService -->|"3. Si requiere reintento"| CloudTasks
    CloudTasks -->|"4. Reintento"| RetryService

    RetryService -->|"5. Éxito"| SuccessTopic
    RetryService -->|"5. Error"| FailedTopic

    SuccessTopic -->|"Consume mensajes"| DataflowJob
    FailedTopic -->|"Consume mensajes"| ErrorBucket

    DataflowJob -->|"Escribe datos raw"| BigQuery

Flujo Principal

  1. Recepción del Webhook: Syntage envía un webhook con evento de creación o actualización de invoice al servicio Cloud Run "Webhook Syntage"
  2. Publicación Inicial: El servicio Cloud Run publica el evento al topic Pub/Sub cp-api-push
  3. Procesamiento: El servicio Cloud Run cp-api-retry-events consume mensajes directamente del topic Pub/Sub
  4. Llamada a API: El servicio realiza una llamada custom a la API de Syntage para extraer el documento específico (carta porte, tax_compliance, etc.)
  5. Manejo de Reintentos: Si la llamada requiere reintento, se crea una tarea en Cloud Tasks cp-api-retry-job que reintenta la llamada. El tiempo máximo de espera entre reintentos es de 30 minutos
  6. Rutas de Resultado:
  7. Éxito: El resultado se publica al topic cp-api-success, que es consumido por el job de Dataflow cp-api-multischema
  8. Error: Los errores se publican al topic cp-api-failed, que escribe a un bucket de Cloud Storage "errors" con ventanas de procesamiento de 5 minutos
  9. Almacenamiento Final: Dataflow procesa los mensajes exitosos y escribe los datos raw a BigQuery en solvento-data-prod

Recursos

Cloud Run Services

Service Name URL Función
webhook-syntage-inline-prod https://webhook-syntage-inline-prod-gqomdmitoa-uc.a.run.app Recibe webhooks de Syntage y publica a Pub/Sub
cp-api-retry-events Por verificar Procesa eventos y realiza llamadas a API de Syntage con reintentos

Detalles de webhook-syntage-inline-prod: - Pub/Sub Topic publicado: cp-api-push (para eventos de carta porte) - Otros topics: También publica a invoices-webhook-prod, buro-webhook-prod, items-api-topic, tax-compliance-api-syntage-success - Base URL Syntage: https://api.syntage.com - Región: us-central1 - Proyecto: solvento-data-prod

Detalles de cp-api-retry-events: - Consume de: Pub/Sub topic cp-api-push - Publica a: - cp-api-success (eventos exitosos) - cp-api-failed (eventos con errores) - Integración con Cloud Tasks: Crea tareas de reintento en cp-api-retry-job que invocan directamente al servicio

Pub/Sub Topics

Topic Name Publisher Descripción Formato Mensaje Proyecto
cp-api-push Cloud Run (webhook-syntage-inline-prod) Recibe eventos de invoice que requieren extracción de carta porte JSON (evento de invoice) solvento-data-prod
cp-api-success Cloud Run (cp-api-retry-events) Eventos exitosos de extracción de carta porte JSON (documento procesado) solvento-data-prod
cp-api-failed Cloud Run (cp-api-retry-events) Eventos con errores en extracción de carta porte JSON (error details) solvento-data-prod

Pub/Sub Subscriptions

Subscription Name Topic Consumidor Ack Deadline Estado
cp-api-push-subscription cp-api-push Cloud Run (cp-api-retry-events) Por verificar Activa
cp-api-success-subscription cp-api-success Dataflow (cp-api-multischema) 60 segundos Activa
cp-api-failed-subscription cp-api-failed Cloud Storage (errors bucket) Por verificar Activa

Cloud Tasks

Queue Name Descripción Tiempo Máximo de Espera Target Estado
cp-api-retry-job Cola de reintentos para llamadas a API de Syntage 30 minutos Cloud Run (cp-api-retry-events) Activa

Configuración de Cloud Tasks: - Máximo de reintentos: Configurado según política de rate limiting - Tiempo de espera máximo: 30 minutos antes de reintentar - Target: Invoca directamente al servicio Cloud Run cp-api-retry-events

Dataflow Jobs

Job Name Pub/Sub Topic Subscription Tipo Estado
cp-api-multischema-prod-20251205-191037 cp-api-success cp-api-success-subscription Streaming Activo

Detalles del Job: - Función: Procesa mensajes exitosos de extracción de carta porte con soporte multi-schema - Destino: BigQuery en solvento-data-prod - Tipo: Streaming (procesamiento en tiempo real) - Nota: El nombre del job incluye timestamp, por lo que puede variar entre ejecuciones

Cloud Storage

Bucket Name Descripción Uso Proyecto
errors Almacena eventos fallidos de extracción de carta porte Recibe mensajes del topic cp-api-failed con ventanas de procesamiento de 5 minutos solvento-data-prod

Configuración del Bucket: - Ventanas de procesamiento: 5 minutos (agrupación de errores) - Formato: JSON con detalles de error - Retención: Por verificar

Configuraciones Relevantes

Rate Limiting

La arquitectura permite controlar el rate limiting mediante: - Cloud Tasks: Encola solicitudes para evitar exceder límites de la API - Tiempo de espera: Máximo 30 minutos entre reintentos - Procesamiento asíncrono: Permite manejar picos de carga sin saturar la API

Timeout y Reintentos

  • Timeout de llamadas API: Por verificar (configurado en Cloud Run service)
  • Máximo de reintentos: Configurado en Cloud Tasks cp-api-retry-job
  • Tiempo máximo de espera: 30 minutos antes de reintentar

Retry Logic

  • Estrategia: Reintentos exponenciales con backoff mediante Cloud Tasks
  • Condiciones de reintento: Errores transitorios, rate limits, timeouts
  • Máximo de intentos: Por verificar

Ventanas de Procesamiento de Errores

  • Duración: 5 minutos
  • Propósito: Agrupar errores para procesamiento batch y análisis
  • Destino: Cloud Storage bucket "errors"

Relaciones con Otros Componentes

Entrada de Datos

  • Syntage Webhook: Envía eventos de invoice que activan el flujo
  • Syntage API: Proporciona los documentos extraídos (carta porte, tax_compliance, etc.)

Procesamiento Intermedio

  • Pub/Sub: Actúa como bus de eventos, desacoplando la recepción de webhooks del procesamiento
  • Cloud Run: Procesa eventos y realiza llamadas a la API
  • Cloud Tasks: Maneja reintentos de manera asíncrona

Salida de Datos

  • Dataflow: Procesa eventos exitosos y escribe a BigQuery
  • Cloud Storage: Almacena errores para análisis posterior
  • BigQuery (solvento-data-prod): Destino final para datos raw procesados

Patrones de Uso

Extracciones Asíncronas

La arquitectura permite generar extracciones asíncronas usando Cloud Tasks para encolar solicitudes a la API, mejorando la resiliencia y permitiendo manejar picos de carga.

Desacoplamiento de API

El flujo desacopla la conexión a la API del procesamiento de datos, permitiendo: - Manejo independiente de rate limits - Escalamiento eficiente del procesamiento de Dataflow - Resiliencia ante fallos temporales de la API

Control de Rate Limits

Cloud Tasks permite controlar el rate limiting mediante: - Encolar solicitudes cuando se alcanzan límites - Reintentos con backoff exponencial - Procesamiento asíncrono que no bloquea el flujo principal

Procesamiento Multi-Schema

Dataflow procesa documentos con soporte multi-schema, permitiendo manejar diferentes tipos de documentos (carta porte, tax_compliance, etc.) en el mismo pipeline.

Monitoreo y Métricas

Métricas Clave

  • Webhook Reception Rate: Tasa de webhooks recibidos de Syntage
  • API Call Success Rate: Tasa de éxito de llamadas a API de Syntage
  • Retry Rate: Tasa de eventos que requieren reintento
  • Processing Latency: Tiempo desde recepción de webhook hasta procesamiento final
  • Dataflow Throughput: Mensajes procesados por segundo en Dataflow
  • Error Rate: Tasa de errores que terminan en Cloud Storage

Alertas Recomendadas

  • Alta tasa de errores: Cuando más del 5% de eventos terminan en error
  • Backlog en Pub/Sub: Cuando hay más de 1000 mensajes sin procesar
  • Cloud Tasks queue size: Cuando hay más de 500 tareas pendientes
  • Dataflow job failures: Cuando el job de Dataflow falla o se detiene
  • API rate limit exceeded: Cuando se detectan errores 429 (Too Many Requests)

Logs Importantes

  • Webhook reception: Logs de recepción de webhooks en Cloud Run
  • API calls: Logs de llamadas a API de Syntage (éxito/error)
  • Retry creation: Logs de creación de tareas de reintento
  • Dataflow processing: Logs de procesamiento en Dataflow
  • Error storage: Logs de escritura a Cloud Storage bucket de errores

Beneficios de la Arquitectura

Desacoplamiento

  • Separación de responsabilidades: La recepción de webhooks está separada del procesamiento
  • Escalabilidad independiente: Cada componente puede escalar según su carga
  • Resiliencia: Fallos en un componente no afectan directamente a otros

Control de Rate Limits

  • Manejo proactivo: Cloud Tasks permite encolar solicitudes cuando se alcanzan límites
  • Reintentos inteligentes: Backoff exponencial evita saturar la API
  • Procesamiento asíncrono: No bloquea el flujo principal

Escalabilidad

  • Dataflow auto-scaling: Se escala automáticamente según la carga
  • Cloud Run concurrency: Maneja múltiples requests simultáneamente
  • Pub/Sub buffering: Actúa como buffer durante picos de carga

Observabilidad

  • Trazabilidad completa: Cada evento puede ser rastreado a través del flujo
  • Métricas detalladas: Monitoreo en cada etapa del proceso
  • Manejo de errores: Errores almacenados para análisis posterior

Referencias