From 898da84dc34ea54c88354a17c1004b0ed4ccef79 Mon Sep 17 00:00:00 2001 From: Manuel Vergara Date: Mon, 5 Aug 2024 23:37:04 +0200 Subject: [PATCH] Update rabbitmq test --- .../02_rabbitmq/05_topics/emit_log_topic.py | 114 ++++++++++++++++ .../05_topics/receive_logs_topic.py | 122 +++++++++++++++++ catch-all/05_infra_test/02_rabbitmq/README.md | 124 ++++++++++-------- 3 files changed, 302 insertions(+), 58 deletions(-) create mode 100644 catch-all/05_infra_test/02_rabbitmq/05_topics/emit_log_topic.py create mode 100644 catch-all/05_infra_test/02_rabbitmq/05_topics/receive_logs_topic.py diff --git a/catch-all/05_infra_test/02_rabbitmq/05_topics/emit_log_topic.py b/catch-all/05_infra_test/02_rabbitmq/05_topics/emit_log_topic.py new file mode 100644 index 0000000..90555d6 --- /dev/null +++ b/catch-all/05_infra_test/02_rabbitmq/05_topics/emit_log_topic.py @@ -0,0 +1,114 @@ +#!/usr/bin/env python +import pika +import argparse +import logging +import time +import random +from datetime import datetime + +# Configuración del logger +logging.basicConfig(level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s') + + +def parse_arguments(): + """ + Analiza los argumentos de línea de comandos utilizando argparse. + Devuelve un objeto con los argumentos proporcionados por el usuario. + """ + + parser = argparse.ArgumentParser( + description='Enviar mensajes a un intercambio de tipo "topic" en RabbitMQ.') + parser.add_argument( + 'routing_key', help='La clave de enrutamiento para el mensaje.') + parser.add_argument( + 'message', nargs='*', default=['Hola', 'Mundo!'], help='El mensaje base a enviar.') + + return parser.parse_args() + + +def establish_connection(): + """ + Establece una conexión con RabbitMQ. + Retorna el objeto de conexión si es exitoso. + Salida del programa si hay un error de conexión. + """ + + try: + connection = pika.BlockingConnection( + pika.ConnectionParameters(host='localhost')) + + return connection + + except pika.exceptions.AMQPConnectionError as e: + logging.error('\n[!] Error al conectar con RabbitMQ: %s', e) + sys.exit(1) + + +def declare_exchange(channel): + """ + Declara el intercambio de tipo 'topic'. + Salida del programa si hay un error al declarar el intercambio. + """ + + try: + channel.exchange_declare(exchange='topic_logs', exchange_type='topic') + + except pika.exceptions.ChannelError as e: + logging.error('\n[!] Error al declarar el intercambio: %s', e) + sys.exit(1) + + +def generate_message(base_message): + """ + Genera un mensaje único que incluye un número aleatorio, fecha y hora actual. + """ + + random_id = random.randint( + 1000, 9999) # Genera un ID aleatorio de 4 dígitos + timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') # Fecha y hora actual + + return f"{timestamp} - ({random_id}): {base_message}" + + +def publish_message(channel, routing_key, message): + """ + Publica un mensaje en el intercambio declarado. + """ + + channel.basic_publish(exchange='topic_logs', + routing_key=routing_key, body=message) + logging.info(' [+] Enviado %s:%s', routing_key, message) + + +def main(): + """ + Función principal que orquesta la ejecución del script. + """ + + # Parsear los argumentos de línea de comandos + args = parse_arguments() + routing_key = args.routing_key + base_message = ' '.join(args.message) + + # Establecer conexión y publicar mensaje cada 5 segundos + with establish_connection() as connection: + channel = connection.channel() + declare_exchange(channel) + + try: + while True: + # Generar y publicar el mensaje + message = generate_message(base_message) + publish_message(channel, routing_key, message) + # Espera de 5 segundos antes de enviar el siguiente mensaje + time.sleep(5) + except KeyboardInterrupt: + logging.info("\n[!] Interrupción del usuario. Terminando...") + except Exception as e: + logging.error("\n[!] Se produjo un error inesperado: %s", e) + + +if __name__ == '__main__': + + main() diff --git a/catch-all/05_infra_test/02_rabbitmq/05_topics/receive_logs_topic.py b/catch-all/05_infra_test/02_rabbitmq/05_topics/receive_logs_topic.py new file mode 100644 index 0000000..e8cfdd2 --- /dev/null +++ b/catch-all/05_infra_test/02_rabbitmq/05_topics/receive_logs_topic.py @@ -0,0 +1,122 @@ +#!/usr/bin/env python +import pika +import argparse +import logging +import sys + +# Configuración del logger +logging.basicConfig(level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s') + + +def parse_arguments(): + """ + Analiza los argumentos de línea de comandos utilizando argparse. + Devuelve un objeto con los argumentos proporcionados por el usuario. + """ + + parser = argparse.ArgumentParser( + description='Recibe mensajes de un intercambio de tipo "topic" en RabbitMQ.') + parser.add_argument('binding_keys', nargs='+', + help='Lista de claves de enlace para filtrar los mensajes.') + + return parser.parse_args() + + +def establish_connection(): + """ + Establece una conexión con RabbitMQ. + Retorna el objeto de conexión si es exitoso. + Salida del programa si hay un error de conexión. + """ + + try: + connection = pika.BlockingConnection( + pika.ConnectionParameters(host='localhost')) + return connection + + except pika.exceptions.AMQPConnectionError as e: + logging.error('[!] Error al conectar con RabbitMQ: %s', e) + sys.exit(1) + + +def declare_exchange_and_queue(channel): + """ + Declara el intercambio de tipo 'topic' y una cola exclusiva. + Retorna el nombre de la cola creada. + """ + + try: + channel.exchange_declare(exchange='topic_logs', exchange_type='topic') + result = channel.queue_declare('', exclusive=True) + + return result.method.queue + + except pika.exceptions.ChannelError as e: + logging.error('[!] Error al declarar el intercambio o la cola: %s', e) + sys.exit(1) + + +def bind_queue(channel, queue_name, binding_keys): + """ + Vincula la cola al intercambio con las claves de enlace proporcionadas. + """ + + for binding_key in binding_keys: + try: + channel.queue_bind(exchange='topic_logs', + queue=queue_name, routing_key=binding_key) + logging.info( + ' [i] Cola vinculada con clave de enlace: %s', binding_key) + + except pika.exceptions.ChannelError as e: + logging.error( + '[!] Error al vincular la cola con la clave de enlace %s: %s', binding_key, e) + sys.exit(1) + + +def callback(ch, method, properties, body): + """ + Función de callback que maneja los mensajes recibidos. + """ + + logging.info(' [+] %s: %s', method.routing_key.upper(), body.decode()) + + +def start_consuming(channel, queue_name): + """ + Inicia la recepción de mensajes desde la cola especificada. + """ + + channel.basic_consume( + queue=queue_name, on_message_callback=callback, auto_ack=True) + logging.info(' [i] Esperando mensajes. Para salir presione CTRL+C') + + try: + channel.start_consuming() + + except KeyboardInterrupt: + logging.info(' [!] Interrupción del usuario. Terminando...') + channel.stop_consuming() + + +def main(): + """ + Función principal que orquesta la ejecución del script. + """ + + # Parsear los argumentos de línea de comandos + args = parse_arguments() + binding_keys = args.binding_keys + + # Establecer conexión, declarar intercambio y cola, vincular y comenzar a consumir + with establish_connection() as connection: + channel = connection.channel() + queue_name = declare_exchange_and_queue(channel) + bind_queue(channel, queue_name, binding_keys) + start_consuming(channel, queue_name) + + +if __name__ == '__main__': + + main() diff --git a/catch-all/05_infra_test/02_rabbitmq/README.md b/catch-all/05_infra_test/02_rabbitmq/README.md index 06228e6..5b26ccd 100644 --- a/catch-all/05_infra_test/02_rabbitmq/README.md +++ b/catch-all/05_infra_test/02_rabbitmq/README.md @@ -16,6 +16,11 @@ - [Código de Ejemplo](#código-de-ejemplo) - [Ejemplos de Uso](#ejemplos-de-uso) - [Topics (Próximamente)](#topics-próximamente) + - [¿Qué es un intercambio de temas?](#qué-es-un-intercambio-de-temas) + - [Casos especiales de `binding_key`](#casos-especiales-de-binding_key) + - [Ejemplo de uso](#ejemplo-de-uso) + - [Características del intercambio de temas](#características-del-intercambio-de-temas) + - [Implementación del sistema de registro](#implementación-del-sistema-de-registro) - [RPC (Próximamente)](#rpc-próximamente) @@ -137,12 +142,13 @@ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged A diferencia de las colas de trabajo, donde cada tarea se entrega a un solo trabajador, este tutorial demuestra el patrón de publicación/suscripción, que entrega mensajes a múltiples consumidores. +![](https://external-content.duckduckgo.com/iu/?u=https%3A%2F%2Fwww.myanglog.com%2Fstatic%2Fe63b9118f1113a569006637046857099%2F7842b%2FUntitled1.png) + El ejemplo es un sistema de registro con dos programas: uno que emite mensajes de registro y otro que los recibe y los imprime. Cada instancia del programa receptor recibe todos los mensajes, permitiendo que los registros se dirijan al disco o se visualicen en pantalla. **Enfoque:** -A diferencia de las colas de trabajo, donde cada tarea se entrega a un solo trabajador, este tutorial demuestra el patrón de publicación/suscripción, que entrega mensajes a múltiples consumidores. El ejemplo es un sistema de registro con dos programas: uno que emite mensajes de registro y otro que los recibe y los imprime. @@ -168,12 +174,18 @@ Ejemplo de declaración de una cola temporal: ```python result = channel.queue_declare(queue='', exclusive=True) ``` +**Código de Ejemplo:** + +- [emit_log.py](./03_publish_subcribe/emit_log.py) para enviar mensajes de log. +- [receive_logs.py](./03_publish_subcribe/receive_logs.py) para recibir mensajes de log. ### Routing En el tutorial anterior, creamos un sistema de registro simple que enviaba mensajes de log a múltiples receptores. En este tutorial, añadiremos la capacidad de suscribirse solo a un subconjunto de mensajes, permitiendo, por ejemplo, que solo los mensajes de error críticos se registren en un archivo, mientras que todos los mensajes de log se imprimen en la consola. +![](https://external-content.duckduckgo.com/iu/?u=https%3A%2F%2Froytuts.com%2Fwp-content%2Fuploads%2F2022%2F02%2Fimage-2.png&f=1&nofb=1&ipt=7fde4343c19d799a6854664651f05dba983d70b2da4179f5b1f9d1e045a941d9&ipo=images) + #### Enlaces @@ -234,64 +246,9 @@ for severity in severities: #### Código de Ejemplo -- **`emit_log_direct.py`**: Script para emitir mensajes de log. +- **[emit_log_direct.py](./04_routing/emit_log_direct.py)**: Script para emitir mensajes de log. +- **[receive_logs_direct.py](./04_routing/receive_logs_direct.py)**: Script para recibir mensajes de log. -```python -#!/usr/bin/env python -import pika -import sys - -connection = pika.BlockingConnection( - pika.ConnectionParameters(host='localhost')) -channel = connection.channel() - -channel.exchange_declare(exchange='direct_logs', exchange_type='direct') - -severity = sys.argv[1] if len(sys.argv) > 1 else 'info' -message = ' '.join(sys.argv[2:]) or 'Hello World!' -channel.basic_publish( - exchange='direct_logs', routing_key=severity, body=message) -print(f" [x] Sent {severity}:{message}") -connection.close() -``` - -- **`receive_logs_direct.py`**: Script para recibir mensajes de log. - -```python -#!/usr/bin/env python -import pika -import sys - -connection = pika.BlockingConnection( - pika.ConnectionParameters(host='localhost')) -channel = connection.channel() - -channel.exchange_declare(exchange='direct_logs', exchange_type='direct') - -result = channel.queue_declare(queue='', exclusive=True) -queue_name = result.method.queue - -severities = sys.argv[1:] -if not severities: - sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) - sys.exit(1) - -for severity in severities: - channel.queue_bind( - exchange='direct_logs', queue=queue_name, routing_key=severity) - -print(' [*] Waiting for logs. To exit press CTRL+C') - - -def callback(ch, method, properties, body): - print(f" [x] {method.routing_key}:{body}") - - -channel.basic_consume( - queue=queue_name, on_message_callback=callback, auto_ack=True) - -channel.start_consuming() -``` #### Ejemplos de Uso @@ -313,13 +270,64 @@ channel.start_consuming() ### Topics (Próximamente) +En el tutorial anterior, mejoramos nuestro sistema de registro utilizando un intercambio de tipo `direct` para recibir registros selectivamente, basado en criterios como la severidad del mensaje. Sin embargo, para mayor flexibilidad, podemos usar un intercambio de tipo `topic`, que permite el enrutamiento de mensajes basado en múltiples criterios. +![](https://miro.medium.com/v2/resize:fit:1400/0*gFwb04MsfqtVB5bY.png) +#### ¿Qué es un intercambio de temas? + +- **`routing_key`**: En un intercambio de tipo `topic`, los mensajes tienen una clave de enrutamiento (`routing_key`) que es una lista de palabras separadas por puntos. Ejemplos: `"quick.orange.rabbit"`, `"lazy.brown.fox"`. +- **`binding_key`**: Las claves de enlace (`binding_key`) también tienen el mismo formato y determinan qué mensajes recibe cada cola. + +#### Casos especiales de `binding_key` + +- **`*` (asterisco)**: Sustituye exactamente una palabra. +- **`#` (almohadilla)**: Sustituye cero o más palabras. + +#### Ejemplo de uso + +Considera el siguiente escenario con dos colas (Q1 y Q2) y estas claves de enlace: + +- Q1: `*.orange.*` (recibe todos los mensajes sobre animales naranjas) +- Q2: `*.*.rabbit` y `lazy.#` (recibe todos los mensajes sobre conejos y animales perezosos) + +Ejemplos de mensajes: + +- `"quick.orange.rabbit"`: Entregado a Q1 y Q2. +- `"lazy.orange.elephant"`: Entregado a Q1 y Q2. +- `"quick.orange.fox"`: Solo entregado a Q1. +- `"lazy.brown.fox"`: Solo entregado a Q2. + +Mensajes con una o cuatro palabras, como `"orange"` o `"quick.orange.new.rabbit"`, no coinciden con ningún enlace y se pierden. + +#### Características del intercambio de temas + +- Puede comportarse como un intercambio `fanout` si se usa `#` como `binding_key` (recibe todos los mensajes). +- Se comporta como un intercambio `direct` si no se utilizan `*` o `#` en las claves de enlace. + +#### Implementación del sistema de registro + +Usaremos un intercambio de temas para enrutar registros usando `routing_key` con el formato `.`. El código para emitir y recibir registros es similar al de tutoriales anteriores. + +**Ejemplos de comandos:** + +- Recibir todos los registros: `python receive_logs_topic.py "#"` +- Recibir registros de "kern": `python receive_logs_topic.py "kern.*"` +- Recibir solo registros "critical": `python receive_logs_topic.py "*.critical"` +- Emitir un registro crítico de "kern": `python emit_log_topic.py "kern.critical" "A critical kernel error"` + +El código es casi el mismo que en el tutorial anterior. + +- **[emit_log_topic.py](./05_topics/emit_log_topic.py)** +- **[receive_logs_topic.py](./05_topics/receive_logs_topic)** ### RPC (Próximamente) + ![](https://alvaro-videla.com/images/RPC-OverRMQ.png) + +