Implementar un bus de comandos y consultas en Java

Escrito por el .
java planeta-codigo programacion
Enlace permanente Comentarios

Un bus de comandos y consultas permite separar en una aplicación las operaciones de modificación y operaciones de obtención de datos. Esto permite si es requerido dos bases de datos diferentes utilizando CQRS, una base de datos para operaciones de modificación y una base de datos para operaciones de consulta. Aún teniendo solo una base de datos para ambas operaciones un bus de comandos y eventos permite independizar a la aplicación de las interfaces con las que se use ya sea REST, GraphQL, línea de comandos o mensajería como RabbitQM y crear manejadores de operaciones siguiendo los principios SOLID de diseño.

Java

Las aplicaciones entre sus tareas están la de realizar operaciones de modificación y operaciones de lectura de la base de datos. Un comando representa la solicitud de una operación de modificación, tiene la característica de que no devuelven datos pero modifican datos. Las consultas representan la solicitud de información de la base de datos, a diferencia de los comandos devuelven datos pero no realizan cambios en la base de datos de modo que son idempotentes y se pueden repetir cuantas veces se quiera.

A medida que una aplicación crece necesita nuevos comandos y consultas, estando en una o varias clases de servicio estas requieren modificarse al añadir nuevos comandos o consultas, al mismo tiempo las clases de servicio tendrán el conjunto completo de todas las dependencias que necesiten todas las operaciones cuando muchas de las operaciones solo necesitan un pequeño conjunto de dependencias. La organización del código con servicios suele originar clases con múltiples responsabilidades convirtiéndose en un potencial problema de mantenimiento.

Separar los comandos y consultas permite aplicar CQRS, en el que las operaciones de consulta se lanzan contra una base de datos especializada en consultas y los comandos se lanzan contra otra base de datos. Tener dos base de datos permite escalar a cada base de datos de forma independiente según sus necesidades pero añade una gran complejidad al sistema ya que la base de datos que recibe los comandos ha de ser replicada en la base de datos de consultas. Una forma de replicar los datos en las bases de datos es mediante eventos de dominio con un bus de eventos y consistencia eventual e implementando deduplicación de eventos de dominio.

Un bus de comandos y consultas es una infraestructura que permite añadir nuevos comandos y consultas aplicando dos de los principios SOLID. La S de responsabilidad única haciendo que cada comando y consulta tenga una única responsabilidad y la O de abierto a extensión y cerrado a modificación.

Interfaces del bus de comandos y consultas

Un bus de comandos y un bus de eventos son simplemente la definición de esta interfaz que tiene un único método a implementar. La interfaz del bus de comandos no devuelve datos y la interfaz del bus de consultas si devuelve datos.

1
2
3
4
5
6
package io.github.picodotdev.blogbitix.eventbus.domain.shared.commandbus;

public interface CommandBus {

    void handle(Command command) throws Exception;
}
CommandBus.java
1
2
3
4
5
6
package io.github.picodotdev.blogbitix.eventbus.domain.shared.querybus;

public interface QueryBus {

    <T> T handle(Query<T> query) throws Exception;
}
QueryBus.java

Ambas interfaces reciben un argumento que contiene los datos necesarios para ejecutar el comando y consulta. Todos los comandos y argumentos heredan de estas clases. Estas clases hacen de objeto de transferencia de datos o DTO entre la capa de interfaz de la infraestructura y la capa de aplicación de dominio.

1
2
3
4
package io.github.picodotdev.blogbitix.eventbus.domain.shared.commandbus;

public class Command {
}
Command.java
1
2
3
4
package io.github.picodotdev.blogbitix.eventbus.domain.shared.querybus;

public class Query<T> {
}
Query.java

Los comandos y consultas permiten independizar a la aplicación de la interfaz que se use para acceder a la aplicación. La aplicación puede ser accedida a través de una interfaz REST, una interfaz GraphQL, con RabbitMQ, línea de comandos. Esta independencia de la interfaz con la que se accede a la aplicación permite soportar varias interfaces de acceso o cambiar a otra en el futuro sin requerir grandes cambios o ninguno en la capa de aplicación ni de dominio.

Implementación de bus de comandos y consultas

La implementación de la interfaz del bus de comandos y consultas reciben clases concretas Command y Query, para aplicar los principios SOLID se necesita un manejador por cada clase Command y Query admitido por los buses. Esta clase manejador es la que contiene la lógica de dominio para proporcionar la funcionalidad del comando y consulta, contiene las dependencias de los servicios de dominio o repositorios de las entidades y hace uso de los métodos de las dependencias que necesita.

1
2
3
4
5
6
package io.github.picodotdev.blogbitix.eventbus.domain.shared.commandbus;

public interface CommandHandler<T extends Command> {

    void handle(T command) throws Exception;
}
CommandHandler.java
1
2
3
4
5
6
package io.github.picodotdev.blogbitix.eventbus.domain.shared.querybus;

public interface QueryHandler<T,U extends Query<T>> {

    T handle(U query) throws Exception;
}
QueryHandler.java

Las clases de DTO para los comandos y consultas que contienen los datos y sirve para el envío de la solicitud de la operación al bus.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
package io.github.picodotdev.blogbitix.eventbus.application.order;

...

public class CreateOrderCommand extends Command {

    private OrderId orderId;
    private List<Item> items;

    public CreateOrderCommand(OrderId orderId, List<Item> items) {
        this.orderId = orderId;
        this.items = items;
    }

    public OrderId getOrderId() {
        return orderId;
    }

    public List<Item> getItems() {
        return items;
    }
}
CreateOrderCommand.java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
package io.github.picodotdev.blogbitix.eventbus.application.order;

...

public class GetOrderQuery extends Query<Order> {

    private OrderId orderId;

    public GetOrderQuery(OrderId orderId) {
        this.orderId = orderId;
    }

    public OrderId getOrderId() {
        return orderId;
    }
}
GetOrderQuery.java

Las clases manejadores de consultas y comandos tienen la ventaja de seguir los principios SOLID, pero al mismo tiempo, si se puede considerar un inconveniente, es que en una aplicación grande el número de comandos y consultas es grande lo que requiere un gran número de manejadores, cada operación requiere dos clases, la del comando o consulta y la del manejador en vez de simplemente una llamada a un método con sus argumentos.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
package io.github.picodotdev.blogbitix.eventbus.application.order;

...

@Component
public class CreateOrderCommandHandler implements CommandHandler<CreateOrderCommand> {

    private OrderService orderService;

    public CreateOrderCommandHandler(OrderService orderService) {
        this.orderService = orderService;
    }

    @Override
    public void handle(CreateOrderCommand command) throws Exception {
        OrderId orderId = command.getOrderId();
        List<Item> items = command.getItems();
        orderService.create(orderId, items);
    }
}
CreateOrderCommandHandler.java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
package io.github.picodotdev.blogbitix.eventbus.application.order;

...

@Component
public class GetOrderQueryHandler implements QueryHandler<Order,GetOrderQuery> {

    private OrderRepository orderRepository;

    public GetOrderQueryHandler(OrderRepository orderRepository) {
        this.orderRepository = orderRepository;
    }

    @Override
    public Order handle(GetOrderQuery query) throws Exception {
        return orderRepository.findById(query.getOrderId());
    }
}
GetOrderQueryHandler.java

Con la interfaz del bus de comandos y consultas, las clases concretas de comandos y consultas y los manejadores de cada comando y consulta, el bus de comandos y consultas consiste en tener una relación entre clase concreta de comando o consulta y manejador de esa clase de comando o consulta.

Utilizando la inyección de dependencias de Spring se permite recibir en el constructor una lista de clases que heredan de una clase o implementan una interfaz, Spring busca estas clases que además están anotadas con la anotación @Component. El constructor guarda en un mapa la relación de manejadores con su clase que maneja, buscando por reflexión qué clase de comando o consulta maneja. El método que implementa la interfaz del bus simplemente busca en el mapa el manejador de la clase recibida y le delega su tratamiento.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package io.github.picodotdev.blogbitix.eventbus.infrastructure;

...

@Component
@Primary
public class SpringCommandBus implements CommandBus {

    private Map<Class, CommandHandler> handlers;

    public SpringCommandBus(List<CommandHandler> commandHandlerImplementations) {
        this.handlers = new HashMap<>();
        commandHandlerImplementations.forEach(commandHandler -> {
            Class<?> commandClass = getCommandClass(commandHandler);
            handlers.put(commandClass, commandHandler);
        });
    }


    @Override
    public void handle(Command command) throws Exception {
        if (!handlers.containsKey(command.getClass())) {
            throw new Exception(String.format("No handler for %s", command.getClass().getName()));
        }
        handlers.get(command.getClass()).handle(command);
    }

    private Class<?> getCommandClass(CommandHandler handler) {
        Type commandInterface = ((ParameterizedType) handler.getClass().getGenericInterfaces()[0]).getActualTypeArguments()[0];
        return getClass(commandInterface.getTypeName());
    }

    private Class<?> getClass(String name) {
        try {
            return Class.forName(name);
        } catch (Exception e) {
            return null;
        }
    }
}
SpringCommandBus.java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package io.github.picodotdev.blogbitix.eventbus.infrastructure;

...

@Component
@Primary
public class SpringQueryBus implements QueryBus {

    private Map<Class, QueryHandler> handlers;

    public SpringQueryBus(List<QueryHandler> queryHandlerImplementations) {
        this.handlers = new HashMap<>();
        queryHandlerImplementations.forEach(queryHandler -> {
            Class queryClass = getQueryClass(queryHandler);
            handlers.put(queryClass, queryHandler);
        });
    }

    @Override
    public <T> T handle(Query<T> query) throws Exception {
        if (!handlers.containsKey(query.getClass())) {
            throw new Exception(String.format("No handler for %s", query.getClass().getName()));
        }
        return (T) handlers.get(query.getClass()).handle(query);
    }

    private Class<?> getQueryClass(QueryHandler handler) {
        Type commandInterface = ((ParameterizedType) handler.getClass().getGenericInterfaces()[0]).getActualTypeArguments()[1];
        return getClass(commandInterface.getTypeName());
    }

    private Class<?> getClass(String name) {
        try {
            return Class.forName(name);
        } catch (Exception e) {
            return null;
        }
    }
}
SpringQueryBus.java

El siguiente código envía un comando y una consulta al bus de consultas y eventos. El comando crea una orden de compra y el segundo obtiene la orden de compra creada.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package io.github.picodotdev.blogbitix.eventbus;

...

@SpringBootApplication
public class Main implements CommandLineRunner {

    @Autowired
    private QueryBus queryBus;

    @Autowired
    private CommandBus commandBus;

    @Autowired
    private ProductRepository productRepository;

    @Autowired
    private OrderRepository orderRepository;

    @Override
    public void run(String... args) throws Exception {
        Product product = productRepository.findAll().stream().findFirst().orElse(null);
        System.out.println("Stock: " + product.getStock());

        OrderId orderId = orderRepository.generateId();
        commandBus.handle(new CreateOrderCommand(orderId, List.of(new Item(product.getId(), product.getPrice(), 2, new BigDecimal("0.21")))));

        Order order = queryBus.handle(new GetOrderQuery(orderId));
        System.out.printf("OrderId: %s, Items: %s%n", orderId, order.getItems().size());

        System.out.println("Stock: " + product.getStock());
    }

    public static void main(String[] args) throws Exception {
        SpringApplication.run(Main.class, args);
    }
}
Main.java

En la salida del programa se observa como se procesa el comando de creación de la orden, la creación de la orden provoca el lanzamiento de un evento de dominio OrderCreated, el manejador de este evento de dominio en el dominio de inventario realiza la actualización del stock de los productos de la orden, en caso de no haber suficiente stock se emite un evento de dominio OrderOversold, el manejador de evento de dominio OrderOversoldCommandHandler podría marcar la orden como sobrevendida o realizar algún proceso con ella. Este lanzamiento de eventos de dominio muestra como funciona la consistencia eventual con el inventario de los productos.

1
2
3
4
Stock: 5
io.github.picodotdev.blogbitix.eventbus.domain.order.OrderCreated ceea5523-158e-4eb1-96d1-9aef60107ce2 2020-10-16T15:09:29.164497
OrderId: io.github.picodotdev.blogbitix.eventbus.domain.order.OrderId@63686af2, Items: 1
Stock: 3
System.out

Estas son las clases que manejan los eventos de dominio que son de interés para el bounded context de inventario.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
package io.github.picodotdev.blogbitix.eventbus.application.inventory;

...

@Component
public class InventorySpringEventBusListener {

    private CommandBus commandBus;

    private InventorySpringEventBusListener(CommandBus commandBus) {
        this.commandBus = commandBus;
    }

    @EventListener
    public void onOrderCreated(OrderCreated orderCreated) throws Exception {
        OrderCreatedCommand command = new OrderCreatedCommand(orderCreated);
        commandBus.handle(command);
    }
}
InventorySpringEventBusListener.java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
package io.github.picodotdev.blogbitix.eventbus.application.inventory;

...

public class OrderCreatedCommand extends Command {

    private OrderCreated event;

    public OrderCreatedCommand(OrderCreated event) {
        this.event = event;
    }

    public OrderCreated getEvent() {
        return event;
    }
}
OrderCreatedCommand.java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package io.github.picodotdev.blogbitix.eventbus.application.inventory;

...

@Component
public class OrderCreatedCommandHandler implements CommandHandler<OrderCreatedCommand> {

    private ProductRepository productRepository;
    private OrderRepository orderRepository;
    private EventBus eventBus;

    public OrderCreatedCommandHandler(ProductRepository productRepository, OrderRepository orderRepository, EventBus eventBus) {
        this.productRepository = productRepository;
        this.orderRepository = orderRepository;
        this.eventBus = eventBus;
    }

    @Override
    public void handle(OrderCreatedCommand command) {
        OrderCreated event = command.getEvent();
        OrderId orderId = event.getOrderId();
        Order order = orderRepository.findById(orderId);

        List<ProductId> oversoldProductIds = order.getItems().stream().filter(it -> {
            Product product = productRepository.findById(it.getProductId());
            return !product.hasStock(it.getQuantity());
        }).map(Item::getProductId).collect(Collectors.toList());

        order.getItems().forEach(it -> {
            Product product = productRepository.findById(it.getProductId());
            product.subtractStock(it.getQuantity());
            eventBus.publish(product);
        });

        if (!oversoldProductIds.isEmpty()) {
            eventBus.publish(new OrderOversold(orderId, oversoldProductIds));
        }
    }
}
OrderCreatedCommandHandler.java

De Domain Driven Design hay varios libros, el libro de referencia sobre la teoría de DDD son Domain-Driven Design: Tackling Complexity in the Heart of Software, Domain-Driven Design Distilled, otros más prácticos son Implementing Domain-Driven Design y Domain-Driven Design in PHP: A Highly Practical Guide.

Terminal

El código fuente completo del ejemplo puedes descargarlo del repositorio de ejemplos de Blog Bitix alojado en GitHub y probarlo en tu equipo ejecutando siguiente comando:
./gradlew run


Comparte el artículo: