Source code for evennia.contrib.utils.auditing.server

"""
Auditable Server Sessions:
Extension of the stock ServerSession that yields objects representing
user inputs and system outputs.

Evennia contribution - Johnny 2017
"""
import os
import re
import socket

from django.conf import settings as ev_settings
from django.utils import timezone

from evennia.server.serversession import ServerSession
from evennia.utils import get_evennia_version, logger, mod_import, utils

# Attributes governing auditing of commands and where to send log objects
AUDIT_CALLBACK = getattr(
    ev_settings, "AUDIT_CALLBACK", "evennia.contrib.utils.auditing.outputs.to_file"
)
AUDIT_IN = getattr(ev_settings, "AUDIT_IN", False)
AUDIT_OUT = getattr(ev_settings, "AUDIT_OUT", False)
AUDIT_ALLOW_SPARSE = getattr(ev_settings, "AUDIT_ALLOW_SPARSE", False)
AUDIT_MASKS = [
    {"connect": r"^[@\s]*[connect]{5,8}\s+(\".+?\"|[^\s]+)\s+(?P<secret>.+)"},
    {"connect": r"^[@\s]*[connect]{5,8}\s+(?P<secret>[\w]+)"},
    {"create": r"^[^@]?[create]{5,6}\s+(\w+|\".+?\")\s+(?P<secret>[\w]+)"},
    {"create": r"^[^@]?[create]{5,6}\s+(?P<secret>[\w]+)"},
    {"userpassword": r"^[@\s]*[userpassword]{11,14}\s+(\w+|\".+?\")\s+=*\s*(?P<secret>[\w]+)"},
    {"userpassword": r"^.*new password set to '(?P<secret>[^']+)'\."},
    {"userpassword": r"^.* has changed your password to '(?P<secret>[^']+)'\."},
    {"password": r"^[@\s]*[password]{6,9}\s+(?P<secret>.*)"},
] + getattr(ev_settings, "AUDIT_MASKS", [])


if AUDIT_CALLBACK:
    try:
        AUDIT_CALLBACK = getattr(
            mod_import(".".join(AUDIT_CALLBACK.split(".")[:-1])), AUDIT_CALLBACK.split(".")[-1]
        )
        logger.log_sec("Auditing module online.")
        logger.log_sec(
            "Audit record User input: {}, output: {}.\n"
            "Audit sparse recording: {}, Log callback: {}".format(
                AUDIT_IN, AUDIT_OUT, AUDIT_ALLOW_SPARSE, AUDIT_CALLBACK
            )
        )
    except Exception as e:
        logger.log_err("Failed to activate Auditing module. %s" % e)


[docs]class AuditedServerSession(ServerSession): """ This particular implementation parses all server inputs and/or outputs and passes a dict containing the parsed metadata to a callback method of your creation. This is useful for recording player activity where necessary for security auditing, usage analysis or post-incident forensic discovery. *** WARNING *** All strings are recorded and stored in plaintext. This includes those strings which might contain sensitive data (create, connect, @password). These commands have their arguments masked by default, but you must mask or mask any custom commands of your own that handle sensitive information. See README.md for installation/configuration instructions. """
[docs] def audit(self, **kwargs): """ Extracts messages and system data from a Session object upon message send or receive. Keyword Args: src (str): Source of data; 'client' or 'server'. Indicates direction. text (str or list): Client sends messages to server in the form of lists. Server sends messages to client as string. Returns: log (dict): Dictionary object containing parsed system and user data related to this message. """ # Get time at start of processing time_obj = timezone.now() time_str = str(time_obj) session = self src = kwargs.pop("src", "?") bytecount = 0 # Do not log empty lines if not kwargs: return {} # Get current session's IP address client_ip = session.address # Capture Account name and dbref together account = session.get_account() account_token = "" if account: account_token = "%s%s" % (account.key, account.dbref) # Capture Character name and dbref together char = session.get_puppet() char_token = "" if char: char_token = "%s%s" % (char.key, char.dbref) # Capture Room name and dbref together room = None room_token = "" if char: room = char.location room_token = "%s%s" % (room.key, room.dbref) # Try to compile an input/output string def drill(obj, bucket): if isinstance(obj, dict): return bucket elif utils.is_iter(obj): for sub_obj in obj: bucket.extend(drill(sub_obj, [])) else: bucket.append(obj) return bucket text = kwargs.pop("text", "") if utils.is_iter(text): text = "|".join(drill(text, [])) # Mask any PII in message, where possible bytecount = len(text.encode("utf-8")) text = self.mask(text) # Compile the IP, Account, Character, Room, and the message. log = { "time": time_str, "hostname": socket.getfqdn(), "application": "%s" % ev_settings.SERVERNAME, "version": get_evennia_version(), "pid": os.getpid(), "direction": "SND" if src == "server" else "RCV", "protocol": self.protocol_key, "ip": client_ip, "session": "session#%s" % self.sessid, "account": account_token, "character": char_token, "room": room_token, "text": text.strip(), "bytes": bytecount, "data": kwargs, "objects": { "time": time_obj, "session": self, "account": account, "character": char, "room": room, }, } # Remove any keys with blank values if AUDIT_ALLOW_SPARSE is False: log["data"] = {k: v for k, v in log["data"].items() if v} log["objects"] = {k: v for k, v in log["objects"].items() if v} log = {k: v for k, v in log.items() if v} return log
[docs] def mask(self, msg): """ Masks potentially sensitive user information within messages before writing to log. Recording cleartext password attempts is bad policy. Args: msg (str): Raw text string sent from client <-> server Returns: msg (str): Text string with sensitive information masked out. """ # Check to see if the command is embedded within server output _msg = msg is_embedded = False match = re.match(".*Command.*'(.+)'.*is not available.*", msg, flags=re.IGNORECASE) if match: msg = match.group(1).replace("\\", "") submsg = msg is_embedded = True for mask in AUDIT_MASKS: for command, regex in mask.items(): try: match = re.match(regex, msg, flags=re.IGNORECASE) except Exception as e: logger.log_err(regex) logger.log_err(e) continue if match: term = match.group("secret") masked = re.sub(term, "*" * len(term.zfill(8)), msg) if is_embedded: msg = re.sub( submsg, "%s <Masked: %s>" % (masked, command), _msg, flags=re.IGNORECASE ) else: msg = masked return msg return _msg
[docs] def data_out(self, **kwargs): """ Generic hook for sending data out through the protocol. Keyword Args: kwargs (any): Other data to the protocol. """ if AUDIT_CALLBACK and AUDIT_OUT: try: log = self.audit(src="server", **kwargs) if log: AUDIT_CALLBACK(log) except Exception as e: logger.log_err(e) super().data_out(**kwargs)
[docs] def data_in(self, **kwargs): """ Hook for protocols to send incoming data to the engine. Keyword Args: kwargs (any): Other data from the protocol. """ if AUDIT_CALLBACK and AUDIT_IN: try: log = self.audit(src="client", **kwargs) if log: AUDIT_CALLBACK(log) except Exception as e: logger.log_err(e) super().data_in(**kwargs)