From 89959d29eee2daf17d9767a9006678f0ff9ca586 Mon Sep 17 00:00:00 2001 From: Manuel Vergara Date: Tue, 6 Aug 2024 22:05:20 +0200 Subject: [PATCH] Update rabbitmq test --- .../02_rabbitmq/06_rpc/rpc_client.py | 113 +++++++++++++++ .../02_rabbitmq/06_rpc/rpc_server.py | 92 ++++++++++++ catch-all/05_infra_test/02_rabbitmq/README.md | 133 +++++++++++++++++- 3 files changed, 335 insertions(+), 3 deletions(-) create mode 100644 catch-all/05_infra_test/02_rabbitmq/06_rpc/rpc_client.py create mode 100644 catch-all/05_infra_test/02_rabbitmq/06_rpc/rpc_server.py diff --git a/catch-all/05_infra_test/02_rabbitmq/06_rpc/rpc_client.py b/catch-all/05_infra_test/02_rabbitmq/06_rpc/rpc_client.py new file mode 100644 index 0000000..852691e --- /dev/null +++ b/catch-all/05_infra_test/02_rabbitmq/06_rpc/rpc_client.py @@ -0,0 +1,113 @@ +#!/usr/bin/env python +import pika +import uuid +import sys +import argparse + + +class FibonacciRpcClient: + def __init__(self, host='localhost'): + # Establece la conexión con RabbitMQ + try: + self.connection = pika.BlockingConnection( + pika.ConnectionParameters(host=host) + ) + except pika.exceptions.AMQPConnectionError as e: + print(f"[!] Error al conectar con RabbitMQ: {e}") + sys.exit(1) + + # Crea un canal de comunicación + self.channel = self.connection.channel() + + # Declara una cola exclusiva para recibir las respuestas del servidor RPC + result = self.channel.queue_declare(queue='', exclusive=True) + self.callback_queue = result.method.queue + + # Configura el canal para consumir mensajes de la cola de respuestas + self.channel.basic_consume( + queue=self.callback_queue, + on_message_callback=self.on_response, + auto_ack=True + ) + + # Inicializa las variables para manejar la respuesta RPC + self.response = None + self.corr_id = None + + def on_response(self, ch, method, props, body): + """Callback ejecutado al recibir una respuesta del servidor RPC.""" + # Verifica si el ID de correlación coincide con el esperado + if self.corr_id == props.correlation_id: + self.response = body + + def call(self, n): + """ + Envía una solicitud RPC para calcular el número de Fibonacci de n. + + :param n: Número entero no negativo para calcular su Fibonacci. + :return: Resultado del cálculo de Fibonacci. + :raises ValueError: Si n no es un número entero no negativo. + """ + # Verifica que la entrada sea un número entero no negativo + if not isinstance(n, int) or n < 0: + raise ValueError("[!] El argumento debe ser un entero no negativo.") + + # Resetea la respuesta y genera un nuevo ID de correlación único + self.response = None + self.corr_id = str(uuid.uuid4()) + + # Publica un mensaje en la cola 'rpc_queue' con las propiedades necesarias + self.channel.basic_publish( + exchange='', + routing_key='rpc_queue', + properties=pika.BasicProperties( + reply_to=self.callback_queue, # Cola de retorno para recibir la respuesta + correlation_id=self.corr_id, # ID único para correlacionar la respuesta + ), + body=str(n) + ) + + # Espera hasta que se reciba la respuesta del servidor + while self.response is None: + self.connection.process_data_events(time_limit=None) + + # Devuelve la respuesta convertida a un entero + return int(self.response) + + def close(self): + """Cierra la conexión con el servidor de RabbitMQ.""" + self.connection.close() + + +if __name__ == "__main__": + # Configura el analizador de argumentos para recibir el número de Fibonacci + parser = argparse.ArgumentParser( + description="Calcula números de Fibonacci mediante RPC.") + parser.add_argument( + "number", + type=int, + help="Número entero no negativo para calcular su Fibonacci." + ) + args = parser.parse_args() + + # Inicializa el cliente RPC de Fibonacci + fibonacci_rpc = FibonacciRpcClient() + + # Número para el cual se desea calcular el Fibonacci + n = args.number + + try: + print(f" [+] Solicitando fib({n})") + # Realiza la llamada RPC y obtiene el resultado + response = fibonacci_rpc.call(n) + print(f" [+] Resultado fib({n}) = {response}") + + except ValueError as e: + print(f"[!] Error de valor: {e}") + + except Exception as e: + print(f"[!] Error inesperado: {e}") + + finally: + # Cierra la conexión del cliente RPC + fibonacci_rpc.close() diff --git a/catch-all/05_infra_test/02_rabbitmq/06_rpc/rpc_server.py b/catch-all/05_infra_test/02_rabbitmq/06_rpc/rpc_server.py new file mode 100644 index 0000000..a5ab97f --- /dev/null +++ b/catch-all/05_infra_test/02_rabbitmq/06_rpc/rpc_server.py @@ -0,0 +1,92 @@ +#!/usr/bin/env python +import pika +from functools import lru_cache +import time + +# Establecemos la conexión a RabbitMQ + + +def create_connection(): + + try: + connection = pika.BlockingConnection( + pika.ConnectionParameters(host='localhost') + ) + + return connection + + except pika.exceptions.AMQPConnectionError as e: + print(f"[!] Error al conectar a RabbitMQ: {e}") + + return None + + +# Conexión a RabbitMQ +connection = create_connection() + +if not connection: + exit(1) + +# Canal de comunicación con RabbitMQ +channel = connection.channel() + +# Declaración de la cola 'rpc_queue' +channel.queue_declare(queue='rpc_queue') + +# Implementación mejorada de Fibonacci con memoización + + +@lru_cache(maxsize=None) +def fib(n): + + if n < 0: + raise ValueError("\n[!] El número no puede ser negativo.") + + elif n == 0: + return 0 + + elif n == 1: + return 1 + + else: + return fib(n - 1) + fib(n - 2) + +# Callback para procesar solicitudes RPC + + +def on_request(ch, method, props, body): + + try: + n = int(body) + print(f" [+] Calculando fib({n})") + + # Simula un tiempo de procesamiento de 2 segundos + time.sleep(2) + + response = fib(n) + print(f" [+] Resultado: {response}") + + except ValueError as e: + response = f"\n[!] Error: {str(e)}" + + except Exception as e: + response = f"\n[!] Error inesperado: {str(e)}" + + # Publicar la respuesta al cliente + ch.basic_publish( + exchange='', + routing_key=props.reply_to, + properties=pika.BasicProperties(correlation_id=props.correlation_id), + body=str(response) + ) + + # Confirmar la recepción del mensaje + ch.basic_ack(delivery_tag=method.delivery_tag) + + +# Configuración de calidad de servicio y consumo de mensajes +channel.basic_qos(prefetch_count=1) +channel.basic_consume(queue='rpc_queue', on_message_callback=on_request) + +print("\n[i] Esperando solicitudes RPC") +channel.start_consuming() diff --git a/catch-all/05_infra_test/02_rabbitmq/README.md b/catch-all/05_infra_test/02_rabbitmq/README.md index 5b26ccd..2c0a8e2 100644 --- a/catch-all/05_infra_test/02_rabbitmq/README.md +++ b/catch-all/05_infra_test/02_rabbitmq/README.md @@ -21,7 +21,12 @@ - [Ejemplo de uso](#ejemplo-de-uso) - [Características del intercambio de temas](#características-del-intercambio-de-temas) - [Implementación del sistema de registro](#implementación-del-sistema-de-registro) - - [RPC (Próximamente)](#rpc-próximamente) + - [RPC](#rpc) + - [Interfaz del cliente](#interfaz-del-cliente) + - [Cola de retorno (*Callback queue*)](#cola-de-retorno-callback-queue) + - [ID de correlación (*Correlation id*)](#id-de-correlación-correlation-id) + - [Resumen](#resumen) + - [Poniéndolo todo junto](#poniéndolo-todo-junto) ## Despliegue rabbitmq con docker @@ -322,14 +327,136 @@ El código es casi el mismo que en el tutorial anterior. - **[receive_logs_topic.py](./05_topics/receive_logs_topic)** - ### RPC (Próximamente) +### RPC + +En el segundo tutorial aprendimos a usar *Work Queues* para distribuir tareas que consumen tiempo entre múltiples trabajadores. + +Pero, ¿qué pasa si necesitamos ejecutar una función en una computadora remota y esperar el resultado? Eso es una historia diferente. Este patrón es comúnmente conocido como *Remote Procedure Call* o RPC. + +![](https://alvaro-videla.com/images/RPC-OverRMQ.png) + +En este tutorial vamos a usar RabbitMQ para construir un sistema RPC: un cliente y un servidor RPC escalable. Como no tenemos tareas que consuman tiempo que valga la pena distribuir, vamos a crear un servicio RPC ficticio que devuelva números de Fibonacci. - ![](https://alvaro-videla.com/images/RPC-OverRMQ.png) +#### Interfaz del cliente + +Para ilustrar cómo se podría usar un servicio RPC, vamos a crear una clase de cliente simple. Va a exponer un método llamado `call` que envía una solicitud RPC y se bloquea hasta que se recibe la respuesta: + +```python +fibonacci_rpc = FibonacciRpcClient() +result = fibonacci_rpc.call(4) +print(f"fib(4) is {result}") +``` + +**Una nota sobre RPC** + +Aunque RPC es un patrón bastante común en informática, a menudo se critica. Los problemas surgen cuando un programador no está al tanto de si una llamada a función es local o si es un RPC lento. Confusiones como esas resultan en un sistema impredecible y añaden una complejidad innecesaria a la depuración. En lugar de simplificar el software, el uso indebido de RPC puede resultar en un código enredado e inmantenible. + +Teniendo esto en cuenta, considere los siguientes consejos: + +- Asegúrese de que sea obvio qué llamada a función es local y cuál es remota. +- Documente su sistema. Haga claras las dependencias entre componentes. +- Maneje los casos de error. ¿Cómo debería reaccionar el cliente cuando el servidor RPC está caído por mucho tiempo? + +En caso de duda, evite RPC. Si puede, debe usar un canal asincrónico: en lugar de un bloqueo estilo RPC, los resultados se envían asincrónicamente a la siguiente etapa de computación. +#### Cola de retorno (*Callback queue*) + +En general, hacer RPC sobre RabbitMQ es fácil. Un cliente envía un mensaje de solicitud y un servidor responde con un mensaje de respuesta. Para recibir una respuesta, el cliente necesita enviar una dirección de una cola de retorno (*callback queue*) con la solicitud. Vamos a intentarlo: + +```python +result = channel.queue_declare(queue='', exclusive=True) +callback_queue = result.method.queue + +channel.basic_publish(exchange='', + routing_key='rpc_queue', + properties=pika.BasicProperties( + reply_to = callback_queue, + ), + body=request) + +# ... y algo de código para leer un mensaje de respuesta de la callback_queue ... +``` + +**Propiedades del mensaje** + +El protocolo AMQP 0-9-1 predefine un conjunto de 14 propiedades que acompañan un mensaje. La mayoría de las propiedades rara vez se utilizan, con la excepción de las siguientes: + +- `delivery_mode`: Marca un mensaje como persistente (con un valor de 2) o transitorio (cualquier otro valor). Puede recordar esta propiedad del segundo tutorial. +- `content_type`: Se utiliza para describir el tipo MIME de la codificación. Por ejemplo, para la codificación JSON a menudo utilizada, es una buena práctica establecer esta propiedad en: `application/json`. +- `reply_to`: Comúnmente utilizado para nombrar una cola de retorno (*callback queue*). +- `correlation_id`: Útil para correlacionar respuestas RPC con solicitudes. +#### ID de correlación (*Correlation id*) + +En el método presentado anteriormente, sugerimos crear una cola de retorno para cada solicitud RPC. Eso es bastante ineficiente, pero afortunadamente hay una mejor manera: crear una única cola de retorno por cliente. + +Eso plantea un nuevo problema: al recibir una respuesta en esa cola, no está claro a qué solicitud pertenece la respuesta. Es entonces cuando se usa la propiedad `correlation_id`. Vamos a configurarla a un valor único para cada solicitud. Más tarde, cuando recibamos un mensaje en la cola de retorno, miraremos esta propiedad, y con base en eso podremos coincidir una respuesta con una solicitud. Si vemos un valor de `correlation_id` desconocido, podemos descartar el mensaje de manera segura: no pertenece a nuestras solicitudes. + +Puede preguntar, ¿por qué deberíamos ignorar los mensajes desconocidos en la cola de retorno en lugar de fallar con un error? Se debe a la posibilidad de una condición de carrera en el lado del servidor. Aunque es poco probable, es posible que el servidor RPC muera justo después de enviarnos la respuesta, pero antes de enviar un mensaje de confirmación para la solicitud. Si eso sucede, el servidor RPC reiniciado procesará la solicitud nuevamente. Es por eso que, en el cliente, debemos manejar las respuestas duplicadas de manera prudente, y el RPC idealmente debería ser idempotente. +#### Resumen + +Nuestro RPC funcionará así: + +- Cuando el cliente se inicia, crea una cola de retorno anónima exclusiva. +- Para una solicitud RPC, el cliente envía un mensaje con dos propiedades: `reply_to`, que se establece en la cola de retorno, y `correlation_id`, que se establece en un valor único para cada solicitud. +- La solicitud se envía a una cola llamada `rpc_queue`. +- El trabajador RPC (también conocido como servidor) está esperando solicitudes en esa cola. Cuando aparece una solicitud, hace el trabajo y envía un mensaje con el resultado de vuelta al cliente, usando la cola del campo `reply_to`. +- El cliente espera datos en la cola de retorno. Cuando aparece un mensaje, verifica la propiedad `correlation_id`. Si coincide con el valor de la solicitud, devuelve la respuesta a la aplicación. + + +#### Poniéndolo todo junto + +- [rpc_server.py](./06_rpc/rpc_client.py) + +El código del servidor es bastante sencillo: + +- Como de costumbre, comenzamos estableciendo la conexión y declarando la cola `rpc_queue`. +- Declaramos nuestra función de Fibonacci. Asume solo una entrada de entero positivo válida. (No espere que funcione para números grandes, probablemente sea la implementación recursiva más lenta posible). +- Declaramos un *callback* `on_request` para `basic_consume`, el núcleo del servidor RPC. Se ejecuta cuando se recibe la solicitud. Hace el trabajo y envía la respuesta de vuelta. +- Podríamos querer ejecutar más de un proceso de servidor. Para distribuir la carga de manera equitativa entre varios servidores, necesitamos establecer el ajuste `prefetch_count`. + +[rpc_client.py](-/06_rpc/rpc_client.py) + +El código del cliente es un poco más complejo: + +- Establecemos una conexión, un canal y declaramos una `callback_queue` exclusiva para las respuestas. +- Nos suscribimos a la `callback_queue`, para que podamos recibir respuestas RPC. +- El *callback* `on_response` que se ejecuta en cada respuesta está haciendo un trabajo muy simple: para cada mensaje de respuesta, verifica si el `correlation_id` es el que estamos buscando. Si es así, guarda la respuesta en `self.response` y sale del bucle de consumo. +- A continuación, definimos nuestro método principal `call`: realiza la solicitud RPC real. +- En el método `call`, generamos un número de `correlation_id` único y lo guardamos: la función de *callback* `on_response` usará este valor para capturar + + la respuesta apropiada. +- También en el método `call`, publicamos el mensaje de solicitud, con dos propiedades: `reply_to` y `correlation_id`. +- Al final, esperamos hasta que llegue la respuesta adecuada y devolvemos la respuesta al usuario. + +Nuestro servicio RPC ya está listo. Podemos iniciar el servidor: + +```bash +python rpc_server.py +``` + +Para solicitar un número de Fibonacci, ejecute el cliente con el número como argumento: + +```bash +python rpc_client.py +``` + +El diseño presentado no es la única implementación posible de un servicio RPC, pero tiene algunas ventajas importantes: + +- Si el servidor RPC es demasiado lento, puede escalar simplemente ejecutando otro. Intente ejecutar un segundo `rpc_server.py` en una nueva consola. +- En el lado del cliente, el RPC requiere enviar y recibir solo un mensaje. No se requieren llamadas sincrónicas como `queue_declare`. Como resultado, el cliente RPC necesita solo un viaje de ida y vuelta por la red para una única solicitud RPC. + +Nuestro código sigue siendo bastante simplista y no intenta resolver problemas más complejos (pero importantes), como: + +- ¿Cómo debería reaccionar el cliente si no hay servidores en funcionamiento? +- ¿Debería un cliente tener algún tipo de tiempo de espera para el RPC? +- Si el servidor funciona mal y genera una excepción, ¿debería enviarse al cliente? +- Proteger contra mensajes entrantes no válidos (por ejemplo, verificando límites) antes de procesar. + +---