Add rabbitmq test

This commit is contained in:
Manuel Vergara 2024-07-31 23:27:53 +02:00
parent 0daba91bbb
commit 50513ff393
8 changed files with 223 additions and 4 deletions

View File

@ -0,0 +1,100 @@
#!/usr/bin/env python
import argparse
import logging
import pika
import sys
import threading
import time
from datetime import datetime
from random import randint
def main():
# Configuración de logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
# Configuración de argparse para manejar argumentos de línea de comandos
parser = argparse.ArgumentParser(
description="Envía un mensaje al intercambio de logs en RabbitMQ."
)
parser.add_argument(
'message',
nargs='*',
help='El mensaje a enviar. Si no se especifica, se enviará "Traza de log"'
)
parser.add_argument(
'--host', default='localhost',
help='El host de RabbitMQ (default: localhost)'
)
parser.add_argument(
'--user', default='invent',
help='El usuario de RabbitMQ (default: invent)'
)
parser.add_argument(
'--password', default='123456',
help='La contraseña de RabbitMQ (default: 123456)'
)
args = parser.parse_args()
# Crear el mensaje base
base_message = ' '.join(args.message) or "Traza de log"
stop_sending = threading.Event()
def send_messages():
credentials = pika.PlainCredentials(args.user, args.password)
try:
# Establecer conexión con RabbitMQ
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=args.host, credentials=credentials)
)
channel = connection.channel()
# Declarar el intercambio de tipo 'fanout'
channel.exchange_declare(exchange='logs', exchange_type='fanout')
while not stop_sending.is_set():
# Crear mensaje con fecha y hora actual
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
num_logs = randint(1, 1000)
message = f"{current_time} - CUSTOM_LOG - {base_message}: Número aleatorio: {num_logs}"
# Publicar el mensaje en el intercambio
channel.basic_publish(
exchange='logs', routing_key='', body=message)
logging.info(f"[+] Sent {message}")
# Esperar 5 segundos antes de enviar el siguiente mensaje
time.sleep(5)
except pika.exceptions.AMQPConnectionError as e:
logging.error(f"[!] No se pudo conectar a RabbitMQ: {e}")
except Exception as e:
logging.error(f"[!] Ocurrió un error: {e}")
finally:
# Cerrar la conexión
if 'connection' in locals() and connection.is_open:
connection.close()
# Iniciar el hilo que enviará mensajes
sender_thread = threading.Thread(target=send_messages)
sender_thread.start()
try:
# Esperar a que el usuario introduzca 'q' para detener el envío de mensajes
while True:
user_input = input()
if user_input.strip().lower() == 'q':
stop_sending.set()
sender_thread.join()
break
except KeyboardInterrupt:
stop_sending.set()
sender_thread.join()
logging.info("Interrupción del usuario recibida. Saliendo...")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,75 @@
#!/usr/bin/env python
import pika
import logging
import argparse
def main():
# Configuración de logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
# Configuración de argparse para manejar argumentos de línea de comandos
parser = argparse.ArgumentParser(
description="Escucha mensajes del intercambio de logs en RabbitMQ."
)
parser.add_argument(
'--host', default='localhost',
help='El host de RabbitMQ (default: localhost)'
)
parser.add_argument(
'--user', default='invent',
help='El usuario de RabbitMQ (default: invent)'
)
parser.add_argument(
'--password', default='123456',
help='La contraseña de RabbitMQ (default: 123456)'
)
args = parser.parse_args()
credentials = pika.PlainCredentials(args.user, args.password)
try:
# Establecer conexión con RabbitMQ
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=args.host, credentials=credentials)
)
channel = connection.channel()
# Declarar el intercambio de tipo 'fanout'
channel.exchange_declare(exchange='logs', exchange_type='fanout')
# Declarar una cola exclusiva para el consumidor
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
# Enlazar la cola al intercambio de logs
channel.queue_bind(exchange='logs', queue=queue_name)
logging.info(' [*] Waiting for logs. To exit press CTRL+C')
# Función de callback para manejar mensajes entrantes
def callback(ch, method, properties, body):
logging.info(f" [x] Received: {body.decode()}")
# Configurar el consumidor
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
# Iniciar el bucle de consumo
channel.start_consuming()
except pika.exceptions.AMQPConnectionError as e:
logging.error(f"[!] No se pudo conectar a RabbitMQ: {e}")
except KeyboardInterrupt:
logging.info("Interrupción del usuario recibida. Saliendo...")
except Exception as e:
logging.error(f"[!] Ocurrió un error: {e}")
finally:
# Cerrar la conexión si está abierta
if 'connection' in locals() and connection.is_open:
connection.close()
if __name__ == "__main__":
main()

View File

@ -6,7 +6,7 @@
- [Pruebas](#pruebas) - [Pruebas](#pruebas)
- [Hello World](#hello-world) - [Hello World](#hello-world)
- [Work Queues](#work-queues) - [Work Queues](#work-queues)
- [Publish/Subscribe (Próximamente)](#publishsubscribe-próximamente) - [Publish/Subscribe](#publishsubscribe)
- [Routing (Próximamente)](#routing-próximamente) - [Routing (Próximamente)](#routing-próximamente)
- [Topics (Próximamente)](#topics-próximamente) - [Topics (Próximamente)](#topics-próximamente)
- [RPC (Próximamente)](#rpc-próximamente) - [RPC (Próximamente)](#rpc-próximamente)
@ -122,10 +122,41 @@ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
``` ```
### Publish/Subscribe (Próximamente) ### Publish/Subscribe
A diferencia de las colas de trabajo, donde cada tarea se entrega a un solo trabajador, este tutorial demuestra el patrón de publicación/suscripción, que entrega mensajes a múltiples consumidores.
El ejemplo es un sistema de registro con dos programas: uno que emite mensajes de registro y otro que los recibe y los imprime.
Cada instancia del programa receptor recibe todos los mensajes, permitiendo que los registros se dirijan al disco o se visualicen en pantalla.
**Enfoque:**
A diferencia de las colas de trabajo, donde cada tarea se entrega a un solo trabajador, este tutorial demuestra el patrón de publicación/suscripción, que entrega mensajes a múltiples consumidores.
El ejemplo es un sistema de registro con dos programas: uno que emite mensajes de registro y otro que los recibe y los imprime.
Cada instancia del programa receptor recibe todos los mensajes, permitiendo que los registros se dirijan al disco o se visualicen en pantalla.
**Exchange:**
En RabbitMQ, los productores envían mensajes a un intercambio, no directamente a una cola.
Un intercambio enruta los mensajes a las colas según las reglas definidas por su tipo.
Los tipos de intercambios incluyen directo, tópico, cabeceras y fanout. El tutorial se centra en fanout, que transmite mensajes a todas las colas conocidas.
Ejemplo de declaración de un intercambio fanout:
```python
channel.exchange_declare(exchange='logs', exchange_type='fanout')
```
**Colas Temporales:**
Las colas temporales se crean con nombres generados aleatoriamente, y se eliminan automáticamente cuando se cierra la conexión del consumidor.
Ejemplo de declaración de una cola temporal:
```python
result = channel.queue_declare(queue='', exclusive=True)
```
### Routing (Próximamente) ### Routing (Próximamente)

View File

@ -7,10 +7,23 @@ services:
- 5672:5672 - 5672:5672
- 15672:15672 - 15672:15672
volumes: volumes:
- ~/.docker-conf/rabbitmq/data/:/var/lib/rabbitmq/ - ./rabbitmq/data/:/var/lib/rabbitmq/
- ~/.docker-conf/rabbitmq/log/:/var/log/rabbitmq - ./rabbitmq/log/:/var/log/rabbitmq
environment:
RABBITMQ_DEFAULT_USER: invent
RABBITMQ_DEFAULT_PASS: 123456
RABBITMQ_ERLANG_COOKIE: 'randomcookievalue'
networks: networks:
- rabbitmq_go_net - rabbitmq_go_net
restart: unless-stopped
labels:
description: "RabbitMQ Server in container docker"
maintainer: "manuelver"
healthcheck:
test: ["CMD", "rabbitmqctl", "status"]
interval: 10s
timeout: 5s
retries: 3
networks: networks:
rabbitmq_go_net: rabbitmq_go_net: