SolveConPython

Python Reto #21 — Parsear logs y extraer métricas

Nivel: Intermedio+
Tema: Parsing de texto, manejo de archivos, validación robusta, agregaciones, tests con pytest
Objetivo: Leer un archivo de logs (texto), parsear líneas con formato consistente y generar métricas útiles, ignorando líneas inválidas sin romper el proceso.

Reto #21 Parsear logs y extraer métricas
Reto #21 Parsear logs y extraer métricas

Enunciado

Crea una función llamada metricas_logs(ruta_log) que:

  1. Reciba ruta_log (string) a un archivo .log o .txt.
  2. Si ruta_log es None o no es str, lance TypeError.
  3. Si el archivo no existe, lance FileNotFoundError.
  4. Lea el archivo línea por línea y procese entradas con este formato:

YYYY-MM-DD HH:MM:SS | LEVEL | mensaje

Donde:

  • LEVEL puede ser: INFO, WARNING, ERROR

Ejemplo de líneas válidas:

2026-01-10 09:00:00 | INFO | Inicio del servicio
2026-01-10 09:01:02 | WARNING | Respuesta lenta
2026-01-10 09:02:10 | ERROR | Fallo al conectar a la base de datos

5. Debe ignorar cualquier línea inválida (por ejemplo, formato incorrecto, campos faltantes, LEVEL desconocido).

6. Debe devolver un diccionario con esta estructura:

{
"total": int,
"por_level": {"INFO": int, "WARNING": int, "ERROR": int},
"primer_timestamp": "YYYY-MM-DD HH:MM:SS" | None,
"ultimo_timestamp": "YYYY-MM-DD HH:MM:SS" | None,
}

Notas:

  • total cuenta solo líneas válidas.
  • primer_timestamp y ultimo_timestamp se calculan sobre líneas válidas.
  • Si no hay líneas válidas, ambos timestamps deben ser None.

Ejemplos

Si el log contiene 2 INFO, 1 WARNING y 3 ERROR válidos:

  • total → 6
  • por_level["ERROR"] → 3

Pistas

  1. Divide cada línea por "|" y luego aplica strip() para limpiar espacios.
  2. Valida que existan exactamente 3 partes: timestamp, level, mensaje.
  3. Para validar el timestamp sin complicarte, usa datetime.strptime.
  4. Si falla el parseo, esa línea es inválida → se ignora.

Solución explicada (paso a paso)

  1. Validar tipos y existencia del archivo.
  2. Inicializar contadores: total y por nivel.
  3. Recorrer el archivo línea por línea:
    • limpiar \n y espacios
    • separar por |
    • validar estructura y level permitido
    • validar timestamp con datetime.strptime
    • si todo OK: contar y actualizar primer/último timestamp
  4. Construir el diccionario final y devolverlo.

Python
from datetime import datetime
FORMAT_TS = "%Y-%m-%d %H:%M:%S"
LEVELS_VALIDOS = {"INFO", "WARNING", "ERROR"}
def metricas_logs(ruta_log: str) -> dict:
"""
Lee un archivo de logs y devuelve métricas básicas.
Formato esperado por línea:
YYYY-MM-DD HH:MM:SS | LEVEL | mensaje
Reglas:
- ruta_log debe ser str (no None)
- archivo inexistente -> FileNotFoundError
- ignora líneas inválidas
"""
if ruta_log is None or not isinstance(ruta_log, str):
raise TypeError("El parámetro 'ruta_log' debe ser una cadena (str) no vacía.")
total = 0
por_level = {"INFO": 0, "WARNING": 0, "ERROR": 0}
primer_ts_dt = None
ultimo_ts_dt = None
with open(ruta_log, mode="r", encoding="utf-8") as f:
for linea in f:
linea = linea.strip()
if not linea:
continue
partes = [p.strip() for p in linea.split("|")]
if len(partes) != 3:
continue
ts_str, level, _mensaje = partes
if level not in LEVELS_VALIDOS:
continue
try:
ts_dt = datetime.strptime(ts_str, FORMAT_TS)
except ValueError:
continue
# Línea válida
total += 1
por_level[level] += 1
if primer_ts_dt is None or ts_dt < primer_ts_dt:
primer_ts_dt = ts_dt
if ultimo_ts_dt is None or ts_dt > ultimo_ts_dt:
ultimo_ts_dt = ts_dt
return {
"total": total,
"por_level": por_level,
"primer_timestamp": primer_ts_dt.strftime(FORMAT_TS) if primer_ts_dt else None,
"ultimo_timestamp": ultimo_ts_dt.strftime(FORMAT_TS) if ultimo_ts_dt else None,
}

Python
import pytest
from reto_21_metricas_logs import metricas_logs
def test_metricas_logs_con_lineas_validas_e_invalidas(tmp_path):
contenido = (
"2026-01-10 09:00:00 | INFO | Inicio\n"
"2026-01-10 09:01:02 | WARNING | Lento\n"
"2026-01-10 09:02:10 | ERROR | Fallo DB\n"
"LINEA INVALIDA\n"
"2026-01-10 09:03:00 | DEBUG | No permitido\n"
"2026-01-10 xx:yy:zz | INFO | Timestamp malo\n"
"\n"
"2026-01-10 09:10:00 | ERROR | Otro error\n"
)
archivo = tmp_path / "app.log"
archivo.write_text(contenido, encoding="utf-8")
res = metricas_logs(str(archivo))
assert res["total"] == 4
assert res["por_level"]["INFO"] == 1
assert res["por_level"]["WARNING"] == 1
assert res["por_level"]["ERROR"] == 2
assert res["primer_timestamp"] == "2026-01-10 09:00:00"
assert res["ultimo_timestamp"] == "2026-01-10 09:10:00"
def test_archivo_no_existe():
with pytest.raises(FileNotFoundError):
metricas_logs("no_existe.log")
def test_ruta_none_lanza_typeerror():
with pytest.raises(TypeError):
metricas_logs(None)
def test_ruta_no_str_lanza_typeerror():
with pytest.raises(TypeError):
metricas_logs(123)

Ejecuta:

  • pytest -q

Variantes para subir de nivel (opcional)

  1. Top N mensajes de ERROR (por frecuencia)
  2. Métricas por hora (conteos agrupados por YYYY-MM-DD HH)
  3. Exportar resumen a CSV/JSON
  4. Soportar múltiples formatos de log (configurable)
  5. Streaming (procesar logs enormes sin cargar todo)

Lo que aprendiste

  • Parsing robusto con validación real
  • Ignorar errores sin ocultar fallos críticos (archivo inexistente)
  • Agregaciones útiles para observabilidad básica
  • Tests con archivos temporales

Accede al código completo y a los tests en GitHub para ejecutar y modificar la solución localmente.

Si quieres seguir en esta línea “muy real”, el próximo reto recomendado es:
Reto #22 — Resumir un CSV por grupo (group-by sin pandas) (agrupación + métricas + archivos).