Iniciación a la programación concurrente en Java

Escrito por el .
java planeta-codigo programacion
Enlace permanente Comentarios

Java proporciona en su API numerosas primitivas para realizar programación concurrente. La programación concurrente permite realizar varias tareas simultáneamente aprovechando los múltiples núcleos de los procesadores modernos con un tiempo de ejecución total para un conjunto de tareas significativamente menor. Dos de los problemas de concurrencia más conocidos son el problema de los filósofos y del barbero que en este artículo muestro como implementar usando varias de las primitivas ofrecidas por Java.

Java

En todo el tiempo que llevo programando en Java no he tenido necesidad de conocer en detalle las primitivas de concurrencia que ofrece el lenguaje y la API. Java desde sus primeras versiones ya ofrecía el soporte básico para la programación concurrente con las clases Thread y Runnable y algunas primitivas de sincronización como la palabra clave reservada syncrhonized, los locks intrínsecos de los objetos y algunos métodos de la clase Thread como sleep, wait y join. Entre la documentación de Java está el siguiente tutorial sobre la concurrencia en Java que es muy recomendable leer.

Las computadoras realizan varias tareas de forma concurrente con la ayuda del sistema operativo que permite compartir el procesador para realizar diferentes tareas (navegar por internet, editar un documento, escuchar música, …) cambiando cada muy poco tiempo (medido en ms) entre procesos, con los procesadores de varios núcleos las tareas se ejecutan silmultáneamente en diferentes núcleos. Los threads en Java se comunican principalmente compartiendo referencias a objetos, este tipo de comunicación es eficiente pero posibilita dos tipos de errores, interferencias entre threads y errores de consistencia, la herramienta para evitarlos es la sincronización. Sin embargo, la sincronización introduce contención cuando dos o más hilos intentan acceder al mismo recurso simultáneamente y provocan una pérdida de rendimiento. El bloqueo mutuo o deadlock, la inanición o starvation y un bloqueo vivo o livelock son problemas de la sincronización. Seguramente te suenen los objetos inmutables, en la programación concurrente son especialmente útiles dado que su estado no cambia y no pueden corromperse ni quedar en un estado inconsistente por la interferencia entre threads evitando de esta manera errores que suelen ser difíciles de depurar por ofrecer un comportamiento errático.

En vez de usar los locks implícitos de los objetos la API de Java para concurrencia ofrece varios tipos más con propiedades adicionales como la habilidad de salir si el intento de adquirir el lock falla. En el paquete java.util.concurrent.locks está listados. Otro tipo de primitivas de sincronización para threads son los Semaphore, CyclicBarrier y CountDownLatch entre otros como Phaser y Exchanger. En el paquete java.util.concurrent.atomic hay varios tipos de datos básicos que realizan sus operaciones de forma atómica como por ejemplo contadores.

Con los Executors y ExecutorService no hace falta que manejemos los hilos a bajo nivel, es posible obtener un pool de threads de una tamaño específico y enviar clases Callable o Runnable que devuelven un resultado para que se ejecuten con un thread del pool cuando esté libre. Con la clase ScheduledExecutorService se programa la ejecución de tareas de forma periódica. En los streams añadidos a Java 8 el procesamiento se puede realizar de forma paralela aprovechando los microprocesadores multinúcleo sin tener que usar de forma explícita ninguna de las utilidades anteriores, internamente usa el Fork/Join.

El soporte para la programación concurrente ofrecido en Java es suficiente para la mayoría de tareas que podamos necesitar y ha mejorado bastante desde las primeras versiones.

El primer ejemplo que muestro es usando concurrencia ejecutar varias tareas y como realizándolas de forma secuencial el tiempo total empleado es la suma del tiempo de las tareas individuales y como usando concurrencia es la suma de la tarea que más tarda. El ejemplo se trata de 8 tareas que de forma secuencial tardan aproximadamente 24 segundos ya que cada tarea emplea 3 segundos, en el caso empleando concurrencia el tiempo es de aproximadamente 6 segundos ya se se emplea en pool de threads de 4 de capacidad con lo que las primeras 4 tareas tardan 3 segundos y el siguiente lote de 4 tareas tarda otros 3 segundos para un total de 6 segundos.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package io.github.picodotdev.blogbitix.javaconcurrency.threadpool;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.*;

public class Main {

    private static Logger logger = LoggerFactory.getLogger(Main.class);

    public static void main(String[] args) throws Exception {
        List<Callable<UUID>> tasks = new ArrayList();
        for (int i = 0; i < 8; ++i) {
            tasks.add(newTask());
        }

        {
            long start = System.currentTimeMillis();
            logger.info("Executing secuential...");

            List<UUID> results = new ArrayList<>();
            for (Callable<UUID> task : tasks) {
                 UUID uuid = task.call();
                results.add(uuid);
            };
            for(UUID uuid : results) {
                System.out.println(uuid);
            }

            long end = System.currentTimeMillis();
            logger.info("Secuential time ({} ms)...", end - start);
        }

        {
            long start = System.currentTimeMillis();
            logger.info("Executing concurrent...");

            ExecutorService executor = Executors.newFixedThreadPool(4);
            List<Future<UUID>> results = executor.invokeAll(tasks);
            executor.shutdown();
            for(Future<UUID> uuid : results) {
                System.out.println(uuid.get());
            }

            long end = System.currentTimeMillis();
            logger.info("Concurrent time ({} ms)...", end - start);
        }
    }

    private static Callable<UUID> newTask() {
        return new Callable<UUID>() {
            @Override
            public UUID call() throws Exception {
                UUID uuid = UUID.randomUUID();
                logger.info("Starting task {}", uuid);
                Thread.sleep(3000);
                return uuid;
            }
        };
    }
}
Main.java

Dos de los problemas más conocidos en la programación concurrente son el de La cena de los filósofos y el de El barbero durmiente. Usando algunas de las primitivas comentadas en este artículo este sería el código para para resolver ambos problemas en Java.

En este código del problema de los filósofos la clase Table crea los filósofos asignándoles los Fork que tienen que compartir para comer después de estar un tiempo pensando. En la ejecución se observa que el primer filósofo que intenta comer puede hacerlo ya que sus tenedores adyacentes está libres pero posteriormente se observa que en algunas ocasiones algún filósofo no puede hacerlo porque sus tenedores están siendo usados por alguno de sus compañeros adyacentes.

Esta implementación de los filósofos no es del todo correcta debido a que un filósofo podría quedarse sin comer o quedarse sin comer duramente mucho tiempo. En el artículo [El problema de concurrencia de la cena de los filósofos resuelto con Java][blogbitix-302] expongo otra solución sin este problema y resuelto correctamente.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
package io.github.picodotdev.blogbitix.javaconcurrency.philosophers;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PhilosophersMain {

    private static Logger logger = LoggerFactory.getLogger(PhilosophersMain.class);

    public static void main(String[] args) throws InterruptedException {
        logger.info("Setuping dinner...");
        Table table = new Table(5);
        Thread dinner = new Thread(table);

        logger.info("Starting dinner...");
        dinner.run();
        dinner.join();
    }
}
PhilosophersMain.java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package io.github.picodotdev.blogbitix.javaconcurrency.philosophers;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Table implements Runnable {

    private List<Philosopher> philosophers;

    public Table(int numPhilosophers) {
        if (numPhilosophers < 2) {
            throw new IllegalArgumentException("There should be more than one philosopher");
        }

        this.philosophers = new ArrayList<>();

        Fork leftFork = new Fork();
        for (int i = 0; i < numPhilosophers; ++i) {
            boolean isLastPhilosopher = (i == numPhilosophers -1);
            Fork rightFork = (isLastPhilosopher) ? philosophers.get(0).getLeftFork() : new Fork();

            Philosopher philosopher = new Philosopher("Philosopher " + (i + 1), leftFork, rightFork);
            philosophers.add(philosopher);
            leftFork = rightFork;
	}
    }

    @Override
    public void run() {
        ExecutorService executorService = Executors.newFixedThreadPool(philosophers.size());
        for (Philosopher p : philosophers) {
            executorService.submit(p);
        }
    }
}
Table.java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package io.github.picodotdev.blogbitix.javaconcurrency.philosophers;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

public class Fork {

    private ReentrantLock lock;

    public Fork() {
        this.lock = new ReentrantLock();
    }

    public boolean take() throws InterruptedException {
        return lock.tryLock(250, TimeUnit.MILLISECONDS);
    }

    public boolean isHeld() {
        return lock.isHeldByCurrentThread();
    }

    public void drop() throws InterruptedException {
        if (isHeld()) {
            lock.unlock();
        }
    }
}
Fork.java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package io.github.picodotdev.blogbitix.javaconcurrency.philosophers;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Iterator;
import java.util.Random;

public class Philosopher implements Runnable {

    private static Logger logger = LoggerFactory.getLogger(Philosopher.class);

    private String name;
    private Fork leftFork;
    private Fork rightFork;

    private Iterator<Long> times;

    public Philosopher(String name, Fork leftFork, Fork rightFork) {
        this.name = name;
        this.leftFork = leftFork;
        this.rightFork = rightFork;

        this.times = new Random().longs(2000, 7000).iterator();
    }

    public void eat() throws InterruptedException {
        try {
            logger.info("{} trying eat...", name);
            boolean leftTaked = leftFork.take();
            if (leftTaked) {
                boolean rightTaked = rightFork.take();
                if (rightTaked) {
                    long time = getTime();
                    logger.info("{} eating during {}ms", name, time);
                    spendTime(time);
                }
            }
            if (!leftFork.isHeld() || !rightFork.isHeld()) {
                logger.info("{} cannot eat", name);
            }
        } finally {
            leftFork.drop();
            rightFork.drop();
        }
    }

    public void think() throws InterruptedException {
        long time = getTime();
        logger.info("{} thinkink during {}ms", name, time);
        spendTime(time);
    }


    public Fork getLeftFork() {
        return leftFork;
    }

    public Fork getRightFork() {
        return rightFork;
    }

    @Override
    public void run() {
        while (true) {
            try {
                think();
                eat();
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
        }
    }

    private long getTime() throws InterruptedException {
        return times.next();
    }

    private void spendTime(long time) throws InterruptedException {
        Thread.sleep(time);
    }
}
Philosopher.java

En el caso de ejemplo del barbero cuando solo hay un barbero los clientes se acumulan ya que estos entran en la tienda a razón de 1 entre 1500 y 3500ms y el barbero tarda afeitar un cliente entre 2000 y 7000ms. Poniendo en la barbería dos barberos los clientes ya no se acumulan en la sala de espera.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package io.github.picodotdev.blogbitix.javaconcurrency.barber;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class BarberShopMain {

    private static Logger logger = LoggerFactory.getLogger(BarberShopMain.class);

    public static void main(String[] args) throws InterruptedException {
        BarberShop shop = new BarberShop(5, 2);
        List<Barber> barbers = shop.getBarbers();
        Street street = new Street(shop);

        ExecutorService executorService = Executors.newFixedThreadPool(barbers.size() + 1);
        executorService.submit(street);
        barbers.forEach(executorService::submit);
    }
}
BarberShopMain.java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package io.github.picodotdev.blogbitix.javaconcurrency.barber;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.concurrent.*;

public class BarberShop {

    private static Logger logger = LoggerFactory.getLogger(BarberShop.class);

    private ArrayBlockingQueue<Client> clients;
    private Semaphore chairs;
    private List<Barber> barbers;

    private Iterator<Long> times;

    public BarberShop(int capacity, int numBarbers) {
        this.clients = new ArrayBlockingQueue<Client>(capacity);
        this.chairs = new Semaphore(numBarbers);

        this.barbers = new ArrayList<>();
        for (int i = 0; i < numBarbers; ++i) {
            Barber barber = new Barber("Barber " + (i + 1), this);
            barbers.add(barber);
        }

        this.times = new Random().longs(1000, 4000).iterator();
    }

    public List<Barber> getBarbers() {
        return barbers;
    }

    public boolean enter(Client client) throws InterruptedException {
        boolean entered = clients.offer(client, 1, TimeUnit.SECONDS);
        if (!entered) {
            return false;
        }
        logger.info("{} awaiting to barber ({})", client.getName(), clients.size());
        chairs.acquire();
        client.awaitShave();
        return true;
    }

    public Client next() throws InterruptedException {
        return clients.take();
    }

    private long getTime() throws InterruptedException {
        return times.next();
    }

    private void spendTime(long time) throws InterruptedException {
        Thread.sleep(time);
    }
}
BarberShop.java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package io.github.picodotdev.blogbitix.javaconcurrency.barber;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;

public class Street implements Runnable {

    private static Logger logger = LoggerFactory.getLogger(Street.class);

    private AtomicLong counter;
    private BarberShop shop;

    private Iterator<Long> times;

    public Street(BarberShop shop) {
        this.counter = new AtomicLong();
        this.shop = shop;

        this.times = new Random().longs(1500, 3500).iterator();
    }

    @Override
    public void run() {
        while (true) {
            try {
                long i = counter.incrementAndGet();
                Client client = new Client("Client " + i, shop);
                logger.info("New client {}", client.getName());
                new Thread(client).start();

                spendTime(getTime());
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
        }
    }

    private long getTime() throws InterruptedException {
        return times.next();
    }

    private void spendTime(long time) throws InterruptedException {
        Thread.sleep(time);
    }
}
Street.java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package io.github.picodotdev.blogbitix.javaconcurrency.barber;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Iterator;
import java.util.Optional;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Barber implements Runnable {

    private static Logger logger = LoggerFactory.getLogger(Barber.class);

    private String name;
    private BarberShop shop;

    private Iterator<Long> times;

    public Barber(String name, BarberShop shop) {
        this.name = name;
        this.shop = shop;

        this.times = new Random().longs(2000, 7000).iterator();
    }

    public void shave(Client client) throws InterruptedException {
        long time = getTime();
        logger.info("{} shaving {} during {}ms", name, client.getName(), time);
        spendTime(time);
        logger.info("{} shaved {}", name, client.getName());
        client.shaved();
    }

    @Override
    public void run() {
        while (true) {
            try {
                logger.info("{} awaiting client", name);
                Client client = shop.next();
                shave(client);
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
        }
    }

    private long getTime() throws InterruptedException {
        return times.next();
    }

    private void spendTime(long time) throws InterruptedException {
        Thread.sleep(time);
    }
}
Barber.java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package io.github.picodotdev.blogbitix.javaconcurrency.barber;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Iterator;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.Semaphore;

public class Client implements Runnable {

    private static Logger logger = LoggerFactory.getLogger(Client.class);

    private String name;
    private BarberShop shop;

    private Iterator<Long> times;

    private Semaphore shave;

    public Client(String name, BarberShop shop) {
        this.name = name;
        this.shop = shop;

        this.shave = new Semaphore(0);
        this.times = new Random().longs(2000, 7000).iterator();
    }

    public String getName() {
        return name;
    }

    public void awaitShave() throws InterruptedException {
        shave.acquire();
    }

    public void shaved() {
        shave.release();
    }

    @Override
    public void run() {
        try {
            boolean entered = shop.enter(this);
            if (!entered) {
                logger.info("{} exited, barbershop at full capacity", name);
            } else {
                logger.info("{} exited", name);
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
    }
}
Client.java

Estos no son los únicos ejemplos clásicos otro es el del agente y los fumadores.

Terminal

El código fuente completo del ejemplo puedes descargarlo del repositorio de ejemplos de Blog Bitix alojado en GitHub y probarlo en tu equipo ejecutando siguiente comando:
./gradlew run


Comparte el artículo: