Source code for evennia.server.portal.ssh

"""
This module implements the ssh (Secure SHell) protocol for encrypted
connections.

This depends on a generic session module that implements the actual
login procedure of the game, tracks sessions etc.

Using standard ssh client,

"""

import os
import re

from twisted.conch.interfaces import IConchUser
from twisted.cred import checkers
from twisted.cred.portal import Portal

_SSH_IMPORT_ERROR = """
ERROR: Missing crypto library for SSH. Install it with

       pip install cryptography pyasn1 bcrypt

(On older Twisted versions you may have to do 'pip install pycrypto pyasn1' instead).

If you get a compilation error you must install a C compiler and the
SSL dev headers (On Debian-derived systems this is the gcc and libssl-dev
packages).
"""

try:
    from twisted.conch.ssh.keys import Key
except ImportError:
    raise ImportError(_SSH_IMPORT_ERROR)

from django.conf import settings
from evennia.accounts.models import AccountDB
from evennia.utils import ansi
from evennia.utils.utils import class_from_module, to_str
from twisted.conch import interfaces as iconch
from twisted.conch.insults import insults
from twisted.conch.manhole import Manhole, recvline
from twisted.conch.manhole_ssh import ConchFactory, TerminalRealm, _Glue
from twisted.conch.ssh import common
from twisted.conch.ssh.userauth import SSHUserAuthServer
from twisted.internet import defer, protocol
from twisted.python import components

_RE_N = re.compile(r"\|n$")
_RE_SCREENREADER_REGEX = re.compile(
    r"%s" % settings.SCREENREADER_REGEX_STRIP, re.DOTALL + re.MULTILINE
)
_GAME_DIR = settings.GAME_DIR
_PRIVATE_KEY_FILE = os.path.join(_GAME_DIR, "server", "ssh-private.key")
_PUBLIC_KEY_FILE = os.path.join(_GAME_DIR, "server", "ssh-public.key")
_KEY_LENGTH = 2048

CTRL_C = "\x03"
CTRL_D = "\x04"
CTRL_BACKSLASH = "\x1c"
CTRL_L = "\x0c"

_NO_AUTOGEN = f"""
Evennia could not generate SSH private- and public keys ({{err}})
Using conch default keys instead.

If this error persists, create the keys manually (using the tools for your OS)
and put them here:
    {_PRIVATE_KEY_FILE}
    {_PUBLIC_KEY_FILE}
"""

_BASE_SESSION_CLASS = class_from_module(settings.BASE_SESSION_CLASS)


# not used atm
[docs]class SSHServerFactory(protocol.ServerFactory): """ This is only to name this better in logs """ noisy = False
[docs] def logPrefix(self): return "SSH"
[docs]class SshProtocol(Manhole, _BASE_SESSION_CLASS): """ Each account connecting over ssh gets this protocol assigned to them. All communication between game and account goes through here. """ noisy = False
[docs] def __init__(self, starttuple): """ For setting up the account. If account is not None then we'll login automatically. Args: starttuple (tuple): A (account, factory) tuple. """ self.protocol_key = "ssh" self.authenticated_account = starttuple[0] # obs must not be called self.factory, that gets overwritten! self.cfactory = starttuple[1]
[docs] def terminalSize(self, width, height): """ Initialize the terminal and connect to the new session. Args: width (int): Width of terminal. height (int): Height of terminal. """ # Clear the previous input line, redraw it at the new # cursor position self.terminal.eraseDisplay() self.terminal.cursorHome() self.width = width self.height = height # Set color defaults for color in ("ANSI", "XTERM256", "TRUECOLOR"): self.protocol_flags[color] = True # initialize the session client_address = self.getClientAddress() client_address = client_address.host if client_address else None self.init_session("ssh", client_address, self.cfactory.sessionhandler) # since we might have authenticated already, we might set this here. if self.authenticated_account: self.logged_in = True self.uid = self.authenticated_account.id self.sessionhandler.connect(self)
[docs] def connectionMade(self): """ This is called when the connection is first established. """ recvline.HistoricRecvLine.connectionMade(self) self.keyHandlers[CTRL_C] = self.handle_INT self.keyHandlers[CTRL_D] = self.handle_EOF self.keyHandlers[CTRL_L] = self.handle_FF self.keyHandlers[CTRL_BACKSLASH] = self.handle_QUIT
# initalize
[docs] def handle_INT(self): """ Handle ^C as an interrupt keystroke by resetting the current input variables to their initial state. """ self.lineBuffer = [] self.lineBufferIndex = 0 self.terminal.nextLine() self.terminal.write("KeyboardInterrupt") self.terminal.nextLine()
[docs] def handle_EOF(self): """ Handles EOF generally used to exit. """ if self.lineBuffer: self.terminal.write("\a") else: self.handle_QUIT()
[docs] def handle_FF(self): """ Handle a 'form feed' byte - generally used to request a screen refresh/redraw. """ self.terminal.eraseDisplay() self.terminal.cursorHome()
[docs] def handle_QUIT(self): """ Quit, end, and lose the connection. """ self.terminal.loseConnection()
[docs] def connectionLost(self, reason=None): """ This is executed when the connection is lost for whatever reason. It can also be called directly, from the disconnect method. Args: reason (str): Motivation for loosing connection. """ insults.TerminalProtocol.connectionLost(self, reason) self.sessionhandler.disconnect(self) self.terminal.loseConnection()
[docs] def getClientAddress(self): """ Get client address. Returns: address_and_port (tuple): The client's address and port in a tuple. For example `('127.0.0.1', 41917)`. """ return self.terminal.transport.getPeer()
[docs] def lineReceived(self, string): """ Communication User -> Evennia. Any line return indicates a command for the purpose of the MUD. So we take the user input and pass it on to the game engine. Args: string (str): Input text. """ self.sessionhandler.data_in(self, text=string)
[docs] def sendLine(self, string): """ Communication Evennia -> User. Any string sent should already have been properly formatted and processed before reaching this point. Args: string (str): Output text. """ for line in string.split("\n"): # the telnet-specific method for sending self.terminal.write(line) self.terminal.nextLine()
# session-general method hooks
[docs] def at_login(self): """ Called when this session gets authenticated by the server. """ pass
[docs] def disconnect(self, reason="Connection closed. Goodbye for now."): """ Disconnect from server. Args: reason (str): Motivation for disconnect. """ if reason: self.data_out(text=((reason,), {})) self.connectionLost(reason)
[docs] def data_out(self, **kwargs): """ Data Evennia -> User Keyword Args: kwargs (any): Options to the protocol. """ self.sessionhandler.data_out(self, **kwargs)
[docs] def send_text(self, *args, **kwargs): """ Send text data. This is an in-band telnet operation. Args: text (str): The first argument is always the text string to send. No other arguments are considered. Keyword Args: options (dict): Send-option flags (booleans) - mxp: enforce mxp link support. - ansi: enforce no ansi colors. - xterm256: enforce xterm256 colors, regardless of ttype setting. - nocolor: strip all colors. - raw: pass string through without any ansi processing (i.e. include evennia ansi markers but do not convert them into ansi tokens) - echo: turn on/off line echo on the client. turn off line echo for client, for example for password. note that it must be actively turned back on again! """ # print "telnet.send_text", args,kwargs # DEBUG text = args[0] if args else "" if text is None: return text = to_str(text) # handle arguments options = kwargs.get("options", {}) flags = self.protocol_flags xterm256 = options.get("xterm256", flags.get("XTERM256", True)) useansi = options.get("ansi", flags.get("ANSI", True)) raw = options.get("raw", flags.get("RAW", False)) nocolor = options.get("nocolor", flags.get("NOCOLOR") or not (xterm256 or useansi)) # echo = options.get("echo", None) # DEBUG screenreader = options.get("screenreader", flags.get("SCREENREADER", False)) if screenreader: # screenreader mode cleans up output text = ansi.parse_ansi(text, strip_ansi=True, xterm256=False, mxp=False) text = _RE_SCREENREADER_REGEX.sub("", text) if raw: # no processing self.sendLine(text) return else: # we need to make sure to kill the color at the end in order # to match the webclient output. linetosend = ansi.parse_ansi( _RE_N.sub("", text) + ("||n" if text.endswith("|") else "|n"), strip_ansi=nocolor, xterm256=xterm256, mxp=False, ) self.sendLine(linetosend)
[docs] def send_prompt(self, *args, **kwargs): self.send_text(*args, **kwargs)
[docs] def send_default(self, *args, **kwargs): pass
[docs]class ExtraInfoAuthServer(SSHUserAuthServer): noisy = False
[docs] def auth_password(self, packet): """ Password authentication. Used mostly for setting up the transport so we can query username and password later. Args: packet (Packet): Auth packet. """ password = common.getNS(packet[1:])[0] c = checkers.UsernamePassword(self.user, password) c.transport = self.transport return self.portal.login(c, None, IConchUser).addErrback(self._ebPassword)
[docs]class AccountDBPasswordChecker(object): """ Checks the django db for the correct credentials for username/password otherwise it returns the account or None which is useful for the Realm. """ noisy = False credentialInterfaces = (checkers.IUsernamePassword,)
[docs] def __init__(self, factory): """ Initialize the factory. Args: factory (SSHFactory): Checker factory. """ self.factory = factory super().__init__()
[docs] def requestAvatarId(self, c): """ Generic credentials. """ up = checkers.IUsernamePassword(c, None) username = up.username password = up.password account = AccountDB.objects.get_account_from_name(username) res = (None, self.factory) if account and account.check_password(password): res = (account, self.factory) return defer.succeed(res)
[docs]class PassAvatarIdTerminalRealm(TerminalRealm): """ Returns an avatar that passes the avatarId through to the protocol. This is probably not the best way to do it. """ noisy = False def _getAvatar(self, avatarId): comp = components.Componentized() user = self.userFactory(comp, avatarId) sess = self.sessionFactory(comp) sess.transportFactory = self.transportFactory sess.chainedProtocolFactory = lambda: self.chainedProtocolFactory(avatarId) comp.setComponent(iconch.IConchUser, user) comp.setComponent(iconch.ISession, sess) return user
[docs]class TerminalSessionTransport_getPeer(object): """ Taken from twisted's TerminalSessionTransport which doesn't provide getPeer to the transport. This one does. """ noisy = False
[docs] def __init__(self, proto, chainedProtocol, avatar, width, height): self.proto = proto self.avatar = avatar self.chainedProtocol = chainedProtocol session = self.proto.session self.proto.makeConnection( _Glue( write=self.chainedProtocol.dataReceived, loseConnection=lambda: avatar.conn.sendClose(session), name="SSH Proto Transport", ) ) def loseConnection(): self.proto.loseConnection() def getPeer(): return session.conn.transport.transport.getPeer() self.chainedProtocol.makeConnection( _Glue( getPeer=getPeer, write=self.proto.write, loseConnection=loseConnection, name="Chained Proto Transport", ) ) self.chainedProtocol.terminalProtocol.terminalSize(width, height)
[docs]def getKeyPair(pubkeyfile, privkeyfile): """ This function looks for RSA keypair files in the current directory. If they do not exist, the keypair is created. """ if not (os.path.exists(pubkeyfile) and os.path.exists(privkeyfile)): # No keypair exists. Generate a new RSA keypair from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.asymmetric import rsa rsa_key = Key( rsa.generate_private_key( public_exponent=65537, key_size=_KEY_LENGTH, backend=default_backend() ) ) public_key_string = rsa_key.public().toString(type="OPENSSH").decode() private_key_string = rsa_key.toString(type="OPENSSH").decode() # save keys for the future. with open(privkeyfile, "wt") as pfile: pfile.write(private_key_string) print("Created SSH private key in '{}'".format(_PRIVATE_KEY_FILE)) with open(pubkeyfile, "wt") as pfile: pfile.write(public_key_string) print("Created SSH public key in '{}'".format(_PUBLIC_KEY_FILE)) else: with open(pubkeyfile) as pfile: public_key_string = pfile.read() with open(privkeyfile) as pfile: private_key_string = pfile.read() return Key.fromString(public_key_string), Key.fromString(private_key_string)
[docs]def makeFactory(configdict): """ Creates the ssh server factory. """ def chainProtocolFactory(username=None): return insults.ServerProtocol( configdict["protocolFactory"], *configdict.get("protocolConfigdict", (username,)), **configdict.get("protocolKwArgs", {}), ) rlm = PassAvatarIdTerminalRealm() rlm.transportFactory = TerminalSessionTransport_getPeer rlm.chainedProtocolFactory = chainProtocolFactory factory = ConchFactory(Portal(rlm)) factory.sessionhandler = configdict["sessions"] try: # create/get RSA keypair publicKey, privateKey = getKeyPair(_PUBLIC_KEY_FILE, _PRIVATE_KEY_FILE) factory.publicKeys = {b"ssh-rsa": publicKey} factory.privateKeys = {b"ssh-rsa": privateKey} except Exception as err: print(_NO_AUTOGEN.format(err=err)) factory.services = factory.services.copy() factory.services["ssh-userauth"] = ExtraInfoAuthServer factory.portal.registerChecker(AccountDBPasswordChecker(factory)) return factory