Introducción y uso de las especificaciones de CloudEvents y AsyncAPI

Escrito por picodotdev el .
programacion planeta-codigo
Enlace permanente Comentarios

En los sistemas distribuidos, la integración entre servicios no se limita a las APIs REST. La comunicación asíncrona mediante mensajes es igualmente habitual, y también requiere un contrato claro entre las partes. Para cubrir esta necesidad existen dos especificaciones complementarias. Una CloudEvents, que estandariza el formato del sobre de cualquier menssaje o event, y otra AsyncAPI, que describe la API completa de un servicio orientado a mensajes, de forma análoga a lo que hace OpenAPI para REST. En este artículo exploramos ambas especificaciones y las ponemos en práctica con una aplicación Spring Boot que produce y consume eventos Kafka encapsulados como CloudEvents, acompañada de su documento AsyncAPI correspondiente.

AsyncAPI

CloudEvents

En una arquitectura de microservicios o sistemas distribuidos es necesario conocer la API de esos servicios para poder integrar otros servicios con ellos. En los servicios que exponene una API con REST la definición de las APIs se puede especificar con OpenAPI.

Pero es habitual como otra forma de integrar servicios hacerlo mediante mensajes de forma asíncrona. Aunque la comunicación sea asíncrona y desacoplada también existe un contrato o interfaz entre los servicios, este es el formato del mensaje y los datos que incluye.

Para la definición de las APIs de un servicio que produce o consume mensajes están CloudEvents y AsyncAPI. Ambas especificaciones son complementarias pero independientes. AsyncAPI describe la API (canales, operaciones, esquemas) mientras CloudEvents define el sobre del mensaje que viaja por esos canales.

CloudEvents

CloudEvents simplemente estandaríza el formato del mensaje y define algunos campos a incluir como el payload de datos del mensaje y el tipo del formato de ese payload.

CloudEvents es una especificación para el evento en sí mismo, un formato de sobre estándar bajo la CNCF. Define un conjunto de atributos obligatorios y opcionales (id, source, specversion, type, más opcionales como subject, time, datacontenttype, etc.) que cualquier evento debería incluir, independientemente de si viaja por Kafka, HTTP, AMQP, NATS o Pub/Sub. El payload en sí sigue siendo específico del servicio, CloudEvents simplemente estandariza el envoltorio de metadatos y cómo se serializa sobre cada protocolo (modo binario vs. modo estructurado).

Esto permite tener un formato de mensajes común entre varios servicios, equipos, sistemas de mensajería o integraciones con terceras partes.

Modos de transporte

CloudEvents soporta diferentes protocolos de transporte y dos modos de funcionamiento:

  • Modo estructurado: El evento completo, metadatos y payload, viaja en un único bloque serializado, normalmente JSON. En Kafka eso significa que todo va en el value del mensaje.
  • Modo binario: Los atributos de CloudEvents se mapean a los headers del protocolo de transporte, y el value del mensaje contiene únicamente el payload de negocio.

Un ejemplo en modo estructurado.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
{
  "specversion": "1.0",
  "type": "com.mystore.order.placed",
  "source": "/orders/service",
  "id": "a1b2c3d4",
  "time": "2024-01-15T10:30:00Z",
  "datacontenttype": "application/json",
  "data": {
    "orderId": "1",
    "customerId": "42",
    "total": "150.30"
  }
}
cloudevents-1.json

Un ejemplo en modo binario. Estas son las cabeceras en el mensaje de Kafka.

1
2
3
4
5
ce_type: "com.mystore.order.placed"
ce_source: "/orders/service"
ce_id: "a1b2c3d4"
ce_time: "2024-01-15T10:30:00Z"
content-type: "application/json"
cloudevents-2-headers.json

El payload de Kafka.

1
2
3
4
5
{
  "orderId": "1",
  "customerId": "42",
  "total": "150.30"
}
cloudevents-2-body.json

Qué modo usar y por qué

El modo binario es el recomendado en la mayoría de casos productivos por varias razones prácticas:

  • Rendimiento: los consumidores que solo necesitan enrutar o filtrar mensajes pueden leer los headers sin deserializar el payload completo. En un sistema con alto volumen de mensajes esto tiene impacto real.
  • Compatibilidad: el payload de negocio queda aislado en el value sin que CloudEvents lo contamine, lo que facilita la convivencia con consumidores que usan y no usan CloudEvents.
  • Procesamiento en infraestructura: herramientas intermedias como Kafka Streams, conectores de Kafka Connect o proxies HTTP pueden inspeccionar y enrutar por los metadatos sin tocar el payload.

El modo estructurado es más sencillo de implementar y depurar porque todo está en un sitio, pero acopla el formato CloudEvents al consumidor. Cualquier lector del mensaje necesita entender el sobre para extraer el payload.

La forma de usar un modo u otro es la siguiente. El serializador se encarga de mover los atributos a headers o al JSON según el modo, de forma transparente para el resto del código.

1
2
3
4
5
6
7
// Modo binario
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CloudEventSerializer.class);
props.put(CloudEventSerializer.ENCODING_CONFIG, Encoding.BINARY);

// Modo estructurado
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CloudEventSerializer.class);
props.put(CloudEventSerializer.ENCODING_CONFIG, Encoding.STRUCTURED);
cloudevents-3.java

AsyncAPI

AsyncAPI es una especificación para describir una API orientada a eventos, como OpenAPI, pero para sistemas asíncronos. La especificación es un documento yaml que describe los canales de tu servicio (topics o colas), las operaciones que publica/suscribe, los esquemas de los mensajes, los bindings del servidor (clúster de Kafka, configuración del broker) y la seguridad.

A partir de ese documento al igual que con OpenAPI se puede generar documentación, stubs de código y teses de contrato. Los conceptos clave de AsyncAPI, siguiendo la estructura del documento de especificación:

  • Info y servers: info contiene los metadatos de la API. Título, versión, descripción y licencia, igual que en OpenAPI. servers define los brokers con los que trabaja la aplicación: la URL de conexión, el protocolo (kafka, amqp, mqtt, ws…) y la configuración de seguridad.
  • Channels: Un channel es el canal de comunicación, el topic de Kafka, la cola de RabbitMQ o el subject de NATS. Es el concepto central de AsyncAPI, define dónde viajan los mensajes. Cada channel tiene un nombre y puede tener parámetros dinámicos en el path, igual que las rutas REST.
  • Operations: Una opración define qué hace tu aplicación con un channel, send (produce mensajes) o receive (los consume). A diferencia de OpenAPI donde un endpoint suele hacer una sola cosa, un mismo channel puede tener operaciones de envío y recepción en servicios distintos.
  • Messages: Un message define la estructura del mensaje que viaja por el channel: sus headers, el content type y el payload. En el payload es donde encaja CloudEvents: los atributos ce_type, ce_source, etc. se pueden modelar como headers del mensaje en AsyncAPI.
  • Components: El bloque components es el almacén de elementos reutilizables, exactamente igual que en OpenAPI. schemas para los modelos de datos, messages para los mensajes, securitySchemes para los mecanismos de autenticación y serverVariables para parametrizar los servers. Todo se referencia con $ref.
  • Bindings: Los bindings son el concepto más específico de AsyncAPI y uno de los más potentes. Permiten añadir configuración propia del protocolo en cualquier nivel, server, channel, operation o message. En Kafka por ejemplo puedes definir el número de particiones, el factor de replicación o el groupId del consumidor, cosas que no tienen equivalente en un protocolo genérico.

En el ejemplo uso la siguiente definción de la API con AsyncAPI del productor y consumidor de la aplicación.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
asyncapi: 3.0.0
info:
  title: Place Order API
  version: 1.0.0
  description: The API notifies you whenever a new order is placed.
servers:
  centralKafkaServer:
    host: kafka.acme.com:8092
    description: Kafka broker
    protocol: kafka
operations:
  onPlaceOrder:
    action: send
    channel:
      $ref: '#/channels/placeOrder'
channels:
  placeOrder:
    description: A message when there is a new place order event.
    address: place-order
    messages:
      placeOrder:
        $ref: '#/components/messages/placeOrder'
components:
  messages:
    placeOrder:
      name: PlaceOrder
      title: Place Order
      payload:
        $ref: '#/components/schemas/PlaceOrderPayload'
  schemas:
    PlaceOrderPayload:
      type: object
      title: PlaceOrderPayload
      properties:
        listingId:
          type: string
          description: Listing id
        userId:
          type: string
          description: User id
orders.yaml

Diagrama de flujo

Diagrama de flujo

Línea de comandos

AsyncAPI ofrece una herramienta de linea de comandos con la que validar la especificación y generar artfactos al igual que se puede hacer con el documento de especificación OpenAPI. Por ejemplo, generar los clienteso documentación de la API.

1
2
3
4
5
# Validar la especificación
asyncapi validate orders.yaml

# Generar documentación HTML
asyncapi generate fromTemplate orders.yaml @asyncapi/html-template
asyncapi.sh

Cuando tiene sentido adoptar CloudEvents y AsyncAPI

Uno de los escenarios donde más se nota la ausencia de AsyncAPI es cuando un evento evoluciona (campos nuevos, obsolescencia de campos, breaking changes). Tener el contrato escrito permite obtener las diferencias entre versiones y comunicar cambios.

Tiene sentido adoptarlas juntas cuando se dan una o varias de estas situaciones.

Múltiples equipos o servicios consumiendo los mismos eventos

Cuando un evento lo consumen tres servicios distintos mantenidos por equipos diferentes, CloudEvents garantiza que todos hablan el mismo idioma en cuanto al sobre del mensaje, y AsyncAPI documenta el contrato de forma que cualquier equipo puede integrarse sin preguntar. Sin estas especificaciones el contrato existe igualmente, pero está en la cabeza de alguien o en un Confluence desactualizado.

Heterogeneidad de sistemas de mensajería

Si hoy usas Kafka pero mañana puede aparecer RabbitMQ, un bus de eventos cloud como EventBridge o una integración con un tercero vía HTTP, CloudEvents te da portabilidad del formato del evento independientemente del transporte. AsyncAPI describe los bindings específicos de cada protocolo sin cambiar el resto del documento.

Integración con terceros o sistemas externos

Es el caso donde más valor aportan. Cuando expones eventos a un partner externo o consumes eventos de una plataforma SaaS, tener un documento AsyncAPI es el equivalente a publicar una API REST con OpenAPI. El otro equipo puede leerlo, validarlo y generar código sin necesidad de reuniones. CloudEvents añade la garantía de que el formato del mensaje es estándar y reconocible.

Equipos que ya usan OpenAPI para sus APIs REST

Si el equipo ya tiene cultura de API-first con OpenAPI, AsyncAPI es la extensión natural para los servicios asíncronos. El salto conceptual es pequeño y las herramientas son similares. CloudEvents complementa esto estandarizando lo que OpenAPI no cubre, el formato del mensaje en tránsito.

Cuándo no tiene tanto sentido

Si tienes un sistema pequeño con un único productor y un único consumidor en el mismo equipo y repositorio, el overhead de mantener un documento AsyncAPI y ajustarse a CloudEvents puede no compensar. El contrato en ese caso es el código compartido, un record de Kotlin o un DTO de Java con Jackson. Adoptar estas especificaciones tiene sentido cuando el coste de la descoordinación entre partes empieza a ser real, no como práctica preventiva en proyectos pequeños.

La regla práctica es que si el evento cruza una frontera de equipo, de sistema o de organización, las dos especificaciones juntas valen la inversión. Si el evento es interno a un servicio o a un equipo muy pequeño, probablemente es sobreingenieria.

Ejemplo usando CloudEvents y AsyncAPI

En este ejemplo de aplicación Java hay un servicio REST que recibe una petición de y genera un mensaje que es enviado a una cola de Kafka, el mensaje es encapsulado en un mensaje de CloudEvents usando el modo recomendado binario.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package io.github.picodotdev.blogbitix.asyncapicloudevents;

import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/events")
public class EventsController {

    private final EventsProducer producer;

    public EventsController(EventsProducer producer) {
        this.producer = producer;
    }

    @PostMapping
    public ResponseEntity<Void> send(@RequestBody EventPayload payload) {
        producer.send(payload);
        return ResponseEntity.accepted().build();
    }
}
EventsController.java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package io.github.picodotdev.blogbitix.asyncapicloudevents;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import java.net.URI;
import java.time.OffsetDateTime;
import java.util.UUID;

@Component
public class EventsProducer {

    private static final Logger log = LogManager.getLogger(EventsProducer.class);

    private final KafkaTemplate<String, CloudEvent> kafkaTemplate;
    private final ObjectMapper objectMapper;
    private final String topic;

    public EventsProducer(KafkaTemplate<String, CloudEvent> kafkaTemplate,
                          ObjectMapper objectMapper,
                          @Value("${app.topic}") String topic) {
        this.kafkaTemplate = kafkaTemplate;
        this.objectMapper = objectMapper;
        this.topic = topic;
    }

    public void send(EventPayload payload) {
        try {
            byte[] data = objectMapper.writeValueAsBytes(payload);

            CloudEvent event = CloudEventBuilder.v1()
                    .withId(UUID.randomUUID().toString())
                    .withSource(URI.create("io.github.picodotdev.blogbitix.asyncapicloudevents/events"))
                    .withType("io.github.picodotdev.blogbitix.asyncapicloudevents.Event")
                    .withTime(OffsetDateTime.now())
                    .withDataContentType("application/json")
                    .withData(data)
                    .build();
            kafkaTemplate.send(topic, event.getId(), event);
            log.info("Published event event (id={}, orderId={}, customerId={}, total={})", event.getId(), payload.orderId(), payload.customerId(), payload.total());
        } catch (JsonProcessingException e) {
            throw new IllegalStateException("Failed to serialize EventPayload", e);
        }
    }
}
EventsProducer.java
1
2
3
4
5
6
package io.github.picodotdev.blogbitix.asyncapicloudevents;

import java.math.BigDecimal;

public record EventPayload(String orderId, String customerId, BigDecimal total) {
}
EventPayload.java

La configuración de Kafka para la aplicación de Spring Boot es la siguiente, junto con el archivo de construcción con las dependencias.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
spring:
  application:
    name: events-service
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: io.cloudevents.kafka.CloudEventSerializer
    consumer:
      group-id: events-service
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: io.cloudevents.kafka.CloudEventDeserializer
      auto-offset-reset: earliest

server:
  port: 8080

logging:
  level:
    root: "info"
    io.github.picodotdev.blogbitix: "info"
  pattern:
    level: "%-5level"

    console: "%d{DEFAULT} %X{uuid} %-5level %X{correlation} %60.60logger %msg%n"

app:
  topic: events
application.yml
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
plugins {
    id("application")
    id("org.springframework.boot") version "3.3.5"
    id("io.spring.dependency-management") version "1.1.6"
}

repositories {
    mavenCentral()
}

configurations.all {
  exclude(group = "org.springframework.boot", module = "spring-boot-starter-logging")
}

dependencies {
    implementation("org.springframework.boot:spring-boot-starter-web")
    implementation("org.springframework.boot:spring-boot-starter-log4j2")
    implementation("org.springframework.kafka:spring-kafka")

    implementation("io.cloudevents:cloudevents-core:3.0.0")
    implementation("io.cloudevents:cloudevents-kafka:3.0.0")
    implementation("io.cloudevents:cloudevents-json-jackson:3.0.0")
}

java {
  toolchain {
    languageVersion = JavaLanguageVersion.of(21)
  }
}

application {
  mainClass = "io.github.picodotdev.blogbitix.asyncapicloudevents.Main"
}
build.gradle.kts

El archivo de Docker Compose para iniciar el contenedor de Kafka.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
services:
  broker:
    image: apache/kafka:latest
    container_name: broker
    ports:
      - "9092:9092"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://localhost:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_NUM_PARTITIONS: 3
docker-compose.yml

El consumidor del mensaje está en la misma aplicación que simplemente emite un mensaje en la salida del sistema.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package io.github.picodotdev.blogbitix.asyncapicloudevents;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.cloudevents.CloudEvent;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
public class EventsConsumer {

    private static final Logger log = LogManager.getLogger(EventsConsumer.class);

    private final ObjectMapper objectMapper;

    public EventsConsumer(ObjectMapper objectMapper) {
        this.objectMapper = objectMapper;
    }

    @KafkaListener(topics = "${app.topic}")
    public void onEvent(CloudEvent event) {
        if (event.getData() == null) {
            log.warn("Received CloudEvent {} with no data, skipping", event.getId());
            return;
        }
        try {
            EventPayload payload = objectMapper.readValue(event.getData().toBytes(), EventPayload.class);
            log.info("Received event (id={}, type={}, orderId={}, customerId={}, total={})", event.getId(), event.getType(), payload.orderId(),
                     payload.customerId(), payload.total());
        } catch (IOException e) {
            log.error("Failed to deserialize CloudEvent (id={})", event.getId(), e);
        }
    }
}
EventsConsumer.java
1
2
curl -v -H "Content-Type: application/json" -d '{"orderId": "1", "customerId": "42", "total": "150.30"}' http://localhost:8080/events

curl.sh
Terminal

El código fuente completo del ejemplo puedes descargarlo del repositorio de ejemplos de Blog Bitix alojado en GitHub y probarlo en tu equipo ejecutando siguiente comando:
./gradlew run


Comparte el artículo: