SolveConPython

Python Reto #29 — Cola de tareas (queue) con workers y reintentos

Nivel: Avanzado (práctico)
Tema: Colas, workers, reintentos con backoff, diseño de clases, control de flujo, tests deterministas
Objetivo: Construir una cola de tareas sencilla que ejecute “jobs” con uno o más workers, reintente fallos temporales y exponga métricas.

Para mantenerlo testeable y sin concurrencia real, este reto simula “workers” procesando en pasos (ticks). No usamos threads.

Enunciado

Implementa una clase TaskQueue que soporte:

1) Crear la cola

TaskQueue(max_reintentos=3, delay_inicial=0.1, factor=2.0, ahora_fn=None)

Reglas:

  • max_reintentos debe ser int >= 0
  • delay_inicial numérico >= 0
  • factor numérico > 0
  • ahora_fn callable opcional (default time.time)
  • La cola debe ser determinista en tests usando ahora_fn

2) Encolar tareas

enqueue(func, *, task_id=None, excepciones=(Exception,)) -> str

  • func debe ser callable y se ejecuta como func()
  • task_id si no se pasa, se genera automáticamente (por ejemplo "t1", "t2", …)
  • excepciones es la tupla de excepciones consideradas “reintentables”
  • Devuelve el task_id

3) Procesar (worker step)

tick() -> None

En cada tick():

  • Si hay tareas “ready” (listas para correr), ejecuta una (simula 1 worker por tick)
  • Si la tarea tiene next_run_at en el futuro, no se ejecuta aún
  • Si func() termina bien → la tarea pasa a DONE
  • Si lanza una excepción en excepciones:
    • si aún quedan reintentos → pasa a RETRY y se reprograma con backoff exponencial
    • si ya no quedan → pasa a FAILED y guarda el error final
  • Si lanza una excepción que no está en excepciones:
    • pasa a FAILED inmediatamente (sin reintentos)

4) Consultar estado

status(task_id) -> dict

Debe devolver:

Python
{
"task_id": str,
"state": "PENDING" | "RETRY" | "DONE" | "FAILED",
"attempts": int, # intentos ejecutados (incluye fallidos)
"retries_left": int,
"next_run_at": float | None,
"last_error": str | None,
}

5) Métricas

metrics() -> dict

Debe devolver:

{"pending": int, "retry": int, "done": int, "failed": int, "total": int}

Estados y reglas clave

  • Una tarea recién encolada empieza en PENDING, con attempts=0.
  • Cada ejecución incrementa attempts.
  • Backoff: delays por reintento:
    • intento 1 fallido → espera delay_inicial
    • intento 2 fallido → espera delay_inicial * factor
    • intento 3 fallido → espera delay_inicial * factor^2
  • next_run_at = ahora + delay
  • La cola debe procesar tareas en orden de inserción, pero respetando next_run_at.

Pistas

  1. Representa cada tarea como dict interno:
    • func, state, attempts, retries_left, next_run_at, last_error, excepciones
  2. En tick():
    • busca la primera tarea con estado PENDING o RETRY cuya next_run_at sea None o <= ahora
  3. Para tests, crea un RelojFake con avanzar().

Python
import time
class TaskQueue:
def __init__(self, max_reintentos=3, delay_inicial=0.1, factor=2.0, ahora_fn=None):
if max_reintentos is None or not isinstance(max_reintentos, int):
raise TypeError("'max_reintentos' debe ser int.")
if max_reintentos < 0:
raise ValueError("'max_reintentos' debe ser >= 0.")
if delay_inicial is None or not isinstance(delay_inicial, (int, float)):
raise TypeError("'delay_inicial' debe ser numérico.")
if delay_inicial < 0:
raise ValueError("'delay_inicial' debe ser >= 0.")
if factor is None or not isinstance(factor, (int, float)):
raise TypeError("'factor' debe ser numérico.")
if factor <= 0:
raise ValueError("'factor' debe ser > 0.")
if ahora_fn is not None and not callable(ahora_fn):
raise TypeError("'ahora_fn' debe ser callable si se proporciona.")
self.max_reintentos = max_reintentos
self.delay_inicial = float(delay_inicial)
self.factor = float(factor)
self.ahora_fn = ahora_fn or time.time
self._tasks = [] # lista de dicts, preserva orden
self._id_counter = 0
def _new_id(self) -> str:
self._id_counter += 1
return f"t{self._id_counter}"
def _validar_excepciones(self, excepciones):
if not isinstance(excepciones, tuple) or not excepciones:
raise TypeError("'excepciones' debe ser una tupla no vacía de excepciones.")
for exc in excepciones:
if not isinstance(exc, type) or not issubclass(exc, BaseException):
raise TypeError("'excepciones' debe contener solo clases de excepción.")
def enqueue(self, func, *, task_id=None, excepciones=(Exception,)) -> str:
if not callable(func):
raise TypeError("El parámetro 'func' debe ser callable.")
self._validar_excepciones(excepciones)
if task_id is None:
task_id = self._new_id()
elif not isinstance(task_id, str) or task_id.strip() == "":
raise TypeError("'task_id' debe ser str no vacío.")
# Evitar ids duplicados
if any(t["task_id"] == task_id for t in self._tasks):
raise ValueError("task_id ya existe en la cola.")
self._tasks.append(
{
"task_id": task_id,
"func": func,
"excepciones": excepciones,
"state": "PENDING",
"attempts": 0,
"retries_left": self.max_reintentos,
"next_run_at": None,
"last_error": None,
}
)
return task_id
def _find_task(self, task_id):
for t in self._tasks:
if t["task_id"] == task_id:
return t
return None
def status(self, task_id) -> dict:
t = self._find_task(task_id)
if t is None:
raise KeyError("task_id no encontrado.")
return {
"task_id": t["task_id"],
"state": t["state"],
"attempts": t["attempts"],
"retries_left": t["retries_left"],
"next_run_at": t["next_run_at"],
"last_error": t["last_error"],
}
def metrics(self) -> dict:
counts = {"PENDING": 0, "RETRY": 0, "DONE": 0, "FAILED": 0}
for t in self._tasks:
counts[t["state"]] += 1
return {
"pending": counts["PENDING"],
"retry": counts["RETRY"],
"done": counts["DONE"],
"failed": counts["FAILED"],
"total": len(self._tasks),
}
def tick(self) -> None:
ahora = float(self.ahora_fn())
# Buscar primera tarea elegible (PENDING/RETRY) cuyo next_run_at <= ahora
elegible = None
for t in self._tasks:
if t["state"] not in ("PENDING", "RETRY"):
continue
nra = t["next_run_at"]
if nra is None or nra <= ahora:
elegible = t
break
if elegible is None:
return # nada que ejecutar ahora
t = elegible
t["attempts"] += 1
t["next_run_at"] = None # se intenta ejecutar ya
try:
t["func"]()
except t["excepciones"] as e:
# error reintentable
t["last_error"] = f"{type(e).__name__}: {e}"
if t["retries_left"] > 0:
# calcular delay según número de fallos ya ocurridos (attempts-1)
fallos = t["attempts"] - 1
delay = self.delay_inicial * (self.factor ** max(0, fallos - 0))
t["retries_left"] -= 1
t["state"] = "RETRY"
t["next_run_at"] = ahora + delay
else:
t["state"] = "FAILED"
return
except Exception as e:
# error NO reintentable (no está en excepciones)
t["last_error"] = f"{type(e).__name__}: {e}"
t["state"] = "FAILED"
return
# éxito
t["state"] = "DONE"
t["last_error"] = None

Python
import pytest
from reto_29_task_queue import TaskQueue
class RelojFake:
def __init__(self, t0=0.0):
self.t = float(t0)
def ahora(self):
return self.t
def avanzar(self, s):
self.t += float(s)
def test_tarea_ok():
reloj = RelojFake(0.0)
q = TaskQueue(max_reintentos=3, delay_inicial=1.0, factor=2.0, ahora_fn=reloj.ahora)
estado = {"n": 0}
def ok():
estado["n"] += 1
tid = q.enqueue(ok, excepciones=(ValueError,))
q.tick()
st = q.status(tid)
assert st["state"] == "DONE"
assert st["attempts"] == 1
assert estado["n"] == 1
def test_reintentos_con_backoff():
reloj = RelojFake(0.0)
q = TaskQueue(max_reintentos=2, delay_inicial=1.0, factor=2.0, ahora_fn=reloj.ahora)
estado = {"n": 0}
def falla_dos_y_luego_ok():
estado["n"] += 1
if estado["n"] <= 2:
raise ValueError("temporal")
tid = q.enqueue(falla_dos_y_luego_ok, excepciones=(ValueError,))
# intento 1 -> falla, reprograma +1s
q.tick()
st = q.status(tid)
assert st["state"] == "RETRY"
assert st["attempts"] == 1
assert st["retries_left"] == 1
# antes de 1s no debe ejecutar
q.tick()
assert q.status(tid)["attempts"] == 1
reloj.avanzar(1.0)
# intento 2 -> falla, reprograma +2s
q.tick()
st = q.status(tid)
assert st["state"] == "RETRY"
assert st["attempts"] == 2
assert st["retries_left"] == 0
reloj.avanzar(2.0)
# intento 3 -> ok => DONE
q.tick()
st = q.status(tid)
assert st["state"] == "DONE"
assert st["attempts"] == 3
def test_agota_reintentos_y_falla():
reloj = RelojFake(0.0)
q = TaskQueue(max_reintentos=1, delay_inicial=1.0, factor=2.0, ahora_fn=reloj.ahora)
def siempre_falla():
raise ValueError("siempre")
tid = q.enqueue(siempre_falla, excepciones=(ValueError,))
# intento 1 falla, queda 0 retries
q.tick()
assert q.status(tid)["state"] == "RETRY"
reloj.avanzar(1.0)
# intento 2 falla y ya no hay retries -> FAILED
q.tick()
assert q.status(tid)["state"] == "FAILED"
def test_excepcion_no_reintentable_falla_directo():
reloj = RelojFake(0.0)
q = TaskQueue(max_reintentos=3, delay_inicial=1.0, factor=2.0, ahora_fn=reloj.ahora)
def lanza_keyerror():
raise KeyError("no reintentable")
tid = q.enqueue(lanza_keyerror, excepciones=(ValueError,))
q.tick()
st = q.status(tid)
assert st["state"] == "FAILED"
assert st["attempts"] == 1
def test_metrics():
reloj = RelojFake(0.0)
q = TaskQueue(max_reintentos=0, delay_inicial=0.0, factor=2.0, ahora_fn=reloj.ahora)
q.enqueue(lambda: None, task_id="a")
q.enqueue(lambda: (_ for _ in ()).throw(ValueError("x")), task_id="b", excepciones=(ValueError,))
q.tick() # ejecuta "a" -> DONE
q.tick() # ejecuta "b" -> FAILED (sin reintentos)
m = q.metrics()
assert m["done"] == 1
assert m["failed"] == 1
assert m["total"] == 2

Ejecuta:

  • pytest -q

Variantes para subir de nivel (opcional)

  1. Multiple workers por tick: tick(n=5) procesa hasta N tareas elegibles.
  2. Prioridades: tareas con priority más alta se ejecutan antes.
  3. Dead-letter queue: tareas fallidas se exportan a un archivo CSV/JSON.
  4. Integración con Circuit Breaker (Reto #28) por tipo de tarea.
  5. Persistencia: guardar/recuperar estado de la cola en disco.

Lo que aprendiste

  • Modelar un sistema de cola con estados y reintentos
  • Programación determinista con reloj inyectable
  • Backoff exponencial sin depender de librerías
  • Tests realistas sin esperar tiempo real

Accede al código completo y a los tests en GitHub para ejecutar y modificar la solución localmente.

Siguiente reto recomendado: Reto #30 — Scheduler (planificador) de tareas con cron simple para unir cola + tiempo + ejecución programada.