Curso-lenguaje-python/catch-all/05_infra_test/04_elastic_stack/app/main.py

115 lines
3.5 KiB
Python

import pandas as pd
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
# Configura la conexión a Elasticsearch
es = Elasticsearch("http://elasticsearch:9200")
def create_index():
"""
Crea un índice en Elasticsearch con el nombre 'movies' si no existe.
Define el mapeo del índice para los campos de los documentos.
"""
# Define el mapeo del índice 'movies'
mappings = {
"properties": {
# Campo para el título de la película
"title": {"type": "text", "analyzer": "english"},
# Campo para la etnicidad
"ethnicity": {"type": "text", "analyzer": "standard"},
# Campo para el director
"director": {"type": "text", "analyzer": "standard"},
# Campo para el elenco
"cast": {"type": "text", "analyzer": "standard"},
# Campo para el género
"genre": {"type": "text", "analyzer": "standard"},
# Campo para el argumento de la película
"plot": {"type": "text", "analyzer": "english"},
# Campo para el año de lanzamiento
"year": {"type": "integer"},
# Campo para la página de Wikipedia
"wiki_page": {"type": "keyword"}
}
}
# Verifica si el índice 'movies' ya existe
if not es.indices.exists(index="movies"):
# Crea el índice 'movies' si no existe
es.indices.create(index="movies", mappings=mappings)
print("\n[+] Índice 'movies' creado.")
else:
print("\n[!] El índice 'movies' ya existe.")
def load_data():
"""
Carga datos desde un archivo CSV a Elasticsearch.
"""
try:
# Lee el archivo CSV
df = pd.read_csv("/app/wiki_movie_plots_deduped.csv", quoting=1)
# Verifica el número de filas en el DataFrame
num_rows = len(df)
sample_size = min(5000, num_rows)
# Elimina filas con valores nulos y toma una muestra
df = df.dropna().sample(sample_size, random_state=42).reset_index(drop=True)
except Exception as e:
print(f"\n[!] Error al leer el archivo CSV: {e}")
return
# Prepara los datos para la carga en Elasticsearch
bulk_data = [
{
"_index": "movies", # Nombre del índice en Elasticsearch
"_id": i, # ID del documento en Elasticsearch
"_source": {
"title": row["Title"], # Título de la película
"ethnicity": row["Origin/Ethnicity"], # Etnicidad
"director": row["Director"], # Director
"cast": row["Cast"], # Elenco
"genre": row["Genre"], # Género
"plot": row["Plot"], # Argumento
"year": row["Release Year"], # Año de lanzamiento
"wiki_page": row["Wiki Page"], # Página de Wikipedia
}
}
for i, row in df.iterrows() # Itera sobre cada fila del DataFrame
]
try:
# Carga los datos en Elasticsearch en bloques
bulk(es, bulk_data)
print("\n[+] Datos cargados en Elasticsearch.")
except Exception as e:
print(f"\n[!] Error al cargar datos en Elasticsearch: {e}")
def main():
"""
Función principal que crea el índice y carga los datos.
"""
create_index() # Crea el índice en Elasticsearch
load_data() # Carga los datos en Elasticsearch
if __name__ == "__main__":
# Ejecuta la función principal si el script se ejecuta directamente
main()