SolveConPython

Python Reto #26 — Rate limiter (Token Bucket simple)

Nivel: Intermedio+
Tema: Control de tasa (rate limiting), tiempo, estado, diseño defensivo, tests con pytest
Objetivo: Implementar un rate limiter estilo Token Bucket para limitar cuántas operaciones se permiten por unidad de tiempo.

Reto #26 Rate limiter (Token Bucket simple)
Reto #26 Rate limiter (Token Bucket simple)

Concepto (rápido)

Un Token Bucket tiene:

  • una capacidad máxima de tokens,
  • una tasa de recarga (tokens por segundo),
  • cada operación consume 1 token,
  • si no hay tokens disponibles, la operación se rechaza.

Enunciado

Crea una clase TokenBucketRateLimiter con:

Constructor

TokenBucketRateLimiter(capacidad, tokens_por_segundo, ahora_fn=None)

Reglas:

  1. capacidad debe ser int > 0 (si no: TypeError o ValueError)
  2. tokens_por_segundo debe ser float o int > 0 (si no: TypeError o ValueError)
  3. ahora_fn (opcional) debe ser callable si se proporciona.
    • Si no se proporciona, usa time.time.
    • Esto permite tests deterministas.

Método

allow() -> bool

Debe:

  1. “Recargar” tokens según el tiempo transcurrido desde la última actualización:
    • tokens += (tiempo_transcurrido * tokens_por_segundo)
    • nunca superar capacidad
  2. Si hay al menos 1 token disponible:
    • consumir 1 token,
    • devolver True
  3. Si no:
    • devolver False
  4. Debe funcionar correctamente aunque se llame muchas veces seguidas.

Ejemplos

rl = TokenBucketRateLimiter(capacidad=3, tokens_por_segundo=1)

rl.allow() # True (2 tokens quedan)
rl.allow() # True (1 token queda)
rl.allow() # True (0 tokens quedan)
rl.allow() # False (sin tokens)

# tras ~1 segundo, debería recargar ~1 token

Pistas

  1. Guarda:
    • self.tokens como float (para recargas fraccionales),
    • self.ultimo_tiempo
  2. En allow():
    • calcula delta = ahora - ultimo_tiempo,
    • recarga,
    • actualiza ultimo_tiempo,
    • decide si permitir.
  3. Para tests, inyecta ahora_fn que tú puedas controlar.

Solución explicada (paso a paso)

  1. Al crear el limiter:
    • inicia tokens = capacidad
    • ultimo_tiempo = ahora_fn()
  2. En cada allow():
    • calcula cuánto tiempo pasó,
    • recarga tokens proporcionalmente,
    • limita tokens a capacidad,
    • si tokens >= 1: consume y permite,
    • si no: rechaza.

Texto plano
import time
class TokenBucketRateLimiter:
"""
Rate limiter tipo Token Bucket.
- capacidad: tokens máximos
- tokens_por_segundo: ritmo de recarga
- allow(): consume 1 token si hay disponibilidad
"""
def __init__(self, capacidad: int, tokens_por_segundo: float, ahora_fn=None):
if capacidad is None or not isinstance(capacidad, int):
raise TypeError("El parámetro 'capacidad' debe ser un entero (int).")
if capacidad <= 0:
raise ValueError("'capacidad' debe ser mayor que 0.")
if tokens_por_segundo is None or not isinstance(tokens_por_segundo, (int, float)):
raise TypeError("El parámetro 'tokens_por_segundo' debe ser numérico (int o float).")
if tokens_por_segundo <= 0:
raise ValueError("'tokens_por_segundo' debe ser mayor que 0.")
if ahora_fn is not None and not callable(ahora_fn):
raise TypeError("El parámetro 'ahora_fn' debe ser callable si se proporciona.")
self.capacidad = capacidad
self.tokens_por_segundo = float(tokens_por_segundo)
self.ahora_fn = ahora_fn or time.time
self.tokens = float(capacidad)
self.ultimo_tiempo = float(self.ahora_fn())
def allow(self) -> bool:
ahora = float(self.ahora_fn())
delta = ahora - self.ultimo_tiempo
if delta < 0:
# Reloj "retrocede": tratamos como 0 para no penalizar
delta = 0.0
# Recarga proporcional
self.tokens = min(self.capacidad, self.tokens + (delta * self.tokens_por_segundo))
self.ultimo_tiempo = ahora
if self.tokens >= 1.0:
self.tokens -= 1.0
return True
return False

Python
import pytest
from reto_26_rate_limiter_token_bucket import TokenBucketRateLimiter
class RelojFake:
def __init__(self, t0=0.0):
self.t = float(t0)
def ahora(self):
return self.t
def avanzar(self, segundos):
self.t += float(segundos)
def test_consumo_inicial_y_bloqueo():
reloj = RelojFake(0.0)
rl = TokenBucketRateLimiter(capacidad=3, tokens_por_segundo=1, ahora_fn=reloj.ahora)
assert rl.allow() is True
assert rl.allow() is True
assert rl.allow() is True
assert rl.allow() is False # ya no quedan tokens
def test_recarga_con_tiempo():
reloj = RelojFake(0.0)
rl = TokenBucketRateLimiter(capacidad=2, tokens_por_segundo=1, ahora_fn=reloj.ahora)
assert rl.allow() is True
assert rl.allow() is True
assert rl.allow() is False
reloj.avanzar(1.0) # recarga ~1 token
assert rl.allow() is True
assert rl.allow() is False
def test_recarga_no_supera_capacidad():
reloj = RelojFake(0.0)
rl = TokenBucketRateLimiter(capacidad=2, tokens_por_segundo=10, ahora_fn=reloj.ahora)
assert rl.allow() is True
assert rl.allow() is True
assert rl.allow() is False
reloj.avanzar(10.0) # debería recargar mucho, pero tope=capacidad
assert rl.allow() is True
assert rl.allow() is True
assert rl.allow() is False
def test_validaciones_constructor():
with pytest.raises(TypeError):
TokenBucketRateLimiter(None, 1)
with pytest.raises(ValueError):
TokenBucketRateLimiter(0, 1)
with pytest.raises(TypeError):
TokenBucketRateLimiter(1, None)
with pytest.raises(ValueError):
TokenBucketRateLimiter(1, 0)
with pytest.raises(TypeError):
TokenBucketRateLimiter(1, 1, ahora_fn="no_callable")

Ejecuta:

  • pytest -q

Variantes para subir de nivel (opcional)

  1. Coste por operación: allow(cost=1) consumiendo más de 1 token
  2. Multiple buckets por “cliente” (diccionario de buckets por API key)
  3. Devolver tiempo estimado de espera cuando allow() es False
  4. Persistencia (guardar estado entre ejecuciones)

Lo que aprendiste

  • Modelo Token Bucket y por qué se usa en APIs
  • Cómo manejar tiempo y estado de forma segura
  • Tests deterministas inyectando un reloj
  • Validación defensiva de parámetros

Accede al código completo y a los tests en GitHub para ejecutar y modificar la solución localmente.

Siguiente reto recomendado: Reto #27 — Retry con backoff exponencial (sin librerías) para patrones reales de resiliencia en redes y automatización.