Compare commits
2 Commits
78552c227a
...
898da84dc3
Author | SHA1 | Date | |
---|---|---|---|
898da84dc3 | |||
bba89794a2 |
114
catch-all/05_infra_test/02_rabbitmq/05_topics/emit_log_topic.py
Normal file
114
catch-all/05_infra_test/02_rabbitmq/05_topics/emit_log_topic.py
Normal file
@ -0,0 +1,114 @@
|
|||||||
|
#!/usr/bin/env python
|
||||||
|
import pika
|
||||||
|
import argparse
|
||||||
|
import logging
|
||||||
|
import time
|
||||||
|
import random
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
# Configuración del logger
|
||||||
|
logging.basicConfig(level=logging.INFO,
|
||||||
|
format='%(asctime)s - %(levelname)s - %(message)s')
|
||||||
|
|
||||||
|
|
||||||
|
def parse_arguments():
|
||||||
|
"""
|
||||||
|
Analiza los argumentos de línea de comandos utilizando argparse.
|
||||||
|
Devuelve un objeto con los argumentos proporcionados por el usuario.
|
||||||
|
"""
|
||||||
|
|
||||||
|
parser = argparse.ArgumentParser(
|
||||||
|
description='Enviar mensajes a un intercambio de tipo "topic" en RabbitMQ.')
|
||||||
|
parser.add_argument(
|
||||||
|
'routing_key', help='La clave de enrutamiento para el mensaje.')
|
||||||
|
parser.add_argument(
|
||||||
|
'message', nargs='*', default=['Hola', 'Mundo!'], help='El mensaje base a enviar.')
|
||||||
|
|
||||||
|
return parser.parse_args()
|
||||||
|
|
||||||
|
|
||||||
|
def establish_connection():
|
||||||
|
"""
|
||||||
|
Establece una conexión con RabbitMQ.
|
||||||
|
Retorna el objeto de conexión si es exitoso.
|
||||||
|
Salida del programa si hay un error de conexión.
|
||||||
|
"""
|
||||||
|
|
||||||
|
try:
|
||||||
|
connection = pika.BlockingConnection(
|
||||||
|
pika.ConnectionParameters(host='localhost'))
|
||||||
|
|
||||||
|
return connection
|
||||||
|
|
||||||
|
except pika.exceptions.AMQPConnectionError as e:
|
||||||
|
logging.error('\n[!] Error al conectar con RabbitMQ: %s', e)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
|
||||||
|
def declare_exchange(channel):
|
||||||
|
"""
|
||||||
|
Declara el intercambio de tipo 'topic'.
|
||||||
|
Salida del programa si hay un error al declarar el intercambio.
|
||||||
|
"""
|
||||||
|
|
||||||
|
try:
|
||||||
|
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
|
||||||
|
|
||||||
|
except pika.exceptions.ChannelError as e:
|
||||||
|
logging.error('\n[!] Error al declarar el intercambio: %s', e)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
|
||||||
|
def generate_message(base_message):
|
||||||
|
"""
|
||||||
|
Genera un mensaje único que incluye un número aleatorio, fecha y hora actual.
|
||||||
|
"""
|
||||||
|
|
||||||
|
random_id = random.randint(
|
||||||
|
1000, 9999) # Genera un ID aleatorio de 4 dígitos
|
||||||
|
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') # Fecha y hora actual
|
||||||
|
|
||||||
|
return f"{timestamp} - ({random_id}): {base_message}"
|
||||||
|
|
||||||
|
|
||||||
|
def publish_message(channel, routing_key, message):
|
||||||
|
"""
|
||||||
|
Publica un mensaje en el intercambio declarado.
|
||||||
|
"""
|
||||||
|
|
||||||
|
channel.basic_publish(exchange='topic_logs',
|
||||||
|
routing_key=routing_key, body=message)
|
||||||
|
logging.info(' [+] Enviado %s:%s', routing_key, message)
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
"""
|
||||||
|
Función principal que orquesta la ejecución del script.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Parsear los argumentos de línea de comandos
|
||||||
|
args = parse_arguments()
|
||||||
|
routing_key = args.routing_key
|
||||||
|
base_message = ' '.join(args.message)
|
||||||
|
|
||||||
|
# Establecer conexión y publicar mensaje cada 5 segundos
|
||||||
|
with establish_connection() as connection:
|
||||||
|
channel = connection.channel()
|
||||||
|
declare_exchange(channel)
|
||||||
|
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
# Generar y publicar el mensaje
|
||||||
|
message = generate_message(base_message)
|
||||||
|
publish_message(channel, routing_key, message)
|
||||||
|
# Espera de 5 segundos antes de enviar el siguiente mensaje
|
||||||
|
time.sleep(5)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
logging.info("\n[!] Interrupción del usuario. Terminando...")
|
||||||
|
except Exception as e:
|
||||||
|
logging.error("\n[!] Se produjo un error inesperado: %s", e)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
|
||||||
|
main()
|
@ -0,0 +1,122 @@
|
|||||||
|
#!/usr/bin/env python
|
||||||
|
import pika
|
||||||
|
import argparse
|
||||||
|
import logging
|
||||||
|
import sys
|
||||||
|
|
||||||
|
# Configuración del logger
|
||||||
|
logging.basicConfig(level=logging.INFO,
|
||||||
|
format='%(asctime)s - %(levelname)s - %(message)s')
|
||||||
|
|
||||||
|
|
||||||
|
def parse_arguments():
|
||||||
|
"""
|
||||||
|
Analiza los argumentos de línea de comandos utilizando argparse.
|
||||||
|
Devuelve un objeto con los argumentos proporcionados por el usuario.
|
||||||
|
"""
|
||||||
|
|
||||||
|
parser = argparse.ArgumentParser(
|
||||||
|
description='Recibe mensajes de un intercambio de tipo "topic" en RabbitMQ.')
|
||||||
|
parser.add_argument('binding_keys', nargs='+',
|
||||||
|
help='Lista de claves de enlace para filtrar los mensajes.')
|
||||||
|
|
||||||
|
return parser.parse_args()
|
||||||
|
|
||||||
|
|
||||||
|
def establish_connection():
|
||||||
|
"""
|
||||||
|
Establece una conexión con RabbitMQ.
|
||||||
|
Retorna el objeto de conexión si es exitoso.
|
||||||
|
Salida del programa si hay un error de conexión.
|
||||||
|
"""
|
||||||
|
|
||||||
|
try:
|
||||||
|
connection = pika.BlockingConnection(
|
||||||
|
pika.ConnectionParameters(host='localhost'))
|
||||||
|
return connection
|
||||||
|
|
||||||
|
except pika.exceptions.AMQPConnectionError as e:
|
||||||
|
logging.error('[!] Error al conectar con RabbitMQ: %s', e)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
|
||||||
|
def declare_exchange_and_queue(channel):
|
||||||
|
"""
|
||||||
|
Declara el intercambio de tipo 'topic' y una cola exclusiva.
|
||||||
|
Retorna el nombre de la cola creada.
|
||||||
|
"""
|
||||||
|
|
||||||
|
try:
|
||||||
|
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
|
||||||
|
result = channel.queue_declare('', exclusive=True)
|
||||||
|
|
||||||
|
return result.method.queue
|
||||||
|
|
||||||
|
except pika.exceptions.ChannelError as e:
|
||||||
|
logging.error('[!] Error al declarar el intercambio o la cola: %s', e)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
|
||||||
|
def bind_queue(channel, queue_name, binding_keys):
|
||||||
|
"""
|
||||||
|
Vincula la cola al intercambio con las claves de enlace proporcionadas.
|
||||||
|
"""
|
||||||
|
|
||||||
|
for binding_key in binding_keys:
|
||||||
|
try:
|
||||||
|
channel.queue_bind(exchange='topic_logs',
|
||||||
|
queue=queue_name, routing_key=binding_key)
|
||||||
|
logging.info(
|
||||||
|
' [i] Cola vinculada con clave de enlace: %s', binding_key)
|
||||||
|
|
||||||
|
except pika.exceptions.ChannelError as e:
|
||||||
|
logging.error(
|
||||||
|
'[!] Error al vincular la cola con la clave de enlace %s: %s', binding_key, e)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
|
||||||
|
def callback(ch, method, properties, body):
|
||||||
|
"""
|
||||||
|
Función de callback que maneja los mensajes recibidos.
|
||||||
|
"""
|
||||||
|
|
||||||
|
logging.info(' [+] %s: %s', method.routing_key.upper(), body.decode())
|
||||||
|
|
||||||
|
|
||||||
|
def start_consuming(channel, queue_name):
|
||||||
|
"""
|
||||||
|
Inicia la recepción de mensajes desde la cola especificada.
|
||||||
|
"""
|
||||||
|
|
||||||
|
channel.basic_consume(
|
||||||
|
queue=queue_name, on_message_callback=callback, auto_ack=True)
|
||||||
|
logging.info(' [i] Esperando mensajes. Para salir presione CTRL+C')
|
||||||
|
|
||||||
|
try:
|
||||||
|
channel.start_consuming()
|
||||||
|
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
logging.info(' [!] Interrupción del usuario. Terminando...')
|
||||||
|
channel.stop_consuming()
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
"""
|
||||||
|
Función principal que orquesta la ejecución del script.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Parsear los argumentos de línea de comandos
|
||||||
|
args = parse_arguments()
|
||||||
|
binding_keys = args.binding_keys
|
||||||
|
|
||||||
|
# Establecer conexión, declarar intercambio y cola, vincular y comenzar a consumir
|
||||||
|
with establish_connection() as connection:
|
||||||
|
channel = connection.channel()
|
||||||
|
queue_name = declare_exchange_and_queue(channel)
|
||||||
|
bind_queue(channel, queue_name, binding_keys)
|
||||||
|
start_consuming(channel, queue_name)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
|
||||||
|
main()
|
@ -16,6 +16,11 @@
|
|||||||
- [Código de Ejemplo](#código-de-ejemplo)
|
- [Código de Ejemplo](#código-de-ejemplo)
|
||||||
- [Ejemplos de Uso](#ejemplos-de-uso)
|
- [Ejemplos de Uso](#ejemplos-de-uso)
|
||||||
- [Topics (Próximamente)](#topics-próximamente)
|
- [Topics (Próximamente)](#topics-próximamente)
|
||||||
|
- [¿Qué es un intercambio de temas?](#qué-es-un-intercambio-de-temas)
|
||||||
|
- [Casos especiales de `binding_key`](#casos-especiales-de-binding_key)
|
||||||
|
- [Ejemplo de uso](#ejemplo-de-uso)
|
||||||
|
- [Características del intercambio de temas](#características-del-intercambio-de-temas)
|
||||||
|
- [Implementación del sistema de registro](#implementación-del-sistema-de-registro)
|
||||||
- [RPC (Próximamente)](#rpc-próximamente)
|
- [RPC (Próximamente)](#rpc-próximamente)
|
||||||
|
|
||||||
|
|
||||||
@ -137,12 +142,13 @@ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
|
|||||||
|
|
||||||
A diferencia de las colas de trabajo, donde cada tarea se entrega a un solo trabajador, este tutorial demuestra el patrón de publicación/suscripción, que entrega mensajes a múltiples consumidores.
|
A diferencia de las colas de trabajo, donde cada tarea se entrega a un solo trabajador, este tutorial demuestra el patrón de publicación/suscripción, que entrega mensajes a múltiples consumidores.
|
||||||
|
|
||||||
|
![](https://external-content.duckduckgo.com/iu/?u=https%3A%2F%2Fwww.myanglog.com%2Fstatic%2Fe63b9118f1113a569006637046857099%2F7842b%2FUntitled1.png)
|
||||||
|
|
||||||
El ejemplo es un sistema de registro con dos programas: uno que emite mensajes de registro y otro que los recibe y los imprime.
|
El ejemplo es un sistema de registro con dos programas: uno que emite mensajes de registro y otro que los recibe y los imprime.
|
||||||
|
|
||||||
Cada instancia del programa receptor recibe todos los mensajes, permitiendo que los registros se dirijan al disco o se visualicen en pantalla.
|
Cada instancia del programa receptor recibe todos los mensajes, permitiendo que los registros se dirijan al disco o se visualicen en pantalla.
|
||||||
|
|
||||||
**Enfoque:**
|
**Enfoque:**
|
||||||
A diferencia de las colas de trabajo, donde cada tarea se entrega a un solo trabajador, este tutorial demuestra el patrón de publicación/suscripción, que entrega mensajes a múltiples consumidores.
|
|
||||||
|
|
||||||
El ejemplo es un sistema de registro con dos programas: uno que emite mensajes de registro y otro que los recibe y los imprime.
|
El ejemplo es un sistema de registro con dos programas: uno que emite mensajes de registro y otro que los recibe y los imprime.
|
||||||
|
|
||||||
@ -168,12 +174,18 @@ Ejemplo de declaración de una cola temporal:
|
|||||||
```python
|
```python
|
||||||
result = channel.queue_declare(queue='', exclusive=True)
|
result = channel.queue_declare(queue='', exclusive=True)
|
||||||
```
|
```
|
||||||
|
**Código de Ejemplo:**
|
||||||
|
|
||||||
|
- [emit_log.py](./03_publish_subcribe/emit_log.py) para enviar mensajes de log.
|
||||||
|
- [receive_logs.py](./03_publish_subcribe/receive_logs.py) para recibir mensajes de log.
|
||||||
|
|
||||||
|
|
||||||
### Routing
|
### Routing
|
||||||
|
|
||||||
En el tutorial anterior, creamos un sistema de registro simple que enviaba mensajes de log a múltiples receptores. En este tutorial, añadiremos la capacidad de suscribirse solo a un subconjunto de mensajes, permitiendo, por ejemplo, que solo los mensajes de error críticos se registren en un archivo, mientras que todos los mensajes de log se imprimen en la consola.
|
En el tutorial anterior, creamos un sistema de registro simple que enviaba mensajes de log a múltiples receptores. En este tutorial, añadiremos la capacidad de suscribirse solo a un subconjunto de mensajes, permitiendo, por ejemplo, que solo los mensajes de error críticos se registren en un archivo, mientras que todos los mensajes de log se imprimen en la consola.
|
||||||
|
|
||||||
|
![](https://external-content.duckduckgo.com/iu/?u=https%3A%2F%2Froytuts.com%2Fwp-content%2Fuploads%2F2022%2F02%2Fimage-2.png&f=1&nofb=1&ipt=7fde4343c19d799a6854664651f05dba983d70b2da4179f5b1f9d1e045a941d9&ipo=images)
|
||||||
|
|
||||||
|
|
||||||
#### Enlaces
|
#### Enlaces
|
||||||
|
|
||||||
@ -234,64 +246,9 @@ for severity in severities:
|
|||||||
|
|
||||||
#### Código de Ejemplo
|
#### Código de Ejemplo
|
||||||
|
|
||||||
- **`emit_log_direct.py`**: Script para emitir mensajes de log.
|
- **[emit_log_direct.py](./04_routing/emit_log_direct.py)**: Script para emitir mensajes de log.
|
||||||
|
- **[receive_logs_direct.py](./04_routing/receive_logs_direct.py)**: Script para recibir mensajes de log.
|
||||||
|
|
||||||
```python
|
|
||||||
#!/usr/bin/env python
|
|
||||||
import pika
|
|
||||||
import sys
|
|
||||||
|
|
||||||
connection = pika.BlockingConnection(
|
|
||||||
pika.ConnectionParameters(host='localhost'))
|
|
||||||
channel = connection.channel()
|
|
||||||
|
|
||||||
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
|
|
||||||
|
|
||||||
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
|
|
||||||
message = ' '.join(sys.argv[2:]) or 'Hello World!'
|
|
||||||
channel.basic_publish(
|
|
||||||
exchange='direct_logs', routing_key=severity, body=message)
|
|
||||||
print(f" [x] Sent {severity}:{message}")
|
|
||||||
connection.close()
|
|
||||||
```
|
|
||||||
|
|
||||||
- **`receive_logs_direct.py`**: Script para recibir mensajes de log.
|
|
||||||
|
|
||||||
```python
|
|
||||||
#!/usr/bin/env python
|
|
||||||
import pika
|
|
||||||
import sys
|
|
||||||
|
|
||||||
connection = pika.BlockingConnection(
|
|
||||||
pika.ConnectionParameters(host='localhost'))
|
|
||||||
channel = connection.channel()
|
|
||||||
|
|
||||||
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
|
|
||||||
|
|
||||||
result = channel.queue_declare(queue='', exclusive=True)
|
|
||||||
queue_name = result.method.queue
|
|
||||||
|
|
||||||
severities = sys.argv[1:]
|
|
||||||
if not severities:
|
|
||||||
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
for severity in severities:
|
|
||||||
channel.queue_bind(
|
|
||||||
exchange='direct_logs', queue=queue_name, routing_key=severity)
|
|
||||||
|
|
||||||
print(' [*] Waiting for logs. To exit press CTRL+C')
|
|
||||||
|
|
||||||
|
|
||||||
def callback(ch, method, properties, body):
|
|
||||||
print(f" [x] {method.routing_key}:{body}")
|
|
||||||
|
|
||||||
|
|
||||||
channel.basic_consume(
|
|
||||||
queue=queue_name, on_message_callback=callback, auto_ack=True)
|
|
||||||
|
|
||||||
channel.start_consuming()
|
|
||||||
```
|
|
||||||
|
|
||||||
#### Ejemplos de Uso
|
#### Ejemplos de Uso
|
||||||
|
|
||||||
@ -313,13 +270,64 @@ channel.start_consuming()
|
|||||||
|
|
||||||
### Topics (Próximamente)
|
### Topics (Próximamente)
|
||||||
|
|
||||||
|
En el tutorial anterior, mejoramos nuestro sistema de registro utilizando un intercambio de tipo `direct` para recibir registros selectivamente, basado en criterios como la severidad del mensaje. Sin embargo, para mayor flexibilidad, podemos usar un intercambio de tipo `topic`, que permite el enrutamiento de mensajes basado en múltiples criterios.
|
||||||
|
|
||||||
|
![](https://miro.medium.com/v2/resize:fit:1400/0*gFwb04MsfqtVB5bY.png)
|
||||||
|
|
||||||
|
#### ¿Qué es un intercambio de temas?
|
||||||
|
|
||||||
|
- **`routing_key`**: En un intercambio de tipo `topic`, los mensajes tienen una clave de enrutamiento (`routing_key`) que es una lista de palabras separadas por puntos. Ejemplos: `"quick.orange.rabbit"`, `"lazy.brown.fox"`.
|
||||||
|
- **`binding_key`**: Las claves de enlace (`binding_key`) también tienen el mismo formato y determinan qué mensajes recibe cada cola.
|
||||||
|
|
||||||
|
#### Casos especiales de `binding_key`
|
||||||
|
|
||||||
|
- **`*` (asterisco)**: Sustituye exactamente una palabra.
|
||||||
|
- **`#` (almohadilla)**: Sustituye cero o más palabras.
|
||||||
|
|
||||||
|
#### Ejemplo de uso
|
||||||
|
|
||||||
|
Considera el siguiente escenario con dos colas (Q1 y Q2) y estas claves de enlace:
|
||||||
|
|
||||||
|
- Q1: `*.orange.*` (recibe todos los mensajes sobre animales naranjas)
|
||||||
|
- Q2: `*.*.rabbit` y `lazy.#` (recibe todos los mensajes sobre conejos y animales perezosos)
|
||||||
|
|
||||||
|
Ejemplos de mensajes:
|
||||||
|
|
||||||
|
- `"quick.orange.rabbit"`: Entregado a Q1 y Q2.
|
||||||
|
- `"lazy.orange.elephant"`: Entregado a Q1 y Q2.
|
||||||
|
- `"quick.orange.fox"`: Solo entregado a Q1.
|
||||||
|
- `"lazy.brown.fox"`: Solo entregado a Q2.
|
||||||
|
|
||||||
|
Mensajes con una o cuatro palabras, como `"orange"` o `"quick.orange.new.rabbit"`, no coinciden con ningún enlace y se pierden.
|
||||||
|
|
||||||
|
#### Características del intercambio de temas
|
||||||
|
|
||||||
|
- Puede comportarse como un intercambio `fanout` si se usa `#` como `binding_key` (recibe todos los mensajes).
|
||||||
|
- Se comporta como un intercambio `direct` si no se utilizan `*` o `#` en las claves de enlace.
|
||||||
|
|
||||||
|
#### Implementación del sistema de registro
|
||||||
|
|
||||||
|
Usaremos un intercambio de temas para enrutar registros usando `routing_key` con el formato `<facilidad>.<severidad>`. El código para emitir y recibir registros es similar al de tutoriales anteriores.
|
||||||
|
|
||||||
|
**Ejemplos de comandos:**
|
||||||
|
|
||||||
|
- Recibir todos los registros: `python receive_logs_topic.py "#"`
|
||||||
|
- Recibir registros de "kern": `python receive_logs_topic.py "kern.*"`
|
||||||
|
- Recibir solo registros "critical": `python receive_logs_topic.py "*.critical"`
|
||||||
|
- Emitir un registro crítico de "kern": `python emit_log_topic.py "kern.critical" "A critical kernel error"`
|
||||||
|
|
||||||
|
El código es casi el mismo que en el tutorial anterior.
|
||||||
|
|
||||||
|
- **[emit_log_topic.py](./05_topics/emit_log_topic.py)**
|
||||||
|
- **[receive_logs_topic.py](./05_topics/receive_logs_topic)**
|
||||||
|
|
||||||
|
|
||||||
### RPC (Próximamente)
|
### RPC (Próximamente)
|
||||||
|
|
||||||
|
|
||||||
|
![](https://alvaro-videla.com/images/RPC-OverRMQ.png)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user