"""
This module implements the telnet protocol.
This depends on a generic session module that implements
the actual login procedure of the game, tracks
sessions etc.
"""
import re
from django.conf import settings
from twisted.conch.telnet import (
ECHO,
GA,
IAC,
LINEMODE,
LINEMODE_EDIT,
LINEMODE_TRAPSIG,
MODE,
NOP,
NULL,
WILL,
WONT,
StatefulTelnetProtocol,
Telnet,
)
from twisted.internet import protocol
from twisted.internet.task import LoopingCall
from evennia.server.portal import mssp, naws, suppress_ga, telnet_oob, ttype
from evennia.server.portal.mccp import MCCP, Mccp, mccp_compress
from evennia.server.portal.mxp import Mxp, mxp_parse
from evennia.utils import ansi
from evennia.utils.utils import class_from_module, to_bytes
_RE_N = re.compile(r"\|n$")
_RE_LEND = re.compile(rb"\n$|\r$|\r\n$|\r\x00$|", re.MULTILINE)
_RE_LINEBREAK = re.compile(rb"\n\r|\r\n|\n|\r", re.DOTALL + re.MULTILINE)
_RE_SCREENREADER_REGEX = re.compile(
r"%s" % settings.SCREENREADER_REGEX_STRIP, re.DOTALL + re.MULTILINE
)
_IDLE_COMMAND = str.encode(settings.IDLE_COMMAND + "\n")
# identify HTTP indata
_HTTP_REGEX = re.compile(
b"(GET|HEAD|POST|PUT|DELETE|TRACE|OPTIONS|CONNECT|PATCH) (.*? HTTP/[0-9]\.[0-9])", re.I
)
_HTTP_WARNING = bytes(
"""
This is Evennia's Telnet port and cannot be used for regular HTTP traffic.
Use a telnet client to connect here and point your browser to the server's
dedicated web port instead.
""".strip(),
"utf-8",
)
_BASE_SESSION_CLASS = class_from_module(settings.BASE_SESSION_CLASS)
[docs]class TelnetServerFactory(protocol.ServerFactory):
"""
This exists only to name this better in logs.
"""
noisy = False
[docs] def logPrefix(self):
return "Telnet"
[docs]class TelnetProtocol(Telnet, StatefulTelnetProtocol, _BASE_SESSION_CLASS):
"""
Each player connecting over telnet (ie using most traditional mud
clients) gets a telnet protocol instance assigned to them. All
communication between game and player goes through here.
"""
[docs] def __init__(self, *args, **kwargs):
self.protocol_key = "telnet"
super().__init__(*args, **kwargs)
[docs] def dataReceived(self, data):
"""
Unused by default, but a good place to put debug printouts
of incoming data.
"""
# print(f"telnet dataReceived: {data}")
try:
super().dataReceived(data)
except ValueError as err:
from evennia.utils import logger
logger.log_err(f"Malformed telnet input: {err}")
[docs] def connectionMade(self):
"""
This is called when the connection is first established.
"""
# important in order to work normally with standard telnet
self.do(LINEMODE).addErrback(self._wont_linemode)
# initialize the session
self.line_buffer = b""
client_address = self.transport.client
client_address = client_address[0] if client_address else None
# this number is counted down for every handshake that completes.
# when it reaches 0 the portal/server syncs their data
self.handshakes = 8 # suppress-go-ahead, naws, ttype, mccp, mssp, msdp, gmcp, mxp
self.init_session(self.protocol_key, client_address, self.factory.sessionhandler)
self.protocol_flags["ENCODING"] = settings.ENCODINGS[0] if settings.ENCODINGS else "utf-8"
# add this new connection to sessionhandler so
# the Server becomes aware of it.
self.sessionhandler.connect(self)
# change encoding to ENCODINGS[0] which reflects Telnet default encoding
# suppress go-ahead
self.sga = suppress_ga.SuppressGA(self)
# negotiate client size
self.naws = naws.Naws(self)
# negotiate ttype (client info)
# Obs: mudlet ttype does not seem to work if we start mccp before ttype. /Griatch
self.ttype = ttype.Ttype(self)
# negotiate mccp (data compression) - turn this off for wireshark analysis
self.mccp = Mccp(self)
# negotiate mssp (crawler communication)
self.mssp = mssp.Mssp(self)
# oob communication (MSDP, GMCP) - two handshake calls!
self.oob = telnet_oob.TelnetOOB(self)
# mxp support
self.mxp = Mxp(self)
from evennia.utils.utils import delay
# timeout the handshakes in case the client doesn't reply at all
self._handshake_delay = delay(2, callback=self.handshake_done, timeout=True)
# TCP/IP keepalive watches for dead links
self.transport.setTcpKeepAlive(1)
# The TCP/IP keepalive is not enough for some networks;
# we have to complement it with a NOP keep-alive.
self.protocol_flags["NOPKEEPALIVE"] = True
self.nop_keep_alive = None
self.toggle_nop_keepalive()
def _wont_linemode(self, *args):
"""
Client refuses do(linemode). This is common for MUD-specific
clients, but we must ask for the sake of raw telnet. We ignore
this error.
"""
pass
def _send_nop_keepalive(self):
"""
Send NOP keepalive unless flag is set
"""
if self.protocol_flags.get("NOPKEEPALIVE"):
self._write(IAC + NOP)
[docs] def toggle_nop_keepalive(self):
"""
Allow to toggle the NOP keepalive for those sad clients that
can't even handle a NOP instruction. This is turned off by the
protocol_flag NOPKEEPALIVE (settable e.g. by the default
`option` command).
"""
if self.nop_keep_alive and self.nop_keep_alive.running:
self.nop_keep_alive.stop()
else:
self.nop_keep_alive = LoopingCall(self._send_nop_keepalive)
self.nop_keep_alive.start(30, now=False)
[docs] def handshake_done(self, timeout=False):
"""
This is called by all telnet extensions once they are finished.
When all have reported, a sync with the server is performed.
The system will force-call this sync after a small time to handle
clients that don't reply to handshakes at all.
"""
if timeout:
if self.handshakes > 0:
self.handshakes = 0
self.sessionhandler.sync(self)
else:
self.handshakes -= 1
if self.handshakes <= 0:
# do the sync
self.sessionhandler.sync(self)
[docs] def at_login(self):
"""
Called when this session gets authenticated by the server.
"""
pass
[docs] def enableRemote(self, option):
"""
This sets up the remote-activated options we allow for this protocol.
Args:
option (char): The telnet option to enable.
Returns:
enable (bool): If this option should be enabled.
"""
if option == LINEMODE:
# make sure to activate line mode with local editing for all clients
self.requestNegotiation(
LINEMODE, MODE + bytes(chr(ord(LINEMODE_EDIT) + ord(LINEMODE_TRAPSIG)), "ascii")
)
return True
else:
return (
option == ttype.TTYPE
or option == naws.NAWS
or option == MCCP
or option == mssp.MSSP
or option == ECHO
or option == suppress_ga.SUPPRESS_GA
)
[docs] def disableRemote(self, option):
return (
option == LINEMODE
or option == ttype.TTYPE
or option == naws.NAWS
or option == MCCP
or option == mssp.MSSP
or option == ECHO
or option == suppress_ga.SUPPRESS_GA
)
[docs] def enableLocal(self, option):
"""
Call to allow the activation of options for this protocol
Args:
option (char): The telnet option to enable locally.
Returns:
enable (bool): If this option should be enabled.
"""
return (
option == LINEMODE
or option == MCCP
or option == ECHO
or option == suppress_ga.SUPPRESS_GA
)
[docs] def disableLocal(self, option):
"""
Disable a given option locally.
Args:
option (char): The telnet option to disable locally.
"""
if option == LINEMODE:
return True
if option == ECHO:
return True
if option == MCCP:
self.mccp.no_mccp(option)
return True
else:
try:
return super().disableLocal(option)
except Exception:
from evennia.utils import logger
logger.log_trace()
[docs] def connectionLost(self, reason):
"""
this is executed when the connection is lost for whatever
reason. it can also be called directly, from the disconnect
method
Args:
reason (str): Motivation for losing connection.
"""
self.sessionhandler.disconnect(self)
self.transport.loseConnection()
[docs] def applicationDataReceived(self, data):
"""
Telnet method called when non-telnet-command data is coming in
over the telnet connection. We pass it on to the game engine
directly.
Args:
data (str): Incoming data.
"""
if not data:
data = [data]
elif data.strip() == NULL:
# this is an ancient type of keepalive used by some
# legacy clients. There should never be a reason to send a
# lone NULL character so this seems to be a safe thing to
# support for backwards compatibility. It also stops the
# NULL from continuously popping up as an unknown command.
data = [_IDLE_COMMAND]
else:
data = _RE_LINEBREAK.split(data)
if len(data) > 2 and _HTTP_REGEX.match(data[0]):
# guard against HTTP request on the Telnet port; we
# block and kill the connection.
self.transport.write(_HTTP_WARNING)
self.transport.loseConnection()
return
if self.line_buffer and len(data) > 1:
# buffer exists, it is terminated by the first line feed
data[0] = self.line_buffer + data[0]
self.line_buffer = b""
# if the last data split is empty, it means all splits have
# line breaks, if not, it is unterminated and must be
# buffered.
self.line_buffer += data.pop()
# send all data chunks
for dat in data:
self.data_in(text=dat + b"\n")
def _write(self, data):
"""
Hook overloading the one used in plain telnet
"""
data = data.replace(b"\n", b"\r\n").replace(b"\r\r\n", b"\r\n")
super()._write(mccp_compress(self, data))
[docs] def sendLine(self, line):
"""
Hook overloading the one used by linereceiver.
Args:
line (str): Line to send.
"""
line = to_bytes(line, self)
# escape IAC in line mode, and correctly add \r\n (the TELNET end-of-line)
line = line.replace(IAC, IAC + IAC)
line = line.replace(b"\n", b"\r\n")
if not line.endswith(b"\r\n") and self.protocol_flags.get("FORCEDENDLINE", True):
line += b"\r\n"
if not self.protocol_flags.get("NOGOAHEAD", True):
line += IAC + GA
return self.transport.write(mccp_compress(self, line))
# Session hooks
[docs] def disconnect(self, reason=""):
"""
Generic hook for the engine to call in order to
disconnect this protocol.
Args:
reason (str, optional): Reason for disconnecting.
"""
self.data_out(text=((reason,), {}))
self.connectionLost(reason)
[docs] def data_in(self, **kwargs):
"""
Data User -> Evennia
Keyword Args:
kwargs (any): Options from the protocol.
"""
# from evennia.server.profiling.timetrace import timetrace # DEBUG
# text = timetrace(text, "telnet.data_in") # DEBUG
self.sessionhandler.data_in(self, **kwargs)
[docs] def data_out(self, **kwargs):
"""
Data Evennia -> User
Keyword Args:
kwargs (any): Options to the protocol
"""
self.sessionhandler.data_out(self, **kwargs)
# send_* methods
[docs] def send_text(self, *args, **kwargs):
"""
Send text data. This is an in-band telnet operation.
Args:
text (str): The first argument is always the text string to send. No other arguments
are considered.
Keyword Args:
options (dict): Send-option flags
- mxp: Enforce MXP link support.
- ansi: Enforce no ANSI colors.
- xterm256: Enforce xterm256 colors, regardless of TTYPE.
- noxterm256: Enforce no xterm256 color support, regardless of TTYPE.
- nocolor: Strip all Color, regardless of ansi/xterm256 setting.
- raw: Pass string through without any ansi processing
(i.e. include Evennia ansi markers but do not
convert them into ansi tokens)
- echo: Turn on/off line echo on the client. Turn
off line echo for client, for example for password.
Note that it must be actively turned back on again!
"""
text = args[0] if args else ""
if text is None:
return
# handle arguments
options = kwargs.get("options", {})
flags = self.protocol_flags
xterm256 = options.get(
"xterm256", flags.get("XTERM256", False) if flags.get("TTYPE", False) else True
)
useansi = options.get(
"ansi", flags.get("ANSI", False) if flags.get("TTYPE", False) else True
)
raw = options.get("raw", flags.get("RAW", False))
nocolor = options.get("nocolor", flags.get("NOCOLOR") or not (xterm256 or useansi))
echo = options.get("echo", None)
mxp = options.get("mxp", flags.get("MXP", False))
screenreader = options.get("screenreader", flags.get("SCREENREADER", False))
if screenreader:
# screenreader mode cleans up output
text = ansi.parse_ansi(text, strip_ansi=True, xterm256=False, mxp=False)
text = _RE_SCREENREADER_REGEX.sub("", text)
if options.get("send_prompt"):
# send a prompt instead.
prompt = text
if not raw:
# processing
prompt = ansi.parse_ansi(
_RE_N.sub("", prompt) + ("||n" if prompt.endswith("|") else "|n"),
strip_ansi=nocolor,
xterm256=xterm256,
)
if mxp:
prompt = mxp_parse(prompt)
prompt = to_bytes(prompt, self)
prompt = prompt.replace(IAC, IAC + IAC).replace(b"\n", b"\r\n")
if not self.protocol_flags.get(
"NOPROMPTGOAHEAD", self.protocol_flags.get("NOGOAHEAD", True)
):
prompt += IAC + GA
self.transport.write(mccp_compress(self, prompt))
else:
if echo is not None:
# turn on/off echo. Note that this is a bit turned around since we use
# echo as if we are "turning off the client's echo" when telnet really
# handles it the other way around.
if echo:
# by telling the client that WE WON'T echo, the client knows
# that IT should echo. This is the expected behavior from
# our perspective.
self.transport.write(mccp_compress(self, IAC + WONT + ECHO))
else:
# by telling the client that WE WILL echo, the client can
# safely turn OFF its OWN echo.
self.transport.write(mccp_compress(self, IAC + WILL + ECHO))
if raw:
# no processing
self.sendLine(text)
return
else:
# we need to make sure to kill the color at the end in order
# to match the webclient output.
linetosend = ansi.parse_ansi(
_RE_N.sub("", text) + ("||n" if text.endswith("|") else "|n"),
strip_ansi=nocolor,
xterm256=xterm256,
mxp=mxp,
)
if mxp:
linetosend = mxp_parse(linetosend)
self.sendLine(linetosend)
[docs] def send_prompt(self, *args, **kwargs):
"""
Send a prompt - a text without a line end. See send_text for argument options.
"""
kwargs["options"].update({"send_prompt": True})
self.send_text(*args, **kwargs)
[docs] def send_default(self, cmdname, *args, **kwargs):
"""
Send other oob data
"""
if not cmdname == "options":
self.oob.data_out(cmdname, *args, **kwargs)