Source code for evennia.typeclasses.attributes

"""
Attributes are arbitrary data stored on objects. Attributes supports
both pure-string values and pickled arbitrary data.

Attributes are also used to implement Nicks. This module also contains
the Attribute- and NickHandlers as well as the `NAttributeHandler`,
which is a non-db version of Attributes.


"""

import fnmatch
import re
from collections import defaultdict
from copy import copy

from django.conf import settings
from django.db import models
from django.utils.encoding import smart_str
from evennia.locks.lockhandler import LockHandler
from evennia.utils.dbserialize import from_pickle, to_pickle
from evennia.utils.idmapper.models import SharedMemoryModel
from evennia.utils.picklefield import PickledObjectField
from evennia.utils.utils import is_iter, lazy_property, make_iter, to_str

_TYPECLASS_AGGRESSIVE_CACHE = settings.TYPECLASS_AGGRESSIVE_CACHE

# -------------------------------------------------------------
#
#   Attributes
#
# -------------------------------------------------------------


[docs]class IAttribute: """ Attributes are things that are specific to different types of objects. For example, a drink container needs to store its fill level, whereas an exit needs to store its open/closed/locked/unlocked state. These are done via attributes, rather than making different classes for each object type and storing them directly. The added benefit is that we can add/remove attributes on the fly as we like. The Attribute class defines the following properties: - key (str): Primary identifier. - lock_storage (str): Perm strings. - model (str): A string defining the model this is connected to. This is a natural_key, like "objects.objectdb" - date_created (datetime): When the attribute was created. - value (any): The data stored in the attribute, in pickled form using wrappers to be able to store/retrieve models. - strvalue (str): String-only data. This data is not pickled and is thus faster to search for in the database. - category (str): Optional character string for grouping the Attribute. This class is an API/Interface/Abstract base class; do not instantiate it directly. """
[docs] @lazy_property def locks(self): return LockHandler(self)
key = property(lambda self: self.db_key) strvalue = property(lambda self: getattr(self, 'db_strvalue', None)) category = property(lambda self: self.db_category) model = property(lambda self: self.db_model) attrtype = property(lambda self: self.db_attrtype) date_created = property(lambda self: self.db_date_created) def __lock_storage_get(self): return self.db_lock_storage def __lock_storage_set(self, value): self.db_lock_storage = value def __lock_storage_del(self): self.db_lock_storage = "" lock_storage = property(__lock_storage_get, __lock_storage_set, __lock_storage_del)
[docs] def access(self, accessing_obj, access_type="read", default=False, **kwargs): """ Determines if another object has permission to access. Args: accessing_obj (object): Entity trying to access this one. access_type (str, optional): Type of access sought, see the lock documentation. default (bool, optional): What result to return if no lock of access_type was found. The default, `False`, means a lockdown policy, only allowing explicit access. kwargs (any, optional): Not used; here to make the API consistent with other access calls. Returns: result (bool): If the lock was passed or not. """ result = self.locks.check(accessing_obj, access_type=access_type, default=default) return result
# # # Attribute methods # # def __str__(self): return smart_str("%s(%s)" % (self.db_key, self.id)) def __repr__(self): return "%s(%s)" % (self.db_key, self.id)
[docs]class InMemoryAttribute(IAttribute): """ This Attribute is used purely for NAttributes/NAttributeHandler. It has no database backend. """ # Primary Key has no meaning for an InMemoryAttribute. This merely serves to satisfy other code.
[docs] def __init__(self, pk, **kwargs): """ Create an Attribute that exists only in Memory. Args: pk (int): This is a fake 'primary key' / id-field. It doesn't actually have to be unique, but is fed an incrementing number from the InMemoryBackend by default. This is needed only so Attributes can be sorted. Some parts of the API also see the lack of a .pk field as a sign that the Attribute was deleted. **kwargs: Other keyword arguments are used to construct the actual Attribute. """ self.id = pk self.pk = pk # Copy all kwargs to local properties. We use db_ for compatability here. for key, value in kwargs.items(): # Value and locks are special. We must call the wrappers. if key == "value": self.value = value elif key == "strvalue": self.db_strvalue = value elif key == "lock_storage": self.lock_storage = value else: setattr(self, f"db_{key}", value)
# value property (wraps db_value) def __value_get(self): return self.db_value def __value_set(self, new_value): self.db_value = new_value def __value_del(self): pass value = property(__value_get, __value_set, __value_del)
[docs]class AttributeProperty: """ AttributeProperty. """ attrhandler_name = "attributes" cached_default_name_template = "_property_attribute_default_{key}"
[docs] def __init__(self, default=None, category=None, strattr=False, lockstring="", autocreate=True): """ Allows for specifying Attributes as Django-like 'fields' on the class level. Note that while one can set a lock on the Attribute, there is no way to *check* said lock when accessing via the property - use the full `AttributeHandler` if you need to do access checks. Note however that if you use the full `AttributeHandler` to access this Attribute, the `at_get/at_set` methods on this class will _not_ fire (because you are bypassing the `AttributeProperty` entirely in that case). Initialize an Attribute as a property descriptor. Keyword Args: default (any): A default value if the attr is not set. If a callable, this will be run without any arguments and is expected to return the default value. category (str): The attribute's category. If unset, use class default. strattr (bool): If set, this Attribute *must* be a simple string, and will be stored more efficiently. lockstring (str): This is not itself useful with the property, but only if using the full AttributeHandler.get(accessing_obj=...) to access the Attribute. autocreate (bool): True by default; this means Evennia makes sure to create a new copy of the Attribute (with the default value) whenever a new object with this property is created. If `False`, no Attribute will be created until the property is explicitly assigned a value. This makes it more efficient while it retains its default (there's no db access), but without an actual Attribute generated, one cannot access it via .db, the AttributeHandler or see it with `examine`. Example: :: class Character(DefaultCharacter): foo = AttributeProperty(default="Bar") """ self._default = default self._category = category self._strattr = strattr self._lockstring = lockstring self._autocreate = autocreate self._key = ""
def __set_name__(self, cls, name): """ Called when descriptor is first assigned to the class. It is called with the name of the field. """ self._key = name def _get_and_cache_default(self, instance): """ Get and cache the default value for this attribute. We make sure to convert any mutables into _Saver* equivalent classes here and cache the result on the instance's AttributeHandler. """ attrhandler = getattr(instance, self.attrhandler_name) value = getattr(attrhandler, self.cached_default_name_template.format(key=self._key), None) if not value: if callable(self._default): value = self._default() else: value = copy(self._default) value = from_pickle(value, db_obj=instance) setattr(attrhandler, self.cached_default_name_template.format(key=self._key), value) return value def __get__(self, instance, owner): """ Called when the attrkey is retrieved from the instance. """ value = self._get_and_cache_default(instance) try: value = self.at_get( getattr(instance, self.attrhandler_name).get( key=self._key, default=value, category=self._category, strattr=self._strattr, raise_exception=self._autocreate, ), instance, ) except AttributeError: if self._autocreate: # attribute didn't exist and autocreate is set self.__set__(instance, value) else: raise return value def __set__(self, instance, value): """ Called when assigning to the property (and when auto-creating an Attribute). """ ( getattr(instance, self.attrhandler_name).add( self._key, self.at_set(value, instance), category=self._category, lockstring=self._lockstring, strattr=self._strattr, ) ) def __delete__(self, instance): """ Called when running `del` on the property. Will remove/clear the Attribute. Note that the Attribute will be recreated next retrieval unless the AttributeProperty is also removed in code! """ getattr(instance, self.attrhandler_name).remove(key=self._key, category=self._category)
[docs] def at_set(self, value, obj): """ The value to set is passed through the method. It can be used to customize/validate the input in a custom child class. Args: value (any): The value about to the stored in this Attribute. obj (object): Object the attribute is attached to Returns: any: The value to store. Raises: AttributeError: If the value is invalid to store. Notes: This is will only fire if you actually set the Attribute via this `AttributeProperty`. That is, if you instead set it via the `AttributeHandler` (or via `.db`), you are bypassing this `AttributeProperty` entirely and this method is never reached. """ return value
[docs] def at_get(self, value, obj): """ The value returned from the Attribute is passed through this method. It can be used to react to the retrieval or modify the result in some way. Args: value (any): Value returned from the Attribute. obj (object): Object the attribute is attached to Returns: any: The value to return to the caller. Notes: This is will only fire if you actually get the Attribute via this `AttributeProperty`. That is, if you instead get it via the `AttributeHandler` (or via `.db`), you are bypassing this `AttributeProperty` entirely and this method is never reached. """ return value
[docs]class NAttributeProperty(AttributeProperty): """ NAttribute property descriptor. Allows for specifying NAttributes as Django-like 'fields' on the class level. Example: :: class Character(DefaultCharacter): foo = NAttributeProperty(default="Bar") """ attrhandler_name = "nattributes"
[docs]class Attribute(IAttribute, SharedMemoryModel): """ This attribute is stored via Django. Most Attributes will be using this class. """ # # Attribute Database Model setup # # These database fields are all set using their corresponding properties, # named same as the field, but without the db_* prefix. db_key = models.CharField("key", max_length=255, db_index=True) db_value = PickledObjectField( "value", null=True, help_text=( "The data returned when the attribute is accessed. Must be " "written as a Python literal if editing through the admin " "interface. Attribute values which are not Python literals " "cannot be edited through the admin interface." ), ) db_strvalue = models.TextField( "strvalue", null=True, blank=True, help_text="String-specific storage for quick look-up" ) db_category = models.CharField( "category", max_length=128, db_index=True, blank=True, null=True, help_text="Optional categorization of attribute.", ) # Lock storage db_lock_storage = models.TextField( "locks", blank=True, help_text="Lockstrings for this object are stored here." ) db_model = models.CharField( "model", max_length=32, db_index=True, blank=True, null=True, help_text=( "Which model of object this attribute is attached to (A " "natural key like 'objects.objectdb'). You should not change " "this value unless you know what you are doing." ), ) # subclass of Attribute (None or nick) db_attrtype = models.CharField( "attrtype", max_length=16, db_index=True, blank=True, null=True, help_text="Subclass of Attribute (None or nick)", ) # time stamp db_date_created = models.DateTimeField("date_created", editable=False, auto_now_add=True) # Database manager # objects = managers.AttributeManager() class Meta: "Define Django meta options" verbose_name = "Attribute" # Wrapper properties to easily set database fields. These are # @property decorators that allows to access these fields using # normal python operations (without having to remember to save() # etc). So e.g. a property 'attr' has a get/set/del decorator # defined that allows the user to do self.attr = value, # value = self.attr and del self.attr respectively (where self # is the object in question). # lock_storage wrapper. Overloaded for saving to database. def __lock_storage_get(self): return self.db_lock_storage def __lock_storage_set(self, value): self.db_lock_storage = value self.save(update_fields=["db_lock_storage"]) def __lock_storage_del(self): self.db_lock_storage = "" self.save(update_fields=["db_lock_storage"]) lock_storage = property(__lock_storage_get, __lock_storage_set, __lock_storage_del) # value property (wraps db_value) @property def value(self): """ Getter. Allows for `value = self.value`. We cannot cache here since it makes certain cases (such as storing a dbobj which is then deleted elsewhere) out-of-sync. The overhead of unpickling seems hard to avoid. """ return from_pickle(self.db_value, db_obj=self) @value.setter def value(self, new_value): """ Setter. Allows for self.value = value. We cannot cache here, see self.__value_get. """ self.db_value = to_pickle(new_value) self.save(update_fields=["db_value"]) @value.deleter def value(self): """Deleter. Allows for del attr.value. This removes the entire attribute.""" self.delete()
# # Handlers making use of the Attribute model #
[docs]class IAttributeBackend: """ Abstract interface for the backends used by the Attribute Handler. All Backends must implement this base class. """ _attrcreate = "attrcreate" _attredit = "attredit" _attrread = "attrread" _attrclass = None
[docs] def __init__(self, handler, attrtype): self.handler = handler self.obj = handler.obj self._attrtype = attrtype self._objid = handler.obj.id self._cache = {} # store category names fully cached self._catcache = {} # full cache was run on all attributes self._cache_complete = False
[docs] def query_all(self): """ Fetch all Attributes from this object. Returns: attrlist (list): A list of Attribute objects. """ raise NotImplementedError()
[docs] def query_key(self, key, category): """ Args: key (str): The key of the Attribute being searched for. category (str or None): The category of the desired Attribute. Returns: attribute (IAttribute): A single Attribute. """ raise NotImplementedError()
[docs] def query_category(self, category): """ Returns every matching Attribute as a list, given a category. This method calls up whatever storage the backend uses. Args: category (str or None): The category to query. Returns: attrs (list): The discovered Attributes. """ raise NotImplementedError()
def _full_cache(self): """Cache all attributes of this object""" if not _TYPECLASS_AGGRESSIVE_CACHE: return attrs = self.query_all() self._cache = { f"{to_str(attr.key).lower()}-{attr.category.lower() if attr.category else None}": attr for attr in attrs } self._cache_complete = True def _get_cache_key(self, key, category): """ Fetch cache key. Args: key (str): The key of the Attribute being searched for. category (str or None): The category of the desired Attribute. Returns: attribute (IAttribute): A single Attribute. """ cachekey = "%s-%s" % (key, category) cachefound = False try: attr = _TYPECLASS_AGGRESSIVE_CACHE and self._cache[cachekey] cachefound = True except KeyError: attr = None if attr and (not hasattr(attr, "pk") and attr.pk is None): # clear out Attributes deleted from elsewhere. We must search this anew. attr = None cachefound = False del self._cache[cachekey] if cachefound and _TYPECLASS_AGGRESSIVE_CACHE: if attr: return [attr] # return cached entity else: return [] # no such attribute: return an empty list else: conn = self.query_key(key, category) if conn: attr = conn[0].attribute if _TYPECLASS_AGGRESSIVE_CACHE: self._cache[cachekey] = attr return [attr] if attr.pk else [] else: # There is no such attribute. We will explicitly save that # in our cache to avoid firing another query if we try to # retrieve that (non-existent) attribute again. if _TYPECLASS_AGGRESSIVE_CACHE: self._cache[cachekey] = None return [] def _get_cache_category(self, category): """ Retrieves Attribute list (by category) from cache. Args: category (str or None): The category to query. Returns: attrs (list): The discovered Attributes. """ catkey = "-%s" % category if _TYPECLASS_AGGRESSIVE_CACHE and catkey in self._catcache: return [attr for key, attr in self._cache.items() if key.endswith(catkey) and attr] else: # we have to query to make this category up-date in the cache attrs = self.query_category(category) if _TYPECLASS_AGGRESSIVE_CACHE: for attr in attrs: if attr.pk: cachekey = "%s-%s" % (attr.key, category) self._cache[cachekey] = attr # mark category cache as up-to-date self._catcache[catkey] = True return attrs def _get_cache(self, key=None, category=None): """ Retrieve from cache or database (always caches) Args: key (str, optional): Attribute key to query for category (str, optional): Attribiute category Returns: args (list): Returns a list of zero or more matches found from cache or database. Notes: When given a category only, a search for all objects of that cateogory is done and the category *name* is stored. This tells the system on subsequent calls that the list of cached attributes of this category is up-to-date and that the cache can be queried for category matches without missing any. The TYPECLASS_AGGRESSIVE_CACHE=False setting will turn off caching, causing each attribute access to trigger a database lookup. """ key = key.strip().lower() if key else None category = category.strip().lower() if category is not None else None if key: return self._get_cache_key(key, category) return self._get_cache_category(category)
[docs] def get(self, key=None, category=None): """ Frontend for .get_cache. Retrieves Attribute(s). Args: key (str, optional): Attribute key to query for category (str, optional): Attribiute category Returns: args (list): Returns a list of zero or more matches found from cache or database. """ return self._get_cache(key, category)
def _set_cache(self, key, category, attr_obj): """ Update cache. Args: key (str): A cleaned key string category (str or None): A cleaned category name attr_obj (IAttribute): The newly saved attribute """ if not _TYPECLASS_AGGRESSIVE_CACHE: return if not key: # don't allow an empty key in cache return cachekey = "%s-%s" % (key, category) catkey = "-%s" % category self._cache[cachekey] = attr_obj # mark that the category cache is no longer up-to-date self._catcache.pop(catkey, None) self._cache_complete = False def _delete_cache(self, key, category): """ Remove attribute from cache Args: key (str): A cleaned key string category (str or None): A cleaned category name """ catkey = "-%s" % category if key: cachekey = "%s-%s" % (key, category) self._cache.pop(cachekey, None) else: self._cache = { key: attrobj for key, attrobj in list(self._cache.items()) if not key.endswith(catkey) } # mark that the category cache is no longer up-to-date self._catcache.pop(catkey, None) self._cache_complete = False
[docs] def reset_cache(self): """ Reset cache from the outside. """ self._cache_complete = False self._cache = {} self._catcache = {}
[docs] def do_create_attribute(self, key, category, lockstring, value, strvalue): """ Does the hard work of actually creating Attributes, whatever is needed. Args: key (str): The Attribute's key. category (str or None): The Attribute's category, or None lockstring (str): Any locks for the Attribute. value (obj): The Value of the Attribute. strvalue (bool): Signifies if this is a strvalue Attribute. Value MUST be a string or this will lead to Trouble. Ignored for InMemory attributes. Returns: attr (IAttribute): The new Attribute. """ raise NotImplementedError()
[docs] def create_attribute(self, key, category, lockstring, value, strvalue=False, cache=True): """ Creates Attribute (using the class specified for the backend), (optionally) caches it, and returns it. This MUST actively save the Attribute to whatever database backend is used, AND call self.set_cache(key, category, new_attrobj) Args: key (str): The Attribute's key. category (str or None): The Attribute's category, or None lockstring (str): Any locks for the Attribute. value (obj): The Value of the Attribute. strvalue (bool): Signifies if this is a strvalue Attribute. Value MUST be a string or this will lead to Trouble. Ignored for InMemory attributes. cache (bool): Whether to cache the new Attribute Returns: attr (IAttribute): The new Attribute. """ attr = self.do_create_attribute(key, category, lockstring, value, strvalue) if cache: self._set_cache(key, category, attr) return attr
[docs] def do_update_attribute(self, attr, value, strvalue): """ Simply sets a new Value to an Attribute. Args: attr (IAttribute): The Attribute being changed. value (obj): The Value for the Attribute. strvalue (bool): If True, `value` is expected to be a string. """ raise NotImplementedError()
[docs] def do_batch_update_attribute(self, attr_obj, category, lock_storage, new_value, strvalue): """ Called opnly by batch add. For the database backend, this is a method of updating that can alter category and lock-storage. Args: attr_obj (IAttribute): The Attribute being altered. category (str or None): The attribute's (new) category. lock_storage (str): The attribute's new locks. new_value (obj): The Attribute's new value. strvalue (bool): Signifies if this is a strvalue Attribute. Value MUST be a string or this will lead to Trouble. Ignored for InMemory attributes. """ raise NotImplementedError()
[docs] def do_batch_finish(self, attr_objs): """ Called after batch_add completed. Used for handling database operations and/or caching complications. Args: attr_objs (list of IAttribute): The Attributes created/updated thus far. """ raise NotImplementedError()
[docs] def batch_add(self, *args, **kwargs): """ Batch-version of `.add()`. This is more efficient than repeat-calling `.add` when having many Attributes to add. Args: *args (tuple): Tuples of varying length representing the Attribute to add to this object. Supported tuples are - (key, value) - (key, value, category) - (key, value, category, lockstring) - (key, value, category, lockstring, default_access) Raises: RuntimeError: If trying to pass a non-iterable as argument. Notes: The indata tuple order matters, so if you want a lockstring but no category, set the category to `None`. This method does not have the ability to check editing permissions and is mainly used internally. It does not use the normal `self.add` but applies the Attributes directly to the database. """ new_attrobjs = [] strattr = kwargs.get("strattr", False) for tup in args: if not is_iter(tup) or len(tup) < 2: raise RuntimeError("batch_add requires iterables as arguments (got %r)." % tup) ntup = len(tup) keystr = str(tup[0]).strip().lower() new_value = tup[1] category = str(tup[2]).strip().lower() if ntup > 2 and tup[2] is not None else None lockstring = tup[3] if ntup > 3 else "" attr_objs = self._get_cache(keystr, category) if attr_objs: attr_obj = attr_objs[0] # update an existing attribute object self.do_batch_update_attribute(attr_obj, category, lockstring, new_value, strattr) else: new_attr = self.do_create_attribute( keystr, category, lockstring, new_value, strvalue=strattr ) new_attrobjs.append(new_attr) if new_attrobjs: self.do_batch_finish(new_attrobjs)
[docs] def do_delete_attribute(self, attr): """ Does the hard work of actually deleting things. Args: attr (IAttribute): The attribute to delete. """ raise NotImplementedError()
[docs] def delete_attribute(self, attr): """ Given an Attribute, deletes it. Also remove it from cache. Args: attr (IAttribute): The attribute to delete. """ if not attr: return self._delete_cache(attr.key, attr.category) self.do_delete_attribute(attr)
[docs] def update_attribute(self, attr, value, strattr=False): """ Simply updates an Attribute. Args: attr (IAttribute): The attribute to delete. value (obj): The new value. strattr (bool): If set, the `value` is a raw string. """ self.do_update_attribute(attr, value, strattr)
[docs] def do_batch_delete(self, attribute_list): """ Given a list of attributes, deletes them all. The default implementation is fine, but this is overridable since some databases may allow for a better method. Args: attribute_list (list of IAttribute): """ for attribute in attribute_list: self.delete_attribute(attribute)
[docs] def clear_attributes(self, category, accessing_obj, default_access): """ Remove all Attributes on this object. Args: category (str, optional): If given, clear only Attributes of this category. accessing_obj (object, optional): If given, check the `attredit` lock on each Attribute before continuing. default_access (bool, optional): Use this permission as fallback if `access_obj` is given but there is no lock of type `attredit` on the Attribute in question. """ category = category.strip().lower() if category is not None else None if not self._cache_complete: self._full_cache() if category is not None: attrs = [attr for attr in self._cache.values() if attr.category == category] else: attrs = self._cache.values() if accessing_obj: self.do_batch_delete( [ attr for attr in attrs if attr.access(accessing_obj, self._attredit, default=default_access) ] ) else: # have to cast the results to a list or we'll get a RuntimeError for removing from the # dict we're iterating self.do_batch_delete(list(attrs)) self.reset_cache()
[docs] def get_all_attributes(self): """ Simply returns all Attributes of this object, sorted by their IDs. Returns: attributes (list of IAttribute) """ if _TYPECLASS_AGGRESSIVE_CACHE: if not self._cache_complete: self._full_cache() return sorted([attr for attr in self._cache.values() if attr], key=lambda o: o.id) else: return sorted([attr for attr in self.query_all() if attr], key=lambda o: o.id)
[docs]class InMemoryAttributeBackend(IAttributeBackend): """ This Backend for Attributes stores NOTHING in the database. Everything is kept in memory, and normally lost on a crash, reload, shared memory flush, etc. It generates IDs for the Attributes it manages, but these are of little importance beyond sorting and satisfying the caching logic to know an Attribute hasn't been deleted out from under the cache's nose. """ _attrclass = InMemoryAttribute
[docs] def __init__(self, handler, attrtype): super().__init__(handler, attrtype) self._storage = dict() self._category_storage = defaultdict(list) self._id_counter = 0
def _next_id(self): """ Increments the internal ID counter and returns the new value. Returns: next_id (int): A simple integer. """ self._id_counter += 1 return self._id_counter
[docs] def query_all(self): return self._storage.values()
[docs] def query_key(self, key, category): found = self._storage.get((key, category), None) if found: return [found] return []
[docs] def query_category(self, category): if category is None: return self._storage.values() return self._category_storage.get(category, [])
[docs] def do_create_attribute(self, key, category, lockstring, value, strvalue): """ See parent class. strvalue has no meaning for InMemory attributes. """ new_attr = self._attrclass( pk=self._next_id(), key=key, category=category, lock_storage=lockstring, value=value ) self._storage[(key, category)] = new_attr self._category_storage[category].append(new_attr) return new_attr
[docs] def do_update_attribute(self, attr, value, strvalue): attr.value = value
[docs] def do_batch_update_attribute(self, attr_obj, category, lock_storage, new_value, strvalue): """ No need to bother saving anything. Just set some values. """ attr_obj.db_category = category attr_obj.db_lock_storage = lock_storage if lock_storage else "" attr_obj.value = new_value
[docs] def do_batch_finish(self, attr_objs): """ Nothing to do here for In-Memory. Args: attr_objs (list of IAttribute): The Attributes created/updated thus far. """ pass
[docs] def do_delete_attribute(self, attr): """ Removes the Attribute from local storage. Once it's out of the cache, garbage collection will handle the rest. Args: attr (IAttribute): The attribute to delete. """ del self._storage[(attr.key, attr.category)] self._category_storage[attr.category].remove(attr)
[docs]class ModelAttributeBackend(IAttributeBackend): """ Uses Django models for storing Attributes. """ _attrclass = Attribute _m2m_fieldname = "db_attributes"
[docs] def __init__(self, handler, attrtype): super().__init__(handler, attrtype) self._model = to_str(handler.obj.__dbclass__.__name__.lower())
[docs] def query_all(self): query = { "%s__id" % self._model: self._objid, "attribute__db_model__iexact": self._model, "attribute__db_attrtype": self._attrtype, } return [ conn.attribute for conn in getattr(self.obj, self._m2m_fieldname).through.objects.filter(**query) ]
[docs] def query_key(self, key, category): query = { "%s__id" % self._model: self._objid, "attribute__db_model__iexact": self._model, "attribute__db_attrtype": self._attrtype, "attribute__db_key__iexact": key.lower(), "attribute__db_category__iexact": category.lower() if category else None, } if not self.obj.pk: return [] return getattr(self.obj, self._m2m_fieldname).through.objects.filter(**query)
[docs] def query_category(self, category): query = { "%s__id" % self._model: self._objid, "attribute__db_model__iexact": self._model, "attribute__db_attrtype": self._attrtype, "attribute__db_category__iexact": category.lower() if category else None, } return [ conn.attribute for conn in getattr(self.obj, self._m2m_fieldname).through.objects.filter(**query) ]
[docs] def do_create_attribute(self, key, category, lockstring, value, strvalue): kwargs = { "db_key": key, "db_category": category, "db_model": self._model, "db_lock_storage": lockstring if lockstring else "", "db_attrtype": self._attrtype, } if strvalue: kwargs["db_value"] = None kwargs["db_strvalue"] = value else: kwargs["db_value"] = to_pickle(value) kwargs["db_strvalue"] = None new_attr = self._attrclass(**kwargs) new_attr.save() getattr(self.obj, self._m2m_fieldname).add(new_attr) self._set_cache(key, category, new_attr) return new_attr
[docs] def do_update_attribute(self, attr, value, strvalue): if strvalue: attr.value = None attr.db_strvalue = value else: attr.value = value attr.db_strvalue = None attr.save(update_fields=["db_strvalue", "db_value"])
[docs] def do_batch_update_attribute(self, attr_obj, category, lock_storage, new_value, strvalue): attr_obj.db_category = category attr_obj.db_lock_storage = lock_storage if lock_storage else "" if strvalue: # store as a simple string (will not notify OOB handlers) attr_obj.db_strvalue = new_value attr_obj.value = None else: # store normally (this will also notify OOB handlers) attr_obj.value = new_value attr_obj.db_strvalue = None attr_obj.save(update_fields=["db_strvalue", "db_value", "db_category", "db_lock_storage"])
[docs] def do_batch_finish(self, attr_objs): # Add new objects to m2m field all at once getattr(self.obj, self._m2m_fieldname).add(*attr_objs)
[docs] def do_delete_attribute(self, attr): try: attr.delete() except AssertionError: # This could happen if the Attribute has already been deleted. pass
[docs]class AttributeHandler: """ Handler for adding Attributes to the object. """ _attrcreate = "attrcreate" _attredit = "attredit" _attrread = "attrread" _attrtype = None
[docs] def __init__(self, obj, backend_class): """ Setup the AttributeHandler. Args: obj (TypedObject): An Account, Object, Channel, ServerSession (not technically a typed object), etc. backend_class (IAttributeBackend class): The class of the backend to use. """ self.obj = obj self.backend = backend_class(self, self._attrtype)
[docs] def has(self, key=None, category=None): """ Checks if the given Attribute (or list of Attributes) exists on the object. Args: key (str or iterable): The Attribute key or keys to check for. If `None`, search by category. category (str or None): Limit the check to Attributes with this category (note, that `None` is the default category). Returns: has_attribute (bool or list): If the Attribute exists on this object or not. If `key` was given as an iterable then the return is a list of booleans. """ ret = [] category = category.strip().lower() if category is not None else None for keystr in make_iter(key): keystr = key.strip().lower() ret.extend(bool(attr) for attr in self.backend.get(keystr, category)) return ret[0] if len(ret) == 1 else ret
[docs] def get( self, key=None, default=None, category=None, return_obj=False, strattr=False, raise_exception=False, accessing_obj=None, default_access=True, return_list=False, ): """ Get the Attribute. Args: key (str or list, optional): the attribute identifier or multiple attributes to get. if a list of keys, the method will return a list. default (any, optional): The value to return if an Attribute was not defined. If set, it will be returned in a one-item list. category (str, optional): the category within which to retrieve attribute(s). return_obj (bool, optional): If set, the return is not the value of the Attribute but the Attribute object itself. strattr (bool, optional): Return the `strvalue` field of the Attribute rather than the usual `value`, this is a string-only value for quick database searches. raise_exception (bool, optional): When an Attribute is not found, the return from this is usually `default`. If this is set, an exception is raised instead. accessing_obj (object, optional): If set, an `attrread` permission lock will be checked before returning each looked-after Attribute. default_access (bool, optional): If no `attrread` lock is set on object, this determines if the lock should then be passed or not. return_list (bool, optional): Always return a list, also if there is only one or zero matches found. Returns: result (any or list): One or more matches for keys and/or categories. Each match will be the value of the found Attribute(s) unless `return_obj` is True, at which point it will be the attribute object itself or None. If `return_list` is True, this will always be a list, regardless of the number of elements. Raises: AttributeError: If `raise_exception` is set and no matching Attribute was found matching `key`. """ ret = [] for keystr in make_iter(key): # it's okay to send a None key attr_objs = self.backend.get(keystr, category) if attr_objs: ret.extend(attr_objs) elif raise_exception: raise AttributeError elif return_obj: ret.append(None) if accessing_obj: # check 'attrread' locks ret = [ attr for attr in ret if attr.access(accessing_obj, self._attrread, default=default_access) ] if strattr: ret = ret if return_obj else [attr.strvalue for attr in ret if attr] else: ret = ret if return_obj else [attr.value for attr in ret if attr] if return_list: return ret if ret else [default] if default is not None else [] return ret[0] if ret and len(ret) == 1 else ret or default
[docs] def add( self, key, value, category=None, lockstring="", strattr=False, accessing_obj=None, default_access=True, ): """ Add attribute to object, with optional `lockstring`. Args: key (str): An Attribute name to add. value (any or str): The value of the Attribute. If `strattr` keyword is set, this *must* be a string. category (str, optional): The category for the Attribute. The default `None` is the normal category used. lockstring (str, optional): A lock string limiting access to the attribute. strattr (bool, optional): Make this a string-only Attribute. This is only ever useful for optimization purposes. accessing_obj (object, optional): An entity to check for the `attrcreate` access-type. If not passing, this method will be exited. default_access (bool, optional): What access to grant if `accessing_obj` is given but no lock of the type `attrcreate` is defined on the Attribute in question. """ if accessing_obj and not self.obj.access( accessing_obj, self._attrcreate, default=default_access ): # check create access return if not key: return category = category.strip().lower() if category is not None else None keystr = key.strip().lower() attr_obj = self.backend.get(key, category) if attr_obj: # update an existing attribute object attr_obj = attr_obj[0] self.backend.update_attribute(attr_obj, value, strattr) else: # create a new Attribute (no OOB handlers can be notified) self.backend.create_attribute(keystr, category, lockstring, value, strattr)
[docs] def batch_add(self, *args, **kwargs): """ Batch-version of `add()`. This is more efficient than repeat-calling add when having many Attributes to add. Args: *args (tuple): Each argument should be a tuples (can be of varying length) representing the Attribute to add to this object. Supported tuples are - (key, value) - (key, value, category) - (key, value, category, lockstring) - (key, value, category, lockstring, default_access) Keyword Args: strattr (bool): If `True`, value must be a string. This will save the value without pickling which is less flexible but faster to search (not often used except internally). Raises: RuntimeError: If trying to pass a non-iterable as argument. Notes: The indata tuple order matters, so if you want a lockstring but no category, set the category to `None`. This method does not have the ability to check editing permissions like normal .add does, and is mainly used internally. It does not use the normal self.add but apply the Attributes directly to the database. """ self.backend.batch_add(*args, **kwargs)
[docs] def remove( self, key=None, category=None, raise_exception=False, accessing_obj=None, default_access=True, ): """ Remove attribute or a list of attributes from object. Args: key (str or list, optional): An Attribute key to remove or a list of keys. If multiple keys, they must all be of the same `category`. If None and category is not given, remove all Attributes. category (str, optional): The category within which to remove the Attribute. raise_exception (bool, optional): If set, not finding the Attribute to delete will raise an exception instead of just quietly failing. accessing_obj (object, optional): An object to check against the `attredit` lock. If not given, the check will be skipped. default_access (bool, optional): The fallback access to grant if `accessing_obj` is given but there is no `attredit` lock set on the Attribute in question. Raises: AttributeError: If `raise_exception` is set and no matching Attribute was found matching `key`. Notes: If neither key nor category is given, this acts as clear(). """ if key is None: self.clear( category=category, accessing_obj=accessing_obj, default_access=default_access ) return category = category.strip().lower() if category is not None else None for keystr in make_iter(key): keystr = keystr.lower() attr_objs = self.backend.get(keystr, category) for attr_obj in attr_objs: if not ( accessing_obj and not attr_obj.access(accessing_obj, self._attredit, default=default_access) ): self.backend.delete_attribute(attr_obj) if not attr_objs and raise_exception: raise AttributeError
[docs] def clear(self, category=None, accessing_obj=None, default_access=True): """ Remove all Attributes on this object. Args: category (str, optional): If given, clear only Attributes of this category. accessing_obj (object, optional): If given, check the `attredit` lock on each Attribute before continuing. default_access (bool, optional): Use this permission as fallback if `access_obj` is given but there is no lock of type `attredit` on the Attribute in question. """ self.backend.clear_attributes(category, accessing_obj, default_access)
[docs] def all(self, category=None, accessing_obj=None, default_access=True): """ Return all Attribute objects on this object, regardless of category. Args: category (str, optional): A given category to limit results to. accessing_obj (object, optional): Check the `attrread` lock on each attribute before returning them. If not given, this check is skipped. default_access (bool, optional): Use this permission as a fallback if `accessing_obj` is given but one or more Attributes has no lock of type `attrread` defined on them. Returns: Attributes (list): All the Attribute objects (note: Not their values!) in the handler. """ attrs = self.backend.get_all_attributes() if category: attrs = [attr for attr in attrs if attr.category == category] if accessing_obj: return [ attr for attr in attrs if attr.access(accessing_obj, self._attrread, default=default_access) ] else: return attrs
[docs] def reset_cache(self): self.backend.reset_cache()
# DbHolders for .db and .ndb properties on Typeclasses. _GA = object.__getattribute__ _SA = object.__setattr__
[docs]class DbHolder: "Holder for allowing property access of attributes"
[docs] def __init__(self, obj, name, manager_name="attributes"): _SA(self, name, _GA(obj, manager_name)) _SA(self, "name", name)
def __getattribute__(self, attrname): if attrname == "all": # we allow to overload our default .all attr = _GA(self, _GA(self, "name")).get("all") return attr if attr else _GA(self, "all") return _GA(self, _GA(self, "name")).get(attrname) def __setattr__(self, attrname, value): _GA(self, _GA(self, "name")).add(attrname, value) def __delattr__(self, attrname): _GA(self, _GA(self, "name")).remove(attrname)
[docs] def get_all(self): return _GA(self, _GA(self, "name")).backend.get_all_attributes()
all = property(get_all)
# # Nick templating # """ This supports the use of replacement templates in nicks: This happens in two steps: 1) The user supplies a template that is converted to a regex according to the unix-like templating language. 2) This regex is tested against nicks depending on which nick replacement strategy is considered (most commonly inputline). 3) If there is a template match and there are templating markers, these are replaced with the arguments actually given. @desc $1 $2 $3 This will be converted to the following regex: \@desc (?P<1>\w+) (?P<2>\w+) $(?P<3>\w+) Supported template markers (through fnmatch) * matches anything (non-greedy) -> .*? ? matches any single character -> [seq] matches any entry in sequence [!seq] matches entries not in sequence Custom arg markers $N argument position (1-99) """ _RE_OR = re.compile(r"(?<!\\)\|") _RE_NICK_RE_ARG = re.compile(r"arg([1-9][0-9]?)") _RE_NICK_ARG = re.compile(r"\\(\$)([1-9][0-9]?)") _RE_NICK_RAW_ARG = re.compile(r"(\$)([1-9][0-9]?)") _RE_NICK_SPACE = re.compile(r"\\ ")
[docs]class NickTemplateInvalid(ValueError): pass
[docs]def initialize_nick_templates(pattern, replacement, pattern_is_regex=False): """ Initialize the nick templates for matching and remapping a string. Args: pattern (str): The pattern to be used for nick recognition. This will be parsed for shell patterns into a regex, unless `pattern_is_regex` is `True`, in which case it must be an already valid regex string. In this case, instead of `$N`, numbered arguments must instead be given as matching groups named as `argN`, such as `(?P<arg1>.+?)`. replacement (str): The template to be used to replace the string matched by the pattern. This can contain `$N` markers and is never parsed into a regex. pattern_is_regex (bool): If set, `pattern` is a full regex string instead of containing shell patterns. Returns: regex, template (str): Regex to match against strings and template with markers ``{arg1}, {arg2}``, etc for replacement using the standard `.format` method. Raises: evennia.typecalasses.attributes.NickTemplateInvalid: If the in/out template does not have a matching number of `$args`. Examples: - `pattern` (shell syntax): `"grin $1"` - `pattern` (regex): `"grin (?P<arg1.+?>)"` - `replacement`: `"emote gives a wicked grin to $1"` """ # create the regex from the pattern if pattern_is_regex: # Note that for a regex we can't validate in the way we do for the shell # pattern, since you may have complex OR statements or optional arguments. # Explicit regex given from the onset - this already contains argN # groups. we need to split out any | - separated parts so we can # attach the line-break/ending extras all regexes require. pattern_regex_string = r"|".join( or_part + r"(?:[\n\r]*?)\Z" for or_part in _RE_OR.split(pattern) ) else: # Shell pattern syntax - convert $N to argN groups # for the shell pattern we make sure we have matching $N on both sides pattern_args = [match.group(1) for match in _RE_NICK_RAW_ARG.finditer(pattern)] replacement_args = [match.group(1) for match in _RE_NICK_RAW_ARG.finditer(replacement)] if set(pattern_args) != set(replacement_args): # We don't have the same amount of argN/$N tags in input/output. raise NickTemplateInvalid("Nicks: Both in/out-templates must contain the same $N tags.") # generate regex from shell pattern pattern_regex_string = fnmatch.translate(pattern) pattern_regex_string = _RE_NICK_SPACE.sub(r"\\s+", pattern_regex_string) pattern_regex_string = _RE_NICK_ARG.sub( lambda m: "(?P<arg%s>.+?)" % m.group(2), pattern_regex_string ) # we must account for a possible line break coming over the wire pattern_regex_string = pattern_regex_string[:-2] + r"(?:[\n\r]*?)\Z" # map the replacement to match the arg1 group-names, to make replacement easy replacement_string = _RE_NICK_RAW_ARG.sub(lambda m: "{arg%s}" % m.group(2), replacement) return pattern_regex_string, replacement_string
[docs]def parse_nick_template(string, template_regex, outtemplate): """ Parse a text using a template and map it to another template Args: string (str): The input string to process template_regex (regex): A template regex created with initialize_nick_template. outtemplate (str): The template to which to map the matches produced by the template_regex. This should have $1, $2, etc to match the template-regex. Un-found $N-markers (possible if the regex has optional matching groups) are replaced with empty strings. """ match = template_regex.match(string) if match: matchdict = { key: value if value is not None else "" for key, value in match.groupdict().items() } return True, outtemplate.format_map(matchdict) return False, string
[docs]class NickHandler(AttributeHandler): """ Handles the addition and removal of Nicks. Nicks are special versions of Attributes with an `_attrtype` hardcoded to `nick`. They also always use the `strvalue` fields for their data. """ _attrtype = "nick"
[docs] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._regex_cache = {}
[docs] def has(self, key, category="inputline"): """ Args: key (str or iterable): The Nick key or keys to check for. category (str): Limit the check to Nicks with this category (note, that `None` is the default category). Returns: has_nick (bool or list): If the Nick exists on this object or not. If `key` was given as an iterable then the return is a list of booleans. """ return super().has(key, category=category)
[docs] def get(self, key=None, category="inputline", return_tuple=False, **kwargs): """ Get the replacement value matching the given key and category Args: key (str or list, optional): the attribute identifier or multiple attributes to get. if a list of keys, the method will return a list. category (str, optional): the category within which to retrieve the nick. The "inputline" means replacing data sent by the user. return_tuple (bool, optional): return the full nick tuple rather than just the replacement. For non-template nicks this is just a string. kwargs (any, optional): These are passed on to `AttributeHandler.get`. Returns: str or tuple: The nick replacement string or nick tuple. """ if return_tuple or "return_obj" in kwargs: return super().get(key=key, category=category, **kwargs) else: retval = super().get(key=key, category=category, **kwargs) if retval: return ( retval[3] if isinstance(retval, tuple) else [tup[3] for tup in make_iter(retval)] ) return None
[docs] def add(self, pattern, replacement, category="inputline", pattern_is_regex=False, **kwargs): """ Add a new nick, a mapping pattern -> replacement. Args: pattern (str): A pattern to match for. This will be parsed for shell patterns using the `fnmatch` library and can contain `$N`-markers to indicate the locations of arguments to catch. If `pattern_is_regex=True`, this must instead be a valid regular expression and the `$N`-markers must be named `argN` that matches numbered regex groups (see examples). replacement (str): The string (or template) to replace `key` with (the "nickname"). This may contain `$N` markers to indicate where to place the argument-matches category (str, optional): the category within which to retrieve the nick. The "inputline" means replacing data sent by the user. pattern_is_regex (bool): If `True`, the `pattern` will be parsed as a raw regex string. Instead of using `$N` markers in this string, one then must mark numbered arguments as a named regex-groupd named `argN`. For example, `(?P<arg1>.+?)` will match the behavior of using `$1` in the shell pattern. **kwargs (any, optional): These are passed on to `AttributeHandler.get`. Notes: For most cases, the shell-pattern is much shorter and easier. The regex pattern form can be useful for more complex matchings though, for example in order to add optional arguments, such as with `(?P<argN>.*?)`. Example: - pattern (default shell syntax): `"gr $1 at $2"` - pattern (with pattern_is_regex=True): `r"gr (?P<arg1>.+?) at (?P<arg2>.+?)"` - replacement: `"emote With a flourish, $1 grins at $2."` """ nick_regex, nick_template = initialize_nick_templates( pattern, replacement, pattern_is_regex=pattern_is_regex ) super().add( pattern, (nick_regex, nick_template, pattern, replacement), category=category, **kwargs )
[docs] def remove(self, key, category="inputline", **kwargs): """ Remove Nick with matching category. Args: key (str): A key for the nick to match for. category (str, optional): the category within which to removethe nick. The "inputline" means replacing data sent by the user. kwargs (any, optional): These are passed on to `AttributeHandler.get`. """ super().remove(key, category=category, **kwargs)
[docs] def nickreplace(self, raw_string, categories=("inputline", "channel"), include_account=True): """ Apply nick replacement of entries in raw_string with nick replacement. Args: raw_string (str): The string in which to perform nick replacement. categories (tuple, optional): Replacement categories in which to perform the replacement, such as "inputline", "channel" etc. include_account (bool, optional): Also include replacement with nicks stored on the Account level. kwargs (any, optional): Not used. Returns: string (str): A string with matching keys replaced with their nick equivalents. """ nicks = {} for category in make_iter(categories): nicks.update( { nick.key: nick for nick in make_iter(self.get(category=category, return_obj=True)) if nick and nick.key } ) if include_account and self.obj.has_account: for category in make_iter(categories): nicks.update( { nick.key: nick for nick in make_iter( self.obj.account.nicks.get(category=category, return_obj=True) ) if nick and nick.key } ) for key, nick in nicks.items(): nick_regex, template, _, _ = nick.value regex = self._regex_cache.get(nick_regex) if not regex: try: regex = re.compile(nick_regex, re.I + re.DOTALL + re.U) except re.error: from evennia.utils import logger logger.log_trace("Probably nick being created with unvalidated regex mapping.") continue self._regex_cache[nick_regex] = regex is_match, raw_string = parse_nick_template(raw_string, regex, template) if is_match: break return raw_string