Nivel: Avanzado (práctico)
Tema: Colas, workers, reintentos con backoff, diseño de clases, control de flujo, tests deterministas
Objetivo: Construir una cola de tareas sencilla que ejecute “jobs” con uno o más workers, reintente fallos temporales y exponga métricas.
Para mantenerlo testeable y sin concurrencia real, este reto simula “workers” procesando en pasos (ticks). No usamos threads.
Enunciado
Implementa una clase TaskQueue que soporte:
1) Crear la cola
TaskQueue(max_reintentos=3, delay_inicial=0.1, factor=2.0, ahora_fn=None)
Reglas:
max_reintentosdebe serint>= 0delay_inicialnumérico >= 0factornumérico > 0ahora_fncallable opcional (defaulttime.time)- La cola debe ser determinista en tests usando
ahora_fn
2) Encolar tareas
enqueue(func, *, task_id=None, excepciones=(Exception,)) -> str
funcdebe ser callable y se ejecuta comofunc()task_idsi no se pasa, se genera automáticamente (por ejemplo"t1","t2", …)excepcioneses la tupla de excepciones consideradas “reintentables”- Devuelve el
task_id
3) Procesar (worker step)
tick() -> None
En cada tick():
- Si hay tareas “ready” (listas para correr), ejecuta una (simula 1 worker por tick)
- Si la tarea tiene
next_run_aten el futuro, no se ejecuta aún - Si
func()termina bien → la tarea pasa aDONE - Si lanza una excepción en
excepciones:- si aún quedan reintentos → pasa a
RETRYy se reprograma con backoff exponencial - si ya no quedan → pasa a
FAILEDy guarda el error final
- si aún quedan reintentos → pasa a
- Si lanza una excepción que no está en
excepciones:- pasa a
FAILEDinmediatamente (sin reintentos)
- pasa a
4) Consultar estado
status(task_id) -> dict
Debe devolver:
{ "task_id": str, "state": "PENDING" | "RETRY" | "DONE" | "FAILED", "attempts": int, # intentos ejecutados (incluye fallidos) "retries_left": int, "next_run_at": float | None, "last_error": str | None,}
5) Métricas
metrics() -> dict
Debe devolver:
{"pending": int, "retry": int, "done": int, "failed": int, "total": int}
Estados y reglas clave
- Una tarea recién encolada empieza en
PENDING, conattempts=0. - Cada ejecución incrementa
attempts. - Backoff: delays por reintento:
- intento 1 fallido → espera
delay_inicial - intento 2 fallido → espera
delay_inicial * factor - intento 3 fallido → espera
delay_inicial * factor^2
- intento 1 fallido → espera
next_run_at = ahora + delay- La cola debe procesar tareas en orden de inserción, pero respetando
next_run_at.
Pistas
- Representa cada tarea como dict interno:
func,state,attempts,retries_left,next_run_at,last_error,excepciones
- En
tick():- busca la primera tarea con estado
PENDINGoRETRYcuyanext_run_atseaNoneo <= ahora
- busca la primera tarea con estado
- Para tests, crea un
RelojFakeconavanzar().
import timeclass TaskQueue: def __init__(self, max_reintentos=3, delay_inicial=0.1, factor=2.0, ahora_fn=None): if max_reintentos is None or not isinstance(max_reintentos, int): raise TypeError("'max_reintentos' debe ser int.") if max_reintentos < 0: raise ValueError("'max_reintentos' debe ser >= 0.") if delay_inicial is None or not isinstance(delay_inicial, (int, float)): raise TypeError("'delay_inicial' debe ser numérico.") if delay_inicial < 0: raise ValueError("'delay_inicial' debe ser >= 0.") if factor is None or not isinstance(factor, (int, float)): raise TypeError("'factor' debe ser numérico.") if factor <= 0: raise ValueError("'factor' debe ser > 0.") if ahora_fn is not None and not callable(ahora_fn): raise TypeError("'ahora_fn' debe ser callable si se proporciona.") self.max_reintentos = max_reintentos self.delay_inicial = float(delay_inicial) self.factor = float(factor) self.ahora_fn = ahora_fn or time.time self._tasks = [] # lista de dicts, preserva orden self._id_counter = 0 def _new_id(self) -> str: self._id_counter += 1 return f"t{self._id_counter}" def _validar_excepciones(self, excepciones): if not isinstance(excepciones, tuple) or not excepciones: raise TypeError("'excepciones' debe ser una tupla no vacía de excepciones.") for exc in excepciones: if not isinstance(exc, type) or not issubclass(exc, BaseException): raise TypeError("'excepciones' debe contener solo clases de excepción.") def enqueue(self, func, *, task_id=None, excepciones=(Exception,)) -> str: if not callable(func): raise TypeError("El parámetro 'func' debe ser callable.") self._validar_excepciones(excepciones) if task_id is None: task_id = self._new_id() elif not isinstance(task_id, str) or task_id.strip() == "": raise TypeError("'task_id' debe ser str no vacío.") # Evitar ids duplicados if any(t["task_id"] == task_id for t in self._tasks): raise ValueError("task_id ya existe en la cola.") self._tasks.append( { "task_id": task_id, "func": func, "excepciones": excepciones, "state": "PENDING", "attempts": 0, "retries_left": self.max_reintentos, "next_run_at": None, "last_error": None, } ) return task_id def _find_task(self, task_id): for t in self._tasks: if t["task_id"] == task_id: return t return None def status(self, task_id) -> dict: t = self._find_task(task_id) if t is None: raise KeyError("task_id no encontrado.") return { "task_id": t["task_id"], "state": t["state"], "attempts": t["attempts"], "retries_left": t["retries_left"], "next_run_at": t["next_run_at"], "last_error": t["last_error"], } def metrics(self) -> dict: counts = {"PENDING": 0, "RETRY": 0, "DONE": 0, "FAILED": 0} for t in self._tasks: counts[t["state"]] += 1 return { "pending": counts["PENDING"], "retry": counts["RETRY"], "done": counts["DONE"], "failed": counts["FAILED"], "total": len(self._tasks), } def tick(self) -> None: ahora = float(self.ahora_fn()) # Buscar primera tarea elegible (PENDING/RETRY) cuyo next_run_at <= ahora elegible = None for t in self._tasks: if t["state"] not in ("PENDING", "RETRY"): continue nra = t["next_run_at"] if nra is None or nra <= ahora: elegible = t break if elegible is None: return # nada que ejecutar ahora t = elegible t["attempts"] += 1 t["next_run_at"] = None # se intenta ejecutar ya try: t["func"]() except t["excepciones"] as e: # error reintentable t["last_error"] = f"{type(e).__name__}: {e}" if t["retries_left"] > 0: # calcular delay según número de fallos ya ocurridos (attempts-1) fallos = t["attempts"] - 1 delay = self.delay_inicial * (self.factor ** max(0, fallos - 0)) t["retries_left"] -= 1 t["state"] = "RETRY" t["next_run_at"] = ahora + delay else: t["state"] = "FAILED" return except Exception as e: # error NO reintentable (no está en excepciones) t["last_error"] = f"{type(e).__name__}: {e}" t["state"] = "FAILED" return # éxito t["state"] = "DONE" t["last_error"] = None
import pytestfrom reto_29_task_queue import TaskQueueclass RelojFake: def __init__(self, t0=0.0): self.t = float(t0) def ahora(self): return self.t def avanzar(self, s): self.t += float(s)def test_tarea_ok(): reloj = RelojFake(0.0) q = TaskQueue(max_reintentos=3, delay_inicial=1.0, factor=2.0, ahora_fn=reloj.ahora) estado = {"n": 0} def ok(): estado["n"] += 1 tid = q.enqueue(ok, excepciones=(ValueError,)) q.tick() st = q.status(tid) assert st["state"] == "DONE" assert st["attempts"] == 1 assert estado["n"] == 1def test_reintentos_con_backoff(): reloj = RelojFake(0.0) q = TaskQueue(max_reintentos=2, delay_inicial=1.0, factor=2.0, ahora_fn=reloj.ahora) estado = {"n": 0} def falla_dos_y_luego_ok(): estado["n"] += 1 if estado["n"] <= 2: raise ValueError("temporal") tid = q.enqueue(falla_dos_y_luego_ok, excepciones=(ValueError,)) # intento 1 -> falla, reprograma +1s q.tick() st = q.status(tid) assert st["state"] == "RETRY" assert st["attempts"] == 1 assert st["retries_left"] == 1 # antes de 1s no debe ejecutar q.tick() assert q.status(tid)["attempts"] == 1 reloj.avanzar(1.0) # intento 2 -> falla, reprograma +2s q.tick() st = q.status(tid) assert st["state"] == "RETRY" assert st["attempts"] == 2 assert st["retries_left"] == 0 reloj.avanzar(2.0) # intento 3 -> ok => DONE q.tick() st = q.status(tid) assert st["state"] == "DONE" assert st["attempts"] == 3def test_agota_reintentos_y_falla(): reloj = RelojFake(0.0) q = TaskQueue(max_reintentos=1, delay_inicial=1.0, factor=2.0, ahora_fn=reloj.ahora) def siempre_falla(): raise ValueError("siempre") tid = q.enqueue(siempre_falla, excepciones=(ValueError,)) # intento 1 falla, queda 0 retries q.tick() assert q.status(tid)["state"] == "RETRY" reloj.avanzar(1.0) # intento 2 falla y ya no hay retries -> FAILED q.tick() assert q.status(tid)["state"] == "FAILED"def test_excepcion_no_reintentable_falla_directo(): reloj = RelojFake(0.0) q = TaskQueue(max_reintentos=3, delay_inicial=1.0, factor=2.0, ahora_fn=reloj.ahora) def lanza_keyerror(): raise KeyError("no reintentable") tid = q.enqueue(lanza_keyerror, excepciones=(ValueError,)) q.tick() st = q.status(tid) assert st["state"] == "FAILED" assert st["attempts"] == 1def test_metrics(): reloj = RelojFake(0.0) q = TaskQueue(max_reintentos=0, delay_inicial=0.0, factor=2.0, ahora_fn=reloj.ahora) q.enqueue(lambda: None, task_id="a") q.enqueue(lambda: (_ for _ in ()).throw(ValueError("x")), task_id="b", excepciones=(ValueError,)) q.tick() # ejecuta "a" -> DONE q.tick() # ejecuta "b" -> FAILED (sin reintentos) m = q.metrics() assert m["done"] == 1 assert m["failed"] == 1 assert m["total"] == 2
Ejecuta:
pytest -q
Variantes para subir de nivel (opcional)
- Multiple workers por tick:
tick(n=5)procesa hasta N tareas elegibles. - Prioridades: tareas con
prioritymás alta se ejecutan antes. - Dead-letter queue: tareas fallidas se exportan a un archivo CSV/JSON.
- Integración con Circuit Breaker (Reto #28) por tipo de tarea.
- Persistencia: guardar/recuperar estado de la cola en disco.
Lo que aprendiste
- Modelar un sistema de cola con estados y reintentos
- Programación determinista con reloj inyectable
- Backoff exponencial sin depender de librerías
- Tests realistas sin esperar tiempo real
Accede al código completo y a los tests en GitHub para ejecutar y modificar la solución localmente.
Siguiente reto recomendado: Reto #30 — Scheduler (planificador) de tareas con cron simple para unir cola + tiempo + ejecución programada.