SolveConPython

Python Reto #23 — Pipeline ETL simple (leer → limpiar → validar → exportar)

Nivel: Intermedio+
Tema: ETL, CSV, validación, normalización, errores por fila, exportación, tests con pytest
Objetivo: Construir un mini pipeline de datos: leer un CSV “sucio”, limpiar y validar filas, separar válidas e inválidas, y exportar resultados.

Reto #23 Pipeline ETL simple (leer → limpiar → validar → exportar)
Reto #23 Pipeline ETL simple (leer → limpiar → validar → exportar)

Enunciado

Crea una función llamada:

etl_simple(ruta_entrada, ruta_salida_ok, ruta_salida_error)

que:

  1. Reciba tres rutas (strings):
    • ruta_entrada: CSV de entrada con cabecera
    • ruta_salida_ok: CSV de salida con filas válidas
    • ruta_salida_error: CSV de salida con filas inválidas + motivo
  2. Validación:
    • Si alguna ruta es NoneTypeError
    • Si alguna ruta no es strTypeError
    • Si ruta_entrada no existe → FileNotFoundError
  3. Debe procesar un CSV con estas columnas (cabecera obligatoria):
    • email
    • edad
    • pais
  4. Reglas de limpieza y validación por fila:
    • email: strip(), minúsculas; válido si contiene un solo @, sin espacios, y con . en el dominio
    • edad: strip(); válido si es entero entre 0 y 120 (incluidos)
    • pais: strip(); válido si no está vacío
  5. Salidas:
    • En ruta_salida_ok escribe las filas válidas con columnas: email,edad,pais (limpias)
    • En ruta_salida_error escribe filas inválidas con columnas: email,edad,pais,error donde error describe el motivo principal
  6. La función debe devolver un resumen:

{"ok": int, "error": int}

Nota: Si una fila tiene varios problemas, registra solo el primer error detectado para simplificar.

CSV de ejemplo (entrada)

usuarios.csv

email,edad,pais
USER@Email.com , 30 , España
bademail.com,25,México
ana@mail,22,Chile
luis@mail.com,200,Perú
maria@mail.com, ,Argentina
,40,Colombia
pedro@mail.com,40,

Salidas esperadas:

  • OK: solo filas válidas (email normalizado, edad limpia, país limpio)
  • ERROR: filas inválidas con explicación

Pistas

  1. Reutiliza el validador de email “simple” del Reto #11.
  2. Convierte edad con int(...) y valida rango.
  3. Para exportar CSV usa csv.DictWriter.
  4. Maneja archivos con newline="" para evitar líneas en blanco en Windows.

Solución explicada (paso a paso)

  1. Validar rutas y existencia del archivo de entrada.
  2. Leer con csv.DictReader.
  3. Por cada fila:
    • normalizar campos (strip, lower)
    • validar email → si falla, mandar a error
    • validar edad → si falla, mandar a error
    • validar país → si falla, mandar a error
    • si todo OK → escribir en salida OK
  4. Contar ok y error.
  5. Devolver resumen.

Python
import csv
def _email_valido_simple(email: str) -> bool:
if " " in email:
return False
if email.count("@") != 1:
return False
usuario, dominio = email.split("@")
if not usuario or not dominio:
return False
if "." not in dominio:
return False
return True
def etl_simple(ruta_entrada: str, ruta_salida_ok: str, ruta_salida_error: str) -> dict:
"""
Pipeline ETL simple:
- Lee CSV (email, edad, pais)
- Limpia y valida
- Exporta filas válidas e inválidas
- Devuelve conteos
"""
if ruta_entrada is None or ruta_salida_ok is None or ruta_salida_error is None:
raise TypeError("Las rutas no pueden ser None.")
if not isinstance(ruta_entrada, str) or not isinstance(ruta_salida_ok, str) or not isinstance(ruta_salida_error, str):
raise TypeError("Las rutas deben ser cadenas (str).")
ok = 0
error = 0
campos_salida_ok = ["email", "edad", "pais"]
campos_salida_error = ["email", "edad", "pais", "error"]
with open(ruta_entrada, mode="r", encoding="utf-8", newline="") as f_in, \
open(ruta_salida_ok, mode="w", encoding="utf-8", newline="") as f_ok, \
open(ruta_salida_error, mode="w", encoding="utf-8", newline="") as f_err:
reader = csv.DictReader(f_in)
# Validación mínima de cabecera
requeridas = {"email", "edad", "pais"}
if reader.fieldnames is None or not requeridas.issubset(set(reader.fieldnames)):
raise ValueError("El CSV de entrada debe tener cabecera con: email, edad, pais.")
w_ok = csv.DictWriter(f_ok, fieldnames=campos_salida_ok)
w_err = csv.DictWriter(f_err, fieldnames=campos_salida_error)
w_ok.writeheader()
w_err.writeheader()
for fila in reader:
raw_email = (fila.get("email") or "").strip()
raw_edad = (fila.get("edad") or "").strip()
raw_pais = (fila.get("pais") or "").strip()
email = raw_email.lower()
pais = raw_pais
# Validar email
if email == "" or not _email_valido_simple(email):
w_err.writerow({"email": email, "edad": raw_edad, "pais": pais, "error": "email_invalido"})
error += 1
continue
# Validar edad
try:
edad_int = int(raw_edad)
except ValueError:
w_err.writerow({"email": email, "edad": raw_edad, "pais": pais, "error": "edad_invalida"})
error += 1
continue
if edad_int < 0 or edad_int > 120:
w_err.writerow({"email": email, "edad": str(edad_int), "pais": pais, "error": "edad_fuera_de_rango"})
error += 1
continue
# Validar país
if pais == "":
w_err.writerow({"email": email, "edad": str(edad_int), "pais": pais, "error": "pais_vacio"})
error += 1
continue
# Fila OK
w_ok.writerow({"email": email, "edad": str(edad_int), "pais": pais})
ok += 1
return {"ok": ok, "error": error}

Python
import csv
import pytest
from reto_23_etl_simple import etl_simple
def leer_csv_a_lista(ruta):
with open(ruta, "r", encoding="utf-8", newline="") as f:
return list(csv.DictReader(f))
def test_etl_simple(tmp_path):
contenido = (
"email,edad,pais\n"
" USER@Email.com , 30 , España\n"
"bademail.com,25,México\n"
"ana@mail,22,Chile\n"
"luis@mail.com,200,Perú\n"
"maria@mail.com, ,Argentina\n"
",40,Colombia\n"
"pedro@mail.com,40,\n"
"ok@mail.com,0,Uruguay\n"
)
entrada = tmp_path / "usuarios.csv"
salida_ok = tmp_path / "ok.csv"
salida_err = tmp_path / "error.csv"
entrada.write_text(contenido, encoding="utf-8")
resumen = etl_simple(str(entrada), str(salida_ok), str(salida_err))
# Válidos: USER@email.com (30, España) y ok@mail.com (0, Uruguay) => 2
assert resumen["ok"] == 2
assert resumen["error"] == 6
filas_ok = leer_csv_a_lista(salida_ok)
assert filas_ok[0]["email"] == "user@email.com"
assert filas_ok[0]["edad"] == "30"
assert filas_ok[0]["pais"] == "España"
filas_err = leer_csv_a_lista(salida_err)
assert all("error" in f for f in filas_err)
assert any(f["error"] == "email_invalido" for f in filas_err)
assert any(f["error"] == "edad_fuera_de_rango" for f in filas_err)
assert any(f["error"] == "pais_vacio" for f in filas_err)
def test_archivo_no_existe():
with pytest.raises(FileNotFoundError):
etl_simple("no_existe.csv", "ok.csv", "err.csv")
def test_rutas_none():
with pytest.raises(TypeError):
etl_simple(None, "ok.csv", "err.csv")

Ejecuta:

  • pytest -q

Variantes para subir de nivel (opcional)

  1. Acumular múltiples errores por fila (lista de errores)
  2. Normalizar país (por ejemplo, title case)
  3. Soportar separador ; como parámetro
  4. Generar un resumen por país adicional
  5. Registrar estadísticas por tipo de error

Lo que aprendiste

  • Diseño de un pipeline ETL realista
  • Validación por fila sin romper el proceso
  • Exportación de resultados limpios vs. errores
  • Tests con archivos temporales y lecturas CSV

Accede al código completo y a los tests en GitHub para ejecutar y modificar la solución localmente.

El siguiente reto recomendado: Reto #24 — Cache simple (memoization) para acelerar funciones: rendimiento + diseño limpio + tests.