Source code for evennia.server.throttle

import time
from collections import deque

from django.core.cache import caches
from django.utils.translation import gettext as _

from evennia.utils import logger


[docs]class Throttle: """ Keeps a running count of failed actions per IP address. Available methods indicate whether or not the number of failures exceeds a particular threshold. This version of the throttle is usable by both the terminal server as well as the web server, imposes limits on memory consumption by using deques with length limits instead of open-ended lists, and uses native Django caches for automatic key eviction and persistence configurability. """ error_msg = _("Too many failed attempts; you must wait a few minutes before trying again.")
[docs] def __init__(self, **kwargs): """ Allows setting of throttle parameters. Keyword Args: name (str): Name of this throttle. limit (int): Max number of failures before imposing limiter. If `None`, the throttle is disabled. timeout (int): number of timeout seconds after max number of tries has been reached. cache_size (int): Max number of attempts to record per IP within a rolling window; this is NOT the same as the limit after which the throttle is imposed! """ try: self.storage = caches["throttle"] except Exception: logger.log_trace("Throttle: Errors encountered; using default cache.") self.storage = caches["default"] self.name = kwargs.get("name", "undefined-throttle") self.limit = kwargs.get("limit", 5) self.cache_size = kwargs.get("cache_size", self.limit) self.timeout = kwargs.get("timeout", 5 * 60)
[docs] def get_cache_key(self, *args, **kwargs): """ Creates a 'prefixed' key containing arbitrary terms to prevent key collisions in the same namespace. """ return "-".join((self.name, *args))
[docs] def touch(self, key, *args, **kwargs): """ Refreshes the timeout on a given key and ensures it is recorded in the key register. Args: key(str): Key of entry to renew. """ cache_key = self.get_cache_key(key) if self.storage.touch(cache_key, self.timeout): self.record_key(key)
[docs] def get(self, ip=None): """ Convenience function that returns the storage table, or part of. Args: ip (str, optional): IP address of requestor Returns: storage (dict): When no IP is provided, returns a dict of all current IPs being tracked and the timestamps of their recent failures. timestamps (deque): When an IP is provided, returns a deque of timestamps of recent failures only for that IP. """ if ip: cache_key = self.get_cache_key(str(ip)) return self.storage.get(cache_key, deque(maxlen=self.cache_size)) else: keys_key = self.get_cache_key("keys") keys = self.storage.get_or_set(keys_key, set(), self.timeout) data = self.storage.get_many((self.get_cache_key(x) for x in keys)) found_keys = set(data.keys()) if len(keys) != len(found_keys): self.storage.set(keys_key, found_keys, self.timeout) return data
[docs] def update(self, ip, failmsg="Exceeded threshold."): """ Store the time of the latest failure. Args: ip (str): IP address of requestor failmsg (str, optional): Message to display in logs upon activation of throttle. Returns: None """ cache_key = self.get_cache_key(ip) # Get current status previously_throttled = self.check(ip) # Get previous failures, if any entries = self.storage.get(cache_key, []) entries.append(time.time()) # Store updated record self.storage.set(cache_key, deque(entries, maxlen=self.cache_size), self.timeout) # See if this update caused a change in status currently_throttled = self.check(ip) # If this makes it engage, log a single activation event if not previously_throttled and currently_throttled: logger.log_sec( f"Throttle Activated: {failmsg} (IP: {ip}, " f"{self.limit} hits in {self.timeout} seconds.)" ) self.record_ip(ip)
[docs] def remove(self, ip, *args, **kwargs): """ Clears data stored for an IP from the throttle. Args: ip(str): IP to clear. """ exists = self.get(ip) if not exists: return False cache_key = self.get_cache_key(ip) self.storage.delete(cache_key) self.unrecord_ip(ip) # Return True if NOT exists return not bool(self.get(ip))
[docs] def record_ip(self, ip, *args, **kwargs): """ Tracks keys as they are added to the cache (since there is no way to get a list of keys after-the-fact). Args: ip(str): IP being added to cache. This should be the original IP, not the cache-prefixed key. """ keys_key = self.get_cache_key("keys") keys = self.storage.get(keys_key, set()) keys.add(ip) self.storage.set(keys_key, keys, self.timeout) return True
[docs] def unrecord_ip(self, ip, *args, **kwargs): """ Forces removal of a key from the key registry. Args: ip(str): IP to remove from list of keys. """ keys_key = self.get_cache_key("keys") keys = self.storage.get(keys_key, set()) try: keys.remove(ip) self.storage.set(keys_key, keys, self.timeout) return True except KeyError: return False
[docs] def check(self, ip): """ This will check the session's address against the storage dictionary to check they haven't spammed too many fails recently. Args: ip (str): IP address of requestor Returns: throttled (bool): True if throttling is active, False otherwise. """ if self.limit is None: # throttle is disabled return False now = time.time() ip = str(ip) cache_key = self.get_cache_key(ip) # checking mode latest_fails = self.storage.get(cache_key) if latest_fails and len(latest_fails) >= self.limit: # too many fails recently if now - latest_fails[-1] < self.timeout: # too soon - timeout in play self.touch(cache_key) return True else: # timeout has passed. clear faillist self.remove(ip) return False else: return False