"""
Internal types for dirty models
"""
import itertools
from functools import wraps
from .base import BaseData, InnerFieldTypeMixin
[docs]def modified_data_decorator(function):
"""
Decorator to initialise the modified_data if necessary. To be used in list functions
to modify the list
"""
@wraps(function)
def func(self, *args, **kwargs):
"""Decorator function"""
if not self.get_read_only() or not self.is_locked():
self.initialise_modified_data()
return function(self, *args, **kwargs)
return lambda: None
return func
def restore_list_model_from_data(list_class, field, common_data, original_list, modified_list):
model = list_class(field_type=field[0](**field[1]))
if original_list is not None:
model.__original_data__ = [common_data[i] for i in original_list]
list(map(model._prepare_child, model.__original_data__))
if modified_list is not None:
model.__modified_data__ = [common_data[i] for i in modified_list]
list(map(model._prepare_child, model.__modified_data__))
return model
[docs]class ListModel(InnerFieldTypeMixin, BaseData):
"""
Dirty model for a list. It has the behavior to work as a list implementing its methods
and has also the methods export_data, export_modified_data, import_data and flat_data
to work also as a model, storing original and modified values.
"""
def __init__(self, seq=None, *args, **kwargs):
super(ListModel, self).__init__(*args, **kwargs)
self.__original_data__ = []
self.__modified_data__ = None
if seq is not None:
self.extend(seq)
[docs] def get_validated_object(self, value):
"""
Returns the value validated by the field_type
"""
try:
if self.get_field_type().check_value(value) or self.get_field_type().can_use_value(value):
data = self.get_field_type().use_value(value)
self._prepare_child(data)
return data
else:
return None
except AttributeError:
return value
[docs] def initialise_modified_data(self):
"""
Initialise the modified_data if necessary
"""
if self.__modified_data__ is None:
if self.__original_data__:
self.__modified_data__ = list(self.__original_data__)
else:
self.__modified_data__ = []
@modified_data_decorator
def __setitem__(self, key, value):
"""
Function to set a value to an element e.g list[key] = value
"""
validated_value = self.get_validated_object(value)
if validated_value is not None:
self.__modified_data__.__setitem__(key, validated_value)
def __getitem__(self, item):
"""
Function to get an item from a list e.g list[key]
"""
if not isinstance(item, (str, int, slice)):
raise TypeError("Item must be an integer, slice or string")
if isinstance(item, str):
try:
return self.get_1st_attr_by_path(item)
except AttributeError as ex:
raise KeyError(str(ex))
if self.__modified_data__ is not None:
val = self.__modified_data__.__getitem__(item)
if val is not None:
return val
return self.__original_data__.__getitem__(item)
@modified_data_decorator
def __delitem__(self, key):
"""
Delete item from a list
"""
del self.__modified_data__[key]
def __len__(self):
"""
Function to get the list length
"""
if self.__modified_data__ is not None:
return len(self.__modified_data__)
return len(self.__original_data__)
[docs] @modified_data_decorator
def append(self, item):
"""
Appending elements to our list
"""
validated_value = self.get_validated_object(item)
if validated_value is not None:
self.__modified_data__.append(validated_value)
[docs] @modified_data_decorator
def insert(self, index, p_object):
"""
Insert an element to a list
"""
validated_value = self.get_validated_object(p_object)
if validated_value is not None:
self.__modified_data__.insert(index, validated_value)
[docs] def index(self, value):
"""
Gets the index in the list for a value
"""
if self.__modified_data__ is not None:
return self.__modified_data__.index(value)
return self.__original_data__.index(value)
[docs] def clear(self):
"""
Resets our list, keeping original data
"""
self.__modified_data__ = None
[docs] def clear_all(self):
"""
Resets our list
"""
self.__original_data__ = []
self.__modified_data__ = None
[docs] @modified_data_decorator
def remove(self, value):
"""
Deleting an element from the list
"""
return self.__modified_data__.remove(value)
[docs] @modified_data_decorator
def extend(self, iterable):
"""
Given an iterable, it adds the elements to our list
"""
for value in iterable:
self.append(value)
[docs] @modified_data_decorator
def pop(self, *args):
"""
Obtains and delete the element from the list
"""
if self.__modified_data__ is not None:
return self.__modified_data__.pop(*args)
[docs] def count(self, value):
"""
Gives the number of occurrencies of a value in the list
"""
if self.__modified_data__ is not None:
return self.__modified_data__.count(value)
return self.__original_data__.count(value)
[docs] @modified_data_decorator
def reverse(self):
"""
Reverses the list order
"""
if self.__modified_data__:
self.__modified_data__.reverse()
[docs] @modified_data_decorator
def sort(self):
"""
Sorts the list
"""
if self.__modified_data__:
self.__modified_data__.sort()
def __iter__(self):
"""
Defined behaviour for our iterable to be iterated
"""
if self.__modified_data__ is not None:
return self.__modified_data__.__iter__()
return self.__original_data__.__iter__()
[docs] def flat_data(self):
"""
Function to pass our modified values to the original ones
"""
def flat_field(value):
"""
Flat item
"""
try:
value.flat_data()
return value
except AttributeError:
return value
modified_data = self.__modified_data__ if self.__modified_data__ is not None else self.__original_data__
if modified_data is not None:
self.__original_data__ = [flat_field(value) for value in modified_data]
self.__modified_data__ = None
[docs] def export_data(self):
"""
Retrieves the data in a jsoned form
"""
def export_field(value):
"""
Export item
"""
try:
return value.export_data()
except AttributeError:
return value
if self.__modified_data__ is not None:
return [export_field(value) for value in self.__modified_data__]
return [export_field(value) for value in self.__original_data__]
[docs] def export_modified_data(self):
"""
Retrieves the modified data in a jsoned form
"""
def export_modfield(value, is_modified_seq=True):
"""
Export modified item
"""
try:
return value.export_modified_data()
except AttributeError:
if is_modified_seq:
return value
if self.__modified_data__ is not None:
return [export_modfield(value) for value in self.__modified_data__]
return list(x for x in [export_modfield(value) for value in self.__original_data__] if x is not None)
[docs] def export_modifications(self):
"""
Returns list modifications.
"""
if self.__modified_data__ is not None:
return self.export_data()
result = {}
for key, value in enumerate(self.__original_data__):
try:
if not value.is_modified():
continue
modifications = value.export_modifications()
except AttributeError:
continue
try:
result.update({'{}.{}'.format(key, f): v for f, v in modifications.items()})
except AttributeError:
result[key] = modifications
return result
[docs] def export_original_data(self):
"""
Retrieves the original_data
"""
def export_field(value):
"""
Export item
"""
try:
return value.export_original_data()
except AttributeError:
return value
return [export_field(val) for val in self.__original_data__]
[docs] def import_data(self, data):
"""
Uses data to add it to the list
"""
if hasattr(data, '__iter__'):
self.extend(data)
[docs] def import_deleted_fields(self, data):
"""
Set data fields to deleted
"""
def child_delete_from_str(data_str):
"""
Inner function to set children fields to deleted
"""
parts = data_str.split('.', 1)
if parts[0].isnumeric:
self[int(parts[0])].import_deleted_fields(parts[1])
if not self.get_read_only() or not self.is_locked():
if isinstance(data, str):
data = [data]
if isinstance(data, list):
for key in data:
child_delete_from_str(key)
[docs] def export_deleted_fields(self):
"""
Returns a list with any deleted fields form original data.
In tree models, deleted fields on children will be appended.
"""
result = []
if self.__modified_data__ is not None:
return result
for index, item in enumerate(self):
try:
deleted_fields = item.export_deleted_fields()
result.extend(['{}.{}'.format(index, key) for key in deleted_fields])
except AttributeError:
pass
return result
[docs] def is_modified(self):
"""
Returns whether list is modified or not
"""
if self.__modified_data__ is not None:
return True
for value in self.__original_data__:
try:
if value.is_modified():
return True
except AttributeError:
pass
return False
[docs] def clear_modified_data(self):
"""
Clears only the modified data
"""
self.__modified_data__ = None
for value in self.__original_data__:
try:
value.clear_modified_data()
except AttributeError:
pass
def _update_read_only(self):
for value in itertools.chain(self.__original_data__ if self.__original_data__ else [],
self.__modified_data__ if self.__modified_data__ else []):
try:
value.set_read_only(self.get_read_only())
except AttributeError:
pass
[docs] def get_attrs_by_path(self, field_path, stop_first=False):
"""
It returns list of values looked up by field path.
Field path is dot-formatted string path: ``parent_field.child_field``.
:param field_path: field path. It allows ``*`` as wildcard.
:type field_path: list or None.
:param stop_first: Stop iteration on first value looked up. Default: False.
:type stop_first: bool
:return: value
"""
index_list, next_field = self._get_indexes_by_path(field_path)
values = []
for idx in index_list:
if next_field:
try:
res = self[idx].get_attrs_by_path(next_field, stop_first=stop_first)
if res is None:
continue
values.extend(res)
if stop_first and len(values):
break
except AttributeError:
pass
else:
if stop_first:
return [self[idx], ]
values.append(self[idx])
return values if len(values) else None
[docs] def get_1st_attr_by_path(self, field_path, **kwargs):
"""
It returns first value looked up by field path.
Field path is dot-formatted string path: ``parent_field.child_field``.
:param field_path: field path. It allows ``*`` as wildcard.
:type field_path: str
:param default: Default value if field does not exist.
If it is not defined :class:`AttributeError` exception will be raised.
:return: value
"""
res = self.get_attrs_by_path(field_path, stop_first=True)
if res is None:
if 'default' in kwargs:
return kwargs['default']
raise AttributeError("Field '{0}' does not exist".format(field_path))
return res.pop()
[docs] def delete_attr_by_path(self, field):
"""
Function for deleting a field specifying the path in the whole model as described
in :func:`dirty:models.models.BaseModel.perform_function_by_path`
"""
index_list, next_field = self._get_indexes_by_path(field)
if index_list:
for index in reversed(index_list):
if next_field:
self[index].delete_attr_by_path(next_field)
else:
self.pop(index)
[docs] def reset_attr_by_path(self, field):
"""
Function for restoring a field specifying the path in the whole model as described
in :func:`dirty:models.models.BaseModel.perform_function_by_path`
"""
index_list, next_field = self._get_indexes_by_path(field)
if index_list:
if next_field:
for index in index_list:
self[index].reset_attr_by_path(next_field)
else:
for index in index_list:
try:
self[index].clear_modified_data()
except (AttributeError, IndexError):
return
def _get_indexes_by_path(self, field):
"""
Returns a list of indexes by field path.
:param field: Field structure as following:
*.subfield_2 would apply the function to the every subfield_2 of the elements
1.subfield_2 would apply the function to the subfield_2 of the element 1
* would apply the function to every element
1 would apply the function to element 1
"""
try:
field, next_field = field.split('.', 1)
except ValueError:
next_field = ''
if field == '*':
index_list = []
for item in self:
index_list.append(self.index(item))
if index_list:
return index_list, next_field
return [], None
elif field.isnumeric():
index = int(field)
if index >= len(self):
return None, None
return [index], next_field
def __repr__(self):
return str(self)
def __str__(self):
return str([item for item in self])
def __contains__(self, item):
return item in self.__modified_data__ if self.__modified_data__ is not None else item in self.__original_data__
def __reduce__(self):
orginal_list = [id(i) for i in self.__original_data__]
common_data = {id(i): i for i in self.__original_data__}
try:
modified_list = [id(i) for i in self.__modified_data__]
common_data.update({id(i): i for i in self.__modified_data__ if id(i) not in common_data})
except TypeError:
modified_list = None
return restore_list_model_from_data, (self.__class__,
(self.get_field_type().__class__,
self.get_field_type().export_definition()),
common_data,
orginal_list,
modified_list)