Introducción, conceptos y uso básico del broker de mensajes Apache Kafka

Escrito por el .
java planeta-codigo programacion software
Enlace permanente Comentarios

La comunicación mediante mensajes permite desacoplar las aplicaciones y procesar las peticiones de forma asíncrona. Apache Kafka es un broker de mensajes muy popular por su escalabilidad, tolerancia a fallos y adaptabilidad para ser usado en diferentes casos de uso. Es una herramienta compleja dado el número de casos de uso que soporta y la necesidad de ser distribuida para dotarse de escalabilidad y tolerancia a fallos.

Apache Kafka

Las aplicaciones tienen varias formas de comunicación, la más habitual es una comunicación petición y respuesta síncrona, otra opción es una comunicación mediante mensajes asíncrona. La comunicación mediante mensajes tiene varias ventajas al permitir que el productor del mensaje y consumidor no se conozcan lo que evita el acoplamiento entre servicios salvo por el esquema del mensaje que intercambian, la otra ventaja es que no es necesario que el productor y consumidor estén funcionando al mismo tiempo para comunicarse lo que mejora la fiabilidad y resiliencia de la aplicación.

Tecnologías de comunicación de mensajes hay varias en Java está JMS, dado que es una tecnología propia de Java limita las opciones del lenguaje en el que implementar los servicios quizá no es la más adecuada. Otra opción es RabbitMQ que proporciona un broker de mensajes más agnóstico del lenguaje de programación que JMS.

Siendo RabbitMQ una opción sencilla como cola de mensajes tiene unos límites en la escalabilidad y resiliencia. Apache Kafka es otro broker de mensajes más compleja que RabbitMQ pero que soporta más casos de uso en cuanto a resiliencia y escalabilidad para dar soporte a un gran volumen de mensajes.

Como cualquier herramienta Apache Kafka solo es una herramienta y es muy posible que en el futuro surjan nuevas que mejoren en algún aspecto Apache Kafka, una de esas herramientas puede que sea Memphis que se define a sí misma como tan simple como RabbitMQ y robusta como Apache Kafka.

En cualquier caso Apache Kafka es la opción dominante como broker de mensajes por su gran adaptabilidad a diferentes casos de usos, surgida en el año 2010 y siendo en el 2016 en el que empezó a ganar popularidad.

Introducción a Apache Kafka

Apache Kafka es un broker de mensajes altamente escalable al mismo tiempo que complejo. La complejidad en cierta medida es necesaria para ser resiliente y ofrecer tolerancia a fallos, la tolerancia a fallos y la escalabilidad se proporcionan creando un sistema distribuido que es el que añade complejidad a la herramienta como en la mayoría de herramientas distribuidas.

La documentación de Apache Kafka para conocer y entender los conceptos básicos es un buen sitio por donde empezar a aprender. Si se quiere algo más guiado y estructurado en el libro Apache Kafka: The Definitive Guide se explica de forma detallada todos los principales conceptos de forma detallada incluyendo los parámetros más importantes de configuración, un libro que me ha parecido muy detallado y recomiendo.

Por qué Apache Kafka

Kafka no es el único sistema de mensajería, las características que hacen de Kafka una buena opción son las siguientes:

  • Múltiples productores: varios productores pueden escribir en el mismo topic.
  • Múltiples consumidores: varios consumidores pueden leer del mismo topic y los mismos mensajes sin que estos se interfieran.
  • Durables y retención en disco: los mensajes son persistidos en disco para que ante un error no se pierdan y poder continuar su proceso una vez el error se resuelva. Los topics pueden definir una política de retención en base al tamaño en bytes o de tiempo, superado el límite definido los mensajes se descartan.
  • Escalable: la facilidad de particionar los topics y configurar consumidores permite procesar el volumen de mensajes que se tenga aunque sea muy grande, basta con añadir más brokers, particiones y consumidores.
  • Alto rendimiento: dada su escalabilidad el número de mensajes que puede entregar también es grande, y manteniendo latencias bajas en la entrega de mensajes.
  • Características de la plataforma: proporciona librerías y una API para extender el comportamiento de Kafka, integraciones para conectarlo a otros sistemas con Kafka Connect por ejemplo para recibir como mensajes los cambios en una base de datos y Kafka Streams permite procesar flujos constantes de datos.

Conceptos esenciales

Para usar Kafka de forma correcta conviene entender varios de los conceptos que maneja este broker de mensajes.

Para proporcionar escalabilidad y tolerancia a fallos es necesario utilizar varias instancias de servidor o brokers de Kafka que forman un cluster. En caso de que un broker falle por cualquier motivo el resto de brokers asumen su trabajo. Uno de los brokers es designado por el propio cluster como el controller y encargado de ciertas tareas administrativas del cluster.

Las aplicaciones tienen un rol de productor de mensajes, consumidor de mensajes o incluso ambos si lee y escribe al mismo tiempo. Las aplicaciones leen y escriben mensajes de Kafka a través de los topics, pudiendo haber múltiples productores y consumidores de un mismo topic al mismo tiempo.

Los producers envían los mensajes a los topics pero por escalabilidad y tolerancia a fallos los topics se dividen en particiones o partitions. Las particiones son consumidas por un único consumer pero un consumer puede tener asignados varias partitions de las que leer. Al crear el topic se especifica el número de peticiones que se desean para él y se distribuyen entre los diferentes brokers. Los mensajes son entregados a las particiones en base a una clave del mensaje en caso de que la tengan, esto permite entregar y agrupar ciertos mensajes en una misma partición para procesarlos en el orden que se producen por ejemplo todos los mensajes relacionados con un usuario.

Los mensajes son obtenidos por los consumidores de las particiones, cada partición es asignada a un consumer, solo un consumer lee de una partición y los mensajes de la partición se leen en el mismo orden en que fueron escritos. Por este motivo no tiene sentido tener más consumidores que particiones, y si se necesita escalar el consumo de mensajes con más consumidores es necesario aumentar el número de particiones. En caso de añadir nuevos consumidores se produce un rebalanceo en la asignación de las particiones distribuyendo la carga entre los diferentes consumers. Los consumers están agrupados en grupos de consumidores o consumers groups, dos consumer groups diferentes reciben la misma colección de mensajes que se envíen al topic. A cada mensaje se le asigna un número incremental u offset, la diferencia entre el offset producido en un partition y el offset consumido se denomina lag, estando cercano a cero significa que el consumidor está procesando los mensajes sin retardo y un lag con valores altos y que se incrementan significa que el consumidor no está siendo capaz de procesar el volumen de mensajes producidos.

Al crear el topic se define el número de particiones que se desea y se especifica el número de réplicas de cada partición. El número de particiones se determina en función del volumen de tráfico previsto del topic. Las particiones y las réplicas se distribuyen entre los diferentes brokers, para cada una de las particiones una es elegida como la partición leader y en la que los producer envían los mensajes. Las réplicas se mantienen sincronizadas con la partición leader, pudiendo definirse también el mínimo número de réplicas que han de estar sincronizadas. A las réplicas que están sincronizadas se les denomina ISR o in sync replica.

Los mensajes enviados a Kafka se escriben en disco y tienen la durabilidad definida según el periodo de retención definido para el topic. Los mensajes de las particiones escritas en el sistema de archivos se guardan en segmentos. Realmente la durabilidad aplica no a los mensajes individuales sino a los segmentos de las particiones.

En un sistema distribuido que se comunica por red hay fallos y que pueden ocasionar cosas inesperadas. Kafka proporciona la garantía que los mensajes se entrega en el mismo orden que se reciben, que los mensajes considerados commited, aquellos que se escriben en todas la réplicas sincronizadas, no se pierden y que los consumidores solo leen mensajes commited. Otras garantías que Kafka implementa son medidas de garantía en la entrega de mensajes o delivery garantees para que las aplicaciones se abstraigan de los posibles errores. Las garantías en la entrega de mensajes son como mucho uno o at most one con la que el mensaje solo se entrega como mucho una vez pudiendo perderse mensajes, con al menos uno o at least one se garantiza que se entrega una vez pero pudiendo recibir el mismo mensaje varias veces y finalmente exactamente uno o exactly once con la que el mensaje se entrega exactamente una vez. Sin estas garantías, puede haber casos en los que los mensajes no se entregan o se entregan varias veces.

Los mensajes tienen una clave, en algunos casos de uso solo interesa mantener el último mensaje de cada clave en un topic, a estos topics que solo mantienen la última versión de un mensaje según su clave se les denomina compacted topics.

Kafka no impone ninguna restricción en el contenido de los mensajes, los trata simplemente como una cantidad de bytes ya sean de texto, comprimidos o cifrados. Como formato se puede utilizar uno basado en texto como JSON o uno binario más eficiente como Avro y con un esquema.

Para algunas de tareas Kafka utiliza Zookeeper para la coordinación, consenso y almacenamiento de configuración. Es otro cluster de servidores que hay que mantener y añade complejidad. Los desarrolladores están trabajando para que Zookeeper no sea necesario y simplificar el uso de Kafka.

Propiedades de configuración

Kafka soporta numerosas propiedades de configuración entre las que están el número de particiones de los topics, la retención de los segmentos de los mensajes en base a tiempo o número de bytes, el número de réplicas a mantener de cada partición y otros parámetros de tiempos al procesar los mensajes y comunicación entre brokers.

Estas propiedades se configuran en función del volumen de tráfico generado por los productores, el volumen que es capaz de procesar cada consumidor y el volumen de tráfico o almacenamiento que es capaz de soportar un broker. El conjunto de los consumidores han de tener una capacidad de procesamiento similar a la de los productores, si los consumidores no fueran capaces de soportar el volumen de los productores y la situación se mantuviese de forma prolongada pasado un tiempo se acumularían demasiados mensajes sin procesar, el lag aumentaría, situación que podría generar malos comportamientos en las aplicaciones como pérdida de mensajes.

Un solo broker soporta un volumen grande de tráfico, pero en los casos de un uso intenso es necesario mayor número de instancias.

Descarga e inicio con contenedores Docker

Una opción sencilla para usar Apache Kafka es usándolo a través de un contenedor de Docker, basta instalar Docker y descargar el contenedor de Kafka.

Como imagen de contenedor de Kafka están las proporcionadas por la empresa Confluent que participa en el desarrollo de Kafka y proporciona varias herramientas complementarias de Kafka.

Este es un archivo de Docker Compose que permite arrancar varios contenedores a la vez, en el se muestra un cluster de tres instancias de Kafka y una única instancia de Zookeeper.

1
2
$ docker-compose up

docker-compose-up.sh
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
---
version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.0
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker1:
    image: confluentinc/cp-kafka:7.3.0
    container_name: broker1
    ports:
      - "19092:9092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker1:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
      KAFKA_BOOTSTRAP_SERVERS: kafka1:9092,kafka2:9092,kafka3:9092
      KAFKA_NUM_PARTITIONS: 2
      KAFKA_DEFAULT_REPLICATION_FACTOR: 2

  broker2:
    image: confluentinc/cp-kafka:7.3.0
    container_name: broker2
    ports:
      - "19093:9092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker2:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
      KAFKA_BOOTSTRAP_SERVERS: kafka1:9092,kafka2:9092,kafka3:9092
      KAFKA_NUM_PARTITIONS: 2
      KAFKA_DEFAULT_REPLICATION_FACTOR: 2

  broker3:
    image: confluentinc/cp-kafka:7.3.0
    container_name: broker3
    ports:
      - "19094:9092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker3:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
      KAFKA_BOOTSTRAP_SERVERS: kafka1:9092,kafka2:9092,kafka3:9092
      KAFKA_NUM_PARTITIONS: 2
      KAFKA_DEFAULT_REPLICATION_FACTOR: 2
docker-compose.yml

Uso

Los producers y consumers se pueden implementar en diferentes lenguajes entre los que está Java. Kafka proporciona una API para la integración con Kafka y el proyecto Spring una integración para las aplicaciones que utilizan este framework para mayor facilidad de uso en el proyecto Spring for Apache Kafka.

No es necesario crear un cliente de Kafka con un lenguaje de programación, Kafka proporciona varias utilidades administrativas que permiten consultar información de configuración y enviar y recibir mensajes desde la línea de comandos a un determinado topic.

Con esta utilidades permite probar Kafka por ejemplo para realizar el balanceo de consumidores cuando se agregan y se quitan, probar el periodo de retención de los mensajes o probar que ocurre cuando un broker se cae o detiene.

Listar que topics hay en un cluster y crear un topic con las propiedades de número de particiones, réplicas y retención se realiza con las utilidades de línea de comandos para interactuar con el cluster.

1
2
$ docker exec -it broker1 kafka-topics --bootstrap-server broker1:9092 --create --partitions 2 --replication-factor 2 --topic quickstart

kafka-topics-create.sh
1
2
$ docker exec -it broker1 kafka-topics --bootstrap-server broker1:9092 --delete --topic quickstart

kafka-topics-delete.sh
1
2
$ docker exec -it broker1 kafka-topics --bootstrap-server broker1:9092 --list

kafka-topics-list.sh
1
2
__consumer_offsets
quickstart
kafka-topics-list.out

Como se ha indicado de forma explícita al crear el topic y por los valores indicados en las variables de entorno del archivo docker-compose.yml que indican la configuración por defecto si no se hubiese indicado de forma explícita el topic tiene dos particiones y cada partición dos réplicas. al describir el topic se indican que broker es el líder de la partición, en que brokers están las réplicas de cada patición y que brokers tienen su réplica sincronizada.

1
2
$ docker exec -it broker1 kafka-topics --bootstrap-server broker1:9092 --topic quickstart --describe

kafka-topics-describe.sh
1
2
3
Topic: quickstart	TopicId: cqmOrCwcSAesmGGeRawHSw	PartitionCount: 2	ReplicationFactor: 2	Configs: 
	Topic: quickstart	Partition: 0	Leader: 2	Replicas: 2,1	Isr: 2,1
	Topic: quickstart	Partition: 1	Leader: 3	Replicas: 3,2	Isr: 3,2
kafka-topics-describe-1.out

Las operaciones básicas son que el productor envíe mensajes y los consumidores los reciban. Las utilidades de línea de comandos permiten crear consumidores y grupos de consumidores.

1
2
$ docker exec -it broker1 kafka-console-producer --bootstrap-server broker1:9092 --topic quickstart

kafka-console-producer.sh
1
2
3
4
>1
>2
>3
>
kafka-console-producer.out

Diferentes formas de arrancar un consumidor, leyendo el primer mensaje del topic, leyendo solo los nuevos mensajes y dentro de un consumer group.

1
2
3
$ docker exec -it broker1 kafka-console-consumer --bootstrap-server broker1:9092 --topic quickstart --from-beginning
$ docker exec -it broker1 kafka-console-consumer --bootstrap-server broker1:9092 --topic quickstart
$ docker exec -it broker1 kafka-console-consumer --bootstrap-server broker1:9092 --group consumer-group-1 --topic quickstart
kafka-console-consumer.sh
1
2
3
1
2
3
kafka-console-consumer.out

Listar los consumers permite ver cuántos hay, que offset están consumiendo y que lag tienen. En esta salida el primer consumer-group hay un único consumer que está consumiendo de ambas particiones del topic, en el segundo consumer-group hay dos consumers diferentes cada uno consumiendo de una partición.

1
2
$ docker exec -it broker1 kafka-consumer-groups --bootstrap-server localhost:9092 --all-groups --describe

kafka-consumer-groups.sh
1
2
3
4
5
6
7
GROUP                  TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                           HOST            CLIENT-ID
console-consumer-82790 quickstart      0          -               0               -               console-consumer-0d7d3567-2bc2-4c7f-ab74-45a57b905e30 /172.24.0.3     console-consumer
console-consumer-82790 quickstart      1          -               3               -               console-consumer-0d7d3567-2bc2-4c7f-ab74-45a57b905e30 /172.24.0.3     console-consumer

GROUP            TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                           HOST            CLIENT-ID
consumer-group-1 quickstart      0          0               0               0               console-consumer-263103d2-61fa-454f-9989-a313be03de36 /172.24.0.3     console-consumer
consumer-group-1 quickstart      1          3               3               0               console-consumer-82ac7e50-7736-416a-8237-f2a582246053 /172.24.0.3     console-consumer
kafka-consumer-groups.out

Las réplicas de las particiones son distribuidas entre los brokers, si un broker que es leader de una partición deja de funcionar otro broker con una réplica sincronizada o isr se convertirá en el nuevo leader, este mecanismo es el que utiliza Kafka para tener la propiedad de tolerancia a fallos.

Aunque Kafka tiene tolerancia a fallos no se auto repara a si mismo en todas las situaciones. Las réplicas que tuviese un broker que ha fallado no se distribuyen en otros brokers para que el número de réplicas definido se mantenga constante. Kafka espera que si se cae un broker esté al cabo de un tiempo se recupere y mientras tanto el resto de brokers siguen procesando los mensajes. Las réplicas no se mueven de broker debido a que podría ser demasiado costoso ya que pueden alcanzar tamaños suficientemente grandes para que el tráfico de red fuese muy elevado y costoso en tiempo.

Al forzar la parada de un broker simulando un fallo sus réplicas se dejan de considerar sincronizadas.

1
2
$ docker stop broker2

docker-stop-broker2.sh
1
2
3
Topic: quickstart	TopicId: cqmOrCwcSAesmGGeRawHSw	PartitionCount: 2	ReplicationFactor: 2	Configs: 
	Topic: quickstart	Partition: 0	Leader: 1	Replicas: 2,1	Isr: 1
	Topic: quickstart	Partition: 1	Leader: 3	Replicas: 3,2	Isr: 3
kafka-topics-describe-2.out

Una vez el broker se vuelve a levantar quizá por una acción manual del administrador de sistemas o automáticamente por del orquestador de contenedores las réplicas que tenía asignadas el broker 2 se vuelven marcar como sincronizadas en la columna isr.

1
2
$ docker start broker2

docker-start-broker2.sh
1
2
3
Topic: quickstart	TopicId: cqmOrCwcSAesmGGeRawHSw	PartitionCount: 2	ReplicationFactor: 2	Configs: 
	Topic: quickstart	Partition: 0	Leader: 2	Replicas: 2,1	Isr: 1,2
	Topic: quickstart	Partition: 1	Leader: 3	Replicas: 3,2	Isr: 3,2
kafka-topics-describe-3.out

Tutorial

En YouTube hay varios tutoriales con varios capítulos acerca de Kafka con más información y mostrándolo en vídeo, basta con hacer un búsqueda.


Comparte el artículo: