diff --git a/catch-all/05_infra_test/02_rabbitmq/04_routing/emit_log_direct.py b/catch-all/05_infra_test/02_rabbitmq/04_routing/emit_log_direct.py new file mode 100644 index 0000000..be81de8 --- /dev/null +++ b/catch-all/05_infra_test/02_rabbitmq/04_routing/emit_log_direct.py @@ -0,0 +1,141 @@ +#!/usr/bin/env python +import pika +import sys +import argparse +import time +import random +import datetime +import threading +import signal + + +def establish_connection(host: str, port: int): + """Establece la conexión con RabbitMQ.""" + + try: + connection = pika.BlockingConnection( + pika.ConnectionParameters(host=host, port=port) + ) + + return connection + + except pika.exceptions.AMQPConnectionError as e: + print(f"\n[!] Error al conectar con RabbitMQ: {e}") + sys.exit(1) + + +def publish_message(channel, exchange: str, severity: str, message: str): + """Publica un mensaje en el intercambio especificado.""" + + try: + channel.basic_publish( + exchange=exchange, + routing_key=severity, + body=message + ) + print(f"[i] Sent {severity}:{message}") + + except Exception as e: + print(f"\n[!] Error al enviar mensaje: {e}") + sys.exit(1) + + +def send_messages_periodically(channel, exchange_name, severity, base_message): + """Envía mensajes periódicamente cada 5 segundos hasta que se detenga.""" + + try: + while not stop_event.is_set(): + # Generar número aleatorio para identificar el envío + random_number = random.randint(1000, 9999) + + # Obtener la fecha y hora actual + current_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') + + # Crear el mensaje con la fecha, hora y número aleatorio + message = f"{base_message} [{random_number}] at {current_time}" + + # Publicar el mensaje + publish_message(channel, exchange_name, severity, message) + + # Esperar 5 segundos antes de enviar el siguiente mensaje + # Comprobación del evento de parada durante la espera + for _ in range(50): + if stop_event.is_set(): + break + + time.sleep(0.1) + + except KeyboardInterrupt: + stop_event.set() + + +def user_input_thread(): + """Hilo para capturar la entrada del usuario.""" + + while not stop_event.is_set(): + user_input = input() + if user_input.strip().lower() == 'q': + stop_event.set() + + +def signal_handler(sig, frame): + """Manejador de señal para terminar el programa.""" + + stop_event.set() + + +def main(): + + parser = argparse.ArgumentParser( + description='Envía mensajes a RabbitMQ usando un intercambio directo.') + parser.add_argument('--host', type=str, default='localhost', + help='El host de RabbitMQ (por defecto: localhost)') + parser.add_argument('--port', type=int, default=5672, + help='El puerto de RabbitMQ (por defecto: 5672)') + parser.add_argument('severity', type=str, nargs='?', + default='info', help='La severidad del mensaje') + parser.add_argument('message', type=str, nargs='*', + default=['Hello', 'World!'], help='El mensaje a enviar') + + args = parser.parse_args() + + # Establecer conexión + connection = establish_connection(args.host, args.port) + channel = connection.channel() + + # Declarar intercambio + exchange_name = 'direct_logs' + channel.exchange_declare(exchange=exchange_name, exchange_type='direct') + + # Mensaje base + base_message = ' '.join(args.message) + + # Crear un hilo para el envío periódico de mensajes + send_thread = threading.Thread(target=send_messages_periodically, args=( + channel, exchange_name, args.severity, base_message)) + send_thread.start() + + # Crear un hilo para capturar la entrada del usuario + input_thread = threading.Thread(target=user_input_thread) + input_thread.start() + + print("Presiona 'q' para detener el envío de mensajes.") + + # Esperar a que los hilos terminen antes de cerrar la conexión + send_thread.join() + input_thread.join() + + # Cerrar conexión + connection.close() + print("Conexión cerrada. Programa terminado.") + + +if __name__ == '__main__': + + # Crear un evento para detener el envío de mensajes + stop_event = threading.Event() + + # Configurar el manejador de señales para terminar el programa + signal.signal(signal.SIGINT, signal_handler) + + main() diff --git a/catch-all/05_infra_test/02_rabbitmq/04_routing/receive_logs_direct.py b/catch-all/05_infra_test/02_rabbitmq/04_routing/receive_logs_direct.py new file mode 100644 index 0000000..8e9e7ba --- /dev/null +++ b/catch-all/05_infra_test/02_rabbitmq/04_routing/receive_logs_direct.py @@ -0,0 +1,86 @@ +#!/usr/bin/env python +import pika +import sys +import argparse +import signal + + +def establish_connection(host: str, port: int): + """Establece la conexión con RabbitMQ.""" + + try: + connection = pika.BlockingConnection( + pika.ConnectionParameters(host=host, port=port) + ) + + return connection + + except pika.exceptions.AMQPConnectionError as e: + print(f"\n[!] Error al conectar con RabbitMQ: {e}") + sys.exit(1) + + +def declare_exchange_and_queue(channel, exchange_name: str, severities: list): + """Declara el intercambio y las colas necesarias.""" + + channel.exchange_declare(exchange=exchange_name, exchange_type='direct') + + # Crear una cola exclusiva + result = channel.queue_declare(queue='', exclusive=True) + queue_name = result.method.queue + + # Vincular la cola al intercambio para cada severidad especificada + for severity in severities: + channel.queue_bind( + exchange=exchange_name, queue=queue_name, routing_key=severity) + + return queue_name + + +def callback(ch, method, properties, body): + """Función de callback para procesar mensajes recibidos.""" + + print(f" [i] {method.routing_key}:{body.decode()}") + + +def main(): + + parser = argparse.ArgumentParser( + description='Recibe mensajes de RabbitMQ usando un intercambio directo.') + parser.add_argument('--host', type=str, default='localhost', + help='El host de RabbitMQ (por defecto: localhost)') + parser.add_argument('--port', type=int, default=5672, + help='El puerto de RabbitMQ (por defecto: 5672)') + parser.add_argument('severities', metavar='S', type=str, nargs='+', + help='Lista de severidades a recibir (info, warning, error)') + + args = parser.parse_args() + + # Establecer conexión + connection = establish_connection(args.host, args.port) + channel = connection.channel() + + # Declarar el intercambio y las colas + exchange_name = 'direct_logs' + queue_name = declare_exchange_and_queue( + channel, exchange_name, args.severities) + + print('\n[!] Esperando logs. Para salir presionar CTRL+C') + + # Iniciar el consumo de mensajes + channel.basic_consume( + queue=queue_name, on_message_callback=callback, auto_ack=True) + + # Manejar Ctrl+C para detener el programa + try: + channel.start_consuming() + + except KeyboardInterrupt: + print("\nInterrupción recibida. Cerrando conexión...") + connection.close() + print("\n[!] Conexión cerrada. Programa terminado.") + + +if __name__ == '__main__': + + main() diff --git a/catch-all/05_infra_test/02_rabbitmq/README.md b/catch-all/05_infra_test/02_rabbitmq/README.md index f5d4710..06228e6 100644 --- a/catch-all/05_infra_test/02_rabbitmq/README.md +++ b/catch-all/05_infra_test/02_rabbitmq/README.md @@ -7,18 +7,27 @@ - [Hello World](#hello-world) - [Work Queues](#work-queues) - [Publish/Subscribe](#publishsubscribe) - - [Routing (Próximamente)](#routing-próximamente) + - [Routing](#routing) + - [Enlaces](#enlaces) + - [Intercambio Directo](#intercambio-directo) + - [Múltiples Enlaces](#múltiples-enlaces) + - [Emisión de Logs](#emisión-de-logs) + - [Suscripción](#suscripción) + - [Código de Ejemplo](#código-de-ejemplo) + - [Ejemplos de Uso](#ejemplos-de-uso) - [Topics (Próximamente)](#topics-próximamente) - [RPC (Próximamente)](#rpc-próximamente) ## Despliegue rabbitmq con docker +Para desplegar RabbitMQ rápidamente, puedes usar Docker. Ejecuta el siguiente comando para iniciar un contenedor con RabbitMQ y su consola de gestión: + ```bash docker run -d --hostname my-rabbit --name some-rabbit -p 8080:15672 -p 5672:5672 rabbitmq:3-management ``` -Con [docker-compose.yaml](./docker-compose.yaml): +Si prefieres usar docker-compose, utiliza el archivo [docker-compose.yaml](./docker-compose.yaml) con el siguiente comando: ```bash docker compose up -d @@ -41,7 +50,9 @@ Tenemos que diferenciar algunos conceptos: Vamos a programar un producer y un consumer en Python. -RabbitMQ habla múltiples protocolos. Este tutorial utiliza AMQP 0-9-1, que es un protocolo abierto de propósito general para mensajería. Hay un gran número de clientes para RabbitMQ en muchos idiomas diferentes. En esta serie de tutoriales vamos a usar Pika 1.0.0, que es el cliente Python recomendado por el equipo de RabbitMQ. Para instalarlo puedes usar la herramienta de gestión de paquetes pip: +RabbitMQ habla múltiples protocolos. Este tutorial utiliza AMQP 0-9-1, que es un protocolo abierto de propósito general para mensajería. + +Hay un gran número de clientes para RabbitMQ en muchos idiomas diferentes. En esta serie de tutoriales vamos a usar Pika 1.0.0, que es el cliente Python recomendado por el equipo de RabbitMQ. Para instalarlo puedes usar la herramienta de gestión de paquetes pip: ```bash pip install pika --upgrade @@ -159,10 +170,145 @@ result = channel.queue_declare(queue='', exclusive=True) ``` -### Routing (Próximamente) +### Routing + +En el tutorial anterior, creamos un sistema de registro simple que enviaba mensajes de log a múltiples receptores. En este tutorial, añadiremos la capacidad de suscribirse solo a un subconjunto de mensajes, permitiendo, por ejemplo, que solo los mensajes de error críticos se registren en un archivo, mientras que todos los mensajes de log se imprimen en la consola. +#### Enlaces +En ejemplos anteriores, ya creamos enlaces entre intercambios (exchanges) y colas (queues). Un enlace determina qué colas están interesadas en los mensajes de un intercambio. Los enlaces pueden incluir una clave de enrutamiento (routing key) que especifica qué mensajes de un intercambio deben ser enviados a una cola. + +```python +channel.queue_bind(exchange=exchange_name, + queue=queue_name, + routing_key='black') +``` + +La clave de enlace depende del tipo de intercambio. En un intercambio de tipo `fanout`, esta clave es ignorada. + +#### Intercambio Directo + +Anteriormente, usamos un intercambio de tipo `fanout` que transmitía todos los mensajes a todos los consumidores sin distinción. Ahora utilizaremos un intercambio `direct` que permite filtrar mensajes basándose en su severidad. Así, los mensajes serán enviados solo a las colas que coincidan exactamente con la clave de enrutamiento del mensaje. + +Por ejemplo, si un intercambio tiene dos colas con claves de enlace `orange` y `black`, un mensaje con clave de enrutamiento `orange` solo irá a la cola correspondiente a `orange`. + +#### Múltiples Enlaces + +Es posible vincular varias colas con la misma clave de enlace. En este caso, el intercambio `direct` actúa como un `fanout`, enviando el mensaje a todas las colas que tengan una clave de enlace coincidente. + +#### Emisión de Logs + +Usaremos este modelo para nuestro sistema de logs. En lugar de `fanout`, enviaremos mensajes a un intercambio `direct`, usando la severidad del log como clave de enrutamiento. + +Primero, debemos declarar un intercambio: + +```python +channel.exchange_declare(exchange='direct_logs', + exchange_type='direct') +``` + +Y luego podemos enviar un mensaje: + +```python +channel.basic_publish(exchange='direct_logs', + routing_key=severity, + body=message) +``` + +Las severidades pueden ser `'info'`, `'warning'` o `'error'`. + +#### Suscripción + +Para recibir mensajes, crearemos un enlace para cada severidad de interés. + +```python +result = channel.queue_declare(queue='', exclusive=True) +queue_name = result.method.queue + +for severity in severities: + channel.queue_bind(exchange='direct_logs', + queue=queue_name, + routing_key=severity) +``` + +#### Código de Ejemplo + +- **`emit_log_direct.py`**: Script para emitir mensajes de log. + +```python +#!/usr/bin/env python +import pika +import sys + +connection = pika.BlockingConnection( + pika.ConnectionParameters(host='localhost')) +channel = connection.channel() + +channel.exchange_declare(exchange='direct_logs', exchange_type='direct') + +severity = sys.argv[1] if len(sys.argv) > 1 else 'info' +message = ' '.join(sys.argv[2:]) or 'Hello World!' +channel.basic_publish( + exchange='direct_logs', routing_key=severity, body=message) +print(f" [x] Sent {severity}:{message}") +connection.close() +``` + +- **`receive_logs_direct.py`**: Script para recibir mensajes de log. + +```python +#!/usr/bin/env python +import pika +import sys + +connection = pika.BlockingConnection( + pika.ConnectionParameters(host='localhost')) +channel = connection.channel() + +channel.exchange_declare(exchange='direct_logs', exchange_type='direct') + +result = channel.queue_declare(queue='', exclusive=True) +queue_name = result.method.queue + +severities = sys.argv[1:] +if not severities: + sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) + sys.exit(1) + +for severity in severities: + channel.queue_bind( + exchange='direct_logs', queue=queue_name, routing_key=severity) + +print(' [*] Waiting for logs. To exit press CTRL+C') + + +def callback(ch, method, properties, body): + print(f" [x] {method.routing_key}:{body}") + + +channel.basic_consume( + queue=queue_name, on_message_callback=callback, auto_ack=True) + +channel.start_consuming() +``` + +#### Ejemplos de Uso + +- Para guardar solo los mensajes de `'warning'` y `'error'` en un archivo: + ```bash + python receive_logs_direct.py warning error > logs_from_rabbit.log + ``` + +- Para ver todos los mensajes de log en pantalla: + ```bash + python receive_logs_direct.py info warning error + ``` + +- Para emitir un mensaje de error: + ```bash + python emit_log_direct.py error "Run. Run. Or it will explode." + ``` ### Topics (Próximamente) diff --git a/catch-all/05_infra_test/02_rabbitmq/docker-compose.yaml b/catch-all/05_infra_test/02_rabbitmq/docker-compose.yaml index 1aa341f..f807b0c 100644 --- a/catch-all/05_infra_test/02_rabbitmq/docker-compose.yaml +++ b/catch-all/05_infra_test/02_rabbitmq/docker-compose.yaml @@ -9,10 +9,10 @@ services: volumes: - ./rabbitmq/data/:/var/lib/rabbitmq/ - ./rabbitmq/log/:/var/log/rabbitmq - environment: - RABBITMQ_DEFAULT_USER: invent - RABBITMQ_DEFAULT_PASS: 123456 - RABBITMQ_ERLANG_COOKIE: 'randomcookievalue' + # environment: + # RABBITMQ_DEFAULT_USER: invent + # RABBITMQ_DEFAULT_PASS: 123456 + # RABBITMQ_ERLANG_COOKIE: 'randomcookievalue' networks: - rabbitmq_go_net restart: unless-stopped