"""
Batch processors
These commands implements the 'batch-command' and 'batch-code'
processors, using the functionality in evennia.utils.batchprocessors.
They allow for offline world-building.
Batch-command is the simpler system. This reads a file (*.ev)
containing a list of in-game commands and executes them in sequence as
if they had been entered in the game (including permission checks
etc).
Batch-code is a full-fledged python code interpreter that reads blocks
of python code (*.py) and executes them in sequence. This allows for
much more power than Batch-command, but requires knowing Python and
the Evennia API. It is also a severe security risk and should
therefore always be limited to superusers only.
"""
import re
from django.conf import settings
from evennia.commands.cmdset import CmdSet
from evennia.utils import logger, utils
from evennia.utils.batchprocessors import BATCHCMD, BATCHCODE
_RE_COMMENT = re.compile(r"^#.*?$", re.MULTILINE + re.DOTALL)
_RE_CODE_START = re.compile(r"^# batchcode code:", re.MULTILINE)
_COMMAND_DEFAULT_CLASS = utils.class_from_module(settings.COMMAND_DEFAULT_CLASS)
# limit symbols for API inclusion
__all__ = ("CmdBatchCommands", "CmdBatchCode")
_HEADER_WIDTH = 70
_UTF8_ERROR = """
|rDecode error in '%s'.|n
This file contains non-ascii character(s). This is common if you
wrote some input in a language that has more letters and special
symbols than English; such as accents or umlauts. This is usually
fine and fully supported! But for Evennia to know how to decode such
characters in a universal way, the batchfile must be saved with the
international 'UTF-8' encoding. This file is not.
Please re-save the batchfile with the UTF-8 encoding (refer to the
documentation of your text editor on how to do this, or switch to a
better featured one) and try again.
Error reported was: '%s'
"""
# -------------------------------------------------------------
# Helper functions
# -------------------------------------------------------------
def format_header(caller, entry):
"""
Formats a header
"""
width = _HEADER_WIDTH - 10
# strip all comments for the header
if caller.ndb.batch_batchmode != "batch_commands":
# only do cleanup for batchcode
entry = _RE_CODE_START.split(entry, 1)[1]
entry = _RE_COMMENT.sub("", entry).strip()
header = utils.crop(entry, width=width)
ptr = caller.ndb.batch_stackptr + 1
stacklen = len(caller.ndb.batch_stack)
header = "|w%02i/%02i|G: %s|n" % (ptr, stacklen, header)
# add extra space to the side for padding.
header = "%s%s" % (header, " " * (width - len(header)))
header = header.replace("\n", "\\n")
return header
def format_code(entry):
"""
Formats the viewing of code and errors
"""
code = ""
for line in entry.split("\n"):
code += "\n|G>>>|n %s" % line
return code.strip()
def batch_cmd_exec(caller):
"""
Helper function for executing a single batch-command entry
"""
ptr = caller.ndb.batch_stackptr
stack = caller.ndb.batch_stack
command = stack[ptr]
caller.msg(format_header(caller, command))
try:
caller.execute_cmd(command)
except Exception:
logger.log_trace()
return False
return True
def batch_code_exec(caller):
"""
Helper function for executing a single batch-code entry
"""
ptr = caller.ndb.batch_stackptr
stack = caller.ndb.batch_stack
debug = caller.ndb.batch_debug
code = stack[ptr]
caller.msg(format_header(caller, code))
err = BATCHCODE.code_exec(code, extra_environ={"caller": caller}, debug=debug)
if err:
caller.msg(format_code(err))
return False
return True
def step_pointer(caller, step=1):
"""
Step in stack, returning the item located.
stackptr - current position in stack
stack - the stack of units
step - how many steps to move from stackptr
"""
ptr = caller.ndb.batch_stackptr
stack = caller.ndb.batch_stack
nstack = len(stack)
if ptr + step <= 0:
caller.msg("|RBeginning of batch file.")
if ptr + step >= nstack:
caller.msg("|REnd of batch file.")
caller.ndb.batch_stackptr = max(0, min(nstack - 1, ptr + step))
def show_curr(caller, showall=False):
"""
Show the current position in stack
"""
stackptr = caller.ndb.batch_stackptr
stack = caller.ndb.batch_stack
if stackptr >= len(stack):
caller.ndb.batch_stackptr = len(stack) - 1
show_curr(caller, showall)
return
entry = stack[stackptr]
string = format_header(caller, entry)
codeall = entry.strip()
string += "|G(hh for help)"
if showall:
for line in codeall.split("\n"):
string += "\n|G||n %s" % line
caller.msg(string)
def purge_processor(caller):
"""
This purges all effects running
on the caller.
"""
try:
del caller.ndb.batch_stack
del caller.ndb.batch_stackptr
del caller.ndb.batch_pythonpath
del caller.ndb.batch_batchmode
except Exception:
# something might have already been erased; it's not critical
pass
# clear everything back to the state before the batch call
if caller.ndb.batch_cmdset_backup:
caller.cmdset.cmdset_stack = caller.ndb.batch_cmdset_backup
caller.cmdset.update()
del caller.ndb.batch_cmdset_backup
else:
# something went wrong. Purge cmdset except default
caller.cmdset.clear()
# caller.scripts.validate() # this will purge interactive mode
# -------------------------------------------------------------
# main access commands
# -------------------------------------------------------------
[docs]class CmdBatchCommands(_COMMAND_DEFAULT_CLASS):
"""
build from batch-command file
Usage:
batchcommands[/interactive] <python.path.to.file>
Switch:
interactive - this mode will offer more control when
executing the batch file, like stepping,
skipping, reloading etc.
Runs batches of commands from a batch-cmd text file (*.ev).
"""
key = "batchcommands"
aliases = ["batchcommand", "batchcmd"]
switch_options = ("interactive",)
locks = "cmd:perm(batchcommands) or perm(Developer)"
help_category = "Building"
[docs] def func(self):
"""Starts the processor."""
caller = self.caller
args = self.args
if not args:
caller.msg("Usage: batchcommands[/interactive] <path.to.file>")
return
python_path = self.args
# parse indata file
try:
commands = BATCHCMD.parse_file(python_path)
except UnicodeDecodeError as err:
caller.msg(_UTF8_ERROR % (python_path, err))
return
except IOError as err:
if err:
err = "{}\n".format(str(err))
else:
err = ""
string = (
"%s'%s' could not load. You have to supply python paths "
"from one of the defined batch-file directories\n (%s)."
)
caller.msg(string % (err, python_path, ", ".join(settings.BASE_BATCHPROCESS_PATHS)))
return
if not commands:
caller.msg(f"File {python_path} seems empty of valid commands.")
return
switches = self.switches
# Store work data in cache
caller.ndb.batch_stack = commands
caller.ndb.batch_stackptr = 0
caller.ndb.batch_pythonpath = python_path
caller.ndb.batch_batchmode = "batch_commands"
# we use list() here to create a new copy of the cmdset stack
caller.ndb.batch_cmdset_backup = list(caller.cmdset.cmdset_stack)
caller.cmdset.add(BatchSafeCmdSet)
if "inter" in switches or "interactive" in switches:
# Allow more control over how batch file is executed
# Set interactive state directly
caller.cmdset.add(BatchInteractiveCmdSet)
caller.msg(f"\nBatch-command processor - Interactive mode for {python_path} ...")
show_curr(caller)
else:
caller.msg(
"Running Batch-command processor - Automatic mode "
f"for {python_path} (this might take some time) ..."
)
# run in-process (might block)
for _ in range(len(commands)):
# loop through the batch file
if not batch_cmd_exec(caller):
return
step_pointer(caller, 1)
# clean out the safety cmdset and clean out all other
# temporary attrs.
caller.msg(f"|G Batchfile '{python_path}' applied.")
purge_processor(caller)
[docs]class CmdBatchCode(_COMMAND_DEFAULT_CLASS):
"""
build from batch-code file
Usage:
batchcode[/interactive] <python path to file>
Switch:
interactive - this mode will offer more control when
executing the batch file, like stepping,
skipping, reloading etc.
debug - auto-delete all objects that has been marked as
deletable in the script file (see example files for
syntax). This is useful so as to to not leave multiple
object copies behind when testing out the script.
Runs batches of commands from a batch-code text file (*.py).
"""
key = "batchcode"
aliases = ["batchcodes"]
switch_options = ("interactive", "debug")
locks = "cmd:superuser()"
help_category = "Building"
[docs] def func(self):
"""Starts the processor."""
caller = self.caller
args = self.args
if not args:
caller.msg("Usage: batchcode[/interactive/debug] <path.to.file>")
return
python_path = self.args
debug = "debug" in self.switches
# parse indata file
try:
codes = BATCHCODE.parse_file(python_path)
except UnicodeDecodeError as err:
caller.msg(_UTF8_ERROR % (python_path, err))
return
except IOError as err:
if err:
err = f"{err}\n"
else:
err = ""
string = (
"%s'%s' could not load. You have to supply python paths "
"from one of the defined batch-file directories\n (%s)."
)
caller.msg(string % (err, python_path, ", ".join(settings.BASE_BATCHPROCESS_PATHS)))
return
if not codes:
caller.msg(f"File {python_path} seems empty of functional code.")
return
switches = self.switches
# Store work data in cache
caller.ndb.batch_stack = codes
caller.ndb.batch_stackptr = 0
caller.ndb.batch_pythonpath = python_path
caller.ndb.batch_batchmode = "batch_code"
caller.ndb.batch_debug = debug
# we use list() here to create a new copy of cmdset_stack
caller.ndb.batch_cmdset_backup = list(caller.cmdset.cmdset_stack)
caller.cmdset.add(BatchSafeCmdSet)
if "inter" in switches or "interactive" in switches:
# Allow more control over how batch file is executed
# Set interactive state directly
caller.cmdset.add(BatchInteractiveCmdSet)
caller.msg(f"\nBatch-code processor - Interactive mode for {python_path} ...")
show_curr(caller)
else:
caller.msg(f"Running Batch-code processor - Automatic mode for {python_path} ...")
for _ in range(len(codes)):
# loop through the batch file
if not batch_code_exec(caller):
return
step_pointer(caller, 1)
# clean out the safety cmdset and clean out all other
# temporary attrs.
caller.msg(f"|G Batchfile '{python_path}' applied.")
purge_processor(caller)
# -------------------------------------------------------------
# State-commands for the interactive batch processor modes
# (these are the same for both processors)
# -------------------------------------------------------------
class CmdStateAbort(_COMMAND_DEFAULT_CLASS):
"""
abort
This is a safety feature. It force-ejects us out of the processor and to
the default cmdset, regardless of what current cmdset the processor might
have put us in (e.g. when testing buggy scripts etc).
"""
key = "abort"
help_category = "BatchProcess"
locks = "cmd:perm(batchcommands)"
def func(self):
"""Exit back to default."""
purge_processor(self.caller)
self.msg("Exited processor and reset out active cmdset back to the default one.")
class CmdStateLL(_COMMAND_DEFAULT_CLASS):
"""
ll
Look at the full source for the current
command definition.
"""
key = "ll"
help_category = "BatchProcess"
locks = "cmd:perm(batchcommands)"
def func(self):
show_curr(self.caller, showall=True)
class CmdStatePP(_COMMAND_DEFAULT_CLASS):
"""
pp
Process the currently shown command definition.
"""
key = "pp"
help_category = "BatchProcess"
locks = "cmd:perm(batchcommands)"
def func(self):
"""
This checks which type of processor we are running.
"""
caller = self.caller
if caller.ndb.batch_batchmode == "batch_code":
batch_code_exec(caller)
else:
batch_cmd_exec(caller)
class CmdStateRR(_COMMAND_DEFAULT_CLASS):
"""
rr
Reload the batch file, keeping the current
position in it.
"""
key = "rr"
help_category = "BatchProcess"
locks = "cmd:perm(batchcommands)"
def func(self):
caller = self.caller
if caller.ndb.batch_batchmode == "batch_code":
new_data = BATCHCODE.parse_file(caller.ndb.batch_pythonpath)
else:
new_data = BATCHCMD.parse_file(caller.ndb.batch_pythonpath)
caller.ndb.batch_stack = new_data
caller.msg(format_code("File reloaded. Staying on same command."))
show_curr(caller)
class CmdStateRRR(_COMMAND_DEFAULT_CLASS):
"""
rrr
Reload the batch file, starting over
from the beginning.
"""
key = "rrr"
help_category = "BatchProcess"
locks = "cmd:perm(batchcommands)"
def func(self):
caller = self.caller
if caller.ndb.batch_batchmode == "batch_code":
BATCHCODE.parse_file(caller.ndb.batch_pythonpath)
else:
BATCHCMD.parse_file(caller.ndb.batch_pythonpath)
caller.ndb.batch_stackptr = 0
caller.msg(format_code("File reloaded. Restarting from top."))
show_curr(caller)
class CmdStateNN(_COMMAND_DEFAULT_CLASS):
"""
nn
Go to next command. No commands are executed.
"""
key = "nn"
help_category = "BatchProcess"
locks = "cmd:perm(batchcommands)"
def func(self):
caller = self.caller
arg = self.args
if arg and arg.isdigit():
step = int(self.args)
else:
step = 1
step_pointer(caller, step)
show_curr(caller)
class CmdStateNL(_COMMAND_DEFAULT_CLASS):
"""
nl
Go to next command, viewing its full source.
No commands are executed.
"""
key = "nl"
help_category = "BatchProcess"
locks = "cmd:perm(batchcommands)"
def func(self):
caller = self.caller
arg = self.args
if arg and arg.isdigit():
step = int(self.args)
else:
step = 1
step_pointer(caller, step)
show_curr(caller, showall=True)
class CmdStateBB(_COMMAND_DEFAULT_CLASS):
"""
bb
Backwards to previous command. No commands
are executed.
"""
key = "bb"
help_category = "BatchProcess"
locks = "cmd:perm(batchcommands)"
def func(self):
caller = self.caller
arg = self.args
if arg and arg.isdigit():
step = -int(self.args)
else:
step = -1
step_pointer(caller, step)
show_curr(caller)
class CmdStateBL(_COMMAND_DEFAULT_CLASS):
"""
bl
Backwards to previous command, viewing its full
source. No commands are executed.
"""
key = "bl"
help_category = "BatchProcess"
locks = "cmd:perm(batchcommands)"
def func(self):
caller = self.caller
arg = self.args
if arg and arg.isdigit():
step = -int(self.args)
else:
step = -1
step_pointer(caller, step)
show_curr(caller, showall=True)
class CmdStateSS(_COMMAND_DEFAULT_CLASS):
"""
ss [steps]
Process current command, then step to the next
one. If steps is given,
process this many commands.
"""
key = "ss"
help_category = "BatchProcess"
locks = "cmd:perm(batchcommands)"
def func(self):
caller = self.caller
arg = self.args
if arg and arg.isdigit():
step = int(self.args)
else:
step = 1
for _ in range(step):
if caller.ndb.batch_batchmode == "batch_code":
batch_code_exec(caller)
else:
batch_cmd_exec(caller)
step_pointer(caller, 1)
show_curr(caller)
class CmdStateSL(_COMMAND_DEFAULT_CLASS):
"""
sl [steps]
Process current command, then step to the next
one, viewing its full source. If steps is given,
process this many commands.
"""
key = "sl"
help_category = "BatchProcess"
locks = "cmd:perm(batchcommands)"
def func(self):
caller = self.caller
arg = self.args
if arg and arg.isdigit():
step = int(self.args)
else:
step = 1
for _ in range(step):
if caller.ndb.batch_batchmode == "batch_code":
batch_code_exec(caller)
else:
batch_cmd_exec(caller)
step_pointer(caller, 1)
show_curr(caller)
class CmdStateCC(_COMMAND_DEFAULT_CLASS):
"""
cc
Continue to process all remaining
commands.
"""
key = "cc"
help_category = "BatchProcess"
locks = "cmd:perm(batchcommands)"
def func(self):
caller = self.caller
nstack = len(caller.ndb.batch_stack)
ptr = caller.ndb.batch_stackptr
step = nstack - ptr
for _ in range(step):
if caller.ndb.batch_batchmode == "batch_code":
batch_code_exec(caller)
else:
batch_cmd_exec(caller)
step_pointer(caller, 1)
show_curr(caller)
purge_processor(self)
caller.msg(format_code("Finished processing batch file."))
class CmdStateJJ(_COMMAND_DEFAULT_CLASS):
"""
jj <command number>
Jump to specific command number
"""
key = "jj"
help_category = "BatchProcess"
locks = "cmd:perm(batchcommands)"
def func(self):
caller = self.caller
arg = self.args
if arg and arg.isdigit():
number = int(self.args) - 1
else:
caller.msg(format_code("You must give a number index."))
return
ptr = caller.ndb.batch_stackptr
step = number - ptr
step_pointer(caller, step)
show_curr(caller)
class CmdStateJL(_COMMAND_DEFAULT_CLASS):
"""
jl <command number>
Jump to specific command number and view its full source.
"""
key = "jl"
help_category = "BatchProcess"
locks = "cmd:perm(batchcommands)"
def func(self):
caller = self.caller
arg = self.args
if arg and arg.isdigit():
number = int(self.args) - 1
else:
caller.msg(format_code("You must give a number index."))
return
ptr = caller.ndb.batch_stackptr
step = number - ptr
step_pointer(caller, step)
show_curr(caller, showall=True)
class CmdStateQQ(_COMMAND_DEFAULT_CLASS):
"""
qq
Quit the batchprocessor.
"""
key = "qq"
help_category = "BatchProcess"
locks = "cmd:perm(batchcommands)"
def func(self):
purge_processor(self.caller)
self.msg("Aborted interactive batch mode.")
class CmdStateHH(_COMMAND_DEFAULT_CLASS):
"""Help command"""
key = "hh"
help_category = "BatchProcess"
locks = "cmd:perm(batchcommands)"
def func(self):
string = """
Interactive batch processing commands:
nn [steps] - next command (no processing)
nl [steps] - next & look
bb [steps] - back to previous command (no processing)
bl [steps] - back & look
jj <N> - jump to command nr N (no processing)
jl <N> - jump & look
pp - process currently shown command (no step)
ss [steps] - process & step
sl [steps] - process & step & look
ll - look at full definition of current command
rr - reload batch file (stay on current)
rrr - reload batch file (start from first)
hh - this help list
cc - continue processing to end, then quit.
qq - quit (abort all remaining commands)
abort - this is a safety command that always is available
regardless of what cmdsets gets added to us during
batch-command processing. It immediately shuts down
the processor and returns us to the default cmdset.
"""
self.msg(string)
# -------------------------------------------------------------
#
# Defining the cmdsets for the interactive batchprocessor
# mode (same for both processors)
#
# -------------------------------------------------------------
class BatchSafeCmdSet(CmdSet):
"""
The base cmdset for the batch processor.
This sets a 'safe' abort command that will
always be available to get out of everything.
"""
key = "Batch_default"
priority = 150 # override other cmdsets.
def at_cmdset_creation(self):
"""Init the cmdset"""
self.add(CmdStateAbort())
class BatchInteractiveCmdSet(CmdSet):
"""
The cmdset for the interactive batch processor mode.
"""
key = "Batch_interactive"
priority = 104
def at_cmdset_creation(self):
"""init the cmdset"""
self.add(CmdStateAbort())
self.add(CmdStateLL())
self.add(CmdStatePP())
self.add(CmdStateRR())
self.add(CmdStateRRR())
self.add(CmdStateNN())
self.add(CmdStateNL())
self.add(CmdStateBB())
self.add(CmdStateBL())
self.add(CmdStateSS())
self.add(CmdStateSL())
self.add(CmdStateCC())
self.add(CmdStateJJ())
self.add(CmdStateJL())
self.add(CmdStateQQ())
self.add(CmdStateHH())