From 50513ff393944dbb115c4f7b416f2c92707f808d Mon Sep 17 00:00:00 2001 From: Manuel Vergara Date: Wed, 31 Jul 2024 23:27:53 +0200 Subject: [PATCH] Add rabbitmq test --- .../receive.py | 0 .../{01hello-world => 01_hello_world}/send.py | 0 .../new_task.py | 0 .../worker.py | 0 .../03_publish_subcribe/emit_log.py | 100 ++++++++++++++++++ .../03_publish_subcribe/receive_logs.py | 75 +++++++++++++ catch-all/05_infra_test/02_rabbitmq/README.md | 35 +++++- .../02_rabbitmq/docker-compose.yaml | 17 ++- 8 files changed, 223 insertions(+), 4 deletions(-) rename catch-all/05_infra_test/02_rabbitmq/{01hello-world => 01_hello_world}/receive.py (100%) rename catch-all/05_infra_test/02_rabbitmq/{01hello-world => 01_hello_world}/send.py (100%) rename catch-all/05_infra_test/02_rabbitmq/{02work-queues => 02_work_queues}/new_task.py (100%) rename catch-all/05_infra_test/02_rabbitmq/{02work-queues => 02_work_queues}/worker.py (100%) create mode 100644 catch-all/05_infra_test/02_rabbitmq/03_publish_subcribe/emit_log.py create mode 100644 catch-all/05_infra_test/02_rabbitmq/03_publish_subcribe/receive_logs.py diff --git a/catch-all/05_infra_test/02_rabbitmq/01hello-world/receive.py b/catch-all/05_infra_test/02_rabbitmq/01_hello_world/receive.py similarity index 100% rename from catch-all/05_infra_test/02_rabbitmq/01hello-world/receive.py rename to catch-all/05_infra_test/02_rabbitmq/01_hello_world/receive.py diff --git a/catch-all/05_infra_test/02_rabbitmq/01hello-world/send.py b/catch-all/05_infra_test/02_rabbitmq/01_hello_world/send.py similarity index 100% rename from catch-all/05_infra_test/02_rabbitmq/01hello-world/send.py rename to catch-all/05_infra_test/02_rabbitmq/01_hello_world/send.py diff --git a/catch-all/05_infra_test/02_rabbitmq/02work-queues/new_task.py b/catch-all/05_infra_test/02_rabbitmq/02_work_queues/new_task.py similarity index 100% rename from catch-all/05_infra_test/02_rabbitmq/02work-queues/new_task.py rename to catch-all/05_infra_test/02_rabbitmq/02_work_queues/new_task.py diff --git a/catch-all/05_infra_test/02_rabbitmq/02work-queues/worker.py b/catch-all/05_infra_test/02_rabbitmq/02_work_queues/worker.py similarity index 100% rename from catch-all/05_infra_test/02_rabbitmq/02work-queues/worker.py rename to catch-all/05_infra_test/02_rabbitmq/02_work_queues/worker.py diff --git a/catch-all/05_infra_test/02_rabbitmq/03_publish_subcribe/emit_log.py b/catch-all/05_infra_test/02_rabbitmq/03_publish_subcribe/emit_log.py new file mode 100644 index 0000000..a46d00b --- /dev/null +++ b/catch-all/05_infra_test/02_rabbitmq/03_publish_subcribe/emit_log.py @@ -0,0 +1,100 @@ +#!/usr/bin/env python +import argparse +import logging +import pika +import sys +import threading +import time +from datetime import datetime +from random import randint + + +def main(): + # Configuración de logging + logging.basicConfig(level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s') + + # Configuración de argparse para manejar argumentos de línea de comandos + parser = argparse.ArgumentParser( + description="Envía un mensaje al intercambio de logs en RabbitMQ." + ) + parser.add_argument( + 'message', + nargs='*', + help='El mensaje a enviar. Si no se especifica, se enviará "Traza de log"' + ) + parser.add_argument( + '--host', default='localhost', + help='El host de RabbitMQ (default: localhost)' + ) + parser.add_argument( + '--user', default='invent', + help='El usuario de RabbitMQ (default: invent)' + ) + parser.add_argument( + '--password', default='123456', + help='La contraseña de RabbitMQ (default: 123456)' + ) + args = parser.parse_args() + + # Crear el mensaje base + base_message = ' '.join(args.message) or "Traza de log" + + stop_sending = threading.Event() + + def send_messages(): + credentials = pika.PlainCredentials(args.user, args.password) + try: + # Establecer conexión con RabbitMQ + connection = pika.BlockingConnection( + pika.ConnectionParameters( + host=args.host, credentials=credentials) + ) + channel = connection.channel() + + # Declarar el intercambio de tipo 'fanout' + channel.exchange_declare(exchange='logs', exchange_type='fanout') + + while not stop_sending.is_set(): + # Crear mensaje con fecha y hora actual + current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + num_logs = randint(1, 1000) + message = f"{current_time} - CUSTOM_LOG - {base_message}: Número aleatorio: {num_logs}" + + # Publicar el mensaje en el intercambio + channel.basic_publish( + exchange='logs', routing_key='', body=message) + logging.info(f"[+] Sent {message}") + + # Esperar 5 segundos antes de enviar el siguiente mensaje + time.sleep(5) + + except pika.exceptions.AMQPConnectionError as e: + logging.error(f"[!] No se pudo conectar a RabbitMQ: {e}") + except Exception as e: + logging.error(f"[!] Ocurrió un error: {e}") + finally: + # Cerrar la conexión + if 'connection' in locals() and connection.is_open: + connection.close() + + # Iniciar el hilo que enviará mensajes + sender_thread = threading.Thread(target=send_messages) + sender_thread.start() + + try: + # Esperar a que el usuario introduzca 'q' para detener el envío de mensajes + while True: + user_input = input() + if user_input.strip().lower() == 'q': + stop_sending.set() + sender_thread.join() + break + except KeyboardInterrupt: + stop_sending.set() + sender_thread.join() + logging.info("Interrupción del usuario recibida. Saliendo...") + + +if __name__ == "__main__": + main() diff --git a/catch-all/05_infra_test/02_rabbitmq/03_publish_subcribe/receive_logs.py b/catch-all/05_infra_test/02_rabbitmq/03_publish_subcribe/receive_logs.py new file mode 100644 index 0000000..fcaf44f --- /dev/null +++ b/catch-all/05_infra_test/02_rabbitmq/03_publish_subcribe/receive_logs.py @@ -0,0 +1,75 @@ +#!/usr/bin/env python +import pika +import logging +import argparse + + +def main(): + # Configuración de logging + logging.basicConfig(level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s') + + # Configuración de argparse para manejar argumentos de línea de comandos + parser = argparse.ArgumentParser( + description="Escucha mensajes del intercambio de logs en RabbitMQ." + ) + parser.add_argument( + '--host', default='localhost', + help='El host de RabbitMQ (default: localhost)' + ) + parser.add_argument( + '--user', default='invent', + help='El usuario de RabbitMQ (default: invent)' + ) + parser.add_argument( + '--password', default='123456', + help='La contraseña de RabbitMQ (default: 123456)' + ) + args = parser.parse_args() + + credentials = pika.PlainCredentials(args.user, args.password) + + try: + # Establecer conexión con RabbitMQ + connection = pika.BlockingConnection( + pika.ConnectionParameters(host=args.host, credentials=credentials) + ) + channel = connection.channel() + + # Declarar el intercambio de tipo 'fanout' + channel.exchange_declare(exchange='logs', exchange_type='fanout') + + # Declarar una cola exclusiva para el consumidor + result = channel.queue_declare(queue='', exclusive=True) + queue_name = result.method.queue + + # Enlazar la cola al intercambio de logs + channel.queue_bind(exchange='logs', queue=queue_name) + + logging.info(' [*] Waiting for logs. To exit press CTRL+C') + + # Función de callback para manejar mensajes entrantes + def callback(ch, method, properties, body): + logging.info(f" [x] Received: {body.decode()}") + + # Configurar el consumidor + channel.basic_consume( + queue=queue_name, on_message_callback=callback, auto_ack=True) + + # Iniciar el bucle de consumo + channel.start_consuming() + + except pika.exceptions.AMQPConnectionError as e: + logging.error(f"[!] No se pudo conectar a RabbitMQ: {e}") + except KeyboardInterrupt: + logging.info("Interrupción del usuario recibida. Saliendo...") + except Exception as e: + logging.error(f"[!] Ocurrió un error: {e}") + finally: + # Cerrar la conexión si está abierta + if 'connection' in locals() and connection.is_open: + connection.close() + + +if __name__ == "__main__": + main() diff --git a/catch-all/05_infra_test/02_rabbitmq/README.md b/catch-all/05_infra_test/02_rabbitmq/README.md index 6df3abd..f5d4710 100644 --- a/catch-all/05_infra_test/02_rabbitmq/README.md +++ b/catch-all/05_infra_test/02_rabbitmq/README.md @@ -6,7 +6,7 @@ - [Pruebas](#pruebas) - [Hello World](#hello-world) - [Work Queues](#work-queues) - - [Publish/Subscribe (Próximamente)](#publishsubscribe-próximamente) + - [Publish/Subscribe](#publishsubscribe) - [Routing (Próximamente)](#routing-próximamente) - [Topics (Próximamente)](#topics-próximamente) - [RPC (Próximamente)](#rpc-próximamente) @@ -122,10 +122,41 @@ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged ``` -### Publish/Subscribe (Próximamente) +### Publish/Subscribe +A diferencia de las colas de trabajo, donde cada tarea se entrega a un solo trabajador, este tutorial demuestra el patrón de publicación/suscripción, que entrega mensajes a múltiples consumidores. +El ejemplo es un sistema de registro con dos programas: uno que emite mensajes de registro y otro que los recibe y los imprime. +Cada instancia del programa receptor recibe todos los mensajes, permitiendo que los registros se dirijan al disco o se visualicen en pantalla. + +**Enfoque:** +A diferencia de las colas de trabajo, donde cada tarea se entrega a un solo trabajador, este tutorial demuestra el patrón de publicación/suscripción, que entrega mensajes a múltiples consumidores. + +El ejemplo es un sistema de registro con dos programas: uno que emite mensajes de registro y otro que los recibe y los imprime. + +Cada instancia del programa receptor recibe todos los mensajes, permitiendo que los registros se dirijan al disco o se visualicen en pantalla. + +**Exchange:** +En RabbitMQ, los productores envían mensajes a un intercambio, no directamente a una cola. + +Un intercambio enruta los mensajes a las colas según las reglas definidas por su tipo. + +Los tipos de intercambios incluyen directo, tópico, cabeceras y fanout. El tutorial se centra en fanout, que transmite mensajes a todas las colas conocidas. + +Ejemplo de declaración de un intercambio fanout: + +```python +channel.exchange_declare(exchange='logs', exchange_type='fanout') +``` + +**Colas Temporales:** +Las colas temporales se crean con nombres generados aleatoriamente, y se eliminan automáticamente cuando se cierra la conexión del consumidor. +Ejemplo de declaración de una cola temporal: + +```python +result = channel.queue_declare(queue='', exclusive=True) +``` ### Routing (Próximamente) diff --git a/catch-all/05_infra_test/02_rabbitmq/docker-compose.yaml b/catch-all/05_infra_test/02_rabbitmq/docker-compose.yaml index fd46120..1aa341f 100644 --- a/catch-all/05_infra_test/02_rabbitmq/docker-compose.yaml +++ b/catch-all/05_infra_test/02_rabbitmq/docker-compose.yaml @@ -7,10 +7,23 @@ services: - 5672:5672 - 15672:15672 volumes: - - ~/.docker-conf/rabbitmq/data/:/var/lib/rabbitmq/ - - ~/.docker-conf/rabbitmq/log/:/var/log/rabbitmq + - ./rabbitmq/data/:/var/lib/rabbitmq/ + - ./rabbitmq/log/:/var/log/rabbitmq + environment: + RABBITMQ_DEFAULT_USER: invent + RABBITMQ_DEFAULT_PASS: 123456 + RABBITMQ_ERLANG_COOKIE: 'randomcookievalue' networks: - rabbitmq_go_net + restart: unless-stopped + labels: + description: "RabbitMQ Server in container docker" + maintainer: "manuelver" + healthcheck: + test: ["CMD", "rabbitmqctl", "status"] + interval: 10s + timeout: 5s + retries: 3 networks: rabbitmq_go_net: