Curso-lenguaje-python/catch-all/05_infra_test/02_rabbitmq
2024-08-05 23:37:04 +02:00
..
01_hello_world Add rabbitmq test 2024-07-31 23:27:53 +02:00
02_work_queues Add rabbitmq test 2024-07-31 23:27:53 +02:00
03_publish_subcribe Add rabbitmq test 2024-07-31 23:27:53 +02:00
04_routing Update rabbitmq test 2024-08-04 20:56:58 +02:00
05_topics Update rabbitmq test 2024-08-05 23:37:04 +02:00
docker-compose.yaml Update rabbitmq test 2024-08-04 20:56:58 +02:00
README.md Update rabbitmq test 2024-08-05 23:37:04 +02:00

Pruebas con rabbitmq

Índice de contenidos:

Despliegue rabbitmq con docker

Para desplegar RabbitMQ rápidamente, puedes usar Docker. Ejecuta el siguiente comando para iniciar un contenedor con RabbitMQ y su consola de gestión:

docker run -d --hostname my-rabbit --name some-rabbit -p 8080:15672 -p 5672:5672 rabbitmq:3-management

Si prefieres usar docker-compose, utiliza el archivo docker-compose.yaml con el siguiente comando:

docker compose up -d

Pruebas

Pruebas extraídas de los tutoriales de la documentación oficial de RabbitMQ.

Hello World

Lo más sencillo que hace algo.

Tenemos que diferenciar algunos conceptos:

  • Producer: es el que envía mensajes.
  • Queue: es donde se almacenan los mensajes.
  • Consumer: es el que recibe mensajes.

Vamos a programar un producer y un consumer en Python.

RabbitMQ habla múltiples protocolos. Este tutorial utiliza AMQP 0-9-1, que es un protocolo abierto de propósito general para mensajería.

Hay un gran número de clientes para RabbitMQ en muchos idiomas diferentes. En esta serie de tutoriales vamos a usar Pika 1.0.0, que es el cliente Python recomendado por el equipo de RabbitMQ. Para instalarlo puedes usar la herramienta de gestión de paquetes pip:

pip install pika --upgrade

Nuestro primer programa send.py será el producer que enviará un único mensaje a la cola. Este script también crea la cola hola.

El programa receive.py será el consumer que recibirá mensajes de la cola y los imprimirá en pantalla.

Desde la instalación de rabbitmq puedes ver qué colas tiene RabbitMQ y cuántos mensajes hay en ellas con rabbitmqctl:

sudo rabbitmqctl list_queues

Antes tendrás que entrar en el contenedor de rabbitmq:

docker exec -it rabbitmq-server bash

Ahora, para probarlo, ejecuta el producer y el consumer en dos terminales diferentes:

cd hello-world

python send.py

python receive.py

Work Queues

Reparto de tareas entre los trabajadores (el modelo de consumidores competidores).

Antes hemos enviado un mensaje que contenía ¡Hola Mundo!. Ahora enviaremos cadenas que representan tareas complejas. No tenemos una tarea del mundo real, como imágenes para ser redimensionadas o archivos pdf para ser renderizados, así que vamos a fingir que estamos ocupados usando la función time.sleep(). Tomaremos el número de puntos de la cadena como su complejidad; cada punto representará un segundo de «trabajo». Por ejemplo, una tarea falsa descrita por Hola... tardará tres segundos.

Vamos a modificar el anterior send.py para permitir el envío de mensajes arbitrarios desde la línea de comando. Le llamaremos new_task.py.

También modificaremos receive.py para simular un segundo trabajao por cada punto en el cuerpo del mensaje. Como sacará mensajes de la cola y realizará la tarea le llamaremos worker.py.

Ahora, si ejecutamos dos veces o más el script worker.py, veremos cómo se reparten las tareas entre los dos consumidores.

En dos terminales distintas:

cd 02work-queues

python worker.py

Y en la tercera terminal enviaremos trabajos:

python new_task.py Primer mensaje.
python new_task.py Segundo mensaje..
python new_task.py Tercer mensaje...
python new_task.py Cuarto mensaje....
python new_task.py Quinto mensaje.....

Por defecto, RabbitMQ enviará cada mensaje al siguiente consumidor, en secuencia. Por término medio, cada consumidor recibirá el mismo número de mensajes. Esta forma de distribuir mensajes se llama round-robin.

Para asegurarse de que un mensaje nunca se pierde, RabbitMQ soporta acuses de recibo de mensajes. Un ack(nowledgement) es enviado de vuelta por el consumidor para decirle a RabbitMQ que un mensaje en particular ha sido recibido, procesado y que RabbitMQ es libre de borrarlo.

Apunte: ack es una abreviatura de acknowledgement (reconocimiento). En el caso de que un consumidor muera (su conexión se cierre, por ejemplo) sin enviar un ack, RabbitMQ entenderá que no ha procesado el mensaje y lo reenviará a otro consumidor. Si hay otros consumidores conectados a la cola, se les enviará el mensaje.

Acuse de recibo olvidado

Es un error común olvidar el basic_ack. Los mensajes se volverán a entregar cuando tu cliente salga (lo que puede parecer una redistribución aleatoria), pero RabbitMQ consumirá cada vez más memoria ya que no será capaz de liberar ningún mensaje no empaquetado.

Para depurar este tipo de errores puedes usar rabbitmqctl para imprimir el campo messages_unacknowledged:

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

Publish/Subscribe

A diferencia de las colas de trabajo, donde cada tarea se entrega a un solo trabajador, este tutorial demuestra el patrón de publicación/suscripción, que entrega mensajes a múltiples consumidores.

El ejemplo es un sistema de registro con dos programas: uno que emite mensajes de registro y otro que los recibe y los imprime.

Cada instancia del programa receptor recibe todos los mensajes, permitiendo que los registros se dirijan al disco o se visualicen en pantalla.

Enfoque:

El ejemplo es un sistema de registro con dos programas: uno que emite mensajes de registro y otro que los recibe y los imprime.

Cada instancia del programa receptor recibe todos los mensajes, permitiendo que los registros se dirijan al disco o se visualicen en pantalla.

Exchange: En RabbitMQ, los productores envían mensajes a un intercambio, no directamente a una cola.

Un intercambio enruta los mensajes a las colas según las reglas definidas por su tipo.

Los tipos de intercambios incluyen directo, tópico, cabeceras y fanout. El tutorial se centra en fanout, que transmite mensajes a todas las colas conocidas.

Ejemplo de declaración de un intercambio fanout:

channel.exchange_declare(exchange='logs', exchange_type='fanout')

Colas Temporales: Las colas temporales se crean con nombres generados aleatoriamente, y se eliminan automáticamente cuando se cierra la conexión del consumidor. Ejemplo de declaración de una cola temporal:

result = channel.queue_declare(queue='', exclusive=True)

Código de Ejemplo:

Routing

En el tutorial anterior, creamos un sistema de registro simple que enviaba mensajes de log a múltiples receptores. En este tutorial, añadiremos la capacidad de suscribirse solo a un subconjunto de mensajes, permitiendo, por ejemplo, que solo los mensajes de error críticos se registren en un archivo, mientras que todos los mensajes de log se imprimen en la consola.

Enlaces

En ejemplos anteriores, ya creamos enlaces entre intercambios (exchanges) y colas (queues). Un enlace determina qué colas están interesadas en los mensajes de un intercambio. Los enlaces pueden incluir una clave de enrutamiento (routing key) que especifica qué mensajes de un intercambio deben ser enviados a una cola.

channel.queue_bind(exchange=exchange_name,
                   queue=queue_name,
                   routing_key='black')

La clave de enlace depende del tipo de intercambio. En un intercambio de tipo fanout, esta clave es ignorada.

Intercambio Directo

Anteriormente, usamos un intercambio de tipo fanout que transmitía todos los mensajes a todos los consumidores sin distinción. Ahora utilizaremos un intercambio direct que permite filtrar mensajes basándose en su severidad. Así, los mensajes serán enviados solo a las colas que coincidan exactamente con la clave de enrutamiento del mensaje.

Por ejemplo, si un intercambio tiene dos colas con claves de enlace orange y black, un mensaje con clave de enrutamiento orange solo irá a la cola correspondiente a orange.

Múltiples Enlaces

Es posible vincular varias colas con la misma clave de enlace. En este caso, el intercambio direct actúa como un fanout, enviando el mensaje a todas las colas que tengan una clave de enlace coincidente.

Emisión de Logs

Usaremos este modelo para nuestro sistema de logs. En lugar de fanout, enviaremos mensajes a un intercambio direct, usando la severidad del log como clave de enrutamiento.

Primero, debemos declarar un intercambio:

channel.exchange_declare(exchange='direct_logs',
                         exchange_type='direct')

Y luego podemos enviar un mensaje:

channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)

Las severidades pueden ser 'info', 'warning' o 'error'.

Suscripción

Para recibir mensajes, crearemos un enlace para cada severidad de interés.

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

Código de Ejemplo

Ejemplos de Uso

  • Para guardar solo los mensajes de 'warning' y 'error' en un archivo:

    python receive_logs_direct.py warning error > logs_from_rabbit.log
    
  • Para ver todos los mensajes de log en pantalla:

    python receive_logs_direct.py info warning error
    
  • Para emitir un mensaje de error:

    python emit_log_direct.py error "Run. Run. Or it will explode."
    

Topics (Próximamente)

En el tutorial anterior, mejoramos nuestro sistema de registro utilizando un intercambio de tipo direct para recibir registros selectivamente, basado en criterios como la severidad del mensaje. Sin embargo, para mayor flexibilidad, podemos usar un intercambio de tipo topic, que permite el enrutamiento de mensajes basado en múltiples criterios.

¿Qué es un intercambio de temas?

  • routing_key: En un intercambio de tipo topic, los mensajes tienen una clave de enrutamiento (routing_key) que es una lista de palabras separadas por puntos. Ejemplos: "quick.orange.rabbit", "lazy.brown.fox".
  • binding_key: Las claves de enlace (binding_key) también tienen el mismo formato y determinan qué mensajes recibe cada cola.

Casos especiales de binding_key

  • * (asterisco): Sustituye exactamente una palabra.
  • # (almohadilla): Sustituye cero o más palabras.

Ejemplo de uso

Considera el siguiente escenario con dos colas (Q1 y Q2) y estas claves de enlace:

  • Q1: *.orange.* (recibe todos los mensajes sobre animales naranjas)
  • Q2: *.*.rabbit y lazy.# (recibe todos los mensajes sobre conejos y animales perezosos)

Ejemplos de mensajes:

  • "quick.orange.rabbit": Entregado a Q1 y Q2.
  • "lazy.orange.elephant": Entregado a Q1 y Q2.
  • "quick.orange.fox": Solo entregado a Q1.
  • "lazy.brown.fox": Solo entregado a Q2.

Mensajes con una o cuatro palabras, como "orange" o "quick.orange.new.rabbit", no coinciden con ningún enlace y se pierden.

Características del intercambio de temas

  • Puede comportarse como un intercambio fanout si se usa # como binding_key (recibe todos los mensajes).
  • Se comporta como un intercambio direct si no se utilizan * o # en las claves de enlace.

Implementación del sistema de registro

Usaremos un intercambio de temas para enrutar registros usando routing_key con el formato <facilidad>.<severidad>. El código para emitir y recibir registros es similar al de tutoriales anteriores.

Ejemplos de comandos:

  • Recibir todos los registros: python receive_logs_topic.py "#"
  • Recibir registros de "kern": python receive_logs_topic.py "kern.*"
  • Recibir solo registros "critical": python receive_logs_topic.py "*.critical"
  • Emitir un registro crítico de "kern": python emit_log_topic.py "kern.critical" "A critical kernel error"

El código es casi el mismo que en el tutorial anterior.

RPC (Próximamente)