Source code for evennia.server.portal.telnet

"""
This module implements the telnet protocol.

This depends on a generic session module that implements
the actual login procedure of the game, tracks
sessions etc.

"""

import re

from django.conf import settings
from twisted.conch.telnet import (
    ECHO,
    GA,
    IAC,
    LINEMODE,
    LINEMODE_EDIT,
    LINEMODE_TRAPSIG,
    MODE,
    NOP,
    NULL,
    WILL,
    WONT,
    StatefulTelnetProtocol,
    Telnet,
)
from twisted.internet import protocol
from twisted.internet.task import LoopingCall

from evennia.server.portal import mssp, naws, suppress_ga, telnet_oob, ttype
from evennia.server.portal.mccp import MCCP, Mccp, mccp_compress
from evennia.server.portal.mxp import Mxp, mxp_parse
from evennia.server.portal.naws import NAWS
from evennia.utils import ansi
from evennia.utils.utils import class_from_module, to_bytes

_RE_N = re.compile(r"\|n$")
_RE_LEND = re.compile(rb"\n$|\r$|\r\n$|\r\x00$|", re.MULTILINE)
_RE_LINEBREAK = re.compile(rb"\n\r|\r\n|\n|\r", re.DOTALL + re.MULTILINE)
_RE_SCREENREADER_REGEX = re.compile(
    r"%s" % settings.SCREENREADER_REGEX_STRIP, re.DOTALL + re.MULTILINE
)
_IDLE_COMMAND = str.encode(settings.IDLE_COMMAND + "\n")

# identify HTTP indata
_HTTP_REGEX = re.compile(
    b"(GET|HEAD|POST|PUT|DELETE|TRACE|OPTIONS|CONNECT|PATCH) (.*? HTTP/[0-9]\.[0-9])", re.I
)

_HTTP_WARNING = bytes(
    """
    This is Evennia's Telnet port and cannot be used for regular HTTP traffic.
    Use a telnet client to connect here and point your browser to the server's
    dedicated web port instead.

    """.strip(),
    "utf-8",
)


_BASE_SESSION_CLASS = class_from_module(settings.BASE_SESSION_CLASS)


[docs]class TelnetServerFactory(protocol.ServerFactory): """ This exists only to name this better in logs. """ noisy = False
[docs] def logPrefix(self): return "Telnet"
[docs]class TelnetProtocol(Telnet, StatefulTelnetProtocol, _BASE_SESSION_CLASS): """ Each player connecting over telnet (ie using most traditional mud clients) gets a telnet protocol instance assigned to them. All communication between game and player goes through here. """
[docs] def __init__(self, *args, **kwargs): self.protocol_key = "telnet" super().__init__(*args, **kwargs)
[docs] def dataReceived(self, data): """ Unused by default, but a good place to put debug printouts of incoming data. """ try: # Do we have a NAWS update? if ( NAWS in data and len([data[i : i + 1] for i in range(0, len(data))]) == 9 and # Is auto resizing on? self.protocol_flags.get("AUTORESIZE") ): self.sessionhandler.sync(self.sessionhandler.get(self.sessid)) super().dataReceived(data) except ValueError as err: from evennia.utils import logger logger.log_err(f"Malformed telnet input: {err}")
[docs] def connectionMade(self): """ This is called when the connection is first established. """ # important in order to work normally with standard telnet self.do(LINEMODE).addErrback(self._wont_linemode) # initialize the session self.line_buffer = b"" client_address = self.transport.client client_address = client_address[0] if client_address else None # this number is counted down for every handshake that completes. # when it reaches 0 the portal/server syncs their data self.handshakes = 8 # suppress-go-ahead, naws, ttype, mccp, mssp, msdp, gmcp, mxp self.init_session(self.protocol_key, client_address, self.factory.sessionhandler) self.protocol_flags["ENCODING"] = settings.ENCODINGS[0] if settings.ENCODINGS else "utf-8" # add this new connection to sessionhandler so # the Server becomes aware of it. self.sessionhandler.connect(self) # change encoding to ENCODINGS[0] which reflects Telnet default encoding # suppress go-ahead self.sga = suppress_ga.SuppressGA(self) # negotiate client size self.naws = naws.Naws(self) # negotiate ttype (client info) # Obs: mudlet ttype does not seem to work if we start mccp before ttype. /Griatch self.ttype = ttype.Ttype(self) # negotiate mccp (data compression) - turn this off for wireshark analysis self.mccp = Mccp(self) # negotiate mssp (crawler communication) self.mssp = mssp.Mssp(self) # oob communication (MSDP, GMCP) - two handshake calls! self.oob = telnet_oob.TelnetOOB(self) # mxp support self.mxp = Mxp(self) from evennia.utils.utils import delay # timeout the handshakes in case the client doesn't reply at all self._handshake_delay = delay(2, callback=self.handshake_done, timeout=True) # TCP/IP keepalive watches for dead links self.transport.setTcpKeepAlive(1) # The TCP/IP keepalive is not enough for some networks; # we have to complement it with a NOP keep-alive. self.protocol_flags["NOPKEEPALIVE"] = True self.nop_keep_alive = None self.toggle_nop_keepalive()
def _wont_linemode(self, *args): """ Client refuses do(linemode). This is common for MUD-specific clients, but we must ask for the sake of raw telnet. We ignore this error. """ pass def _send_nop_keepalive(self): """ Send NOP keepalive unless flag is set """ if self.protocol_flags.get("NOPKEEPALIVE"): self._write(IAC + NOP)
[docs] def toggle_nop_keepalive(self): """ Allow to toggle the NOP keepalive for those sad clients that can't even handle a NOP instruction. This is turned off by the protocol_flag NOPKEEPALIVE (settable e.g. by the default `option` command). """ if self.nop_keep_alive and self.nop_keep_alive.running: self.nop_keep_alive.stop() else: self.nop_keep_alive = LoopingCall(self._send_nop_keepalive) self.nop_keep_alive.start(30, now=False)
[docs] def handshake_done(self, timeout=False): """ This is called by all telnet extensions once they are finished. When all have reported, a sync with the server is performed. The system will force-call this sync after a small time to handle clients that don't reply to handshakes at all. """ if timeout: if self.handshakes > 0: self.handshakes = 0 self.sessionhandler.sync(self) else: self.handshakes -= 1 if self.handshakes <= 0: # do the sync self.sessionhandler.sync(self)
[docs] def at_login(self): """ Called when this session gets authenticated by the server. """ pass
[docs] def enableRemote(self, option): """ This sets up the remote-activated options we allow for this protocol. Args: option (char): The telnet option to enable. Returns: enable (bool): If this option should be enabled. """ if option == LINEMODE: # make sure to activate line mode with local editing for all clients self.requestNegotiation( LINEMODE, MODE + bytes(chr(ord(LINEMODE_EDIT) + ord(LINEMODE_TRAPSIG)), "ascii") ) return True else: return ( option == ttype.TTYPE or option == naws.NAWS or option == MCCP or option == mssp.MSSP or option == ECHO or option == suppress_ga.SUPPRESS_GA )
[docs] def disableRemote(self, option): return ( option == LINEMODE or option == ttype.TTYPE or option == naws.NAWS or option == MCCP or option == mssp.MSSP or option == ECHO or option == suppress_ga.SUPPRESS_GA )
[docs] def enableLocal(self, option): """ Call to allow the activation of options for this protocol Args: option (char): The telnet option to enable locally. Returns: enable (bool): If this option should be enabled. """ return ( option == LINEMODE or option == MCCP or option == ECHO or option == suppress_ga.SUPPRESS_GA )
[docs] def disableLocal(self, option): """ Disable a given option locally. Args: option (char): The telnet option to disable locally. """ if option == LINEMODE: return True if option == ECHO: return True if option == MCCP: self.mccp.no_mccp(option) return True else: try: return super().disableLocal(option) except Exception: from evennia.utils import logger logger.log_trace()
[docs] def connectionLost(self, reason): """ this is executed when the connection is lost for whatever reason. it can also be called directly, from the disconnect method Args: reason (str): Motivation for losing connection. """ self.sessionhandler.disconnect(self) if self.nop_keep_alive and self.nop_keep_alive.running: self.toggle_nop_keepalive() self.transport.loseConnection()
[docs] def applicationDataReceived(self, data): """ Telnet method called when non-telnet-command data is coming in over the telnet connection. We pass it on to the game engine directly. Args: data (str): Incoming data. """ if not data: data = [data] elif data.strip() == NULL: # this is an ancient type of keepalive used by some # legacy clients. There should never be a reason to send a # lone NULL character so this seems to be a safe thing to # support for backwards compatibility. It also stops the # NULL from continuously popping up as an unknown command. data = [_IDLE_COMMAND] else: data = _RE_LINEBREAK.split(data) if len(data) > 2 and _HTTP_REGEX.match(data[0]): # guard against HTTP request on the Telnet port; we # block and kill the connection. self.transport.write(_HTTP_WARNING) self.transport.loseConnection() return if self.line_buffer and len(data) > 1: # buffer exists, it is terminated by the first line feed data[0] = self.line_buffer + data[0] self.line_buffer = b"" # if the last data split is empty, it means all splits have # line breaks, if not, it is unterminated and must be # buffered. self.line_buffer += data.pop() # send all data chunks for dat in data: self.data_in(text=dat + b"\n")
def _write(self, data): """ Hook overloading the one used in plain telnet """ data = data.replace(b"\n", b"\r\n").replace(b"\r\r\n", b"\r\n") super()._write(mccp_compress(self, data))
[docs] def sendLine(self, line): """ Hook overloading the one used by linereceiver. Args: line (str): Line to send. """ line = to_bytes(line, self) # escape IAC in line mode, and correctly add \r\n (the TELNET end-of-line) line = line.replace(IAC, IAC + IAC) line = line.replace(b"\n", b"\r\n") if not line.endswith(b"\r\n") and self.protocol_flags.get("FORCEDENDLINE", True): line += b"\r\n" if not self.protocol_flags.get("NOGOAHEAD", True): line += IAC + GA return self.transport.write(mccp_compress(self, line))
# Session hooks
[docs] def disconnect(self, reason=""): """ Generic hook for the engine to call in order to disconnect this protocol. Args: reason (str, optional): Reason for disconnecting. """ self.data_out(text=((reason,), {})) self.connectionLost(reason)
[docs] def data_in(self, **kwargs): """ Data User -> Evennia Keyword Args: kwargs (any): Options from the protocol. """ # from evennia.server.profiling.timetrace import timetrace # DEBUG # text = timetrace(text, "telnet.data_in") # DEBUG self.sessionhandler.data_in(self, **kwargs)
[docs] def data_out(self, **kwargs): """ Data Evennia -> User Keyword Args: kwargs (any): Options to the protocol """ self.sessionhandler.data_out(self, **kwargs)
# send_* methods
[docs] def send_text(self, *args, **kwargs): """ Send text data. This is an in-band telnet operation. Args: text (str): The first argument is always the text string to send. No other arguments are considered. Keyword Args: options (dict): Send-option flags - mxp: Enforce MXP link support. - ansi: Enforce no ANSI colors. - xterm256: Enforce xterm256 colors, regardless of TTYPE. - noxterm256: Enforce no xterm256 color support, regardless of TTYPE. - nocolor: Strip all Color, regardless of ansi/xterm256 setting. - truecolor: Enforce truecolor, regardless of TTYPE. - raw: Pass string through without any ansi processing (i.e. include Evennia ansi markers but do not convert them into ansi tokens) - echo: Turn on/off line echo on the client. Turn off line echo for client, for example for password. Note that it must be actively turned back on again! """ text = args[0] if args else "" if text is None: return # handle arguments options = kwargs.get("options", {}) flags = self.protocol_flags xterm256 = options.get( "xterm256", flags.get("XTERM256", False) if flags.get("TTYPE", False) else True ) truecolor = options.get( "truecolor", flags.get("TRUECOLOR", False) if flags.get("TTYPE", False) else True ) useansi = options.get( "ansi", flags.get("ANSI", False) if flags.get("TTYPE", False) else True ) raw = options.get("raw", flags.get("RAW", False)) nocolor = options.get("nocolor", flags.get("NOCOLOR") or not (xterm256 or useansi)) echo = options.get("echo", None) mxp = options.get("mxp", flags.get("MXP", False)) screenreader = options.get("screenreader", flags.get("SCREENREADER", False)) if screenreader: # screenreader mode cleans up output text = ansi.parse_ansi(text, strip_ansi=True, xterm256=False, mxp=False) text = _RE_SCREENREADER_REGEX.sub("", text) if options.get("send_prompt"): # send a prompt instead. prompt = text if not raw: # processing prompt = ansi.parse_ansi( _RE_N.sub("", prompt) + ("||n" if prompt.endswith("|") else "|n"), strip_ansi=nocolor, xterm256=xterm256, truecolor=truecolor, ) if mxp: prompt = mxp_parse(prompt) prompt = to_bytes(prompt, self) prompt = prompt.replace(IAC, IAC + IAC).replace(b"\n", b"\r\n") if not self.protocol_flags.get( "NOPROMPTGOAHEAD", self.protocol_flags.get("NOGOAHEAD", True) ): prompt += IAC + GA self.transport.write(mccp_compress(self, prompt)) else: if echo is not None: # turn on/off echo. Note that this is a bit turned around since we use # echo as if we are "turning off the client's echo" when telnet really # handles it the other way around. if echo: # by telling the client that WE WON'T echo, the client knows # that IT should echo. This is the expected behavior from # our perspective. self.transport.write(mccp_compress(self, IAC + WONT + ECHO)) else: # by telling the client that WE WILL echo, the client can # safely turn OFF its OWN echo. self.transport.write(mccp_compress(self, IAC + WILL + ECHO)) if raw: # no processing self.sendLine(text) return else: # we need to make sure to kill the color at the end in order # to match the webclient output. linetosend = ansi.parse_ansi( _RE_N.sub("", text) + ("||n" if text.endswith("|") else "|n"), strip_ansi=nocolor, xterm256=xterm256, mxp=mxp, truecolor=truecolor, ) if mxp: linetosend = mxp_parse(linetosend) self.sendLine(linetosend)
[docs] def send_prompt(self, *args, **kwargs): """ Send a prompt - a text without a line end. See send_text for argument options. """ kwargs["options"].update({"send_prompt": True}) self.send_text(*args, **kwargs)
[docs] def send_default(self, cmdname, *args, **kwargs): """ Send other oob data """ if not cmdname == "options": self.oob.data_out(cmdname, *args, **kwargs)