Introducción y ejemplo sencillo de Java Message Service (JMS)

Escrito por picodotdev el , actualizado el .
java programacion software planeta-codigo
Comentarios

JMS es una especificación de Java que define en esta plataforma una forma comunicación entre aplicaciones basada en el intercambio de mensajes. Los mensajes permiten a las aplicaciones no conocerse entre sí y comunicarse de forma asíncrona pudiendo hacer que los mensajes de una cola solo sean consumidos por un único receptor o por varios suscriptores interesados en un determinado tema. En el código de ejemplo muestro tanto la comunicación con colas (queues) como con temas (topics)

Java

La plataforma Java EE (Java Enterprise Edition) pone a disposición de los desarrolladores varias especificaciones, estas especificaciones describen las funcionalidades y la API que deben proporcionar las implementaciones y proporcionan al desarrollador herramientas para facilitar, hacer mejor las aplicaciones y de forma estándar según estas especificaciones. Haciendo uso de una de estas especificaciones es posible cambiar de una implementación a otra de forma transparente y sin modificar ninguna línea de código de la aplicación (en teoría). Hay especificaciones para persistencia en base de datos (JPA), para transaccionalidad (JTA), para servicios web (JAX-WS ) y REST (JAX-RS) entre otras. En el siguiente enlace puede encontrarse el listado completo de especificaciones y sus versiones de JEE 7.

Una de ellas es el servicio de mensajería JMS (Java Message Service). JMS es un sistema de comunicación entre aplicaciones en base a mensajes. El usar mensajes como forma de comunicación entre aplicaciones tiene los siguientes beneficios o ventajas:

  • Integración de sistemas: las aplicaciones que se comunican intercambiando mensajes puede ser desarrolladas con tecnologías diferentes el único requisito es que cada una de ellas tenga una forma de enviar y recibir los mensajes.
  • Escalabilidad: en caso de necesitar más capacidad para procesar los mensajes se pueden añadir más procesadores de mensajes sin que los emisores tengan ningún conocimiento de ello.
  • Asincronía: los mensajes puede ser procesados de forma asíncrona de forma que si un mensaje desencadena un proceso largo en tiempo el emisor del mensaje no tiene que esperar a que el proceso termine, el emisor puede enviar el mensaje y olvidarse. Además, el emisor no necesita que un receptor exista para enviar el mensaje tampoco el receptor necesita que que el emisor exista para recibir el mensaje. Cuando haya un receptor este se encargará de procesar los mensajes que se hayan enviado y estén aún sin procesar.
  • No acoplamiento: el emisor y el receptor no se conocen directamente de forma que cada uno de ellos puede reemplazarse por una nueva implementación de forma transparente para el otro.

La comunicación puede realizarse de dos formas cada una con sus características:

  • Punto a punto (P2P): mediante esta comunicación el mensaje se garantiza que es procesado únicamente una vez independientemente del numero de posibles procesadores que podrían recibir el mensaje. El procesado del mensaje puede ser síncrono o asíncrono. En caso de que no haya un receptor disponible el mensaje se guarda hasta poder entregarse a un receptor. Se realiza mediante colas (Queue). En este modelo al emisor se le denomina Sender y al receptor Receiver.
  • Pub/Sub: en este modelo un mensaje es recibido por todos los receptores suscritos a un tema (Topic) de forma similar a una emisión broadcast. Al emisor se le denomina Publisher y al receptor Subscriber. El emisor y receptor están más desacoplados ya que el emisor no conoce cual de los receptores procesará el mensaje.

Esta comunicación de mensajes entre aplicaciones o entre diferentes partes de una aplicación tiene muchas posibilidades, podría ser utilizado para que un receptor enviase mensajes electrónicos en base a los mensajes enviados a una cola o para actualizar o precalcular datos de una base de datos que puede llevar un considerable tiempo y que de hacerlo en la misma petición de una aplicación web haría que el cliente estuviese esperando hasta que el proceso terminase, en ambos casos no es necesario que los procesos se hagan inmediatamente, son solo dos ejemplos de aplicación real. Esta es la misma funcionalidad que expliqué como hacerla con programación concurrente y el patrón de diseño Command pero pudiéndola implementar con JMS.

Los mensajes puede tener cabeceras (asignada automáticamente por JMS o por el desarrollador), atributos y y los datos (payload) que pueden transportar texto, un stream de objetos primitivos, … en función del tipo de mensaje. Cualquiera de estas cabeceras, atributos y datos puede utilizarse como información para procesar el mensaje.

A continuación pondré el código de una sencilla aplicación que se conecta al servicio JMS de un servidor de aplicaciones WildFly de forma remota y envía y recibe unos pocos mensajes de texto.

Primero el código de un modelo Pub/Sub. Como es propio de este modelo los mensajes se reciben por todos los receptores (los dos threads que escuchan en un topic que debemos crear), en este caso hay un publicador y dos suscriptores:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package io.github.picodotdev.bitix.jms;

import java.util.Properties;

import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.Context;
import javax.naming.InitialContext;

/**
 * Ejemplo que muestra como como enviar y recibir mensajes JMS de un Topic de forma remota.
 */
public class Topic {

    /**
     * Antes de ejecutar este ejemplo, usando WildFly se ha de crear un usuario guest y clave guest con el 
     * script WILDFLY_HOME/bin/add-user.sh.
     */
    public static void main(String[] args) throws Exception {
        // Usuario y password para conectarse al servidor JNDI y al Topic
     String usuario = "guest";
        String contrasena = "guest";

        // Propiedades para crear el contexto: clase factoría, url del servidor JNDI y credenciales
     Properties env = new Properties();
        env.put(Context.INITIAL_CONTEXT_FACTORY, "org.jboss.naming.remote.client.InitialContextFactory");
        env.put(Context.PROVIDER_URL, "http-remoting://localhost:8080");
        env.put(Context.SECURITY_PRINCIPAL, usuario);
        env.put(Context.SECURITY_CREDENTIALS, contrasena);

        // El objeto InitialContext permite obtener la referencias de los objetos registrado en el ábol JNDI
     InitialContext ic = new InitialContext(env);

        // Objetos a obtener para usar JMS: 
     // - TopicConnectionFactory
     // - TopicConection
     // - Topic
     // - TopicSession
     // - TopicSubscriber
     // - TopicPublisher
     TopicConnectionFactory connectionFactory = (TopicConnectionFactory) ic.lookup("jms/RemoteConnectionFactory");
        TopicConnection connection = connectionFactory.createTopicConnection(usuario, contrasena);
        
        // Obtener el Topic en el cual se publicarán y del cual se recibirán los mensajes
     javax.jms.Topic topic = (javax.jms.Topic) ic.lookup("jms/topic/test");

        // Preparar el publicador y subscriptor al Topic
     Subscriber subscriber1 = new Subscriber(connection, topic);
        Subscriber subscriber2 = new Subscriber(connection, topic);
        Publisher publisher = new Publisher(connection, topic);
        
        // Inicializar la recepción y envío de los mensajes
     connection.start();

        // Empezar a publicar mensajes en el Topic (y a recibirlos)
     Thread thread = new Thread(publisher);      
        thread.start();
        // Esperar a que el publicador termine de enviar mensajes
     thread.join();

        // Parar el envío y recepción de mensajes
     connection.stop();
        
        // Terminar liberando los recursos
     subscriber1.close();
        subscriber2.close();
        publisher.close();     
        connection.close();
        ic.close();
    }
    
    private static class Subscriber implements MessageListener {
        
        private TopicSession session;
        private TopicSubscriber subscriber;
        
        public Subscriber(TopicConnection connection, javax.jms.Topic topic) throws Exception {
            this.session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
            this.subscriber = this.session.createSubscriber(topic, null, false);
            this.subscriber.setMessageListener(this);
        }
        
        public void close() throws Exception  {
            subscriber.close();
            session.close();
        }
        
        @Override
        public void onMessage(Message message) {
            try {
                TextMessage text = (TextMessage) message;
                System.out.printf("Suscriptor (%s): El publicador dice: «%s»\n", this, text.getText());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
    private static class Publisher implements Runnable {
        
        private TopicSession session;
        private TopicPublisher publisher;
        
        public Publisher(TopicConnection connection, javax.jms.Topic topic) throws Exception {
            this.session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
            this.publisher = this.session.createPublisher(topic);
        }
        
        public void close() throws Exception  {
            publisher.close();
            session.close();
        }
        
        @Override
        public void run() {
            try {
                for (int i = 0; i < 10; ++i) {
                    Message mensaje = session.createTextMessage(String.format("¡Hola mundo! (%d)", i));
                    publisher.publish(mensaje);
                    Thread.sleep(1000);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

A continuación el código de utilizando un modelo punto a punto en el que vuelve a haber un emisor y dos receptores. En el resultado de la ejecución puede observarse que a pesar de haber dos receptores solo uno de los dos recibe cada mensaje:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package io.github.picodotdev.bitix.jms;

import java.util.Properties;

import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;

/**
 * Ejemplo que muestra como como enviar y recibir mensajes JMS de un Queue de forma remota.
 */
public class Queue {

    /**
     * Antes de ejecutar este ejemplo, usando WildFly se ha de crear un usuario guest y clave guest con el 
     * script WILDFLY_HOME/bin/add-user.sh.
     */
    public static void main(String[] args) throws Exception {
        // Usuario y password para conectarse al servidor JNDI y al Queue
     String usuario = "guest";
        String contrasena = "guest";

        // Propiedades para crear el contexto: clase factoría, url del servidor JNDI y credenciales
     Properties env = new Properties();
        env.put(Context.INITIAL_CONTEXT_FACTORY, "org.jboss.naming.remote.client.InitialContextFactory");
        env.put(Context.PROVIDER_URL, "http-remoting://localhost:8080");
        env.put(Context.SECURITY_PRINCIPAL, usuario);
        env.put(Context.SECURITY_CREDENTIALS, contrasena);

        // El objeto InitialContext permite obtener la referencias de los objetos registrado en el ábol JNDI
     InitialContext ic = new InitialContext(env);

        // Objetos a obtener para usar JMS: 
     // - QueueConnectionFactory
     // - QueueConection
     // - Queue
     // - QueueSession
     // - QueueSubscriber
     // - QueuePublisher
     QueueConnectionFactory connectionFactory = (QueueConnectionFactory) ic.lookup("jms/RemoteConnectionFactory");
        QueueConnection connection = connectionFactory.createQueueConnection(usuario, contrasena);
        
        // Obtener el Queue en el cual se publicarán y del cual se recibirán los mensajes
     javax.jms.Queue queue = (javax.jms.Queue) ic.lookup("jms/queue/test");

        // Preparar el publicador y subscriptor al Queue
     Receiver receiver1 = new Receiver(connection, queue);
        Receiver receiver2 = new Receiver(connection, queue);
        Sender sender = new Sender(connection, queue);
        
        // Inicializar la recepción y envío de los mensajes
     connection.start();

        // Empezar a enviar mensajes en el Queue (y a recibirlos)
     Thread thread = new Thread(sender);     
        thread.start();
        // Esperar a que el enviador termine de enviar mensajes
     thread.join();

        // Parar el envío y recepción de mensajes
     connection.stop();
        
        // Terminar liberando los recursos
     receiver1.close();
        receiver2.close();
        sender.close();        
        connection.close();
        ic.close();
    }
    
    private static class Receiver implements MessageListener {
        
        private QueueSession session;
        private QueueReceiver receiver;
        
        public Receiver(QueueConnection connection, javax.jms.Queue queue) throws Exception {
            this.session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
            this.receiver = this.session.createReceiver(queue);
            this.receiver.setMessageListener(this);
        }
        
        public void close() throws Exception  {
            receiver.close();
            session.close();
        }
        
        @Override
        public void onMessage(Message message) {
            try {
                TextMessage text = (TextMessage) message;
                System.out.printf("Receptor (%s): Un publicador dice: «%s»\n", this, text.getText());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
    private static class Sender implements Runnable {
        
        private QueueSession session;
        private QueueSender sender;
        
        public Sender(QueueConnection connection, javax.jms.Queue queue) throws Exception {
            this.session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
            this.sender = this.session.createSender(queue);
        }
        
        public void close() throws Exception  {
            sender.close();
            session.close();
        }
        
        @Override
        public void run() {
            try {
                for (int i = 0; i < 10; ++i) {
                    Message mensaje = session.createTextMessage(String.format("¡Hola mundo! (%d)", i));
                    sender.send(mensaje);
                    Thread.sleep(1000);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

Comentar que los mensajes se procesan en serie por cada MessageListener, esto es, hasta que no termina el consumo de uno no se consume el siguiente. Esto se aplica por MensajeListener y sesión.

Si queremos probar los ejemplos deberemos disponer del servidor de aplicaciones WildFly. Para que los ejemplos funcionen deberemos configurarlo añadiendo un usuario «guest», de contraseña «guest» y de rol «guest», el añadirlo lo podemos hacer con la utilidad add-user.sh. También deberemos modificar el archivo standalone-full.xml añadiendo el topic y el queue en la sección de JMS e iniciar WildFly usando esa configuración:

1
2
$ ./standalone.sh --server-config=standalone-full.xml
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
<subsystem xmlns="urn:jboss:domain:messaging:2.0">
    <hornetq-server>
        ...
        <jms-destinations>
            <jms-queue name="testQueue">
                <entry name="jms/queue/test"/>
                <entry name="java:jboss/exported/jms/queue/test"/>
            </jms-queue>
            <jms-topic name="testTopic">
                <entry name="jms/topic/test"/>
                <entry name="java:jboss/exported/jms/topic/test"/>
            </jms-topic>
        </jms-destinations>
    </hornetq-server>
</subsystem>

Para terminar y conocer más sobre JMS un buen libro es Java Message Service de O’Reilly, que explica de forma mucho más extensa y completa este tema.

El código fuente completo de estos ejemplos los puedes descargar de mi repositorio de GitHub.