Ejemplo de Reactive Streams en Java

Escrito por el .
java planeta-codigo
Comentarios

Java

Los streams representan un flujo de elementos producidos por un productor y consumidos por uno o más consumidores. Para procesar los elementos del stream se pueden emplear dos modelos, el modelo push donde el productor produce elementos para el consumidor que es avisado cuando hay un nuevo elemento disponible y el modelo pull en el que es el consumidor el que solicita al productor nuevos elementos que los genera bajo demanda. Ambos modelos presentan problemas cuando el productor y el consumidor no funcionan a la misma velocidad de elementos producidos o procesados. La solución es proporcionar un stream que se adapta a la velocidad de ambos. Los reactive streams son empleados cuando los elementos son producidos y consumidos en tiempo real como en sistemas de mensajes o peticiones HTTP en vez de un flujo constante como un dispositivo de almacenamiento.

Una estrategia es conocida como backpressure que consiste en que el suscriptor notifica al productor cuántos elementos puede procesar de modo que el productor solo envía el número de elementos solicitados. La implementación de la solución son los reactive stream que proporcionan un mecanismo estándar asíncrono para el stream con backpressure. Estos evitan que el productor se bloquee por no poder entregarlos al ser rápido produciendo elementos o el suscriptor tenga un buffer grande o descarte algunos elementos por ser lento al consumirlos o se bloquee esperando nuevos elementos si es rápido consumiéndolos.

En la API entre otras novedades de la versión 9 de Java se han añadido las siguientes clases para soportar reactive streams embebidas en la clase Flow, Flow.Publisher, Flow.Subscriber, Flow.Subscription, Flow.Processor y SubmissionPublisher en el paquete java.util.concurrent incluido en el módulo java.base.

Un Publisher publica elementos para los Subscriber basándose en sus demandas recibidas. Un suscriptor se subscribe a un productor. El productor proporciona un token de suscripción con el que el suscriptor puede solicitar N elementos al productor. Cuando los elementos están disponibles el productor envía N o menos elementos al suscriptor. Posteriormente el suscriptor puede solicitar más elementos.

En el ejemplo de código un productor produce y los consumidores procesan elementos a cierto ritmo, dependiendo de la velocidad relativa de cada uno se usará un modelo push o pull. La clase Flow.Processor permite procesar los elementos del productor para aplicarles alguna transformación antes de que sean entregados a los consumidores, actual como consumidor y productor. En este stream de números enteros se aplica la función elevarlos al cuadrado.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package io.github.picodotdev.blogbitix.reactivestreams;

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.stream.IntStream;

public class Main {

    private static class PrintSubscriber implements Flow.Subscriber<Integer> {

        private Flow.Subscription subscription;

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            this.subscription = subscription;
            subscription.request(1);
        }

        @Override
        public void onNext(Integer item) {
            System.out.println("Received item: " + item);
            subscription.request(1);
            Sleeper.sleep(1000);
        }

        @Override
        public void onError(Throwable error) {
            error.printStackTrace();
        }

        @Override
        public void onComplete() {
            System.out.println("PrintSubscriber completed");
        }
    }

    public static class PowProcessor extends SubmissionPublisher<Integer> implements Flow.Processor<Integer, Integer> {

        private Flow.Subscription subscription;

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            this.subscription = subscription;
            subscription.request(1);
        }

        @Override
        public void onNext(Integer item) {
            submit(item * item);
            subscription.request(1);

        }

        @Override
        public void onError(Throwable error) {
            error.printStackTrace();
        }

        @Override
        public void onComplete() {
            System.out.println("PowProcessor completed");
            close();
        }
    }

    private static class Sleeper {
        private static void sleep(int time) {
            try {
                Thread.sleep(time);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws Exception {
        SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
        Flow.Processor<Integer, Integer> processor = new PowProcessor();
        Flow.Subscriber<Integer> subscriber = new PrintSubscriber();

        publisher.subscribe(processor);
        processor.subscribe(subscriber);

        IntStream.range(0, 10).forEach(it -> {
            publisher.submit(it);
            Sleeper.sleep(2000);
        });

        publisher.close();
    }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
Received item: 0
Received item: 1
Received item: 4
Received item: 9
Received item: 16
Received item: 25
Received item: 36
Received item: 49
Received item: 64
Received item: 81
PowProcessor completed
PrintSubscriber completed

El código fuente completo del ejemplo puedes descargarlo del repositorio de ejemplos de Blog Bitix alojado en GitHub y probarlo en tu equipo ejecutando el comando ./gradlew run.