Ejemplo de RabbitMQ con Java para enviar y recibir mensajes

Escrito por el .
java planeta-codigo programacion
Enlace permanente Comentarios

Entre las ventajas de integrar dos aplicaciones mediante el envío de mensajes están que evita que estén acopladas y la comunicación es asíncrona. Con RabbitMQ también podremos implementar cada uno de ellas con el lenguaje de programación que prefiramos de entre las varias posibilidades para las que ofrece clientes y por esto último podemos preferir usarlo en vez de las especificación JMS propia de Java EE que nos obligaría a usar un servidor de aplicaciones que lo implemente, posiblemente JBoss/Wildfly o Weblogic en vez de Tomcat o Jetty. En el artículo incluyo un ejemplo para el lenguaje Java mostrando el envío y recepción de mensajes junto con la aplicación de administración que nos proporcionará información útil.

RabbitMQ

Java

Las aplicaciones que se integran mediante el envío y recepción de mensajes evitan el acoplamiento y sincronía junto con la posibilidad de implementar cada una de ellas con diferentes lenguajes o plataformas. Entre las especificaciones que componen Java EE está <abbr title=”Java Message Service”>JMS pero tanto la aplicación que envía como la que recibe mensajes deben estar programadas en el lenguaje Java, a menos que incluyamos un adaptador que permita a la aplicación no Java interactuar con JMS.

RabbitMQ es un software de servidor que actúa como intermediario o broker de mensajería entre dos o más aplicaciones que se comunican o envían notificaciones con mensajes, proporciona una funcionalidad similar a JMS pero con la ventaja que ofrece soporte para los lenguajes más populares incluido Java y JVM, Ruby, Python, .NET, PHP, Node.js, Go y varios más. Usa varios conceptos similares a los presentes en JMS como que el emisor envía los mensajes a una cola y el receptor los lee.

La comunicación con mensajes entre aplicaciones es útil porque ni el emisor ni receptor se conocen directamente ni han de estar funcionando simultáneamente para comunicarse consiguiendo de este modo el desacoplamiento entre las aplicaciones. Además la comunicación puede ser de uno a varios, los mensajes son leídos de las colas con la posibilidad de que cada mensaje sea recibido por un único receptor o por cada uno de ellos.

Realmente en RabbitMQ los mensajes no son enviados directamente por el emisor a las colas sino que se envían a un exchange que finalmente lo enruta y encola en la cola destino. Los exchanges pueden ser directos basando su lógica de encolado según el valor del binding key enviada junto con el mensaje y un routing key asociada con la cola o basados en temas en los que se usa una cadena formada por una lista de palabras separada por puntos, la lógica de enrutado se toma según si el binding key cumple el patrón del routing key que puede contener sustituidores de palabras, siendo un * una palabra exacta y # varias palabras contiguas.

En la documentación de RabbitMQ hay 6 tutoriales en diferentes lenguajes para el envío y recepción de mensajes.

Basándome en estos ejemplos he creado un proyecto uno muy similar al Tutorial 1, ejecutable más fácilmente con Docker y Gradle.

Tanto en el emisor como en el receptor deberemos declarar las colas que van a usar (si una no existe se creará y si existe se usará), a la cola se le asigna un nombre y el receptor define un manejador para recibir los mensajes según se envían.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package io.github.picodotdev.blogbitix.holamundorabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

public class Send {

    private static final String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        try {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);

            for (int i = 0; i < 10; ++i) {
                String message = String.format("Hello World at %s", LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME));
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                System.out.println(String.format("Sent «%s»", message));

                Thread.sleep(1500);
            }
        } finally {
            channel.close();
            connection.close();
        }
    }
}
Send.java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package io.github.picodotdev.blogbitix.holamundorabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;

public class Receive {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        try {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println(String.format("Received  «%s»", message));
                }
            };

            channel.basicConsume(QUEUE_NAME, true, consumer);

            Thread.sleep(20000);
        } finally {
            channel.close();
            connection.close();
        }
    }
}
Receive.java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
description = 'HolaMundoRabbitMQ'
version = '0.1'

apply plugin: 'java'

repositories {
    mavenCentral()
}

dependencies {
    compile 'com.rabbitmq:amqp-client:4.0.2'
}

task receive(type: JavaExec) {
    main = 'io.github.picodotdev.blogbitix.holamundorabbitmq.Receive'
    classpath = sourceSets.main.runtimeClasspath
}

task send(type: JavaExec) {
    main = 'io.github.picodotdev.blogbitix.holamundorabbitmq.Send'
    classpath = sourceSets.main.runtimeClasspath
}
build.gradle

Para ejecutar el ejemplo usaré el contenedor de Docker para RabbitMQ iniciándolo con Docker Compose y el siguiente archivo descriptor. Puedes consultar la serie de artículos sobre Docker que escribí para conocer cómo usarlo.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
version: "3"
services:
  rabbitmq:
    image: rabbitmq:management-alpine
    volumes:
      - data:/var/lib/rabbitmq
    ports:
      - "5672:5672"
      - "15672:15672"
    environment:
      - RABBITMQ_NODENAME=rabbitmq

volumes:
  data:
docker-compose.yml
1
2
docker-compose up

docker-run.sh

Una vez iniciado el contenedor y con el código fuente del ejemplo, iniciamos en cualquier orden la parte receptora de los mensajes y la parte emisora de mensajes con los comandos ./gradlew receive y ./gradlew send respectivamente, momento en el cual veremos que en la consola salen las notificaciones de recepción y envío.

Aplicación de ejemplo enviando mensajes Aplicación de ejemplo recibiendo mensajes

En la comunicación con RabbitMQ se puede usar TLS/SSL así como mecanismos de autenticación y autorización para mayor seguridad. Usando confirmaciones si el receptor falla en el procesado el mensaje no se pierde ya que no se habrá declarado como acknowledge aún así si RabbitMQ falla los mensajes se perderán a menos que las colas se declaren como persistentes las cuales se guardarán en disco perdurando a una catástrofe.

RabbitMQ posee un plugin para la administración con el que podemos administrar permisos, tener una vista global, ver ratios de mensajes, estadísticas, colas, exchanges y más información, nos da información muy interesante sobre el estado del procesamiento de mensajes. Es accesible mediante el navegador y la URL http://localhost:15672/. En la captura del estado de la cola hello hay 10 mensajes encolados pendientes de entregar a algún receptor.

Aplicación web de administración de RabbitMQ Información de estado de una cola

Para profundizar más en las aplicaciones basadas en mensajes con RabbitMQ dos buenos libros son Learning RabbitMQ y Matering RabbitMQ cubriendo temas más avanzados como clustering, alta disponibilidad, arquitectura, patrones de diseño, seguridad y rendimiento.

Terminal

El código fuente completo del ejemplo puedes descargarlo del repositorio de ejemplos de Blog Bitix alojado en GitHub y probarlo en tu equipo ejecutando siguiente comando:
docker-compose up, ./gradlew receive, ./gradle send


Comparte el artículo: