Cómo deduplicar eventos de dominio

Escrito por picodotdev el .
java planeta-codigo programacion
Enlace permanente Comentarios

Las aplicaciones distribuidas utilizan la comunicación de mensajes para notificar de la ocurrencia de ciertos eventos en el sistema que los interesados reciben. En el envío y recepción de mensajes pueden ocurrir dos situaciones que hay que manejar, una es garantizar que cada mensaje se envíe al menos una vez para lo que se emplea el patrón outbox pattern y la segunda es no procesar un evento recibido por duplicado para lo que se emplea deduplicación de mensajes.

El patrón outbox pattern garantiza que los eventos de dominio se envíen al menos una vez, pero que se envíe una vez no impide que sean enviados varias veces. La deduplicación de eventos permite evitar procesar el mismo evento varias veces si se envía repetido. Una forma de conseguir de duplicación de eventos es asignando a cada evento un identificativo único y que la parte receptora de los eventos compruebe si el evento recibido ha sido ya procesado. La parte receptora para determinar si un evento ha sido ya procesado guarda los identificativos de los ya procesados.

Los servicios de una aplicación pueden utilizar comunicación con mensajes mediante RabbitMQ. La parte receptora lee los mensajes y los procesa en la misma transacción que el resto de operaciones guardando en la base de datos el identificativo del evento procesado, de modo que si es recibido varias veces la parte receptora lo deduplica. Si la transacción falla el evento no se marca como recibido y el sistema de mensajería lo mantiene para enviarlo de nuevo hasta que se procese correctamente, si la transacción se completa pero el mensaje del evento no se notifica como procesado correctamente en el sistema de mensajería el sistema de mensajería lo enviará de nuevo pero la parte receptora lo deduplica. Si la parte receptora completa la transacción y notifica como procesado el mensaje en el sistema de mensajería el sistema de mensajería ya no lo enviará de nuevo.

Con el paso del tiempo y dependiendo del volumen de eventos procesados el número de eventos marcados como procesados en la base de datos si es tan grande como para suponer un problema de rendimiento para saber si un evento ya ha sido procesado se puede eliminar de forma periódica aquellos que ya se estimen que ya no van a volver a llegar pasado un tiempo, puede ser tan simple como eliminar todos los eventos ya procesados de hace más de un mes o la fecha más adecuada que se estime.

Ejemplo de implementación de deduplicación de eventos de dominio

Para asignar un identificativo a cada mensaje se puede utilizar un identificador único universal, en Java estos se generan con la clase UUID. El mensaje además de los datos que incluya incluye este identificativo del mensaje.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package io.github.picodotdev.blogbitix.eventbus.domain.shared.eventbus;

...

public class Event {

    private EventId id;
    private LocalDateTime date;
    private Map<String, Object> data;

    public Event() {
        this(Collections.emptyMap());
    }

    public Event(Map<String, Object> data) {
        this.id = new EventId(UUID.randomUUID());
        this.date = LocalDateTime.now(ZoneId.of("UTC"));
        this.data = data;
    }

    public EventId getId() {
        return id;
    }

    public LocalDateTime getDate() {
        return date;
    }

    public Map<String, Object> getData() {
        return data;
    }
}
Event.java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
package io.github.picodotdev.blogbitix.eventbus.domain.order;

...

public class OrderCreated extends Event {

    private OrderId orderId;

    public OrderCreated(OrderId orderId) {
        this.orderId = orderId;
    }

    public OrderId getOrderId() {
        return orderId;
    }
}
OrderCreated.java

En el ejemplo de los artículos relacionados eventos de dominio en agregados y bus de comandos y consultas cuando se realiza una orden de compra se emite un mensaje para notificar a otros servicios, en el ejemplo el evento de orden de compra creada se utiliza para actualizar el inventario de productos.

Si en el contexto del inventario se recibe por duplicado un mensaje de orden creada resulta en que el inventario de los productos se resta en cada recepción de evento provocando un error en la información de inventario. Para evitarlo hay que implementar deduplicación de mensajes.

Para deduplicar los mensajes creo un repositorio que almacena los eventos ya procesados correctamente y permite comprobar si un mensaje recibido ya se ha procesado, esta implementación almacena los mensajes en memoria pero a partir de la interfaz cualquier otra implementación sería posible como una base de datos relacional que persista los eventos en la misma transacción que los cambios que provoca el mensaje.

1
2
3
4
5
6
7
8
9
package io.github.picodotdev.blogbitix.eventbus.domain.shared.repository;

...

public interface EventRepository {

    void add(Event event);
    boolean exists(Event event);
}
EventRepository.java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package io.github.picodotdev.blogbitix.eventbus.infrastructure;

...

@Component
public class MemoryEventRepository implements EventRepository {

    private Map<EventId, Event> store;

    public MemoryEventRepository() {
        store = new HashMap<>();
    }

    @Override
    public void add(Event event) {
        store.put(event.getId(), event);
    }

    @Override
    public boolean exists(Event event) {
        return store.containsKey(event.getId());
    }
}
MemoryEventRepository.java

El servicio de aplicación que recibe los mensajes de orden creada obtiene el identificativo del mensaje recibido, comprueba si ya se ha procesado, si ya se ha procesado no se realiza ninguna acción, si no se ha procesado con anterioridad se procesa y se guarda el identificativo del mensaje para no procesarlo de nuevo.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package io.github.picodotdev.blogbitix.eventbus.application.inventory;

...

@Component
public class OrderCreatedCommandHandler implements CommandHandler<OrderCreatedCommand> {

    private ProductRepository productRepository;
    private OrderRepository orderRepository;
    private EventRepository eventRepository;
    private EventBus eventBus;

    public OrderCreatedCommandHandler(ProductRepository productRepository, OrderRepository orderRepository, EventRepository eventRepository, EventBus eventBus) {
        this.productRepository = productRepository;
        this.orderRepository = orderRepository;
        this.eventRepository = eventRepository;
        this.eventBus = eventBus;
    }

    @Override
    public void handle(OrderCreatedCommand command) {
        OrderCreated event = command.getEvent();

        if (eventRepository.exists(event)) {
            System.out.printf("Duplicated event %s%n", event.getId().getValue());
            return;
        }
        
        ...

        eventRepository.add(event);
    }
}
OrderCreatedCommandHandler.java

Implementada la deduplicación para simular en el ejemplo en envío por duplicado el mensaje en el bus de eventos de dominio se realiza la operación de envío de eventos dos veces de modo que cada mensaje se envía por duplicado. En la salida del programa con la deduplicación implementada se observa que el contexto de inventario deduplica el segundo mensaje y emite una traza en la consola.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
package io.github.picodotdev.blogbitix.eventbus.infrastructure;

...

@Component("SpringEventBus")
@Primary
public class SpringEventBus implements EventBus {

    @Autowired
    private ApplicationEventPublisher applicationEventPublisher;

    public void publish(Event event) {
        System.out.printf("%s %s %s%n", event.getClass().getName(), event.getId().getValue(), event.getDate().format(DateTimeFormatter.ISO_DATE_TIME));
        applicationEventPublisher.publishEvent(event);
        System.out.printf("%s %s %s%n", event.getClass().getName(), event.getId().getValue(), event.getDate().format(DateTimeFormatter.ISO_DATE_TIME));
        applicationEventPublisher.publishEvent(event);
    }
}
SpringEventBus.java

Sin deduplicación de mensajes si un evento se recibe por duplicado se procesa dos veces, en este caso el inventario se reduce dos veces.

1
2
3
4
5
Stock: 5
io.github.picodotdev.blogbitix.eventbus.domain.order.OrderCreated 382e81de-445f-45a9-ba77-4c5275f661d9 2020-10-23T13:54:41.667705
io.github.picodotdev.blogbitix.eventbus.domain.order.OrderCreated 382e81de-445f-45a9-ba77-4c5275f661d9 2020-10-23T13:54:41.667705
OrderId: io.github.picodotdev.blogbitix.eventbus.domain.order.OrderId@fde4110d, Items: 1
Stock: 1
System.out-1

Implementando deduplicación de mensajes los mensajes duplicados se detectan y se ignoran, el inventario solo se reduce una vez.

1
2
3
4
5
6
Stock: 5
io.github.picodotdev.blogbitix.eventbus.domain.order.OrderCreated 1bda82d5-d70b-4f48-ad19-b342350fa6c9 2020-10-23T14:04:51.732244
io.github.picodotdev.blogbitix.eventbus.domain.order.OrderCreated 1bda82d5-d70b-4f48-ad19-b342350fa6c9 2020-10-23T14:04:51.732244
Duplicated event 1bda82d5-d70b-4f48-ad19-b342350fa6c9
OrderId: io.github.picodotdev.blogbitix.eventbus.domain.order.OrderId@b8757a45, Items: 1
Stock: 3
System.out-2

De Domain Driven Design hay varios libros, el libro de referencia sobre la teoría de DDD son Domain-Driven Design: Tackling Complexity in the Heart of Software, Domain-Driven Design Distilled, Patterns, Principles, and Practices of Domain-Driven Design otros más prácticos son Implementing Domain-Driven Design y Domain-Driven Design in PHP: A Highly Practical Guide.

El código fuente completo del ejemplo puedes descargarlo del repositorio de ejemplos de Blog Bitix alojado en GitHub y probarlo en tu equipo ejecutando siguiente comando:
./gradlew run

Comparte el artículo: