Source code for dirty_models.models

"""
Base models for dirty_models.
"""

import itertools
from datetime import datetime, date, time, timedelta
from enum import Enum

from collections import Mapping
from copy import deepcopy

from dirty_models.fields import DateField, TimeField, TimedeltaField, EnumField
from .base import BaseData, InnerFieldTypeMixin
from .fields import IntegerField, FloatField, BooleanField, StringField, DateTimeField, \
    BaseField, ModelField, ArrayField
from .model_types import ListModel

__all__ = ['BaseModel', 'DynamicModel', 'FastDynamicModel', 'HashMapModel']


class DirtyModelMeta(type):
    """
    Metaclass for dirty_models. It sets automatic fieldnames and
    automatic model_class for ModelField fields.
    """

    def __init__(cls, name, bases, classdict):
        super(DirtyModelMeta, cls).__init__(name, bases, classdict)

        fields = {key: field for key, field in cls.__dict__.items() if isinstance(field, BaseField)}

        structure = {}
        read_only_fields = []
        for key, field in fields.items():
            cls.process_base_field(field, key)
            structure[field.name] = field
            if field.read_only:
                read_only_fields.append(field.name)

        cls.__structure__ = structure
        default_data = {}
        for p in bases:
            try:
                default_data.update(deepcopy(p.__default_data__))
            except AttributeError:
                pass

        default_data.update(deepcopy(cls.get_default_data()))
        default_data.update({f.name: f.default for f in structure.values() if f.default is not None})

        cls.__structure__ = {}
        for p in bases:
            try:
                cls.__structure__.update(deepcopy(p.get_structure()))
            except AttributeError:
                pass

        cls.__structure__.update(structure)
        cls.check_structure()
        cls.__default_data__ = {k: v for k, v in default_data.items() if k in cls.__structure__.keys()}

    def process_base_field(cls, field, key):
        """
        Preprocess field instances.

        :param field: Field object
        :param key: Key where field was found
        """
        if not field.name:
            field.name = key
        elif key != field.name:
            if not isinstance(field.alias, list):
                field.alias = [key]
            else:
                field.alias.insert(0, key)
            setattr(cls, field.name, field)

        cls.prepare_field(field)

        if field.alias:
            for alias_name in field.alias:
                if key is not alias_name:
                    setattr(cls, alias_name, field)

    def prepare_field(cls, field):
        if isinstance(field, ModelField) and not field.model_class:
            field.model_class = cls
            field.__doc__ = field.get_field_docstring()

        try:
            cls.prepare_field(field.field_type)
            field.field_type.__doc__ = field.field_type.get_field_docstring()
        except AttributeError:
            pass

        try:
            for inner_field in field.field_types:
                cls.prepare_field(inner_field)
        except AttributeError:
            pass

    def check_structure(cls):
        names = set()
        fields = {key: field for key, field in cls.__dict__.items() if isinstance(field, BaseField)}
        try:
            name, field = fields.popitem()
        except KeyError:
            field = None

        while field:
            [fields.pop(n) for n, f in fields.copy().items() if f is field]

            alias = set(field.alias or [])
            alias.add(field.name)
            for n in alias:
                if n in names:
                    raise RuntimeError("Field '{0}' used twice on model '{1}'".format(n, cls.__name__))
                names.add(n)

            try:
                name, field = fields.popitem()
            except KeyError:
                field = None


class CamelCaseMeta(DirtyModelMeta):
    """
    Metaclass for dirty_models. Sets camel case version of field's name as default field name.
    """

    def process_base_field(self, field, key):
        from .utils import underscore_to_camel

        if not field.name:
            field.name = underscore_to_camel(key)
        super(CamelCaseMeta, self).process_base_field(field, key)


def set_model_internal_data(model, original_data, modified_data, deleted_data):
    """
    Set internal data to model.
    """
    model.__original_data__ = original_data
    list(map(model._prepare_child, model.__original_data__))

    model.__modified_data__ = modified_data
    list(map(model._prepare_child, model.__modified_data__))

    model.__deleted_fields__ = deleted_data

    return model


def recover_model_from_data(model_class, original_data, modified_data, deleted_data):
    """
    Function to reconstruct a model from DirtyModel basic information: original data, the modified and deleted
    fields.
    Necessary for pickle an object
    """
    model = model_class()
    return set_model_internal_data(model, original_data, modified_data, deleted_data)


[docs]class BaseModel(BaseData, metaclass=DirtyModelMeta): """ Base model with dirty feature. It stores original data and saves modifications in other side. """ __default_data__ = {} def __init__(self, data=None, flat=False, *args, **kwargs): super(BaseModel, self).__init__(*args, **kwargs) BaseModel.__setattr__(self, '__original_data__', {}) BaseModel.__setattr__(self, '__modified_data__', {}) BaseModel.__setattr__(self, '__deleted_fields__', []) from .base import Unlocker with Unlocker(self): self.import_data(self.__default_data__) if isinstance(data, (dict, Mapping)): self.import_data(data) self.import_data(kwargs) if flat: self.flat_data() def __reduce__(self): """ Reduce function to allow dumpable by pickle """ return recover_model_from_data, (self.__class__, self.__original_data__, self.__modified_data__, self.__deleted_fields__,) def get_real_name(self, name): obj = self.get_field_obj(name) try: return obj.name except AttributeError: return None
[docs] def set_field_value(self, name, value): """ Set the value to the field modified_data """ name = self.get_real_name(name) if not name or not self._can_write_field(name): return if name in self.__deleted_fields__: self.__deleted_fields__.remove(name) if self.__original_data__.get(name) == value: try: self.__modified_data__.pop(name) except KeyError: pass else: self.__modified_data__[name] = value self._prepare_child(value) if name not in self.__structure__ or not self.__structure__[name].read_only: return try: value.set_read_only(True) except AttributeError:
pass
[docs] def get_field_value(self, name): """ Get the field value from the modified data or the original one """ name = self.get_real_name(name) if not name or name in self.__deleted_fields__: return None modified = self.__modified_data__.get(name) if modified is not None: return modified
return self.__original_data__.get(name)
[docs] def delete_field_value(self, name): """ Mark this field to be deleted """ name = self.get_real_name(name) if name and self._can_write_field(name): if name in self.__modified_data__: self.__modified_data__.pop(name) if name in self.__original_data__ and name not in self.__deleted_fields__:
self.__deleted_fields__.append(name)
[docs] def reset_field_value(self, name): """ Resets value of a field """ name = self.get_real_name(name) if name and self._can_write_field(name): if name in self.__modified_data__: del self.__modified_data__[name] if name in self.__deleted_fields__: self.__deleted_fields__.remove(name) try: self.__original_data__[name].clear_modified_data() except (KeyError, AttributeError):
pass
[docs] def is_modified_field(self, name): """ Returns whether a field is modified or not """ name = self.get_real_name(name) if name in self.__modified_data__ or name in self.__deleted_fields__: return True try: return self.get_field_value(name).is_modified() except Exception:
return False
[docs] def import_data(self, data): """ Set the fields established in data to the instance """ if self.get_read_only() and self.is_locked(): return if isinstance(data, BaseModel): data = data.export_data() if not isinstance(data, (dict, Mapping)): raise TypeError('Impossible to import data')
self._import_data(data) def _import_data(self, data): for key, value in data.items(): if not self.get_field_obj(key): self._not_allowed_field(key) continue setattr(self, key, value) def _not_allowed_field(self, name): pass def _not_allowed_value(self, name, value): pass def _not_allowed_modify(self, name): pass
[docs] def import_deleted_fields(self, data): """ Set data fields to deleted """ if self.get_read_only() and self.is_locked(): return if isinstance(data, str): data = [data] for key in data: if hasattr(self, key): delattr(self, key) continue keys = key.split('.', 1) if len(keys) != 2: continue child = getattr(self, keys[0])
child.import_deleted_fields(keys[1])
[docs] def export_data(self): """ Get the results with the modified_data """ result = {} data = self.__original_data__.copy() data.update(self.__modified_data__) for key, value in data.items(): if key in self.__deleted_fields__: continue try: result[key] = value.export_data() except AttributeError: result[key] = value
return result
[docs] def export_modified_data(self): """ Get the modified data """ # TODO: why None? Try to get a better flag result = {key: None for key in self.__deleted_fields__} for key, value in self.__modified_data__.items(): if key in result.keys(): continue try: result[key] = value.export_modified_data() except AttributeError: result[key] = value for key, value in self.__original_data__.items(): if key in result.keys(): continue try: if value.is_modified(): result[key] = value.export_modified_data() except AttributeError: pass
return result
[docs] def export_modifications(self): """ Returns model modifications. """ result = {} for key, value in self.__modified_data__.items(): try: result[key] = value.export_data() except AttributeError: result[key] = value for key, value in self.__original_data__.items(): if key in result.keys() or key in self.__deleted_fields__: continue try: if not value.is_modified(): continue modifications = value.export_modifications() except AttributeError: continue try: result.update({'{}.{}'.format(key, f): v for f, v in modifications.items()}) except AttributeError: result[key] = modifications
return result
[docs] def get_original_field_value(self, name): """ Returns original field value or None """ name = self.get_real_name(name) try: value = self.__original_data__[name] except KeyError: return None try: return value.export_original_data() except AttributeError:
return value
[docs] def export_original_data(self): """ Get the original data """
return {key: self.get_original_field_value(key) for key in self.__original_data__.keys()}
[docs] def export_deleted_fields(self): """ Resturns a list with any deleted fields form original data. In tree models, deleted fields on children will be appended. """ result = self.__deleted_fields__.copy() for key, value in self.__original_data__.items(): if key in result: continue try: partial = value.export_deleted_fields() result.extend(['.'.join([key, key2]) for key2 in partial]) except AttributeError: pass
return result
[docs] def flat_data(self): """ Pass all the data from modified_data to original_data """ def flat_field(value): """ Flat field data """ try: value.flat_data() return value except AttributeError: return value modified_dict = self.__original_data__ modified_dict.update(self.__modified_data__) self.__original_data__ = {k: flat_field(v) for k, v in modified_dict.items() if k not in self.__deleted_fields__}
self.clear_modified_data()
[docs] def clear_modified_data(self): """ Clears only the modified data """ self.__modified_data__ = {} self.__deleted_fields__ = [] for value in self.__original_data__.values(): try: value.clear_modified_data() except AttributeError:
pass
[docs] def clear(self): """ Clears all the data in the object, keeping original data """ self.__modified_data__ = {}
self.__deleted_fields__ = [field for field in self.__original_data__.keys()]
[docs] def clear_all(self): """ Clears all the data in the object """ self.__modified_data__ = {} self.__original_data__ = {}
self.__deleted_fields__ = []
[docs] def get_fields(self): """ Returns used fields of model """ result = [key for key in self.__original_data__.keys() if key not in self.__deleted_fields__] result.extend([key for key in self.__modified_data__.keys() if key not in result and key not in self.__deleted_fields__])
return result
[docs] def is_modified(self): """ Returns whether model is modified or not """ if len(self.__modified_data__) or len(self.__deleted_fields__): return True for value in self.__original_data__.values(): try: if value.is_modified(): return True except AttributeError: pass
return False
[docs] def copy(self): """ Creates a copy of model """
return self.__class__(data=self.export_data()) def __iter__(self): def iterfunc(): for field in self.get_fields(): yield (field, getattr(self, field)) return iterfunc() def _can_write_field(self, name): if name not in self.__structure__ or (not self.__structure__[name].read_only and not self.get_read_only()) or not self.is_locked(): return True else: self._not_allowed_modify(name) return False def _update_read_only(self): for value in itertools.chain(self.__original_data__.values(), self.__modified_data__.values()): try: value.set_read_only(self.get_read_only()) except AttributeError: pass def __str__(self): return '{0}({1})'.format(self.__class__.__name__, ",".join(["'{0}': {1}".format(field, repr(self.get_field_value(field))) for field in sorted(self.get_fields())])) def __repr__(self): return str(self) def __contains__(self, item): return item in self.__modified_data__ or (item in self.__original_data__ and item not in self.__deleted_fields__) @classmethod def get_field_obj(cls, name): obj_field = getattr(cls, name, None) return obj_field if isinstance(obj_field, BaseField) else None def _get_fields_by_path(self, field): try: field, next_field = field.split('.', 1) except ValueError: next_field = '' if field == '*': return self.get_fields(), next_field else: return [field], next_field
[docs] def get_attrs_by_path(self, field_path, stop_first=False): """ It returns list of values looked up by field path. Field path is dot-formatted string path: ``parent_field.child_field``. :param field_path: field path. It allows ``*`` as wildcard. :type field_path: list or None. :param stop_first: Stop iteration on first value looked up. Default: False. :type stop_first: bool :return: A list of values or None it was a invalid path. :rtype: :class:`list` or :class:`None` """ fields, next_field = self._get_fields_by_path(field_path) values = [] for field in fields: if next_field: try: res = self.get_field_value(field).get_attrs_by_path(next_field, stop_first=stop_first) if res is None: continue values.extend(res) if stop_first and len(values): break except AttributeError: pass else: value = self.get_field_value(field) if value is None: continue if stop_first: return [value, ] values.append(value)
return values if len(values) else None
[docs] def get_1st_attr_by_path(self, field_path, **kwargs): """ It returns first value looked up by field path. Field path is dot-formatted string path: ``parent_field.child_field``. :param field_path: field path. It allows ``*`` as wildcard. :type field_path: str :param default: Default value if field does not exist. If it is not defined :class:`AttributeError` exception will be raised. :return: value """ res = self.get_attrs_by_path(field_path, stop_first=True) if res is None: try: return kwargs['default'] except KeyError: raise AttributeError("Field '{0}' does not exist".format(field_path))
return res.pop()
[docs] def delete_attr_by_path(self, field_path): """ It deletes fields looked up by field path. Field path is dot-formatted string path: ``parent_field.child_field``. :param field_path: field path. It allows ``*`` as wildcard. :type field_path: str """ fields, next_field = self._get_fields_by_path(field_path) for field in fields: if next_field: try: self.get_field_value(field).delete_attr_by_path(next_field) except AttributeError: pass else:
self.delete_field_value(field)
[docs] def reset_attr_by_path(self, field_path): """ It restores original values for fields looked up by field path. Field path is dot-formatted string path: ``parent_field.child_field``. :param field_path: field path. It allows ``*`` as wildcard. :type field_path: str """ fields, next_field = self._get_fields_by_path(field_path) for field in fields: if next_field: try: self.get_field_value(field).reset_attr_by_path(next_field) except AttributeError: pass else:
self.reset_field_value(field) def __getitem__(self, key): try: return self.get_1st_attr_by_path(key) except AttributeError as ex: raise KeyError(str(ex))
[docs] @classmethod def get_structure(cls): """ Returns a dictionary with model field objects. :return: dict """
return cls.__structure__.copy()
[docs] @classmethod def get_default_data(cls): """ Returns a dictionary with default data. :return: dict """
return deepcopy(cls.__default_data__) def __len__(self):
return len(self.export_data()) class BaseDynamicModel(BaseModel): """ """ __dynamic_model__ = None def __getattr__(self, name): try: return getattr(super(BaseDynamicModel, self), name) except AttributeError: return self.get_field_value(name) def _get_field_type(self, key, value): """ Helper to create field object based on value type """ if isinstance(value, bool): return BooleanField(name=key) elif isinstance(value, int): return IntegerField(name=key) elif isinstance(value, float): return FloatField(name=key) elif isinstance(value, str): return StringField(name=key) elif isinstance(value, time): return TimeField(name=key) elif isinstance(value, datetime): return DateTimeField(name=key) elif isinstance(value, date): return DateField(name=key) elif isinstance(value, timedelta): return TimedeltaField(name=key) elif isinstance(value, Enum): return EnumField(name=key, enum_class=type(value)) elif isinstance(value, (dict, BaseDynamicModel, Mapping)): return ModelField(name=key, model_class=self.__dynamic_model__ or self.__class__) elif isinstance(value, BaseModel): return ModelField(name=key, model_class=value.__class__) elif isinstance(value, (list, set, ListModel)): if not len(value): return None field_type = self._get_field_type(None, value[0]) return ArrayField(name=key, field_type=field_type) elif value is None: return None else: raise TypeError("Invalid parameter: %s. Type not supported." % (key,)) def _import_data(self, data): """ Set the fields established in data to the instance """ for key, value in data.items(): if key.startswith('__'): self._not_allowed_field(key) continue if not self.get_field_obj(key) and not self._define_new_field_by_value(key, value): self._not_allowed_value(key, value) continue setattr(self, key, value) def recover_dynamic_model_from_data(model_class, original_data, modified_data, deleted_data, structure): """ Function to reconstruct a model from DirtyModel basic information: original data, the modified and deleted fields. Necessary for pickle an object """ model = model_class() model.__structure__ = {k: d[0](**d[1]) for k, d in structure.items()} return set_model_internal_data(model, original_data, modified_data, deleted_data)
[docs]class DynamicModel(BaseDynamicModel): """ DynamicModel allow to create model with no structure. Each instance has its own derivated class from DynamicModels. """ _next_id = 0 def __init__(self, *args, **kwargs): super(DynamicModel, self).__init__(*args, **kwargs) self.__structure__ = {} def __new__(cls, *args, **kwargs): new_class = type('DynamicModel_' + str(cls._next_id), (cls,), {'__dynamic_model__': DynamicModel}) cls._next_id = id(new_class) return super(DynamicModel, new_class).__new__(new_class) def _define_new_field_by_value(self, name, value): field_type = self._get_field_type(name, value) if not field_type: return False self.__structure__[field_type.name] = field_type setattr(self.__class__, name, field_type) return True def __setattr__(self, name, value): if not self.__hasattr__(name): if not self.get_read_only() or not self.is_locked(): if not self._define_new_field_by_value(name, value): self._not_allowed_value(name, value) return super(DynamicModel, self).__setattr__(name, value) # def get_field_obj(self, name): # return self.__structure__[name] def __hasattr__(self, name): try: getattr(super(DynamicModel, self), name) except AttributeError: try: self.__dict__[name] except KeyError: try: self.__class__.__dict__[name] except KeyError: return False return True def __reduce__(self): """ Reduce function to allow dumpable by pickle """ return recover_dynamic_model_from_data, (DynamicModel, self.__original_data__, self.__modified_data__, self.__deleted_fields__, {field.name: (field.__class__, field.export_definition())
for field in self.__structure__.values()}) def recover_hashmap_model_from_data(model_class, original_data, modified_data, deleted_data, field_type): """ Function to reconstruct a model from DirtyModel basic information: original data, the modified and deleted fields. Necessary for pickle an object """ model = model_class(field_type=field_type[0](**field_type[1])) return set_model_internal_data(model, original_data, modified_data, deleted_data)
[docs]class HashMapModel(InnerFieldTypeMixin, BaseModel): """ Hash map model with dirty feature. It stores original data and saves modifications in other side. """ def __reduce__(self): """ Reduce function to allow dumpable by pickle """ return recover_hashmap_model_from_data, (self.__class__, self.__original_data__, self.__modified_data__, self.__deleted_fields__, (self.get_field_type().__class__, self.get_field_type().export_definition())) def get_real_name(self, name): new_name = super(HashMapModel, self).get_real_name(name) return new_name if new_name else name def get_field_obj(self, name): return super(HashMapModel, self).get_field_obj(name) or self.__field_type__
[docs] def copy(self): """ Creates a copy of model """
return self.__class__(field_type=self.get_field_type(), data=self.export_data())
[docs] def get_validated_object(self, value): """ Returns the value validated by the field_type """ try: if self.get_field_type().check_value(value) or self.get_field_type().can_use_value(value): data = self.get_field_type().use_value(value) self._prepare_child(data) return data else: return None except AttributeError:
return value def _import_data(self, data): """ Set the fields in data to the hashmap instance. """ for key, value in data.items(): if key.startswith('__'): self._not_allowed_field(key) continue setattr(self, key, value) def __setattr__(self, name, value): if not self.__hasattr__(name) and (not self.get_read_only() or not self.is_locked()): if value is None: delattr(self, name) return validated_value = self.get_validated_object(value) if validated_value is not None and \ (name not in self.__original_data__ or self.__original_data__[name] != validated_value): self.set_field_value(name, validated_value) return super(HashMapModel, self).__setattr__(name, value) def __hasattr__(self, name): try: getattr(super(HashMapModel, self), name) except AttributeError: try: self.__dict__[name] except KeyError: try: self.__class__.__dict__[name] except KeyError: return False return True def __getattr__(self, name): try: return getattr(super(HashMapModel, self), name) except AttributeError: return self.get_field_value(name) def __delattr__(self, name): if not self.__hasattr__(name) and (not self.get_read_only() or not self.is_locked()): self.delete_field_value(name) return
super(HashMapModel, self).__delattr__(name) def recover_fast_dynamic_model_from_data(model_class, original_data, modified_data, deleted_data, field_types): """ Function to reconstruct a model from DirtyModel basic information: original data, the modified and deleted fields. Necessary for pickle an object """ model = model_class() model.__field_types__ = {k: d[0](**d[1]) for k, d in field_types.items()} return set_model_internal_data(model, original_data, modified_data, deleted_data)
[docs]class FastDynamicModel(BaseDynamicModel): """ FastDynamicModel allow to create model with no structure. """ __field_types__ = None def __init__(self, *args, **kwargs): self.__field_types__ = {} self.__dynamic_model__ = FastDynamicModel super(FastDynamicModel, self).__init__(*args, **kwargs) def get_real_name(self, name): return super(FastDynamicModel, self).get_real_name(name) or name
[docs] def get_validated_object(self, field_type, value): """ Returns the value validated by the field_type """ if field_type.check_value(value) or field_type.can_use_value(value): data = field_type.use_value(value) self._prepare_child(data) return data else:
return None
[docs] def get_current_structure(self): """ Returns a dictionary with model field objects. :return: dict """ struct = self.__class__.get_structure() struct.update(self.__field_types__)
return struct def _define_new_field_by_value(self, name, value): field_type = self._get_field_type(name, value) if not field_type: return False self.__field_types__[name] = field_type return True def __setattr__(self, name, value): if self.__field_types__ is not None and not self.__hasattr__(name) \ and (not self.get_read_only() or not self.is_locked()): if value is None: delattr(self, name) return try: field_type = self.__field_types__[name] except KeyError: if not self._define_new_field_by_value(name, value): self._not_allowed_value(name, value) return field_type = self.__field_types__[name] validated_value = self.get_validated_object(field_type, value) if validated_value is not None and \ (name not in self.__original_data__ or self.__original_data__[name] != validated_value): self.set_field_value(name, validated_value) return super(FastDynamicModel, self).__setattr__(name, value) def __hasattr__(self, name): try: getattr(super(FastDynamicModel, self), name) except AttributeError: try: self.__dict__[name] except KeyError: try: self.__class__.__dict__[name] except KeyError: return False return True def __delattr__(self, name): if not self.__hasattr__(name) and (not self.get_read_only() or not self.is_locked()): self.delete_field_value(name) return super(FastDynamicModel, self).__delattr__(name) def get_field_obj(self, name): try: return self.__field_types__[name] except KeyError: return super(FastDynamicModel, self).get_field_obj(name) def __reduce__(self): """ Reduce function to allow dumpable by pickle """ return recover_fast_dynamic_model_from_data, (self.__class__, self.__original_data__, self.__modified_data__, self.__deleted_fields__, {field.name: (field.__class__, field.export_definition())
for field in self.__field_types__.values()})