Curso-lenguaje-python/catch-all/05_infra_test/03_kafka/producer-consumer-parse-recipes/producer-consumer-parse-recipes.py
2024-08-07 23:59:44 +02:00

166 lines
4.8 KiB
Python

import json
import logging
from time import sleep
from bs4 import BeautifulSoup
from kafka import KafkaConsumer, KafkaProducer
# Configuración del registro
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
def publish_message(producer_instance, topic_name, key, value):
"""
Publica un mensaje en el tema de Kafka especificado.
"""
try:
key_bytes = bytes(key, encoding='utf-8') # Convertir clave a bytes
value_bytes = bytes(value, encoding='utf-8') # Convertir valor a bytes
producer_instance.send( # Enviar mensaje
topic_name, key=key_bytes,
value=value_bytes
)
producer_instance.flush() # Asegurar que el mensaje ha sido enviado
logging.info('[i] Mensaje publicado con éxito.')
except Exception as ex:
logging.error('[!] Error al publicar mensaje', exc_info=True)
def connect_kafka_producer():
"""
Conecta y devuelve una instancia del productor de Kafka.
"""
try:
producer = KafkaProducer(
bootstrap_servers=['kafka:9092'], # Servidor Kafka
api_version=(0, 10) # Versión de la API de Kafka
)
logging.info('[i] Conectado con éxito al productor de Kafka.')
return producer
except Exception as ex:
logging.error('[!] Error al conectar con Kafka', exc_info=True)
return None
def parse(markup):
"""
Analiza el HTML y extrae la información de la receta.
"""
title = '-'
submit_by = '-'
description = '-'
calories = 0
ingredients = []
rec = {}
try:
soup = BeautifulSoup(markup, 'lxml') # Analizar HTML con BeautifulSoup
# Actualizar selectores CSS para el título, descripción, ingredientes y calorías
title_section = soup.select_one('h1.headline.heading-content') # Título
submitter_section = soup.select_one('span.author-name') # Autor
description_section = soup.select_one('div.recipe-summary > p') # Descripción
ingredients_section = soup.select('li.ingredients-item') # Ingredientes
calories_section = soup.select_one('span.calorie-count') # Calorías
# Extraer calorías
if calories_section:
calories = calories_section.get_text(strip=True).replace('cals', '').strip()
# Extraer ingredientes
if ingredients_section:
for ingredient in ingredients_section:
ingredient_text = ingredient.get_text(strip=True)
if 'Add all ingredients to list' not in ingredient_text and ingredient_text != '':
ingredients.append({'step': ingredient_text})
# Extraer descripción
if description_section:
description = description_section.get_text(strip=True)
# Extraer nombre del autor
if submitter_section:
submit_by = submitter_section.get_text(strip=True)
# Extraer título
if title_section:
title = title_section.get_text(strip=True)
# Crear diccionario con la información de la receta
rec = {
'title': title,
'submitter': submit_by,
'description': description,
'calories': calories,
'ingredients': ingredients
}
logging.info(f"[i] Receta extraída: {rec}")
except Exception as ex:
logging.error('[!] Error en parsing', exc_info=True)
return json.dumps(rec)
def main():
"""
Ejecuta el proceso de consumo y publicación de mensajes.
"""
topic_name = 'raw_recipes'
parsed_topic_name = 'parsed_recipes'
parsed_records = []
# Crear un consumidor de Kafka
consumer = KafkaConsumer(
topic_name,
auto_offset_reset='earliest',
bootstrap_servers=['kafka:9092'],
api_version=(0, 10),
consumer_timeout_ms=2000
)
logging.info('[i] Iniciando el consumidor para parsing...')
try:
for msg in consumer:
html = msg.value
result = parse(html) # Analizar el HTML
parsed_records.append(result)
consumer.close() # Cerrar el consumidor
logging.info('[i] Consumidor cerrado.')
if parsed_records:
logging.info('[+] Publicando registros...')
producer = connect_kafka_producer()
if producer:
for rec in parsed_records:
publish_message(producer, parsed_topic_name, 'parsed', rec)
producer.close() # Cerrar el productor
else:
logging.error('[!] El productor de Kafka no está disponible.')
except Exception as ex:
logging.error('[!] Error en el productor-consumer', exc_info=True)
if __name__ == '__main__':
main()