Implementar un bus de eventos de dominio en Java

Escrito por el , actualizado el .
java planeta-codigo programacion
Enlace permanente Comentarios

Un bus de eventos es el mecanismo por el que los eventos de dominio de DDD son publicados, son tratados y enviados a sus receptores de forma directa, mediante un middleware u de otra forma. El concepto bus de eventos para eventos de dominio se materializa de forma muy sencilla en código, simplemente una interfaz con un método. Cambiando la implementación de la interfaz un bus de eventos envía los eventos a un sistema de mensajería como RabbitMQ, persiste los eventos en base de datos como parte del outbox pattern o simplemente los imprime en la consola como en el ejemplo del artículo.

Java

En la teoría de Domain Driven Design o DDD se mencionan los eventos de dominio. Un evento de dominio es una notificación de algo que ha sucedido en el sistema en lo que algunas partes del mismo están interesadas en ser notificadas. Dado que un evento es algo que ha sucedido suelen tener un nombre con un verbo en pasado como OrderCreated, otra de sus propiedades es que no indican que acción se ha de realizar como sería el caso de un nombre con SendBuyerOrderEmail. Las mismas características de las aplicaciones que se comunican con mensajes se aplican a las aplicaciones que generan eventos de dominio, la comunicación en el sistema se realiza de forma desacoplada, el emisor y el receptor no se conocen y de forma desincronizada ni el emisor ni el receptor necesitan de la disponibilidad del otro si se realiza con un middleware.

En la teoría de Domain Driven Design esta forma de comunicación es útil para comunicar diferentes bounded context. Un bounded context se encarga de una área funcional de la aplicación con alta cohesión y su propio lenguaje de dominio. El lenguaje de dominio son los conceptos que aplican en un bounded context y cuales son las propiedades relevantes dentro del mismo, el término usuario en un supuesto bounded context de registro y adminisitración de la cuenta es probable no sea el mismo que en el bounded context de órdenes de compra, en el primero el usuario puede tener varias direcciones y en el segundo solo interesa una, por otro lado en el bounded context de inventario ni siquiera exista el término usuario o signifique otra cosa totalmente diferente.

Los bounded context y eventos de dominio se adaptan especialmente bien a las aplicaciones con arquitectura basada en microservicios. Cada microservicio puede ser uno o varios bounded context con un área funcional bien definida y estos comunicarse mediante eventos con eventos de dominio. La forma de emitir estos eventos de dominio es con un bus de eventos. La implementación un bus de comandos y consultas para CQRS es muy similar a la implementación de un bus de eventos.

Transaccionalidad y eventos de dominio

En las aplicaciones un concepto importante es la transaccionalidad, es un requerimiento que se impone al sistema para su buen funcionamiento. Una transacción consiste en que un conjunto de acciones o cambios funciona todo de forma completa o no funciona nada pero nunca de forma parcial. Si la lógica de una aplicación emite varios eventos según se ejecuta y en la mitad del proceso algo falla las acciones desencadenadas por los eventos emitidos deben deshacerse o de lo contrario posiblemente se produzca un mal funcionamiento o una inconsistencia de datos.

Las aplicaciones suelen trabajar con bases de datos y estas son las que proporcionan la transaccionalidad pero si los eventos se envían con un middleware de mensajería la transaccionalidad ya no afecta a un único sistema sino que afecta a dos, la base de datos relacional y el middleware de mensajería. Alguna solución a la transaccionalidad de dos sistemas diferentes como una transacción en dos fases tiene sus propios problemas en cuanto a escalabilidad y complejidad.

Una solución es utilizar únicamente la base de datos. Los eventos se guardan en una tabla en la misma transacción que el resto de datos, de tal modo que si la transacción finaliza correctamente los eventos generados están almacenados y si la transacción falla no se guarda ninguno. Una vez los eventos están guardados otro proceso se encarga de emitirlos. Nuevamente este proceso que emite los eventos puede fallar y ocasionar que un evento sea lanzado dos veces pero al menos garantiza que si se ha generado un evento se emita al menos una vez en el sistema. Este es el patón outbox pattern.

Outbox Pattern

Outbox Pattern

Deduplicación de eventos

Para resolver el problema de eventos duplicados se suele optar por hacer el tratamiento del evento idempotente o deduplicando de eventos. La deduplicación se suele hacer asignando a cada evento un identificador único y luego en la parte receptora comprobar si ese evento ya ha sido procesado.

Consistencia eventual

Otro problema es que todos los cambios que origina una petición no se aplican al mismo tiempo, un bounded context o microservicio hace los cambios de su ámbito y emite un evento que origina otros cambios en otras entidades, bounded context o microservicios. Esto hace que el sistema por un tiempo más o menos largo está en un estado inconsistente. Pero en un sistema distribuido si es posible esto es más sencillo que utilizar un transacción en dos fases de dos sistemas diferentes.

Implementar un bus de eventos en Java

Un bus de eventos no es nada complicado, esta interfaz es lo que define un bus de eventos, básicamente consiste en un método para publicar un mensaje con un argumento que representa los datos del evento. Los otros dos métodos son de utilidad, en este caso se implementan en la interfaz como métodos default permitido con las novedades de Java 8.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
package io.github.picodotdev.blogbitix.eventbus.domain.shared.eventbus;

...

public interface EventBus {

    void publish(Event e);

    default void publish(Collection<Event> events) {
        events.stream().forEach(this::publish);
    }

    default void publish(EventCollection collection) {
        collection.publish(this);
    }

    default void publish(AggregateRoot aggregate) {
        publish(aggregate.getEvents());
    }
}
EventBus.java

En una arquitectura hexagonal que separa el dominio de los detalles de implementación externos la interfaz del bus de eventos junto con las clases de los eventos están en la capa de dominio y la implementación del bus de eventos está en la capa de infraestructura. La implementación del bus de eventos es la que determina si el evento se publica utilizando eventos con Spring, eventos con Guava, utilizar RabbitMQ o los guarda en base de datos para que sean emitidos por otro elemento sea el que los publique en RabbitMQ con el outbox pattern, el proceso publicador en RabbitMQ puede ser una tarea programada con Quartz ejecutada de forma periódica que busca y publica los eventos pendientes de publicar. En este código la implementación simplemente los imprime en la terminal y en otra implementación se utiliza el mecanismo de envío de eventos en la misma aplicación de Spring.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
package io.github.picodotdev.blogbitix.eventbus.infrastructure;

...

@Component("ConsoleEventBus")
public class ConsoleEventBus implements EventBus {

    @Override
    public void publish(Event event) {
        System.out.printf("%s %s %s%n", event.getClass().getName(), event.getId().getValue(), event.getDate().format(DateTimeFormatter.ISO_DATE_TIME));
    }
}
ConsoleEventBus.java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
package io.github.picodotdev.blogbitix.eventbus.infrastructure;

...

@Component("SpringEventBus")
@Primary
public class SpringEventBus implements EventBus {

    @Autowired
    private ApplicationEventPublisher applicationEventPublisher;

    public void publish(Event event) {
        System.out.printf("%s %s %s%n", event.getClass().getName(), event.getId().getValue(), event.getDate().format(DateTimeFormatter.ISO_DATE_TIME));
        applicationEventPublisher.publishEvent(event);
    }
}
SpringEventBus.java

Como generar y lanzar un evento de dominio

En Domain Driven Design un agregado es una entidad que se encarga de mantener la consistencia e invariantes de negocio con las otras entidades con las que está relacionada y que mantiene. Por ejemplo, en una orden de compra compuesta de varios productos las líneas de compra están gestionadas por el agregado de la orden de compra, las lineas de compra no tienen sentido por si mismas fuera de una orden de compra. Si existe una regla de negocio que no permita más de tres lineas de compra o con un importe mayor de 3000€, el agregado de orden de compra o un servicio de dominio se encargaría de que su estado cumpla las reglas de negocio.

El agregado al contener lógica de negocio puede generar eventos de dominio. Una de las soluciones que se suele optar es que el agregado almacene los eventos de dominio que ha generado y posteriormente sean recolectados para ser emitidos en el bus de eventos. Esto facilita las pruebas unitarias y los eventos no se emiten inmediatamente lo que permite implementar el patrón outbox pattern.

Esta es la implementación de un agregado que representa una orden de compra con varias líneas de compra. Al crearse una orden se crea el agregado y emite un evento indicado el suceso, este evento es de interés para un bounded context de inventario para mantener el stock de productos actualizado.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package io.github.picodotdev.blogbitix.eventbus.domain.order;

...

public class Order implements AggregateRoot {

    private static final BigDecimal MAX_AMOUNT = new BigDecimal("3000.00");

    private OrderId id;
    private LocalDateTime date;
    private List<Item> items;

    private EventCollection events;

    protected Order(OrderId id) {
        this.id = id;
        this.items = new ArrayList<>();
        this.date = LocalDateTime.now();
        this.events = new EventCollection();
    }

    public static Order create(OrderId id) throws Exception {
        return create(id, new ArrayList<>());
    }

    public static Order create(OrderId id, List<Item> items) throws Exception {
        Order order = new Order(id);
        order.getItems().addAll(items);
        if (!order.isValidAmount()) {
            throw new Exception("Invalid order amount");
        }
        order.getEvents().add(new OrderCreated(order.getId()));
        return order;
    }

    public OrderId getId() {
        return id;
    }

    public void addItem(Item item) {
        items.add(item);
    }

    public List<Item> getItems() {
        return items;
    }

    public BigDecimal getAmount() {
        return items.stream()
                .map(Item::getAmount)
                .reduce(new BigDecimal("0.00"), (a, b) -> new BigDecimal("0.00").add(a).add(b));
    }

    private boolean isValidAmount() {
        return getAmount().compareTo(MAX_AMOUNT) == -1;
    }

    @Override
    public EventCollection getEvents() {
        return events;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        Order order = (Order) o;
        return id.equals(order.id);
    }

    @Override
    public int hashCode() {
        return Objects.hash(id);
    }
}
Order.java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
package io.github.picodotdev.blogbitix.eventbus.domain.order;

...

public class OrderCreated extends Event {

    private OrderId orderId;

    public OrderCreated(OrderId orderId) {
        this.orderId = orderId;
    }

    public OrderId getOrderId() {
        return orderId;
    }
}
OrderCreated.java

Las clases de evento de dominio heredan de una clase que representa a todos los eventos de dominio.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package io.github.picodotdev.blogbitix.eventbus.domain.shared.eventbus;

...

public class Event {

    private EventId id;
    private LocalDateTime date;

    public Event() {
        this(Collections.emptyMap());
    }

    public Event(Map<String, Object> data) {
        this.id = new EventId(UUID.randomUUID());
        this.date = LocalDateTime.now(ZoneId.of("UTC"));
    }

    public EventId getId() {
        return id;
    }

    public LocalDateTime getDate() {
        return date;
    }
}
Event.java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
package io.github.picodotdev.blogbitix.eventbus.domain.shared.eventbus;

...

public class EventId {

    private UUID id;

    public EventId(UUID id) {
        this.id = id;
    }

    public String getValue() {
        return id.toString();
    }
}
EventId.java

La siguiente clase es el contenedor que almacena los eventos, los agregados crean una instancia de esta clase. Otra alternativa de implementación es mediante herencia en vez de composición como en este ejemplo. Con el método publish se publican los eventos que contiene en el bus, una vez publicados los eventos se eliminan de la colección.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package io.github.picodotdev.blogbitix.eventbus.domain.shared.eventbus;

...

public class EventCollection {

    private List<Event> events;

    public EventCollection() {
        this.events = new ArrayList<>();
    }

    public List<Event> getAll() {
        return events;
    }

    public void add(Event event) {
        events.add(event);
    }

    public void clear() {
        events.clear();
    }

    public void publish(EventBus eventBus) {
        eventBus.publish(events);
        clear();
    }
}
EventCollection.java
1
2
3
4
5
6
7
8
package io.github.picodotdev.blogbitix.eventbus.domain.shared.aggregateroot;

...

public interface AggregateRoot {

    EventCollection getEvents();
}
AggregateRoot.java

Los servicios de dominio contienen la funcionalidad que requiere dependencias o que engloba a varios agregados. Cada agregado suele tener asociado una clase repositorio que se encarga de las operaciones de persistencia, la implementación de los repositorios se realiza en la capa de infraestructura lo que permite cambiar el sistema de persistencia sin afectar a la capa de dominio. Los servicios hacen uso de los repositorios que necesiten. El servicio de dominio de órdenes de compra se encarga de invocar la operación de persistencia del agregado y de la operación de emitir los eventos.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package io.github.picodotdev.blogbitix.eventbus.domain.order;

...

@Component
public class OrderService {

    private OrderRepository orderRepository;
    private EventBus eventBus;

    public OrderService(OrderRepository orderRepository, EventBus eventBus) {
        this.orderRepository = orderRepository;
        this.eventBus = eventBus;
    }

    public OrderId generateId() {
        return orderRepository.generateId();
    }

    public void create(OrderId id, List<Item> items) throws Exception {
        Order order = Order.create(id, items);
        orderRepository.save(order);
        eventBus.publish(order);
    }
}
OrderService.java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
package io.github.picodotdev.blogbitix.eventbus.domain.order;

...

public interface OrderRepository {

    OrderId generateId();

    void save(Order order);

    Order findById(OrderId id);
    Collection<Order> findAll();
}
OrderRepository.java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package io.github.picodotdev.blogbitix.eventbus.infrastructure;

...

@Component
public class MemoryOrderRepository implements OrderRepository {

    private Map<OrderId, Order> orders;

    public MemoryOrderRepository() {
        this.orders = new HashMap<>();
    }

    @Override
    public OrderId generateId() {
        return new OrderId(UUID.randomUUID());
    }

    @Override
    public void save(Order order) {
        orders.put(order.getId(), order);
    }

    @Override
    public Order findById(OrderId id) {
        return orders.get(id);
    }

    @Override
    public Collection<Order> findAll() {
        return orders.values();
    }
}
MemoryOrderRepository.java

Con la implementación del bus de eventos que imprime en la consola se muestra un mensaje cuando se crear una orden de compra.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package io.github.picodotdev.blogbitix.eventbus;

...

@SpringBootApplication
public class Main implements CommandLineRunner {

    @Autowired
    private QueryBus queryBus;

    @Autowired
    private CommandBus commandBus;

    @Autowired
    private ProductRepository productRepository;

    @Autowired
    private OrderRepository orderRepository;

    @Override
    public void run(String... args) throws Exception {
        Product product = productRepository.findAll().stream().findFirst().orElse(null);
        System.out.println("Stock:" + product.getStock());

        OrderId orderId = orderRepository.generateId();
        commandBus.handle(new CreateOrderCommand(orderId, List.of(new Item(product.getId(), product.getPrice(), 2, new BigDecimal("0.21")))));

        System.out.println("Stock: " + product.getStock());
    }

    public static void main(String[] args) throws Exception {
        SpringApplication.run(Main.class, args);
    }
}
Main.java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
package io.github.picodotdev.blogbitix.eventbus.application.order;

...

@Component
public class CreateOrderCommandHandler implements CommandHandler<CreateOrderCommand> {

    private OrderService orderService;

    public CreateOrderCommandHandler(OrderService orderService) {
        this.orderService = orderService;
    }

    @Override
    public void handle(CreateOrderCommand command) throws Exception {
        OrderId orderId = command.getOrderId();
        List<Item> items = command.getItems();
        orderService.create(orderId, items);
    }
}
CreateOrderCommandHandler.java
1
2
io.github.picodotdev.blogbitix.eventbus.domain.purchase.OrderCreated 762e0fd6-612b-4ac4-b098-5773aa93dbdc 2020-10-09T14:38:16.907031

System.out

Otras preguntas

Si un evento incluye el identificativo de la entidad este ha de ser generado con anterioridad. Habitualmente se delega en la base de datos el generar el identificativo con un autonumérico en el momento de inserción, hacerlo en el momento de la persistencia impide conocer su identificativo con antelación. La solución es generar su identificativo antes de crear la entidad.

En este punto se llegan a otras preguntas:

  • ¿Qué eventos hay que lanzar? Esto requiere análisis, hay que descubrirlo con el experto de dominio.
  • ¿Cómo se deduplican los eventos? Asignando a cada evento un identificador único el receptor sabe si ha procesado ese eventos guardando los identificadores de eventos que ha procesado. Si un evento procesado se recibe de nuevo se descarta. Los identificadores de los eventos pueden ser un UUID y los consumidores pasado un tiempo eliminar o archivar los eventos procesados considerando que pasado ese tiempo de horas, días o semanas ya no va a llegar duplicado.
  • ¿Qué información han de incluir los eventos? Por lo menos las propiedades que permitan conocer las entidades de su causa.
  • ¿Qué ocurre con los eventos fuera de orden? Los middleware de mensajería como RabbitMQ ofrecen algún tipo de mecanismo para que los mensajes se procesen en orden que son recibidos aún con alguna limitación.
  • ¿Qué ocurre si la consistencia eventual no es posible?
  • ¿Qué casos son idempotentes? ¿aunque sean idempotentes les afecta el fuera de orden?
  • ¿Qué ocurre si en el futuro se modifican los eventos existentes o se añaden nuevos?

Dependiendo de la lógica de negocio los problemas con mensajes pueden ser difíciles de reproducir, analizar y corregir ya que al igual que en los casos de concurrencia intervienen factores como el orden de los hechos.

De Domain Driven Design hay varios libros, el libro de referencia sobre la teoría de DDD son Domain-Driven Design: Tackling Complexity in the Heart of Software, Domain-Driven Design Distilled, otros más prácticos son Implementing Domain-Driven Design y Domain-Driven Design in PHP: A Highly Practical Guide.

Terminal

El código fuente completo del ejemplo puedes descargarlo del repositorio de ejemplos de Blog Bitix alojado en GitHub y probarlo en tu equipo ejecutando siguiente comando:
./gradlew run


Comparte el artículo: