#!/usr/bin/env python3 """ Rate limiting utilities for API requests """ import time import threading from typing import Dict, Optional from collections import deque from datetime import datetime, timedelta import logging logger = logging.getLogger(__name__) class RateLimiter: """Token bucket rate limiter""" def __init__(self, max_requests: int, time_window_seconds: int): """ Initialize rate limiter Args: max_requests: Maximum number of requests allowed time_window_seconds: Time window in seconds """ self.max_requests = max_requests self.time_window = time_window_seconds self.requests = deque() self._lock = threading.Lock() def is_allowed(self) -> bool: """Check if a request is allowed""" with self._lock: now = time.time() # Remove old requests outside the time window while self.requests and self.requests[0] <= now - self.time_window: self.requests.popleft() # Check if we can make a new request if len(self.requests) < self.max_requests: self.requests.append(now) return True return False def wait_time(self) -> float: """Get the time to wait before next request is allowed""" with self._lock: if len(self.requests) < self.max_requests: return 0.0 # Time until the oldest request expires oldest_request = self.requests[0] return max(0.0, (oldest_request + self.time_window) - time.time()) def wait_if_needed(self): """Block until a request is allowed""" wait_time = self.wait_time() if wait_time > 0: logger.info(f"Rate limit reached, waiting {wait_time:.2f} seconds") time.sleep(wait_time) class AdaptiveRateLimiter: """Adaptive rate limiter that adjusts based on response times""" def __init__(self, initial_rate: float = 1.0, min_rate: float = 0.1, max_rate: float = 10.0): """ Initialize adaptive rate limiter Args: initial_rate: Initial requests per second min_rate: Minimum requests per second max_rate: Maximum requests per second """ self.current_rate = initial_rate self.min_rate = min_rate self.max_rate = max_rate self.last_request_time = 0.0 self.response_times = deque(maxlen=10) self._lock = threading.Lock() def wait_and_record(self, response_time: Optional[float] = None): """Wait for rate limit and record response time""" with self._lock: now = time.time() # Calculate wait time based on current rate time_since_last = now - self.last_request_time min_interval = 1.0 / self.current_rate if time_since_last < min_interval: wait_time = min_interval - time_since_last time.sleep(wait_time) now = time.time() self.last_request_time = now # Record response time and adjust rate if response_time is not None: self.response_times.append(response_time) self._adjust_rate() def _adjust_rate(self): """Adjust rate based on recent response times""" if len(self.response_times) < 3: return avg_response_time = sum(self.response_times) / len(self.response_times) # Decrease rate if responses are slow if avg_response_time > 5.0: # 5 seconds self.current_rate = max(self.min_rate, self.current_rate * 0.8) logger.info(f"Decreased rate to {self.current_rate:.2f} req/s due to slow responses") # Increase rate if responses are fast elif avg_response_time < 1.0: # 1 second self.current_rate = min(self.max_rate, self.current_rate * 1.1) logger.debug(f"Increased rate to {self.current_rate:.2f} req/s") class RequestTracker: """Track API request statistics""" def __init__(self): self.total_requests = 0 self.successful_requests = 0 self.failed_requests = 0 self.total_response_time = 0.0 self.last_request_time = None self.error_count_by_type = {} self._lock = threading.Lock() def record_request(self, success: bool, response_time: float, error_type: Optional[str] = None): """Record a request""" with self._lock: self.total_requests += 1 self.total_response_time += response_time self.last_request_time = datetime.now() if success: self.successful_requests += 1 else: self.failed_requests += 1 if error_type: self.error_count_by_type[error_type] = self.error_count_by_type.get(error_type, 0) + 1 def get_stats(self) -> Dict[str, any]: """Get request statistics""" with self._lock: if self.total_requests == 0: return { 'total_requests': 0, 'success_rate': 0.0, 'average_response_time': 0.0, 'last_request_time': None, 'error_breakdown': {} } return { 'total_requests': self.total_requests, 'successful_requests': self.successful_requests, 'failed_requests': self.failed_requests, 'success_rate': self.successful_requests / self.total_requests, 'average_response_time': self.total_response_time / self.total_requests, 'last_request_time': self.last_request_time.isoformat() if self.last_request_time else None, 'error_breakdown': dict(self.error_count_by_type) }