Source code for evennia.server.throttle
from collections import defaultdict, deque
from evennia.utils import logger
import time
[docs]class Throttle(object):
"""
Keeps a running count of failed actions per IP address.
Available methods indicate whether or not the number of failures exceeds a
particular threshold.
This version of the throttle is usable by both the terminal server as well
as the web server, imposes limits on memory consumption by using deques
with length limits instead of open-ended lists, and removes sparse keys when
no recent failures have been recorded.
"""
error_msg = "Too many failed attempts; you must wait a few minutes before trying again."
[docs] def __init__(self, **kwargs):
"""
Allows setting of throttle parameters.
Keyword Args:
limit (int): Max number of failures before imposing limiter
timeout (int): number of timeout seconds after
max number of tries has been reached.
cache_size (int): Max number of attempts to record per IP within a
rolling window; this is NOT the same as the limit after which
the throttle is imposed!
"""
self.storage = defaultdict(deque)
self.cache_size = self.limit = kwargs.get("limit", 5)
self.timeout = kwargs.get("timeout", 5 * 60)
[docs] def get(self, ip=None):
"""
Convenience function that returns the storage table, or part of.
Args:
ip (str, optional): IP address of requestor
Returns:
storage (dict): When no IP is provided, returns a dict of all
current IPs being tracked and the timestamps of their recent
failures.
timestamps (deque): When an IP is provided, returns a deque of
timestamps of recent failures only for that IP.
"""
if ip:
return self.storage.get(ip, deque(maxlen=self.cache_size))
else:
return self.storage
[docs] def update(self, ip, failmsg="Exceeded threshold."):
"""
Store the time of the latest failure.
Args:
ip (str): IP address of requestor
failmsg (str, optional): Message to display in logs upon activation
of throttle.
Returns:
None
"""
# Get current status
previously_throttled = self.check(ip)
# Enforce length limits
if not self.storage[ip].maxlen:
self.storage[ip] = deque(maxlen=self.cache_size)
self.storage[ip].append(time.time())
# See if this update caused a change in status
currently_throttled = self.check(ip)
# If this makes it engage, log a single activation event
if not previously_throttled and currently_throttled:
logger.log_sec(
"Throttle Activated: %s (IP: %s, %i hits in %i seconds.)"
% (failmsg, ip, self.limit, self.timeout)
)
[docs] def check(self, ip):
"""
This will check the session's address against the
storage dictionary to check they haven't spammed too many
fails recently.
Args:
ip (str): IP address of requestor
Returns:
throttled (bool): True if throttling is active,
False otherwise.
"""
now = time.time()
ip = str(ip)
# checking mode
latest_fails = self.storage[ip]
if latest_fails and len(latest_fails) >= self.limit:
# too many fails recently
if now - latest_fails[-1] < self.timeout:
# too soon - timeout in play
return True
else:
# timeout has passed. clear faillist
del self.storage[ip]
return False
else:
return False