"""
This module implements the ssh (Secure SHell) protocol for encrypted
connections.
This depends on a generic session module that implements the actual
login procedure of the game, tracks sessions etc.
Using standard ssh client,
"""
import os
import re
from twisted.cred.checkers import credentials
from twisted.cred.portal import Portal
from twisted.conch.interfaces import IConchUser
_SSH_IMPORT_ERROR = """
ERROR: Missing crypto library for SSH. Install it with
pip install cryptography pyasn1 bcrypt
(On older Twisted versions you may have to do 'pip install pycrypto pyasn1' instead).
If you get a compilation error you must install a C compiler and the
SSL dev headers (On Debian-derived systems this is the gcc and libssl-dev
packages).
"""
try:
from twisted.conch.ssh.keys import Key
except ImportError:
raise ImportError(_SSH_IMPORT_ERROR)
from twisted.conch.ssh.userauth import SSHUserAuthServer
from twisted.conch.ssh import common
from twisted.conch.insults import insults
from twisted.conch.manhole_ssh import TerminalRealm, _Glue, ConchFactory
from twisted.conch.manhole import Manhole, recvline
from twisted.internet import defer, protocol
from twisted.conch import interfaces as iconch
from twisted.python import components
from django.conf import settings
from evennia.server import session
from evennia.accounts.models import AccountDB
from evennia.utils import ansi
from evennia.utils.utils import to_str
_RE_N = re.compile(r"\|n$")
_RE_SCREENREADER_REGEX = re.compile(
r"%s" % settings.SCREENREADER_REGEX_STRIP, re.DOTALL + re.MULTILINE
)
_GAME_DIR = settings.GAME_DIR
_PRIVATE_KEY_FILE = os.path.join(_GAME_DIR, "server", "ssh-private.key")
_PUBLIC_KEY_FILE = os.path.join(_GAME_DIR, "server", "ssh-public.key")
_KEY_LENGTH = 2048
CTRL_C = "\x03"
CTRL_D = "\x04"
CTRL_BACKSLASH = "\x1c"
CTRL_L = "\x0c"
_NO_AUTOGEN = """
Evennia could not generate SSH private- and public keys ({{err}})
Using conch default keys instead.
If this error persists, create the keys manually (using the tools for your OS)
and put them here:
{}
{}
""".format(
_PRIVATE_KEY_FILE, _PUBLIC_KEY_FILE
)
# not used atm
[docs]class SSHServerFactory(protocol.ServerFactory):
"This is only to name this better in logs"
noisy = False
[docs] def logPrefix(self):
return "SSH"
[docs]class SshProtocol(Manhole, session.Session):
"""
Each account connecting over ssh gets this protocol assigned to
them. All communication between game and account goes through
here.
"""
noisy = False
[docs] def __init__(self, starttuple):
"""
For setting up the account. If account is not None then we'll
login automatically.
Args:
starttuple (tuple): A (account, factory) tuple.
"""
self.protocol_key = "ssh"
self.authenticated_account = starttuple[0]
# obs must not be called self.factory, that gets overwritten!
self.cfactory = starttuple[1]
[docs] def terminalSize(self, width, height):
"""
Initialize the terminal and connect to the new session.
Args:
width (int): Width of terminal.
height (int): Height of terminal.
"""
# Clear the previous input line, redraw it at the new
# cursor position
self.terminal.eraseDisplay()
self.terminal.cursorHome()
self.width = width
self.height = height
# initialize the session
client_address = self.getClientAddress()
client_address = client_address.host if client_address else None
self.init_session("ssh", client_address, self.cfactory.sessionhandler)
# since we might have authenticated already, we might set this here.
if self.authenticated_account:
self.logged_in = True
self.uid = self.authenticated_account.id
self.sessionhandler.connect(self)
[docs] def connectionMade(self):
"""
This is called when the connection is first established.
"""
recvline.HistoricRecvLine.connectionMade(self)
self.keyHandlers[CTRL_C] = self.handle_INT
self.keyHandlers[CTRL_D] = self.handle_EOF
self.keyHandlers[CTRL_L] = self.handle_FF
self.keyHandlers[CTRL_BACKSLASH] = self.handle_QUIT
# initalize
[docs] def handle_INT(self):
"""
Handle ^C as an interrupt keystroke by resetting the current
input variables to their initial state.
"""
self.lineBuffer = []
self.lineBufferIndex = 0
self.terminal.nextLine()
self.terminal.write("KeyboardInterrupt")
self.terminal.nextLine()
[docs] def handle_EOF(self):
"""
Handles EOF generally used to exit.
"""
if self.lineBuffer:
self.terminal.write("\a")
else:
self.handle_QUIT()
[docs] def handle_FF(self):
"""
Handle a 'form feed' byte - generally used to request a screen
refresh/redraw.
"""
self.terminal.eraseDisplay()
self.terminal.cursorHome()
[docs] def handle_QUIT(self):
"""
Quit, end, and lose the connection.
"""
self.terminal.loseConnection()
[docs] def connectionLost(self, reason=None):
"""
This is executed when the connection is lost for whatever
reason. It can also be called directly, from the disconnect
method.
Args:
reason (str): Motivation for loosing connection.
"""
insults.TerminalProtocol.connectionLost(self, reason)
self.sessionhandler.disconnect(self)
self.terminal.loseConnection()
[docs] def getClientAddress(self):
"""
Get client address.
Returns:
address_and_port (tuple): The client's address and port in
a tuple. For example `('127.0.0.1', 41917)`.
"""
return self.terminal.transport.getPeer()
[docs] def lineReceived(self, string):
"""
Communication User -> Evennia. Any line return indicates a
command for the purpose of the MUD. So we take the user input
and pass it on to the game engine.
Args:
string (str): Input text.
"""
self.sessionhandler.data_in(self, text=string)
[docs] def sendLine(self, string):
"""
Communication Evennia -> User. Any string sent should
already have been properly formatted and processed before
reaching this point.
Args:
string (str): Output text.
"""
for line in string.split("\n"):
# the telnet-specific method for sending
self.terminal.write(line)
self.terminal.nextLine()
# session-general method hooks
[docs] def at_login(self):
"""
Called when this session gets authenticated by the server.
"""
pass
[docs] def disconnect(self, reason="Connection closed. Goodbye for now."):
"""
Disconnect from server.
Args:
reason (str): Motivation for disconnect.
"""
if reason:
self.data_out(text=((reason,), {}))
self.connectionLost(reason)
[docs] def data_out(self, **kwargs):
"""
Data Evennia -> User
Keyword Args:
kwargs (any): Options to the protocol.
"""
self.sessionhandler.data_out(self, **kwargs)
[docs] def send_text(self, *args, **kwargs):
"""
Send text data. This is an in-band telnet operation.
Args:
text (str): The first argument is always the text string to send. No other arguments
are considered.
Keyword Args:
options (dict): Send-option flags:
- mxp: Enforce MXP link support.
- ansi: Enforce no ANSI colors.
- xterm256: Enforce xterm256 colors, regardless of TTYPE setting.
- nocolor: Strip all colors.
- raw: Pass string through without any ansi processing
(i.e. include Evennia ansi markers but do not
convert them into ansi tokens)
- echo: Turn on/off line echo on the client. Turn
off line echo for client, for example for password.
Note that it must be actively turned back on again!
"""
# print "telnet.send_text", args,kwargs # DEBUG
text = args[0] if args else ""
if text is None:
return
text = to_str(text)
# handle arguments
options = kwargs.get("options", {})
flags = self.protocol_flags
xterm256 = options.get("xterm256", flags.get("XTERM256", True))
useansi = options.get("ansi", flags.get("ANSI", True))
raw = options.get("raw", flags.get("RAW", False))
nocolor = options.get("nocolor", flags.get("NOCOLOR") or not (xterm256 or useansi))
# echo = options.get("echo", None) # DEBUG
screenreader = options.get("screenreader", flags.get("SCREENREADER", False))
if screenreader:
# screenreader mode cleans up output
text = ansi.parse_ansi(text, strip_ansi=True, xterm256=False, mxp=False)
text = _RE_SCREENREADER_REGEX.sub("", text)
if raw:
# no processing
self.sendLine(text)
return
else:
# we need to make sure to kill the color at the end in order
# to match the webclient output.
linetosend = ansi.parse_ansi(
_RE_N.sub("", text) + ("||n" if text.endswith("|") else "|n"),
strip_ansi=nocolor,
xterm256=xterm256,
mxp=False,
)
self.sendLine(linetosend)
[docs] def send_prompt(self, *args, **kwargs):
self.send_text(*args, **kwargs)
[docs] def send_default(self, *args, **kwargs):
pass
[docs]class AccountDBPasswordChecker(object):
"""
Checks the django db for the correct credentials for
username/password otherwise it returns the account or None which is
useful for the Realm.
"""
noisy = False
credentialInterfaces = (credentials.IUsernamePassword,)
[docs] def __init__(self, factory):
"""
Initialize the factory.
Args:
factory (SSHFactory): Checker factory.
"""
self.factory = factory
super().__init__()
[docs] def requestAvatarId(self, c):
"""
Generic credentials.
"""
up = credentials.IUsernamePassword(c, None)
username = up.username
password = up.password
account = AccountDB.objects.get_account_from_name(username)
res = (None, self.factory)
if account and account.check_password(password):
res = (account, self.factory)
return defer.succeed(res)
[docs]class PassAvatarIdTerminalRealm(TerminalRealm):
"""
Returns an avatar that passes the avatarId through to the
protocol. This is probably not the best way to do it.
"""
noisy = False
def _getAvatar(self, avatarId):
comp = components.Componentized()
user = self.userFactory(comp, avatarId)
sess = self.sessionFactory(comp)
sess.transportFactory = self.transportFactory
sess.chainedProtocolFactory = lambda: self.chainedProtocolFactory(avatarId)
comp.setComponent(iconch.IConchUser, user)
comp.setComponent(iconch.ISession, sess)
return user
[docs]class TerminalSessionTransport_getPeer(object):
"""
Taken from twisted's TerminalSessionTransport which doesn't
provide getPeer to the transport. This one does.
"""
noisy = False
[docs] def __init__(self, proto, chainedProtocol, avatar, width, height):
self.proto = proto
self.avatar = avatar
self.chainedProtocol = chainedProtocol
session = self.proto.session
self.proto.makeConnection(
_Glue(
write=self.chainedProtocol.dataReceived,
loseConnection=lambda: avatar.conn.sendClose(session),
name="SSH Proto Transport",
)
)
def loseConnection():
self.proto.loseConnection()
def getPeer():
return session.conn.transport.transport.getPeer()
self.chainedProtocol.makeConnection(
_Glue(
getPeer=getPeer,
write=self.proto.write,
loseConnection=loseConnection,
name="Chained Proto Transport",
)
)
self.chainedProtocol.terminalProtocol.terminalSize(width, height)
[docs]def getKeyPair(pubkeyfile, privkeyfile):
"""
This function looks for RSA keypair files in the current directory. If they
do not exist, the keypair is created.
"""
if not (os.path.exists(pubkeyfile) and os.path.exists(privkeyfile)):
# No keypair exists. Generate a new RSA keypair
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
rsa_key = Key(
rsa.generate_private_key(
public_exponent=65537, key_size=_KEY_LENGTH, backend=default_backend()
)
)
public_key_string = rsa_key.public().toString(type="OPENSSH").decode()
private_key_string = rsa_key.toString(type="OPENSSH").decode()
# save keys for the future.
with open(privkeyfile, "wt") as pfile:
pfile.write(private_key_string)
print("Created SSH private key in '{}'".format(_PRIVATE_KEY_FILE))
with open(pubkeyfile, "wt") as pfile:
pfile.write(public_key_string)
print("Created SSH public key in '{}'".format(_PUBLIC_KEY_FILE))
else:
with open(pubkeyfile) as pfile:
public_key_string = pfile.read()
with open(privkeyfile) as pfile:
private_key_string = pfile.read()
return Key.fromString(public_key_string), Key.fromString(private_key_string)
[docs]def makeFactory(configdict):
"""
Creates the ssh server factory.
"""
def chainProtocolFactory(username=None):
return insults.ServerProtocol(
configdict["protocolFactory"],
*configdict.get("protocolConfigdict", (username,)),
**configdict.get("protocolKwArgs", {}),
)
rlm = PassAvatarIdTerminalRealm()
rlm.transportFactory = TerminalSessionTransport_getPeer
rlm.chainedProtocolFactory = chainProtocolFactory
factory = ConchFactory(Portal(rlm))
factory.sessionhandler = configdict["sessions"]
try:
# create/get RSA keypair
publicKey, privateKey = getKeyPair(_PUBLIC_KEY_FILE, _PRIVATE_KEY_FILE)
factory.publicKeys = {b"ssh-rsa": publicKey}
factory.privateKeys = {b"ssh-rsa": privateKey}
except Exception as err:
print(_NO_AUTOGEN.format(err=err))
factory.services = factory.services.copy()
factory.services["ssh-userauth"] = ExtraInfoAuthServer
factory.portal.registerChecker(AccountDBPasswordChecker(factory))
return factory