Update rabbitmq test

This commit is contained in:
Manuel Vergara 2024-08-04 20:56:58 +02:00
parent 9a60b44822
commit 78552c227a
4 changed files with 381 additions and 8 deletions

View File

@ -0,0 +1,141 @@
#!/usr/bin/env python
import pika
import sys
import argparse
import time
import random
import datetime
import threading
import signal
def establish_connection(host: str, port: int):
"""Establece la conexión con RabbitMQ."""
try:
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=host, port=port)
)
return connection
except pika.exceptions.AMQPConnectionError as e:
print(f"\n[!] Error al conectar con RabbitMQ: {e}")
sys.exit(1)
def publish_message(channel, exchange: str, severity: str, message: str):
"""Publica un mensaje en el intercambio especificado."""
try:
channel.basic_publish(
exchange=exchange,
routing_key=severity,
body=message
)
print(f"[i] Sent {severity}:{message}")
except Exception as e:
print(f"\n[!] Error al enviar mensaje: {e}")
sys.exit(1)
def send_messages_periodically(channel, exchange_name, severity, base_message):
"""Envía mensajes periódicamente cada 5 segundos hasta que se detenga."""
try:
while not stop_event.is_set():
# Generar número aleatorio para identificar el envío
random_number = random.randint(1000, 9999)
# Obtener la fecha y hora actual
current_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# Crear el mensaje con la fecha, hora y número aleatorio
message = f"{base_message} [{random_number}] at {current_time}"
# Publicar el mensaje
publish_message(channel, exchange_name, severity, message)
# Esperar 5 segundos antes de enviar el siguiente mensaje
# Comprobación del evento de parada durante la espera
for _ in range(50):
if stop_event.is_set():
break
time.sleep(0.1)
except KeyboardInterrupt:
stop_event.set()
def user_input_thread():
"""Hilo para capturar la entrada del usuario."""
while not stop_event.is_set():
user_input = input()
if user_input.strip().lower() == 'q':
stop_event.set()
def signal_handler(sig, frame):
"""Manejador de señal para terminar el programa."""
stop_event.set()
def main():
parser = argparse.ArgumentParser(
description='Envía mensajes a RabbitMQ usando un intercambio directo.')
parser.add_argument('--host', type=str, default='localhost',
help='El host de RabbitMQ (por defecto: localhost)')
parser.add_argument('--port', type=int, default=5672,
help='El puerto de RabbitMQ (por defecto: 5672)')
parser.add_argument('severity', type=str, nargs='?',
default='info', help='La severidad del mensaje')
parser.add_argument('message', type=str, nargs='*',
default=['Hello', 'World!'], help='El mensaje a enviar')
args = parser.parse_args()
# Establecer conexión
connection = establish_connection(args.host, args.port)
channel = connection.channel()
# Declarar intercambio
exchange_name = 'direct_logs'
channel.exchange_declare(exchange=exchange_name, exchange_type='direct')
# Mensaje base
base_message = ' '.join(args.message)
# Crear un hilo para el envío periódico de mensajes
send_thread = threading.Thread(target=send_messages_periodically, args=(
channel, exchange_name, args.severity, base_message))
send_thread.start()
# Crear un hilo para capturar la entrada del usuario
input_thread = threading.Thread(target=user_input_thread)
input_thread.start()
print("Presiona 'q' para detener el envío de mensajes.")
# Esperar a que los hilos terminen antes de cerrar la conexión
send_thread.join()
input_thread.join()
# Cerrar conexión
connection.close()
print("Conexión cerrada. Programa terminado.")
if __name__ == '__main__':
# Crear un evento para detener el envío de mensajes
stop_event = threading.Event()
# Configurar el manejador de señales para terminar el programa
signal.signal(signal.SIGINT, signal_handler)
main()

View File

@ -0,0 +1,86 @@
#!/usr/bin/env python
import pika
import sys
import argparse
import signal
def establish_connection(host: str, port: int):
"""Establece la conexión con RabbitMQ."""
try:
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=host, port=port)
)
return connection
except pika.exceptions.AMQPConnectionError as e:
print(f"\n[!] Error al conectar con RabbitMQ: {e}")
sys.exit(1)
def declare_exchange_and_queue(channel, exchange_name: str, severities: list):
"""Declara el intercambio y las colas necesarias."""
channel.exchange_declare(exchange=exchange_name, exchange_type='direct')
# Crear una cola exclusiva
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
# Vincular la cola al intercambio para cada severidad especificada
for severity in severities:
channel.queue_bind(
exchange=exchange_name, queue=queue_name, routing_key=severity)
return queue_name
def callback(ch, method, properties, body):
"""Función de callback para procesar mensajes recibidos."""
print(f" [i] {method.routing_key}:{body.decode()}")
def main():
parser = argparse.ArgumentParser(
description='Recibe mensajes de RabbitMQ usando un intercambio directo.')
parser.add_argument('--host', type=str, default='localhost',
help='El host de RabbitMQ (por defecto: localhost)')
parser.add_argument('--port', type=int, default=5672,
help='El puerto de RabbitMQ (por defecto: 5672)')
parser.add_argument('severities', metavar='S', type=str, nargs='+',
help='Lista de severidades a recibir (info, warning, error)')
args = parser.parse_args()
# Establecer conexión
connection = establish_connection(args.host, args.port)
channel = connection.channel()
# Declarar el intercambio y las colas
exchange_name = 'direct_logs'
queue_name = declare_exchange_and_queue(
channel, exchange_name, args.severities)
print('\n[!] Esperando logs. Para salir presionar CTRL+C')
# Iniciar el consumo de mensajes
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
# Manejar Ctrl+C para detener el programa
try:
channel.start_consuming()
except KeyboardInterrupt:
print("\nInterrupción recibida. Cerrando conexión...")
connection.close()
print("\n[!] Conexión cerrada. Programa terminado.")
if __name__ == '__main__':
main()

View File

@ -7,18 +7,27 @@
- [Hello World](#hello-world)
- [Work Queues](#work-queues)
- [Publish/Subscribe](#publishsubscribe)
- [Routing (Próximamente)](#routing-próximamente)
- [Routing](#routing)
- [Enlaces](#enlaces)
- [Intercambio Directo](#intercambio-directo)
- [Múltiples Enlaces](#múltiples-enlaces)
- [Emisión de Logs](#emisión-de-logs)
- [Suscripción](#suscripción)
- [Código de Ejemplo](#código-de-ejemplo)
- [Ejemplos de Uso](#ejemplos-de-uso)
- [Topics (Próximamente)](#topics-próximamente)
- [RPC (Próximamente)](#rpc-próximamente)
## Despliegue rabbitmq con docker
Para desplegar RabbitMQ rápidamente, puedes usar Docker. Ejecuta el siguiente comando para iniciar un contenedor con RabbitMQ y su consola de gestión:
```bash
docker run -d --hostname my-rabbit --name some-rabbit -p 8080:15672 -p 5672:5672 rabbitmq:3-management
```
Con [docker-compose.yaml](./docker-compose.yaml):
Si prefieres usar docker-compose, utiliza el archivo [docker-compose.yaml](./docker-compose.yaml) con el siguiente comando:
```bash
docker compose up -d
@ -41,7 +50,9 @@ Tenemos que diferenciar algunos conceptos:
Vamos a programar un producer y un consumer en Python.
RabbitMQ habla múltiples protocolos. Este tutorial utiliza AMQP 0-9-1, que es un protocolo abierto de propósito general para mensajería. Hay un gran número de clientes para RabbitMQ en muchos idiomas diferentes. En esta serie de tutoriales vamos a usar Pika 1.0.0, que es el cliente Python recomendado por el equipo de RabbitMQ. Para instalarlo puedes usar la herramienta de gestión de paquetes pip:
RabbitMQ habla múltiples protocolos. Este tutorial utiliza AMQP 0-9-1, que es un protocolo abierto de propósito general para mensajería.
Hay un gran número de clientes para RabbitMQ en muchos idiomas diferentes. En esta serie de tutoriales vamos a usar Pika 1.0.0, que es el cliente Python recomendado por el equipo de RabbitMQ. Para instalarlo puedes usar la herramienta de gestión de paquetes pip:
```bash
pip install pika --upgrade
@ -159,10 +170,145 @@ result = channel.queue_declare(queue='', exclusive=True)
```
### Routing (Próximamente)
### Routing
En el tutorial anterior, creamos un sistema de registro simple que enviaba mensajes de log a múltiples receptores. En este tutorial, añadiremos la capacidad de suscribirse solo a un subconjunto de mensajes, permitiendo, por ejemplo, que solo los mensajes de error críticos se registren en un archivo, mientras que todos los mensajes de log se imprimen en la consola.
#### Enlaces
En ejemplos anteriores, ya creamos enlaces entre intercambios (exchanges) y colas (queues). Un enlace determina qué colas están interesadas en los mensajes de un intercambio. Los enlaces pueden incluir una clave de enrutamiento (routing key) que especifica qué mensajes de un intercambio deben ser enviados a una cola.
```python
channel.queue_bind(exchange=exchange_name,
queue=queue_name,
routing_key='black')
```
La clave de enlace depende del tipo de intercambio. En un intercambio de tipo `fanout`, esta clave es ignorada.
#### Intercambio Directo
Anteriormente, usamos un intercambio de tipo `fanout` que transmitía todos los mensajes a todos los consumidores sin distinción. Ahora utilizaremos un intercambio `direct` que permite filtrar mensajes basándose en su severidad. Así, los mensajes serán enviados solo a las colas que coincidan exactamente con la clave de enrutamiento del mensaje.
Por ejemplo, si un intercambio tiene dos colas con claves de enlace `orange` y `black`, un mensaje con clave de enrutamiento `orange` solo irá a la cola correspondiente a `orange`.
#### Múltiples Enlaces
Es posible vincular varias colas con la misma clave de enlace. En este caso, el intercambio `direct` actúa como un `fanout`, enviando el mensaje a todas las colas que tengan una clave de enlace coincidente.
#### Emisión de Logs
Usaremos este modelo para nuestro sistema de logs. En lugar de `fanout`, enviaremos mensajes a un intercambio `direct`, usando la severidad del log como clave de enrutamiento.
Primero, debemos declarar un intercambio:
```python
channel.exchange_declare(exchange='direct_logs',
exchange_type='direct')
```
Y luego podemos enviar un mensaje:
```python
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)
```
Las severidades pueden ser `'info'`, `'warning'` o `'error'`.
#### Suscripción
Para recibir mensajes, crearemos un enlace para cada severidad de interés.
```python
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)
```
#### Código de Ejemplo
- **`emit_log_direct.py`**: Script para emitir mensajes de log.
```python
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(
exchange='direct_logs', routing_key=severity, body=message)
print(f" [x] Sent {severity}:{message}")
connection.close()
```
- **`receive_logs_direct.py`**: Script para recibir mensajes de log.
```python
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
severities = sys.argv[1:]
if not severities:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1)
for severity in severities:
channel.queue_bind(
exchange='direct_logs', queue=queue_name, routing_key=severity)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(f" [x] {method.routing_key}:{body}")
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
```
#### Ejemplos de Uso
- Para guardar solo los mensajes de `'warning'` y `'error'` en un archivo:
```bash
python receive_logs_direct.py warning error > logs_from_rabbit.log
```
- Para ver todos los mensajes de log en pantalla:
```bash
python receive_logs_direct.py info warning error
```
- Para emitir un mensaje de error:
```bash
python emit_log_direct.py error "Run. Run. Or it will explode."
```
### Topics (Próximamente)

View File

@ -9,10 +9,10 @@ services:
volumes:
- ./rabbitmq/data/:/var/lib/rabbitmq/
- ./rabbitmq/log/:/var/log/rabbitmq
environment:
RABBITMQ_DEFAULT_USER: invent
RABBITMQ_DEFAULT_PASS: 123456
RABBITMQ_ERLANG_COOKIE: 'randomcookievalue'
# environment:
# RABBITMQ_DEFAULT_USER: invent
# RABBITMQ_DEFAULT_PASS: 123456
# RABBITMQ_ERLANG_COOKIE: 'randomcookievalue'
networks:
- rabbitmq_go_net
restart: unless-stopped