Source code for evennia.typeclasses.attributes

"""
Attributes are arbitrary data stored on objects. Attributes supports
both pure-string values and pickled arbitrary data.

Attributes are also used to implement Nicks. This module also contains
the Attribute- and NickHandlers as well as the `NAttributeHandler`,
which is a non-db version of Attributes.


"""
import re
import fnmatch
import weakref

from django.db import models
from django.conf import settings
from django.utils.encoding import smart_str

from evennia.locks.lockhandler import LockHandler
from evennia.utils.idmapper.models import SharedMemoryModel
from evennia.utils.dbserialize import to_pickle, from_pickle
from evennia.utils.picklefield import PickledObjectField
from evennia.utils.utils import lazy_property, to_str, make_iter, is_iter

_TYPECLASS_AGGRESSIVE_CACHE = settings.TYPECLASS_AGGRESSIVE_CACHE

# -------------------------------------------------------------
#
#   Attributes
#
# -------------------------------------------------------------


[docs]class Attribute(SharedMemoryModel): """ Attributes are things that are specific to different types of objects. For example, a drink container needs to store its fill level, whereas an exit needs to store its open/closed/locked/unlocked state. These are done via attributes, rather than making different classes for each object type and storing them directly. The added benefit is that we can add/remove attributes on the fly as we like. The Attribute class defines the following properties: - key (str): Primary identifier. - lock_storage (str): Perm strings. - model (str): A string defining the model this is connected to. This is a natural_key, like "objects.objectdb" - date_created (datetime): When the attribute was created. - value (any): The data stored in the attribute, in pickled form using wrappers to be able to store/retrieve models. - strvalue (str): String-only data. This data is not pickled and is thus faster to search for in the database. - category (str): Optional character string for grouping the Attribute. """ # # Attribute Database Model setup # # These database fields are all set using their corresponding properties, # named same as the field, but without the db_* prefix. db_key = models.CharField("key", max_length=255, db_index=True) db_value = PickledObjectField( "value", null=True, help_text="The data returned when the attribute is accessed. Must be " "written as a Python literal if editing through the admin " "interface. Attribute values which are not Python literals " "cannot be edited through the admin interface.", ) db_strvalue = models.TextField( "strvalue", null=True, blank=True, help_text="String-specific storage for quick look-up" ) db_category = models.CharField( "category", max_length=128, db_index=True, blank=True, null=True, help_text="Optional categorization of attribute.", ) # Lock storage db_lock_storage = models.TextField( "locks", blank=True, help_text="Lockstrings for this object are stored here." ) db_model = models.CharField( "model", max_length=32, db_index=True, blank=True, null=True, help_text="Which model of object this attribute is attached to (A " "natural key like 'objects.objectdb'). You should not change " "this value unless you know what you are doing.", ) # subclass of Attribute (None or nick) db_attrtype = models.CharField( "attrtype", max_length=16, db_index=True, blank=True, null=True, help_text="Subclass of Attribute (None or nick)", ) # time stamp db_date_created = models.DateTimeField("date_created", editable=False, auto_now_add=True) # Database manager # objects = managers.AttributeManager()
[docs] @lazy_property def locks(self): return LockHandler(self)
class Meta(object): "Define Django meta options" verbose_name = "Evennia Attribute" # read-only wrappers key = property(lambda self: self.db_key) strvalue = property(lambda self: self.db_strvalue) category = property(lambda self: self.db_category) model = property(lambda self: self.db_model) attrtype = property(lambda self: self.db_attrtype) date_created = property(lambda self: self.db_date_created) def __lock_storage_get(self): return self.db_lock_storage def __lock_storage_set(self, value): self.db_lock_storage = value self.save(update_fields=["db_lock_storage"]) def __lock_storage_del(self): self.db_lock_storage = "" self.save(update_fields=["db_lock_storage"]) lock_storage = property(__lock_storage_get, __lock_storage_set, __lock_storage_del) # Wrapper properties to easily set database fields. These are # @property decorators that allows to access these fields using # normal python operations (without having to remember to save() # etc). So e.g. a property 'attr' has a get/set/del decorator # defined that allows the user to do self.attr = value, # value = self.attr and del self.attr respectively (where self # is the object in question). # value property (wraps db_value) # @property def __value_get(self): """ Getter. Allows for `value = self.value`. We cannot cache here since it makes certain cases (such as storing a dbobj which is then deleted elsewhere) out-of-sync. The overhead of unpickling seems hard to avoid. """ return from_pickle(self.db_value, db_obj=self) # @value.setter def __value_set(self, new_value): """ Setter. Allows for self.value = value. We cannot cache here, see self.__value_get. """ self.db_value = to_pickle(new_value) # print("value_set, self.db_value:", repr(self.db_value)) # DEBUG self.save(update_fields=["db_value"]) # @value.deleter def __value_del(self): """Deleter. Allows for del attr.value. This removes the entire attribute.""" self.delete() value = property(__value_get, __value_set, __value_del) # # # Attribute methods # # def __str__(self): return smart_str("%s[category=%s](#%s)" % (self.db_key, self.db_category, self.id)) def __repr__(self): return "%s[category=%s](#%s)" % (self.db_key, self.db_category, self.id)
[docs] def access(self, accessing_obj, access_type="attrread", default=False, **kwargs): """ Determines if another object has permission to access. Args: accessing_obj (object): Entity trying to access this one. access_type (str, optional): Type of access sought, see the lock documentation. default (bool, optional): What result to return if no lock of access_type was found. The default, `False`, means a lockdown policy, only allowing explicit access. kwargs (any, optional): Not used; here to make the API consistent with other access calls. Returns: result (bool): If the lock was passed or not. """ result = self.locks.check(accessing_obj, access_type=access_type, default=default) return result
# # Handlers making use of the Attribute model #
[docs]class AttributeHandler(object): """ Handler for adding Attributes to the object. """ _m2m_fieldname = "db_attributes" _attrcreate = "attrcreate" _attredit = "attredit" _attrread = "attrread" _attrtype = None
[docs] def __init__(self, obj): """Initialize handler.""" self.obj = obj self._objid = obj.id self._model = to_str(obj.__dbclass__.__name__.lower()) self._cache = {} # store category names fully cached self._catcache = {} # full cache was run on all attributes self._cache_complete = False
def _query_all(self): "Fetch all Attributes on this object" query = { "%s__id" % self._model: self._objid, "attribute__db_model__iexact": self._model, "attribute__db_attrtype": self._attrtype, } return [ conn.attribute for conn in getattr(self.obj, self._m2m_fieldname).through.objects.filter(**query) ] def _fullcache(self): """Cache all attributes of this object""" if not _TYPECLASS_AGGRESSIVE_CACHE: return attrs = self._query_all() self._cache = dict( ( "%s-%s" % ( to_str(attr.db_key).lower(), attr.db_category.lower() if attr.db_category is not None else None, ), attr, ) for attr in attrs ) self._cache_complete = True def _getcache(self, key=None, category=None): """ Retrieve from cache or database (always caches) Args: key (str, optional): Attribute key to query for category (str, optional): Attribiute category Returns: args (list): Returns a list of zero or more matches found from cache or database. Notes: When given a category only, a search for all objects of that cateogory is done and the category *name* is stored. This tells the system on subsequent calls that the list of cached attributes of this category is up-to-date and that the cache can be queried for category matches without missing any. The TYPECLASS_AGGRESSIVE_CACHE=False setting will turn off caching, causing each attribute access to trigger a database lookup. """ key = key.strip().lower() if key else None category = category.strip().lower() if category is not None else None if key: cachekey = "%s-%s" % (key, category) cachefound = False try: attr = _TYPECLASS_AGGRESSIVE_CACHE and self._cache[cachekey] cachefound = True except KeyError: attr = None if attr and (not hasattr(attr, "pk") and attr.pk is None): # clear out Attributes deleted from elsewhere. We must search this anew. attr = None cachefound = False del self._cache[cachekey] if cachefound and _TYPECLASS_AGGRESSIVE_CACHE: if attr: return [attr] # return cached entity else: return [] # no such attribute: return an empty list else: query = { "%s__id" % self._model: self._objid, "attribute__db_model__iexact": self._model, "attribute__db_attrtype": self._attrtype, "attribute__db_key__iexact": key.lower(), "attribute__db_category__iexact": category.lower() if category else None, } if not self.obj.pk: return [] conn = getattr(self.obj, self._m2m_fieldname).through.objects.filter(**query) if conn: attr = conn[0].attribute if _TYPECLASS_AGGRESSIVE_CACHE: self._cache[cachekey] = attr return [attr] if attr.pk else [] else: # There is no such attribute. We will explicitly save that # in our cache to avoid firing another query if we try to # retrieve that (non-existent) attribute again. if _TYPECLASS_AGGRESSIVE_CACHE: self._cache[cachekey] = None return [] else: # only category given (even if it's None) - we can't # assume the cache to be complete unless we have queried # for this category before catkey = "-%s" % category if _TYPECLASS_AGGRESSIVE_CACHE and catkey in self._catcache: return [attr for key, attr in self._cache.items() if key.endswith(catkey) and attr] else: # we have to query to make this category up-date in the cache query = { "%s__id" % self._model: self._objid, "attribute__db_model__iexact": self._model, "attribute__db_attrtype": self._attrtype, "attribute__db_category__iexact": category.lower() if category else None, } attrs = [ conn.attribute for conn in getattr(self.obj, self._m2m_fieldname).through.objects.filter( **query ) ] if _TYPECLASS_AGGRESSIVE_CACHE: for attr in attrs: if attr.pk: cachekey = "%s-%s" % (attr.db_key, category) self._cache[cachekey] = attr # mark category cache as up-to-date self._catcache[catkey] = True return attrs def _setcache(self, key, category, attr_obj): """ Update cache. Args: key (str): A cleaned key string category (str or None): A cleaned category name attr_obj (Attribute): The newly saved attribute """ if not _TYPECLASS_AGGRESSIVE_CACHE: return if not key: # don't allow an empty key in cache return cachekey = "%s-%s" % (key, category) catkey = "-%s" % category self._cache[cachekey] = attr_obj # mark that the category cache is no longer up-to-date self._catcache.pop(catkey, None) self._cache_complete = False def _delcache(self, key, category): """ Remove attribute from cache Args: key (str): A cleaned key string category (str or None): A cleaned category name """ catkey = "-%s" % category if key: cachekey = "%s-%s" % (key, category) self._cache.pop(cachekey, None) else: self._cache = { key: attrobj for key, attrobj in list(self._cache.items()) if not key.endswith(catkey) } # mark that the category cache is no longer up-to-date self._catcache.pop(catkey, None) self._cache_complete = False
[docs] def reset_cache(self): """ Reset cache from the outside. """ self._cache_complete = False self._cache = {} self._catcache = {}
[docs] def has(self, key=None, category=None): """ Checks if the given Attribute (or list of Attributes) exists on the object. Args: key (str or iterable): The Attribute key or keys to check for. If `None`, search by category. category (str or None): Limit the check to Attributes with this category (note, that `None` is the default category). Returns: has_attribute (bool or list): If the Attribute exists on this object or not. If `key` was given as an iterable then the return is a list of booleans. """ ret = [] category = category.strip().lower() if category is not None else None for keystr in make_iter(key): keystr = key.strip().lower() ret.extend(bool(attr) for attr in self._getcache(keystr, category)) return ret[0] if len(ret) == 1 else ret
[docs] def get( self, key=None, default=None, category=None, return_obj=False, strattr=False, raise_exception=False, accessing_obj=None, default_access=True, return_list=False, ): """ Get the Attribute. Args: key (str or list, optional): the attribute identifier or multiple attributes to get. if a list of keys, the method will return a list. category (str, optional): the category within which to retrieve attribute(s). default (any, optional): The value to return if an Attribute was not defined. If set, it will be returned in a one-item list. return_obj (bool, optional): If set, the return is not the value of the Attribute but the Attribute object itself. strattr (bool, optional): Return the `strvalue` field of the Attribute rather than the usual `value`, this is a string-only value for quick database searches. raise_exception (bool, optional): When an Attribute is not found, the return from this is usually `default`. If this is set, an exception is raised instead. accessing_obj (object, optional): If set, an `attrread` permission lock will be checked before returning each looked-after Attribute. default_access (bool, optional): If no `attrread` lock is set on object, this determines if the lock should then be passed or not. return_list (bool, optional): Returns: result (any or list): One or more matches for keys and/or categories. Each match will be the value of the found Attribute(s) unless `return_obj` is True, at which point it will be the attribute object itself or None. If `return_list` is True, this will always be a list, regardless of the number of elements. Raises: AttributeError: If `raise_exception` is set and no matching Attribute was found matching `key`. """ ret = [] for keystr in make_iter(key): # it's okay to send a None key attr_objs = self._getcache(keystr, category) if attr_objs: ret.extend(attr_objs) elif raise_exception: raise AttributeError elif return_obj: ret.append(None) if accessing_obj: # check 'attrread' locks ret = [ attr for attr in ret if attr.access(accessing_obj, self._attrread, default=default_access) ] if strattr: ret = ret if return_obj else [attr.strvalue for attr in ret if attr] else: ret = ret if return_obj else [attr.value for attr in ret if attr] if return_list: return ret if ret else [default] if default is not None else [] return ret[0] if ret and len(ret) == 1 else ret or default
[docs] def add( self, key, value, category=None, lockstring="", strattr=False, accessing_obj=None, default_access=True, ): """ Add attribute to object, with optional `lockstring`. Args: key (str): An Attribute name to add. value (any or str): The value of the Attribute. If `strattr` keyword is set, this *must* be a string. category (str, optional): The category for the Attribute. The default `None` is the normal category used. lockstring (str, optional): A lock string limiting access to the attribute. strattr (bool, optional): Make this a string-only Attribute. This is only ever useful for optimization purposes. accessing_obj (object, optional): An entity to check for the `attrcreate` access-type. If not passing, this method will be exited. default_access (bool, optional): What access to grant if `accessing_obj` is given but no lock of the type `attrcreate` is defined on the Attribute in question. """ if accessing_obj and not self.obj.access( accessing_obj, self._attrcreate, default=default_access ): # check create access return if not key: return category = category.strip().lower() if category is not None else None keystr = key.strip().lower() attr_obj = self._getcache(key, category) if attr_obj: # update an existing attribute object attr_obj = attr_obj[0] if strattr: # store as a simple string (will not notify OOB handlers) attr_obj.db_strvalue = value attr_obj.save(update_fields=["db_strvalue"]) else: # store normally (this will also notify OOB handlers) attr_obj.value = value else: # create a new Attribute (no OOB handlers can be notified) kwargs = { "db_key": keystr, "db_category": category, "db_model": self._model, "db_attrtype": self._attrtype, "db_value": None if strattr else to_pickle(value), "db_strvalue": value if strattr else None, } new_attr = Attribute(**kwargs) new_attr.save() getattr(self.obj, self._m2m_fieldname).add(new_attr) # update cache self._setcache(keystr, category, new_attr)
[docs] def batch_add(self, *args, **kwargs): """ Batch-version of `add()`. This is more efficient than repeat-calling add when having many Attributes to add. Args: *args (tuple): Each argument should be a tuples (can be of varying length) representing the Attribute to add to this object. Supported tuples are - `(key, value)` - `(key, value, category)` - `(key, value, category, lockstring)` - `(key, value, category, lockstring, default_access)` Keyword Args: strattr (bool): If `True`, value must be a string. This will save the value without pickling which is less flexible but faster to search (not often used except internally). Raises: RuntimeError: If trying to pass a non-iterable as argument. Notes: The indata tuple order matters, so if you want a lockstring but no category, set the category to `None`. This method does not have the ability to check editing permissions like normal .add does, and is mainly used internally. It does not use the normal self.add but apply the Attributes directly to the database. """ new_attrobjs = [] strattr = kwargs.get("strattr", False) for tup in args: if not is_iter(tup) or len(tup) < 2: raise RuntimeError("batch_add requires iterables as arguments (got %r)." % tup) ntup = len(tup) keystr = str(tup[0]).strip().lower() new_value = tup[1] category = str(tup[2]).strip().lower() if ntup > 2 and tup[2] is not None else None lockstring = tup[3] if ntup > 3 else "" attr_objs = self._getcache(keystr, category) if attr_objs: attr_obj = attr_objs[0] # update an existing attribute object attr_obj.db_category = category attr_obj.db_lock_storage = lockstring or "" attr_obj.save(update_fields=["db_category", "db_lock_storage"]) if strattr: # store as a simple string (will not notify OOB handlers) attr_obj.db_strvalue = new_value attr_obj.save(update_fields=["db_strvalue"]) else: # store normally (this will also notify OOB handlers) attr_obj.value = new_value else: # create a new Attribute (no OOB handlers can be notified) kwargs = { "db_key": keystr, "db_category": category, "db_model": self._model, "db_attrtype": self._attrtype, "db_value": None if strattr else to_pickle(new_value), "db_strvalue": new_value if strattr else None, "db_lock_storage": lockstring or "", } new_attr = Attribute(**kwargs) new_attr.save() new_attrobjs.append(new_attr) self._setcache(keystr, category, new_attr) if new_attrobjs: # Add new objects to m2m field all at once getattr(self.obj, self._m2m_fieldname).add(*new_attrobjs)
[docs] def remove( self, key=None, raise_exception=False, category=None, accessing_obj=None, default_access=True, ): """ Remove attribute or a list of attributes from object. Args: key (str or list, optional): An Attribute key to remove or a list of keys. If multiple keys, they must all be of the same `category`. If None and category is not given, remove all Attributes. raise_exception (bool, optional): If set, not finding the Attribute to delete will raise an exception instead of just quietly failing. category (str, optional): The category within which to remove the Attribute. accessing_obj (object, optional): An object to check against the `attredit` lock. If not given, the check will be skipped. default_access (bool, optional): The fallback access to grant if `accessing_obj` is given but there is no `attredit` lock set on the Attribute in question. Raises: AttributeError: If `raise_exception` is set and no matching Attribute was found matching `key`. Notes: If neither key nor category is given, this acts as clear(). """ if key is None: self.clear( category=category, accessing_obj=accessing_obj, default_access=default_access ) return category = category.strip().lower() if category is not None else None for keystr in make_iter(key): keystr = keystr.lower() attr_objs = self._getcache(keystr, category) for attr_obj in attr_objs: if not ( accessing_obj and not attr_obj.access(accessing_obj, self._attredit, default=default_access) ): try: attr_obj.delete() except AssertionError: print("Assertionerror for attr.delete()") # this happens if the attr was already deleted pass finally: self._delcache(keystr, category) if not attr_objs and raise_exception: raise AttributeError
[docs] def clear(self, category=None, accessing_obj=None, default_access=True): """ Remove all Attributes on this object. Args: category (str, optional): If given, clear only Attributes of this category. accessing_obj (object, optional): If given, check the `attredit` lock on each Attribute before continuing. default_access (bool, optional): Use this permission as fallback if `access_obj` is given but there is no lock of type `attredit` on the Attribute in question. """ category = category.strip().lower() if category is not None else None if not self._cache_complete: self._fullcache() if category is not None: attrs = [attr for attr in self._cache.values() if attr.category == category] else: attrs = self._cache.values() if accessing_obj: [ attr.delete() for attr in attrs if attr and attr.access(accessing_obj, self._attredit, default=default_access) ] else: [attr.delete() for attr in attrs if attr and attr.pk] self._cache = {} self._catcache = {} self._cache_complete = False
[docs] def all(self, accessing_obj=None, default_access=True): """ Return all Attribute objects on this object, regardless of category. Args: accessing_obj (object, optional): Check the `attrread` lock on each attribute before returning them. If not given, this check is skipped. default_access (bool, optional): Use this permission as a fallback if `accessing_obj` is given but one or more Attributes has no lock of type `attrread` defined on them. Returns: Attributes (list): All the Attribute objects (note: Not their values!) in the handler. """ if _TYPECLASS_AGGRESSIVE_CACHE: if not self._cache_complete: self._fullcache() attrs = sorted([attr for attr in self._cache.values() if attr], key=lambda o: o.id) else: attrs = sorted([attr for attr in self._query_all() if attr], key=lambda o: o.id) if accessing_obj: return [ attr for attr in attrs if attr.access(accessing_obj, self._attredit, default=default_access) ] else: return attrs
# Nick templating # """ This supports the use of replacement templates in nicks: This happens in two steps: 1) The user supplies a template that is converted to a regex according to the unix-like templating language. 2) This regex is tested against nicks depending on which nick replacement strategy is considered (most commonly inputline). 3) If there is a template match and there are templating markers, these are replaced with the arguments actually given. @desc $1 $2 $3 This will be converted to the following regex: \@desc (?P<1>\w+) (?P<2>\w+) $(?P<3>\w+) Supported template markers (through fnmatch) * matches anything (non-greedy) -> .*? ? matches any single character -> [seq] matches any entry in sequence [!seq] matches entries not in sequence Custom arg markers $N argument position (1-99) """ _RE_NICK_ARG = re.compile(r"\\(\$)([1-9][0-9]?)") _RE_NICK_TEMPLATE_ARG = re.compile(r"(\$)([1-9][0-9]?)") _RE_NICK_SPACE = re.compile(r"\\ ")
[docs]class NickTemplateInvalid(ValueError): pass
[docs]def initialize_nick_templates(in_template, out_template): """ Initialize the nick templates for matching and remapping a string. Args: in_template (str): The template to be used for nick recognition. out_template (str): The template to be used to replace the string matched by the in_template. Returns: (regex, str): Regex to match against strings and a template Template with markers `{arg1}`, `{arg2}`, etc for replacement using the standard `.format` method. Raises: attributes.NickTemplateInvalid: If the in/out template does not have a matching number of $args. """ # create the regex for in_template regex_string = fnmatch.translate(in_template) # we must account for a possible line break coming over the wire # NOTE-PYTHON3: fnmatch.translate format changed since Python2 regex_string = regex_string[:-2] + r"(?:[\n\r]*?)\Z" # validate the templates regex_args = [match.group(2) for match in _RE_NICK_ARG.finditer(regex_string)] temp_args = [match.group(2) for match in _RE_NICK_TEMPLATE_ARG.finditer(out_template)] if set(regex_args) != set(temp_args): # We don't have the same $-tags in input/output. raise NickTemplateInvalid regex_string = _RE_NICK_SPACE.sub(r"\\s+", regex_string) regex_string = _RE_NICK_ARG.sub(lambda m: "(?P<arg%s>.+?)" % m.group(2), regex_string) template_string = _RE_NICK_TEMPLATE_ARG.sub(lambda m: "{arg%s}" % m.group(2), out_template) return regex_string, template_string
[docs]def parse_nick_template(string, template_regex, outtemplate): """ Parse a text using a template and map it to another template Args: string (str): The input string to processj template_regex (regex): A template regex created with initialize_nick_template. outtemplate (str): The template to which to map the matches produced by the template_regex. This should have $1, $2, etc to match the regex. """ match = template_regex.match(string) if match: return True, outtemplate.format(**match.groupdict()) return False, string
[docs]class NickHandler(AttributeHandler): """ Handles the addition and removal of Nicks. Nicks are special versions of Attributes with an `_attrtype` hardcoded to `nick`. They also always use the `strvalue` fields for their data. """ _attrtype = "nick"
[docs] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._regex_cache = {}
[docs] def has(self, key, category="inputline"): """ Args: key (str or iterable): The Nick key or keys to check for. category (str): Limit the check to Nicks with this category (note, that `None` is the default category). Returns: has_nick (bool or list): If the Nick exists on this object or not. If `key` was given as an iterable then the return is a list of booleans. """ return super().has(key, category=category)
[docs] def get(self, key=None, category="inputline", return_tuple=False, **kwargs): """ Get the replacement value matching the given key and category Args: key (str or list, optional): the attribute identifier or multiple attributes to get. if a list of keys, the method will return a list. category (str, optional): the category within which to retrieve the nick. The "inputline" means replacing data sent by the user. return_tuple (bool, optional): return the full nick tuple rather than just the replacement. For non-template nicks this is just a string. kwargs (any, optional): These are passed on to `AttributeHandler.get`. """ if return_tuple or "return_obj" in kwargs: return super().get(key=key, category=category, **kwargs) else: retval = super().get(key=key, category=category, **kwargs) if retval: return ( retval[3] if isinstance(retval, tuple) else [tup[3] for tup in make_iter(retval)] ) return None
[docs] def add(self, key, replacement, category="inputline", **kwargs): """ Add a new nick. Args: key (str): A key (or template) for the nick to match for. replacement (str): The string (or template) to replace `key` with (the "nickname"). category (str, optional): the category within which to retrieve the nick. The "inputline" means replacing data sent by the user. kwargs (any, optional): These are passed on to `AttributeHandler.get`. """ if category == "channel": nick_regex, nick_template = initialize_nick_templates(key + " $1", replacement + " $1") else: nick_regex, nick_template = initialize_nick_templates(key, replacement) super().add(key, (nick_regex, nick_template, key, replacement), category=category, **kwargs)
[docs] def remove(self, key, category="inputline", **kwargs): """ Remove Nick with matching category. Args: key (str): A key for the nick to match for. category (str, optional): the category within which to removethe nick. The "inputline" means replacing data sent by the user. kwargs (any, optional): These are passed on to `AttributeHandler.get`. """ super().remove(key, category=category, **kwargs)
[docs] def nickreplace(self, raw_string, categories=("inputline", "channel"), include_account=True): """ Apply nick replacement of entries in raw_string with nick replacement. Args: raw_string (str): The string in which to perform nick replacement. categories (tuple, optional): Replacement categories in which to perform the replacement, such as "inputline", "channel" etc. include_account (bool, optional): Also include replacement with nicks stored on the Account level. kwargs (any, optional): Not used. Returns: string (str): A string with matching keys replaced with their nick equivalents. """ nicks = {} for category in make_iter(categories): nicks.update( { nick.key: nick for nick in make_iter(self.get(category=category, return_obj=True)) if nick and nick.key } ) if include_account and self.obj.has_account: for category in make_iter(categories): nicks.update( { nick.key: nick for nick in make_iter( self.obj.account.nicks.get(category=category, return_obj=True) ) if nick and nick.key } ) for key, nick in nicks.items(): nick_regex, template, _, _ = nick.value regex = self._regex_cache.get(nick_regex) if not regex: regex = re.compile(nick_regex, re.I + re.DOTALL + re.U) self._regex_cache[nick_regex] = regex is_match, raw_string = parse_nick_template(raw_string.strip(), regex, template) if is_match: break return raw_string
[docs]class NAttributeHandler(object): """ This stand-alone handler manages non-database saving. It is similar to `AttributeHandler` and is used by the `.ndb` handler in the same way as `.db` does for the `AttributeHandler`. """
[docs] def __init__(self, obj): """ Initialized on the object """ self._store = {} self.obj = weakref.proxy(obj)
[docs] def has(self, key): """ Check if object has this attribute or not. Args: key (str): The Nattribute key to check. Returns: has_nattribute (bool): If Nattribute is set or not. """ return key in self._store
[docs] def get(self, key): """ Get the named key value. Args: key (str): The Nattribute key to get. Returns: the value of the Nattribute. """ return self._store.get(key, None)
[docs] def add(self, key, value): """ Add new key and value. Args: key (str): The name of Nattribute to add. value (any): The value to store. """ self._store[key] = value
[docs] def remove(self, key): """ Remove Nattribute from storage. Args: key (str): The name of the Nattribute to remove. """ if key in self._store: del self._store[key]
[docs] def clear(self): """ Remove all NAttributes from handler. """ self._store = {}
[docs] def all(self, return_tuples=False): """ List the contents of the handler. Args: return_tuples (bool, optional): Defines if the Nattributes are returns as a list of keys or as a list of `(key, value)`. Returns: nattributes (list): A list of keys `[key, key, ...]` or a list of tuples `[(key, value), ...]` depending on the setting of `return_tuples`. """ if return_tuples: return [(key, value) for (key, value) in self._store.items() if not key.startswith("_")] return [key for key in self._store if not str(key).startswith("_")]